You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/20 07:45:40 UTC
[2/4] storm git commit: STORM-2087: Make tests Java 7 compatible
STORM-2087: Make tests Java 7 compatible
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e47ccecb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e47ccecb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e47ccecb
Branch: refs/heads/1.x-branch
Commit: e47ccecba855889b75da40a3cceb190587552aef
Parents: 5c17a59
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Wed Dec 14 19:29:48 2016 +0100
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Wed Dec 14 20:33:02 2016 +0100
----------------------------------------------------------------------
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 77 +++++++++++---------
1 file changed, 41 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e47ccecb/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 8fa7b80..6983160 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -31,7 +31,6 @@ import org.mockito.ArgumentCaptor;
import static org.junit.Assert.*;
import java.util.Map;
-import java.util.stream.IntStream;
import static org.mockito.Mockito.*;
import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*;
@@ -54,13 +53,13 @@ public class SingleTopicKafkaSpoutTest {
void populateTopicData(String topicName, int msgCount) {
kafkaUnitRule.getKafkaUnit().createTopic(topicName);
- IntStream.range(0, msgCount).forEach(value -> {
+ for (int i = 0; i < msgCount; i++){
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
- topicName, Integer.toString(value),
- Integer.toString(value));
+ topicName, Integer.toString(i),
+ Integer.toString(i));
kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
- });
+ };
}
SpoutContext initializeSpout(int msgCount) {
@@ -102,30 +101,32 @@ public class SingleTopicKafkaSpoutTest {
//play 1st tuple
ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class);
context.spout.nextTuple();
- verify(context.collector).emit(anyObject(), anyObject(), messageIdToDoubleAck.capture());
+ verify(context.collector).emit(anyString(), anyList(), messageIdToDoubleAck.capture());
context.spout.ack(messageIdToDoubleAck.getValue());
- IntStream.range(0, messageCount/2).forEach(value -> {
+ for (int i = 0; i < messageCount/2; i++) {
context.spout.nextTuple();
- });
+ };
context.spout.ack(messageIdToDoubleAck.getValue());
- IntStream.range(0, messageCount).forEach(value -> {
+ for (int i = 0; i < messageCount; i++) {
context.spout.nextTuple();
- });
+ };
ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
verify(context.collector, times(messageCount)).emit(
eq(SingleTopicKafkaSpoutConfiguration.STREAM),
- anyObject(),
+ anyList(),
remainingIds.capture());
- remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+ for (Object id : remainingIds.getAllValues()) {
+ context.spout.ack(id);
+ }
- context.spout.acked.values().forEach(item -> {
+ for(Object item : context.spout.acked.values()) {
assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
- });
+ };
}
@Test
@@ -134,22 +135,22 @@ public class SingleTopicKafkaSpoutTest {
SpoutContext context = initializeSpout(messageCount);
- IntStream.range(0, messageCount).forEach(value -> {
+ for (int i = 0; i < messageCount; i++) {
context.spout.nextTuple();
ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
verify(context.collector).emit(
eq(SingleTopicKafkaSpoutConfiguration.STREAM),
eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
- Integer.toString(value),
- Integer.toString(value))),
+ Integer.toString(i),
+ Integer.toString(i))),
messageId.capture());
context.spout.ack(messageId.getValue());
reset(context.collector);
- });
+ };
- context.spout.acked.values().forEach(item -> {
+ for (Object item : context.spout.acked.values()) {
assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
- });
+ };
}
@Test
@@ -160,14 +161,14 @@ public class SingleTopicKafkaSpoutTest {
//play and ack 1 tuple
ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class);
context.spout.nextTuple();
- verify(context.collector).emit(anyObject(), anyObject(), messageIdAcked.capture());
+ verify(context.collector).emit(anyString(), anyList(), messageIdAcked.capture());
context.spout.ack(messageIdAcked.getValue());
reset(context.collector);
//play and fail 1 tuple
ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class);
context.spout.nextTuple();
- verify(context.collector).emit(anyObject(), anyObject(), messageIdFailed.capture());
+ verify(context.collector).emit(anyString(), anyList(), messageIdFailed.capture());
context.spout.fail(messageIdFailed.getValue());
reset(context.collector);
@@ -176,22 +177,24 @@ public class SingleTopicKafkaSpoutTest {
//allow for some calls to nextTuple() to fail to emit a tuple
- IntStream.range(0, messageCount + 5).forEach(value -> {
+ for (int i = 0; i < messageCount + 5; i++) {
context.spout.nextTuple();
- });
+ };
ArgumentCaptor<Object> remainingMessageIds = ArgumentCaptor.forClass(Object.class);
//1 message replayed, messageCount - 2 messages emitted for the first time
verify(context.collector, times(messageCount - 1)).emit(
eq(SingleTopicKafkaSpoutConfiguration.STREAM),
- anyObject(),
+ anyList(),
remainingMessageIds.capture());
- remainingMessageIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+ for (Object id : remainingMessageIds.getAllValues()) {
+ context.spout.ack(id);
+ }
- context.spout.acked.values().forEach(item -> {
+ for (Object item : context.spout.acked.values()) {
assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
- });
+ };
}
@Test
@@ -203,13 +206,13 @@ public class SingleTopicKafkaSpoutTest {
//play 1st tuple
ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class);
context.spout.nextTuple();
- verify(context.collector).emit(anyObject(), anyObject(), messageIdToFail.capture());
+ verify(context.collector).emit(anyString(), anyList(), messageIdToFail.capture());
reset(context.collector);
//play 2nd tuple
ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class);
context.spout.nextTuple();
- verify(context.collector).emit(anyObject(), anyObject(), messageIdToAck.capture());
+ verify(context.collector).emit(anyString(), anyList(), messageIdToAck.capture());
reset(context.collector);
//ack 2nd tuple
@@ -221,20 +224,22 @@ public class SingleTopicKafkaSpoutTest {
Thread.sleep(200);
//allow for some calls to nextTuple() to fail to emit a tuple
- IntStream.range(0, messageCount + 5).forEach(value -> {
+ for (int i = 0; i < messageCount + 5; i++) {
context.spout.nextTuple();
- });
+ };
ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
//1 message replayed, messageCount - 2 messages emitted for the first time
verify(context.collector, times(messageCount - 1)).emit(
eq(SingleTopicKafkaSpoutConfiguration.STREAM),
- anyObject(),
+ anyList(),
remainingIds.capture());
- remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+ for (Object id : remainingIds.getAllValues()) {
+ context.spout.ack(id);
+ };
- context.spout.acked.values().forEach(item -> {
+ for (Object item : context.spout.acked.values()) {
assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
- });
+ };
}
}
\ No newline at end of file