You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by wh...@apache.org on 2016/01/08 08:07:15 UTC
[1/2] storm git commit: STORM-1404. Fix Mockito test failures in
storm-kafka.
Repository: storm
Updated Branches:
refs/heads/master ce31f4cfb -> 0cdc0f7ab
STORM-1404. Fix Mockito test failures in storm-kafka.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a4bc201c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a4bc201c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a4bc201c
Branch: refs/heads/master
Commit: a4bc201c49734712a2a05b8f18b557b562db0d3a
Parents: ce31f4c
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Jan 6 18:04:13 2016 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Thu Jan 7 23:07:01 2016 -0800
----------------------------------------------------------------------
.../test/storm/kafka/bolt/KafkaBoltTest.java | 85 +++++++++++++++-----
1 file changed, 67 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a4bc201c/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index 87daab0..5d53a70 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -29,26 +29,36 @@ import backtype.storm.tuple.TupleImpl;
import backtype.storm.tuple.Values;
import backtype.storm.utils.TupleUtils;
import backtype.storm.utils.Utils;
+import com.google.common.collect.ImmutableList;
import kafka.api.OffsetRequest;
+import kafka.api.FetchRequest;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.*;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import storm.kafka.*;
import storm.kafka.trident.GlobalPartitionInformation;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.HashMap;
import java.util.Properties;
+import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
public class KafkaBoltTest {
@@ -113,7 +123,7 @@ public class KafkaBoltTest {
public void executeWithByteArrayKeyAndMessageSync() {
boolean async = false;
boolean fireAndForget = false;
- bolt = generateDefaultSerializerBolt(async, fireAndForget);
+ bolt = generateDefaultSerializerBolt(async, fireAndForget, null);
String keyString = "test-key";
String messageString = "test-message";
byte[] key = keyString.getBytes();
@@ -129,18 +139,26 @@ public class KafkaBoltTest {
public void executeWithByteArrayKeyAndMessageAsync() {
boolean async = true;
boolean fireAndForget = false;
- bolt = generateDefaultSerializerBolt(async, fireAndForget);
String keyString = "test-key";
String messageString = "test-message";
byte[] key = keyString.getBytes();
byte[] message = messageString.getBytes();
- Tuple tuple = generateTestTuple(key, message);
+ final Tuple tuple = generateTestTuple(key, message);
+
+ final ByteBufferMessageSet mockMsg = mockSingleMessage(key, message);
+ simpleConsumer.close();
+ simpleConsumer = mockSimpleConsumer(mockMsg);
+ KafkaProducer<?, ?> producer = mock(KafkaProducer.class);
+ when(producer.send(any(ProducerRecord.class), any(Callback.class))).thenAnswer(new Answer<Future>() {
+ @Override
+ public Future answer(InvocationOnMock invocationOnMock) throws Throwable {
+ Callback cb = (Callback) invocationOnMock.getArguments()[1];
+ cb.onCompletion(null, null);
+ return mock(Future.class);
+ }
+ });
+ bolt = generateDefaultSerializerBolt(async, fireAndForget, producer);
bolt.execute(tuple);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
verify(collector).ack(tuple);
verifyMessage(keyString, messageString);
}
@@ -150,18 +168,22 @@ public class KafkaBoltTest {
public void executeWithByteArrayKeyAndMessageFire() {
boolean async = true;
boolean fireAndForget = true;
- bolt = generateDefaultSerializerBolt(async, fireAndForget);
+ bolt = generateDefaultSerializerBolt(async, fireAndForget, null);
String keyString = "test-key";
String messageString = "test-message";
byte[] key = keyString.getBytes();
byte[] message = messageString.getBytes();
Tuple tuple = generateTestTuple(key, message);
+ KafkaProducer<?, ?> producer = mock(KafkaProducer.class);
+ when(producer.send(any(ProducerRecord.class), any(Callback.class))).thenAnswer(new Answer<Future>() {
+ @Override
+ public Future answer(InvocationOnMock invocationOnMock) throws Throwable {
+ Callback cb = (Callback) invocationOnMock.getArguments()[1];
+ cb.onCompletion(null, null);
+ return mock(Future.class);
+ }
+ });
bolt.execute(tuple);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
verify(collector).ack(tuple);
verifyMessage(keyString, messageString);
}
@@ -195,7 +217,8 @@ public class KafkaBoltTest {
return bolt;
}
- private KafkaBolt generateDefaultSerializerBolt(boolean async, boolean fireAndForget) {
+ private KafkaBolt generateDefaultSerializerBolt(boolean async, boolean fireAndForget,
+ KafkaProducer<?, ?> mockProducer) {
Properties props = new Properties();
props.put("acks", "1");
props.put("bootstrap.servers", broker.getBrokerConnectionString());
@@ -207,6 +230,9 @@ public class KafkaBoltTest {
bolt.prepare(config, null, new OutputCollector(collector));
bolt.setAsync(async);
bolt.setFireAndForget(fireAndForget);
+ if (mockProducer != null) {
+ Whitebox.setInternalState(bolt, "producer", mockProducer);
+ }
return bolt;
}
@@ -291,4 +317,27 @@ public class KafkaBoltTest {
assertTrue(TupleUtils.isTick(tuple));
return tuple;
}
+
+ private static ByteBufferMessageSet mockSingleMessage(byte[] key, byte[] message) {
+ ByteBufferMessageSet sets = mock(ByteBufferMessageSet.class);
+ MessageAndOffset msg = mock(MessageAndOffset.class);
+ final List<MessageAndOffset> msgs = ImmutableList.of(msg);
+ doReturn(msgs.iterator()).when(sets).iterator();
+ Message kafkaMessage = mock(Message.class);
+ doReturn(ByteBuffer.wrap(key)).when(kafkaMessage).key();
+ doReturn(ByteBuffer.wrap(message)).when(kafkaMessage).payload();
+ doReturn(kafkaMessage).when(msg).message();
+ return sets;
+ }
+
+ private static SimpleConsumer mockSimpleConsumer(ByteBufferMessageSet mockMsg) {
+ SimpleConsumer simpleConsumer = mock(SimpleConsumer.class);
+ FetchResponse resp = mock(FetchResponse.class);
+ doReturn(resp).when(simpleConsumer).fetch(any(FetchRequest.class));
+ OffsetResponse mockOffsetResponse = mock(OffsetResponse.class);
+ doReturn(new long[] {}).when(mockOffsetResponse).offsets(anyString(), anyInt());
+ doReturn(mockOffsetResponse).when(simpleConsumer).getOffsetsBefore(any(kafka.javaapi.OffsetRequest.class));
+ doReturn(mockMsg).when(resp).messageSet(anyString(), anyInt());
+ return simpleConsumer;
+ }
}
[2/2] storm git commit: Add CHANGELOG for STORM-1404.
Posted by wh...@apache.org.
Add CHANGELOG for STORM-1404.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0cdc0f7a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0cdc0f7a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0cdc0f7a
Branch: refs/heads/master
Commit: 0cdc0f7aba050eb1b5d8ac0c2adfa0849e16f66e
Parents: a4bc201
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Jan 7 23:06:53 2016 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Thu Jan 7 23:07:05 2016 -0800
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0cdc0f7a/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1dc812d..87a737a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1404: Fix Mockito test failures in storm-kafka.
* STORM-706: Clarify examples README for IntelliJ.
* STORM-1396: Added backward compatibility method for File Download
* STORM-695: storm CLI tool reports zero exit code on error scenario