You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/07/03 12:35:14 UTC

[sling-org-apache-sling-distribution-journal-kafka] branch master updated: SLING-8554 - Increase test coverage

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bec200  SLING-8554 - Increase test coverage
9bec200 is described below

commit 9bec20065b276ac6c56f81c8382080c19a7a670b
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Wed Jul 3 14:35:04 2019 +0200

    SLING-8554 - Increase test coverage
---
 .../journal/kafka/JsonMessagingTest.java           | 17 +++++
 .../journal/kafka/KafkaMessagePollerTest.java      | 82 ++++++++++++++++++++++
 .../distribution/journal/kafka/MessagingTest.java  | 18 ++++-
 3 files changed, 115 insertions(+), 2 deletions(-)

diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/JsonMessagingTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/JsonMessagingTest.java
index 181ab35..3a81f7c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/JsonMessagingTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/JsonMessagingTest.java
@@ -19,6 +19,7 @@
 package org.apache.sling.distribution.journal.kafka;
 
 import static org.hamcrest.Matchers.samePropertyValuesAs;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -73,10 +74,26 @@ public class JsonMessagingTest {
         poller.close();
     }
     
+    @Test
+    public void testParseError() throws InterruptedException, IOException, IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException {
+        MessagingProvider provider = kafka.getProvider();
+        Closeable poller = provider.createJsonPoller(TOPIC_NAME, Reset.earliest, this::handle, Person.class);
+        JsonMessageSender<String> messageSender = provider.createJsonSender();
+        
+        messageSender.send(TOPIC_NAME, "broken");
+        // Log should display "Failed to parse payload"
+        assertNotReceived();
+        poller.close();
+    }
+    
     private void assertReceived() throws InterruptedException {
         assertTrue(sem.tryAcquire(30, TimeUnit.SECONDS));
     }
     
+    private void assertNotReceived() throws InterruptedException {
+        assertFalse(sem.tryAcquire(1, TimeUnit.SECONDS));
+    }
+    
     private void handle(MessageInfo info, Person message) {
         this.lastMessage = message;
         this.sem.release();
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePollerTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePollerTest.java
new file mode 100644
index 0000000..8947696
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePollerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaMessagePollerTest {
+    @Mock
+    ExceptionEventSender eventSender;
+    
+    @Mock
+    KafkaConsumer<String, byte[]> consumer;
+
+    private Semaphore sem = new Semaphore(0);
+
+    @Test
+    public void testNoHeader() throws IOException, InterruptedException {
+        ConsumerRecord<String, byte[]> record = new ConsumerRecord<String, byte[]>("topic", 1, 0l, 0l, TimestampType.CREATE_TIME, 0, 0, 0, "key", null);
+        when(consumer.poll(Mockito.any()))
+            .thenReturn(records(Collections.singletonList(record)))
+            .thenReturn(records(Collections.emptyList()));
+        // Should display java.lang.IllegalArgumentException in log
+        try (KafkaMessagePoller poller = new KafkaMessagePoller(consumer, eventSender, create(DiscoveryMessage.class, this::handle))) {
+            Assert.assertThat(sem.tryAcquire(100, TimeUnit.MILLISECONDS), equalTo(false));
+        }
+    }
+
+    private void handle(MessageInfo info, DiscoveryMessage message) {
+        sem.release();
+    }
+
+    private ConsumerRecords<String, byte[]> records(List<ConsumerRecord<String, byte[]>> records) {
+        Map<TopicPartition, List<ConsumerRecord<String, byte[]>>> rm = new HashMap<>();
+        for (ConsumerRecord<String, byte[]> record : records) {
+            rm.put(new TopicPartition(record.topic(), record.partition()), Arrays.asList(record));
+        }
+        return new ConsumerRecords<>(rm);
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
index ae28d4c..672b751 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
@@ -34,6 +34,7 @@ import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.kafka.util.KafkaRule;
+import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
 import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
 import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
 import org.junit.Before;
@@ -50,17 +51,18 @@ public class MessagingTest {
     @ClassRule
     public static KafkaRule kafka = new KafkaRule();
     private MessagingProvider provider;
+    private HandlerAdapter<DiscoveryMessage> handler;
     
     @Before
     public void before() {
         MockitoAnnotations.initMocks(this);
         topicName = "MessagingTest" + UUID.randomUUID().toString();
         this.provider = kafka.getProvider();
+        this.handler = HandlerAdapter.create(DiscoveryMessage.class, this::handle);
     }
     
     @Test
     public void testSendReceive() throws Exception {
-        HandlerAdapter<DiscoveryMessage> handler = HandlerAdapter.create(DiscoveryMessage.class, this::handle);
         Closeable poller = provider.createPoller(topicName, Reset.earliest, handler);
         MessageSender<DiscoveryMessage> messageSender = provider.createSender();
         
@@ -73,12 +75,24 @@ public class MessagingTest {
     }
     
     @Test
+    public void testNoHandler() throws Exception {
+        try (Closeable poller = provider.createPoller(topicName, Reset.earliest, handler)) {
+            MessageSender<CommandMessage> messageSender = provider.createSender();
+            CommandMessage msg = CommandMessage.newBuilder()
+                .setSubSlingId("subslingid")
+                .setSubAgentName("agentname")
+                .build();
+            messageSender.send(topicName, msg);
+            assertNotReceived("Should not be received as we have no handler");
+        }
+    }
+    
+    @Test
     public void testAssign() throws Exception {
         DiscoveryMessage msg = createMessage();
         MessageSender<DiscoveryMessage> messageSender = provider.createSender();
         messageSender.send(topicName, msg);
         
-        HandlerAdapter<DiscoveryMessage> handler = HandlerAdapter.create(DiscoveryMessage.class, this::handle);
         try (Closeable poller = provider.createPoller(topicName, Reset.earliest, handler)) {
             assertReceived("Starting from earliest .. should see our message");
         }