You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/07/04 14:54:03 UTC
[sling-org-apache-sling-distribution-journal-kafka] branch master
updated: SLING-8557 - Use a common class for pollers, improve test coverage
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new 2c0930c SLING-8557 - Use a common class for pollers, improve test coverage
2c0930c is described below
commit 2c0930cb23d6f83f1f4eec3b945c663d23b8a703
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Thu Jul 4 16:53:53 2019 +0200
SLING-8557 - Use a common class for pollers, improve test coverage
---
.../journal/kafka/JsonRecordHandler.java | 60 +++++++++
.../journal/kafka/KafkaClientProvider.java | 4 +-
.../journal/kafka/KafkaMessagePoller.java | 143 ---------------------
...afkaJsonMessagePoller.java => KafkaPoller.java} | 93 +++++++-------
.../journal/kafka/ProtobufRecordHandler.java | 81 ++++++++++++
.../journal/kafka/KafkaJsonMessageSenderTest.java | 67 ++++++++++
...MessagePollerTest.java => KafkaPollerTest.java} | 61 ++++-----
...lerTest.java => ProtobufRecordHandlerTest.java} | 37 +-----
8 files changed, 294 insertions(+), 252 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java b/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java
new file mode 100644
index 0000000..14fd391
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+
+public class JsonRecordHandler<T> implements Consumer<ConsumerRecord<String, String>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JsonRecordHandler.class);
+
+ private final MessageHandler<T> handler;
+
+ private final ObjectReader reader;
+
+ public JsonRecordHandler(MessageHandler<T> handler, Class<T> clazz) {
+ this.handler = requireNonNull(handler);
+ ObjectMapper mapper = new ObjectMapper();
+ reader = mapper.readerFor(requireNonNull(clazz));
+ }
+
+ @Override
+ public void accept(ConsumerRecord<String, String> record) {
+ MessageInfo info = new KafkaMessageInfo(record);
+ String payload = record.value();
+ try {
+ T message = reader.readValue(payload);
+ handler.handle(info, message);
+ } catch (IOException e) {
+ LOG.warn("Failed to parse payload {}", payload);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index 9cc9703..199e7fe 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -146,7 +146,7 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
} else {
consumer.seekToEnd(topicPartitions);
}
- Closeable poller = new KafkaMessagePoller(consumer, eventSender, adapters);
+ Closeable poller = KafkaPoller.createProtobufPoller(consumer, eventSender, adapters);
LOG.info("Created poller for reset {}, topicName {}, assign {}", reset, topicName, assign);
return poller;
}
@@ -167,7 +167,7 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
} else {
consumer.seekToEnd(topicPartitions);
}
- return new KafkaJsonMessagePoller<>(consumer, eventSender, handler, type);
+ return KafkaPoller.createJsonPoller(consumer, eventSender, handler, type);
}
@Override
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
deleted file mode 100644
index 52ae1a6..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.kafka;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-import org.apache.sling.distribution.journal.messages.Types;
-import org.apache.sling.distribution.journal.ExceptionEventSender;
-import org.apache.sling.distribution.journal.HandlerAdapter;
-import org.apache.sling.distribution.journal.MessageInfo;
-import com.google.protobuf.ByteString;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.Headers;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
-import static java.lang.Integer.parseInt;
-import static java.lang.String.format;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.time.Duration.ofHours;
-import static java.util.Objects.requireNonNull;
-
-public class KafkaMessagePoller implements Closeable {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaMessagePoller.class);
-
- private final Map<Class<?>, HandlerAdapter<?>> handlers = new HashMap<>();
-
- private final KafkaConsumer<String, byte[]> consumer;
-
- private volatile boolean running = true;
-
- private final String types;
-
- private final ExceptionEventSender eventSender;
-
- public KafkaMessagePoller(KafkaConsumer<String, byte[]> consumer, ExceptionEventSender eventSender, HandlerAdapter<?>... handlerAdapters) {
- this.consumer = requireNonNull(consumer);
- this.eventSender = requireNonNull(eventSender);
- for (HandlerAdapter<?> handlerAdapter : handlerAdapters) {
- handlers.put(handlerAdapter.getType(), handlerAdapter);
- }
- types = handlers.keySet().toString();
- startBackgroundThread(this::run, format("Message Poller %s", types));
- }
-
- @Override
- public void close() throws IOException {
- LOG.info("Shutdown poller for types {}", types);
- running = false;
- consumer.wakeup();
- }
-
- public void run() {
- LOG.info("Start poller for types {}", types);
- while(running) {
- try {
- consumer.poll(ofHours(1))
- .forEach(this::handleRecord);
- } catch (WakeupException e) {
- LOG.debug("Waked up {}", e.getMessage(), e);
- this.running = false;
- } catch(Exception e) {
- eventSender.send(e);
- LOG.error("Exception while receiving from kafka: {}", e.getMessage(), e);
- sleepAfterError();
- // Continue as KafkaConsumer should handle the error transparently
- }
- }
- consumer.close();
- LOG.info("Stop poller for types {}", types);
- }
-
- private void sleepAfterError() {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e1) {
- Thread.currentThread().interrupt();
- }
- }
-
- private void handleRecord(ConsumerRecord<String, byte[]> record) {
- getHandler(record)
- .ifPresent(handler->handleRecord(handler, record));
- }
-
- private Optional<HandlerAdapter<?>> getHandler(ConsumerRecord<String, byte[]> record) {
- try {
- int type = parseInt(getHeaderValue(record.headers(), "type"));
- int version = parseInt(getHeaderValue(record.headers(), "version"));
- Class<?> messageClass = Types.getType(type, version);
- Optional<HandlerAdapter<?>> handler = Optional.ofNullable(handlers.get(messageClass));
- if (!handler.isPresent()) {
- LOG.debug("No handler registered for type {}", messageClass.getName());
- }
- return handler;
- } catch (RuntimeException e) {
- LOG.info("No handler found for headers {}.", record.headers(), e);
- return Optional.empty();
- }
- }
-
- private void handleRecord(HandlerAdapter<?> handler, ConsumerRecord<String, byte[]> record) {
- try {
- MessageInfo info = new KafkaMessageInfo(record);
- ByteString payload = ByteString.copyFrom(record.value());
- handler.handle(info, payload);
- } catch (Exception e) {
- String msg = format("Error consuming message for types %s", types);
- LOG.warn(msg);
- }
- }
-
- private String getHeaderValue(Headers headers, String key) {
- Header header = Optional.ofNullable(headers.lastHeader(key))
- .orElseThrow(()->new IllegalArgumentException(format("Header with key %s not found", key)));
- return new String(header.value(), UTF_8);
- }
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java
similarity index 54%
rename from src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
rename to src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java
index e437ceb..671fa28 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java
@@ -18,92 +18,97 @@
*/
package org.apache.sling.distribution.journal.kafka;
+import static java.time.Duration.ofHours;
+import static java.util.Objects.requireNonNull;
+import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+
import java.io.Closeable;
import java.io.IOException;
+import java.util.function.Consumer;
-import org.apache.sling.distribution.journal.ExceptionEventSender;
-import org.apache.sling.distribution.journal.MessageHandler;
-import org.apache.sling.distribution.journal.MessageInfo;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectReader;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
-import static java.lang.String.format;
-import static java.time.Duration.ofHours;
-import static java.util.Objects.requireNonNull;
-
-public class KafkaJsonMessagePoller<T> implements Closeable {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaJsonMessagePoller.class);
-
- private volatile boolean running = true;
+public class KafkaPoller<T> implements Closeable {
- private final KafkaConsumer<String, String> consumer;
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaPoller.class);
- private final MessageHandler<T> handler;
+ private static final long ERROR_SLEEP_MS = 10000;
- private final ObjectReader reader;
+ private final KafkaConsumer<String, T> consumer;
+ private final Consumer<ConsumerRecord<String, T>> handler;
+
private final ExceptionEventSender eventSender;
+
+ private volatile boolean running = true;
+
+ long errorSleepMs;
- public KafkaJsonMessagePoller(KafkaConsumer<String, String> consumer, ExceptionEventSender eventSender, MessageHandler<T> handler, Class<T> clazz) {
+ public KafkaPoller(KafkaConsumer<String, T> consumer, ExceptionEventSender eventSender, Consumer<ConsumerRecord<String, T>> handler) {
+ this.handler = handler;
this.consumer = requireNonNull(consumer);
this.eventSender = requireNonNull(eventSender);
- this.handler = requireNonNull(handler);
- ObjectMapper mapper = new ObjectMapper();
- reader = mapper.readerFor(requireNonNull(clazz));
- startBackgroundThread(this::run, format("Message Json Poller for handler %s", handler));
+ this.errorSleepMs = ERROR_SLEEP_MS;
+ startBackgroundThread(this::run, "Message Poller");
+ }
+
+ public static Closeable createProtobufPoller(KafkaConsumer<String, byte[]> consumer, ExceptionEventSender eventSender, HandlerAdapter<?>... adapters) {
+ Consumer<ConsumerRecord<String, byte[]>> handler = new ProtobufRecordHandler(adapters);
+ return new KafkaPoller<byte[]>(consumer, eventSender, handler);
+ }
+
+ public static <T> Closeable createJsonPoller(KafkaConsumer<String, String> consumer, ExceptionEventSender eventSender, MessageHandler<T> handler, Class<T> clazz) {
+ Consumer<ConsumerRecord<String, String>> recordHandler = new JsonRecordHandler<T>(handler, clazz);
+ return new KafkaPoller<String>(consumer, eventSender, recordHandler);
}
@Override
public void close() throws IOException {
- LOG.info("Shutdown JSON poller for handler {}", handler);
+ LOG.info("Shutdown poller");
running = false;
consumer.wakeup();
}
public void run() {
- LOG.info("Start JSON poller for handler {}", handler);
+ LOG.info("Start poller");
while(running) {
try {
- consumer.poll(ofHours(1))
- .forEach(this::handleRecord);
+ consumer.poll(ofHours(1)).forEach(this::handle);
} catch (WakeupException e) {
- LOG.debug("Waked up while stopping {}", e.getMessage(), e);
- running = false;
- } catch(Exception e) {
+ LOG.debug("Waked up {}", e.getMessage(), e);
+ this.running = false;
+ } catch (Exception e) {
eventSender.send(e);
- LOG.error("Exception during recieve: {}", e.getMessage(), e);
+ LOG.error("Exception while receiving from kafka: {}", e.getMessage(), e);
sleepAfterError();
// Continue as KafkaConsumer should handle the error transparently
}
}
consumer.close();
- LOG.info("Stop JSON poller for handler {}", handler);
+ LOG.info("Stopped poller");
}
- private void sleepAfterError() {
+ public void handle(ConsumerRecord<String, T> record) {
try {
- Thread.sleep(10000);
- } catch (InterruptedException e1) {
- Thread.currentThread().interrupt();
+ handler.accept(record);
+ } catch (Exception e) {
+ LOG.warn("Error consuming message {}", record.headers());
}
}
- private void handleRecord(ConsumerRecord<String, String> record) {
- MessageInfo info = new KafkaMessageInfo(record);
- String payload = record.value();
+ private void sleepAfterError() {
try {
- T message = reader.readValue(payload);
- handler.handle(info, message);
- } catch (IOException e) {
- eventSender.send(e);
- LOG.error("Failed to parse payload {}", payload);
+ Thread.sleep(errorSleepMs);
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
}
}
+
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java b/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java
new file mode 100644
index 0000000..9a331f2
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import static java.lang.Integer.parseInt;
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.messages.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.ByteString;
+
+public class ProtobufRecordHandler implements Consumer<ConsumerRecord<String, byte[]>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ProtobufRecordHandler.class);
+
+ private final Map<Class<?>, HandlerAdapter<?>> handlers = new HashMap<>();
+
+ public ProtobufRecordHandler(HandlerAdapter<?>... handlerAdapters) {
+ for (HandlerAdapter<?> handlerAdapter : handlerAdapters) {
+ handlers.put(handlerAdapter.getType(), handlerAdapter);
+ }
+ }
+
+ @Override
+ public void accept(ConsumerRecord<String, byte[]> record) {
+ getHandler(record)
+ .ifPresent(handler->handleRecord(handler, record));
+ }
+
+ private Optional<HandlerAdapter<?>> getHandler(ConsumerRecord<String, byte[]> record) {
+ int type = parseInt(getHeaderValue(record.headers(), "type"));
+ int version = parseInt(getHeaderValue(record.headers(), "version"));
+ Class<?> messageClass = Types.getType(type, version);
+ Optional<HandlerAdapter<?>> handler = Optional.ofNullable(handlers.get(messageClass));
+ if (!handler.isPresent()) {
+ LOG.debug("No handler registered for type {}", messageClass.getName());
+ }
+ return handler;
+ }
+
+ private void handleRecord(HandlerAdapter<?> handler, ConsumerRecord<String, byte[]> record) {
+ MessageInfo info = new KafkaMessageInfo(record);
+ ByteString payload = ByteString.copyFrom(record.value());
+ handler.handle(info, payload);
+ }
+
+ private String getHeaderValue(Headers headers, String key) {
+ Header header = Optional.ofNullable(headers.lastHeader(key))
+ .orElseThrow(()->new IllegalArgumentException(format("Header with key %s not found", key)));
+ return new String(header.value(), UTF_8);
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSenderTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSenderTest.java
new file mode 100644
index 0000000..27926bf
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSenderTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
+import org.apache.sling.distribution.journal.MessagingException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaJsonMessageSenderTest {
+
+ private static final String TOPIC = "topic";
+
+ @Mock
+ private ExceptionEventSender eventSender;
+
+ @Mock
+ private KafkaProducer<String, byte[]> producer;
+
+ @InjectMocks
+ private KafkaJsonMessageSender<Person> sender;
+
+ @Mock
+ private Future<RecordMetadata> record;
+
+ @Test(expected = MessagingException.class)
+ public void testSendError() throws Exception {
+ when(producer.send(Mockito.any())).thenReturn(record);
+ when(record.get()).thenThrow(new ExecutionException(new IOException("Expected")));
+ Person person = new Person();
+ person.name = "name";
+ sender.send(TOPIC, person);
+ }
+
+ public static class Person {
+ public String name;
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePollerTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java
similarity index 51%
copy from src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePollerTest.java
copy to src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java
index 8947696..93778d2 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java
@@ -18,28 +18,24 @@
*/
package org.apache.sling.distribution.journal.kafka;
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-import static org.hamcrest.CoreMatchers.equalTo;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.io.IOException;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.sling.distribution.journal.ExceptionEventSender;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -47,36 +43,41 @@ import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
-public class KafkaMessagePollerTest {
+public class KafkaPollerTest {
+
@Mock
- ExceptionEventSender eventSender;
+ private ExceptionEventSender eventSender;
@Mock
- KafkaConsumer<String, byte[]> consumer;
+ private KafkaConsumer<String, String> consumer;
- private Semaphore sem = new Semaphore(0);
+ @Mock
+ private Consumer<ConsumerRecord<String, String>> handler;
+ @SuppressWarnings("unchecked")
@Test
- public void testNoHeader() throws IOException, InterruptedException {
- ConsumerRecord<String, byte[]> record = new ConsumerRecord<String, byte[]>("topic", 1, 0l, 0l, TimestampType.CREATE_TIME, 0, 0, 0, "key", null);
+ public void testHandleError() throws Exception {
+ ConsumerRecord<String, String> record = new ConsumerRecord<String, String>("topic", 1, 0l, "", "");
when(consumer.poll(Mockito.any()))
- .thenReturn(records(Collections.singletonList(record)))
- .thenReturn(records(Collections.emptyList()));
- // Should display java.lang.IllegalArgumentException in log
- try (KafkaMessagePoller poller = new KafkaMessagePoller(consumer, eventSender, create(DiscoveryMessage.class, this::handle))) {
- Assert.assertThat(sem.tryAcquire(100, TimeUnit.MILLISECONDS), equalTo(false));
- }
- }
-
- private void handle(MessageInfo info, DiscoveryMessage message) {
- sem.release();
+ .thenReturn(records(Arrays.asList(record)))
+ .thenThrow(new KafkaException("Expected"))
+ .thenThrow(new WakeupException());
+ doThrow(new RuntimeException("Expected")).when(handler).accept(Mockito.any(ConsumerRecord.class));
+ KafkaPoller<String> poller = new KafkaPoller<String>(consumer, eventSender, handler);
+ poller.errorSleepMs = 100;
+ // Should see "Error consuming message" in the log
+ verify(handler, timeout(1000)).accept(Mockito.any(ConsumerRecord.class));
+ verify(eventSender, timeout(1000)).send(Mockito.any(KafkaException.class));
+ verify(consumer, timeout(1000)).close();
+ poller.close();
}
-
- private ConsumerRecords<String, byte[]> records(List<ConsumerRecord<String, byte[]>> records) {
- Map<TopicPartition, List<ConsumerRecord<String, byte[]>>> rm = new HashMap<>();
- for (ConsumerRecord<String, byte[]> record : records) {
+
+ private ConsumerRecords<String, String> records(List<ConsumerRecord<String, String>> records) {
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> rm = new HashMap<>();
+ for (ConsumerRecord<String, String> record : records) {
rm.put(new TopicPartition(record.topic(), record.partition()), Arrays.asList(record));
}
return new ConsumerRecords<>(rm);
}
+
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePollerTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandlerTest.java
similarity index 56%
rename from src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePollerTest.java
rename to src/test/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandlerTest.java
index 8947696..3c76ea7 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePollerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandlerTest.java
@@ -19,64 +19,35 @@
package org.apache.sling.distribution.journal.kafka;
import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.mockito.Mockito.when;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
-public class KafkaMessagePollerTest {
+public class ProtobufRecordHandlerTest {
@Mock
ExceptionEventSender eventSender;
@Mock
KafkaConsumer<String, byte[]> consumer;
- private Semaphore sem = new Semaphore(0);
-
- @Test
+ @Test(expected = IllegalArgumentException.class)
public void testNoHeader() throws IOException, InterruptedException {
ConsumerRecord<String, byte[]> record = new ConsumerRecord<String, byte[]>("topic", 1, 0l, 0l, TimestampType.CREATE_TIME, 0, 0, 0, "key", null);
- when(consumer.poll(Mockito.any()))
- .thenReturn(records(Collections.singletonList(record)))
- .thenReturn(records(Collections.emptyList()));
- // Should display java.lang.IllegalArgumentException in log
- try (KafkaMessagePoller poller = new KafkaMessagePoller(consumer, eventSender, create(DiscoveryMessage.class, this::handle))) {
- Assert.assertThat(sem.tryAcquire(100, TimeUnit.MILLISECONDS), equalTo(false));
- }
+ ProtobufRecordHandler handler = new ProtobufRecordHandler(create(DiscoveryMessage.class, this::handle));
+ handler.accept(record);
}
private void handle(MessageInfo info, DiscoveryMessage message) {
- sem.release();
- }
-
- private ConsumerRecords<String, byte[]> records(List<ConsumerRecord<String, byte[]>> records) {
- Map<TopicPartition, List<ConsumerRecord<String, byte[]>>> rm = new HashMap<>();
- for (ConsumerRecord<String, byte[]> record : records) {
- rm.put(new TopicPartition(record.topic(), record.partition()), Arrays.asList(record));
- }
- return new ConsumerRecords<>(rm);
}
}