You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/07/03 12:35:14 UTC
[sling-org-apache-sling-distribution-journal-kafka] branch master
updated: SLING-8554 - Increase test coverage
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new 9bec200 SLING-8554 - Increase test coverage
9bec200 is described below
commit 9bec20065b276ac6c56f81c8382080c19a7a670b
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Wed Jul 3 14:35:04 2019 +0200
SLING-8554 - Increase test coverage
---
.../journal/kafka/JsonMessagingTest.java | 17 +++++
.../journal/kafka/KafkaMessagePollerTest.java | 82 ++++++++++++++++++++++
.../distribution/journal/kafka/MessagingTest.java | 18 ++++-
3 files changed, 115 insertions(+), 2 deletions(-)
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/JsonMessagingTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/JsonMessagingTest.java
index 181ab35..3a81f7c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/JsonMessagingTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/JsonMessagingTest.java
@@ -19,6 +19,7 @@
package org.apache.sling.distribution.journal.kafka;
import static org.hamcrest.Matchers.samePropertyValuesAs;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -73,10 +74,26 @@ public class JsonMessagingTest {
poller.close();
}
+ @Test
+ public void testParseError() throws InterruptedException, IOException, IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException {
+ MessagingProvider provider = kafka.getProvider();
+ Closeable poller = provider.createJsonPoller(TOPIC_NAME, Reset.earliest, this::handle, Person.class);
+ JsonMessageSender<String> messageSender = provider.createJsonSender();
+
+ messageSender.send(TOPIC_NAME, "broken");
+ // Log should display "Failed to parse payload"
+ assertNotReceived();
+ poller.close();
+ }
+
private void assertReceived() throws InterruptedException {
assertTrue(sem.tryAcquire(30, TimeUnit.SECONDS));
}
+ private void assertNotReceived() throws InterruptedException {
+ assertFalse(sem.tryAcquire(1, TimeUnit.SECONDS));
+ }
+
private void handle(MessageInfo info, Person message) {
this.lastMessage = message;
this.sem.release();
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePollerTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePollerTest.java
new file mode 100644
index 0000000..8947696
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePollerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaMessagePollerTest {
+ @Mock
+ ExceptionEventSender eventSender;
+
+ @Mock
+ KafkaConsumer<String, byte[]> consumer;
+
+ private Semaphore sem = new Semaphore(0);
+
+ @Test
+ public void testNoHeader() throws IOException, InterruptedException {
+ ConsumerRecord<String, byte[]> record = new ConsumerRecord<String, byte[]>("topic", 1, 0l, 0l, TimestampType.CREATE_TIME, 0, 0, 0, "key", null);
+ when(consumer.poll(Mockito.any()))
+ .thenReturn(records(Collections.singletonList(record)))
+ .thenReturn(records(Collections.emptyList()));
+ // Should display java.lang.IllegalArgumentException in log
+ try (KafkaMessagePoller poller = new KafkaMessagePoller(consumer, eventSender, create(DiscoveryMessage.class, this::handle))) {
+ Assert.assertThat(sem.tryAcquire(100, TimeUnit.MILLISECONDS), equalTo(false));
+ }
+ }
+
+ private void handle(MessageInfo info, DiscoveryMessage message) {
+ sem.release();
+ }
+
+ private ConsumerRecords<String, byte[]> records(List<ConsumerRecord<String, byte[]>> records) {
+ Map<TopicPartition, List<ConsumerRecord<String, byte[]>>> rm = new HashMap<>();
+ for (ConsumerRecord<String, byte[]> record : records) {
+ rm.put(new TopicPartition(record.topic(), record.partition()), Arrays.asList(record));
+ }
+ return new ConsumerRecords<>(rm);
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
index ae28d4c..672b751 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
@@ -34,6 +34,7 @@ import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.kafka.util.KafkaRule;
+import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
import org.junit.Before;
@@ -50,17 +51,18 @@ public class MessagingTest {
@ClassRule
public static KafkaRule kafka = new KafkaRule();
private MessagingProvider provider;
+ private HandlerAdapter<DiscoveryMessage> handler;
@Before
public void before() {
MockitoAnnotations.initMocks(this);
topicName = "MessagingTest" + UUID.randomUUID().toString();
this.provider = kafka.getProvider();
+ this.handler = HandlerAdapter.create(DiscoveryMessage.class, this::handle);
}
@Test
public void testSendReceive() throws Exception {
- HandlerAdapter<DiscoveryMessage> handler = HandlerAdapter.create(DiscoveryMessage.class, this::handle);
Closeable poller = provider.createPoller(topicName, Reset.earliest, handler);
MessageSender<DiscoveryMessage> messageSender = provider.createSender();
@@ -73,12 +75,24 @@ public class MessagingTest {
}
@Test
+ public void testNoHandler() throws Exception {
+ try (Closeable poller = provider.createPoller(topicName, Reset.earliest, handler)) {
+ MessageSender<CommandMessage> messageSender = provider.createSender();
+ CommandMessage msg = CommandMessage.newBuilder()
+ .setSubSlingId("subslingid")
+ .setSubAgentName("agentname")
+ .build();
+ messageSender.send(topicName, msg);
+ assertNotReceived("Should not be received as we have no handler");
+ }
+ }
+
+ @Test
public void testAssign() throws Exception {
DiscoveryMessage msg = createMessage();
MessageSender<DiscoveryMessage> messageSender = provider.createSender();
messageSender.send(topicName, msg);
- HandlerAdapter<DiscoveryMessage> handler = HandlerAdapter.create(DiscoveryMessage.class, this::handle);
try (Closeable poller = provider.createPoller(topicName, Reset.earliest, handler)) {
assertReceived("Starting from earliest .. should see our message");
}