You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by wh...@apache.org on 2016/01/08 08:07:15 UTC

[1/2] storm git commit: STORM-1404. Fix Mockito test failures in storm-kafka.

Repository: storm
Updated Branches:
  refs/heads/master ce31f4cfb -> 0cdc0f7ab


STORM-1404. Fix Mockito test failures in storm-kafka.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a4bc201c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a4bc201c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a4bc201c

Branch: refs/heads/master
Commit: a4bc201c49734712a2a05b8f18b557b562db0d3a
Parents: ce31f4c
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Jan 6 18:04:13 2016 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Thu Jan 7 23:07:01 2016 -0800

----------------------------------------------------------------------
 .../test/storm/kafka/bolt/KafkaBoltTest.java    | 85 +++++++++++++++-----
 1 file changed, 67 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a4bc201c/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index 87daab0..5d53a70 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -29,26 +29,36 @@ import backtype.storm.tuple.TupleImpl;
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.TupleUtils;
 import backtype.storm.utils.Utils;
+import com.google.common.collect.ImmutableList;
 import kafka.api.OffsetRequest;
+import kafka.api.FetchRequest;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.Message;
 import kafka.message.MessageAndOffset;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.*;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import storm.kafka.*;
 import storm.kafka.trident.GlobalPartitionInformation;
 
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.HashMap;
 import java.util.Properties;
+import java.util.concurrent.Future;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 public class KafkaBoltTest {
 
@@ -113,7 +123,7 @@ public class KafkaBoltTest {
     public void executeWithByteArrayKeyAndMessageSync() {
         boolean async = false;
         boolean fireAndForget = false;
-        bolt = generateDefaultSerializerBolt(async, fireAndForget);
+        bolt = generateDefaultSerializerBolt(async, fireAndForget, null);
         String keyString = "test-key";
         String messageString = "test-message";
         byte[] key = keyString.getBytes();
@@ -129,18 +139,26 @@ public class KafkaBoltTest {
     public void executeWithByteArrayKeyAndMessageAsync() {
         boolean async = true;
         boolean fireAndForget = false;
-        bolt = generateDefaultSerializerBolt(async, fireAndForget);
         String keyString = "test-key";
         String messageString = "test-message";
         byte[] key = keyString.getBytes();
         byte[] message = messageString.getBytes();
-        Tuple tuple = generateTestTuple(key, message);
+        final Tuple tuple = generateTestTuple(key, message);
+
+        final ByteBufferMessageSet mockMsg = mockSingleMessage(key, message);
+        simpleConsumer.close();
+        simpleConsumer = mockSimpleConsumer(mockMsg);
+        KafkaProducer<?, ?> producer = mock(KafkaProducer.class);
+        when(producer.send(any(ProducerRecord.class), any(Callback.class))).thenAnswer(new Answer<Future>() {
+            @Override
+            public Future answer(InvocationOnMock invocationOnMock) throws Throwable {
+                Callback cb = (Callback) invocationOnMock.getArguments()[1];
+                cb.onCompletion(null, null);
+                return mock(Future.class);
+            }
+        });
+        bolt = generateDefaultSerializerBolt(async, fireAndForget, producer);
         bolt.execute(tuple);
-        try {
-            Thread.sleep(1000);                 
-        } catch (InterruptedException ex) {
-            Thread.currentThread().interrupt();
-        }
         verify(collector).ack(tuple);
         verifyMessage(keyString, messageString);
     }
@@ -150,18 +168,22 @@ public class KafkaBoltTest {
     public void executeWithByteArrayKeyAndMessageFire() {
         boolean async = true;
         boolean fireAndForget = true;
-        bolt = generateDefaultSerializerBolt(async, fireAndForget);
+        bolt = generateDefaultSerializerBolt(async, fireAndForget, null);
         String keyString = "test-key";
         String messageString = "test-message";
         byte[] key = keyString.getBytes();
         byte[] message = messageString.getBytes();
         Tuple tuple = generateTestTuple(key, message);
+        KafkaProducer<?, ?> producer = mock(KafkaProducer.class);
+        when(producer.send(any(ProducerRecord.class), any(Callback.class))).thenAnswer(new Answer<Future>() {
+            @Override
+            public Future answer(InvocationOnMock invocationOnMock) throws Throwable {
+                Callback cb = (Callback) invocationOnMock.getArguments()[1];
+                cb.onCompletion(null, null);
+                return mock(Future.class);
+            }
+        });
         bolt.execute(tuple);
-        try {
-            Thread.sleep(1000);
-        } catch (InterruptedException ex) {
-            Thread.currentThread().interrupt();
-        }
         verify(collector).ack(tuple);
         verifyMessage(keyString, messageString);
     }
@@ -195,7 +217,8 @@ public class KafkaBoltTest {
         return bolt;
     }
 
-    private KafkaBolt generateDefaultSerializerBolt(boolean async, boolean fireAndForget) {
+    private KafkaBolt generateDefaultSerializerBolt(boolean async, boolean fireAndForget,
+                                                    KafkaProducer<?, ?> mockProducer) {
         Properties props = new Properties();
         props.put("acks", "1");
         props.put("bootstrap.servers", broker.getBrokerConnectionString());
@@ -207,6 +230,9 @@ public class KafkaBoltTest {
         bolt.prepare(config, null, new OutputCollector(collector));
         bolt.setAsync(async);
         bolt.setFireAndForget(fireAndForget);
+        if (mockProducer != null) {
+            Whitebox.setInternalState(bolt, "producer", mockProducer);
+        }
         return bolt;
     }
 
@@ -291,4 +317,27 @@ public class KafkaBoltTest {
         assertTrue(TupleUtils.isTick(tuple));
         return tuple;
     }
+
+    private static ByteBufferMessageSet mockSingleMessage(byte[] key, byte[] message) {
+        ByteBufferMessageSet sets = mock(ByteBufferMessageSet.class);
+        MessageAndOffset msg = mock(MessageAndOffset.class);
+        final List<MessageAndOffset> msgs = ImmutableList.of(msg);
+        doReturn(msgs.iterator()).when(sets).iterator();
+        Message kafkaMessage = mock(Message.class);
+        doReturn(ByteBuffer.wrap(key)).when(kafkaMessage).key();
+        doReturn(ByteBuffer.wrap(message)).when(kafkaMessage).payload();
+        doReturn(kafkaMessage).when(msg).message();
+        return sets;
+    }
+
+    private static SimpleConsumer mockSimpleConsumer(ByteBufferMessageSet mockMsg) {
+        SimpleConsumer simpleConsumer = mock(SimpleConsumer.class);
+        FetchResponse resp = mock(FetchResponse.class);
+        doReturn(resp).when(simpleConsumer).fetch(any(FetchRequest.class));
+        OffsetResponse mockOffsetResponse = mock(OffsetResponse.class);
+        doReturn(new long[] {}).when(mockOffsetResponse).offsets(anyString(), anyInt());
+        doReturn(mockOffsetResponse).when(simpleConsumer).getOffsetsBefore(any(kafka.javaapi.OffsetRequest.class));
+        doReturn(mockMsg).when(resp).messageSet(anyString(), anyInt());
+        return simpleConsumer;
+    }
 }


[2/2] storm git commit: Add CHANGELOG for STORM-1404.

Posted by wh...@apache.org.
Add CHANGELOG for STORM-1404.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0cdc0f7a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0cdc0f7a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0cdc0f7a

Branch: refs/heads/master
Commit: 0cdc0f7aba050eb1b5d8ac0c2adfa0849e16f66e
Parents: a4bc201
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Jan 7 23:06:53 2016 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Thu Jan 7 23:07:05 2016 -0800

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0cdc0f7a/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1dc812d..87a737a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1404: Fix Mockito test failures in storm-kafka.
  * STORM-706: Clarify examples README for IntelliJ.
  * STORM-1396: Added backward compatibility method for File Download
  * STORM-695: storm CLI tool reports zero exit code on error scenario