You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/20 07:45:40 UTC

[2/4] storm git commit: STORM-2087: Make tests Java 7 compatible

STORM-2087: Make tests Java 7 compatible


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e47ccecb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e47ccecb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e47ccecb

Branch: refs/heads/1.x-branch
Commit: e47ccecba855889b75da40a3cceb190587552aef
Parents: 5c17a59
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Wed Dec 14 19:29:48 2016 +0100
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Wed Dec 14 20:33:02 2016 +0100

----------------------------------------------------------------------
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  | 77 +++++++++++---------
 1 file changed, 41 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e47ccecb/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 8fa7b80..6983160 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -31,7 +31,6 @@ import org.mockito.ArgumentCaptor;
 import static org.junit.Assert.*;
 
 import java.util.Map;
-import java.util.stream.IntStream;
 import static org.mockito.Mockito.*;
 import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.*;
 
@@ -54,13 +53,13 @@ public class SingleTopicKafkaSpoutTest {
     void populateTopicData(String topicName, int msgCount) {
         kafkaUnitRule.getKafkaUnit().createTopic(topicName);
 
-        IntStream.range(0, msgCount).forEach(value -> {
+        for (int i = 0; i < msgCount; i++){
             KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
-                    topicName, Integer.toString(value),
-                    Integer.toString(value));
+                    topicName, Integer.toString(i),
+                    Integer.toString(i));
 
             kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
-        });
+        };
     }
 
     SpoutContext initializeSpout(int msgCount) {
@@ -102,30 +101,32 @@ public class SingleTopicKafkaSpoutTest {
         //play 1st tuple
         ArgumentCaptor<Object> messageIdToDoubleAck = ArgumentCaptor.forClass(Object.class);
         context.spout.nextTuple();
-        verify(context.collector).emit(anyObject(), anyObject(), messageIdToDoubleAck.capture());
+        verify(context.collector).emit(anyString(), anyList(), messageIdToDoubleAck.capture());
         context.spout.ack(messageIdToDoubleAck.getValue());
 
-        IntStream.range(0, messageCount/2).forEach(value -> {
+        for (int i = 0; i < messageCount/2; i++) {
             context.spout.nextTuple();
-        });
+        };
 
         context.spout.ack(messageIdToDoubleAck.getValue());
 
-        IntStream.range(0, messageCount).forEach(value -> {
+        for (int i = 0; i < messageCount; i++) {
             context.spout.nextTuple();
-        });
+        };
 
         ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
 
         verify(context.collector, times(messageCount)).emit(
                 eq(SingleTopicKafkaSpoutConfiguration.STREAM),
-                anyObject(),
+                anyList(),
                 remainingIds.capture());
-        remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+        for (Object id : remainingIds.getAllValues()) {
+            context.spout.ack(id);
+        }
 
-        context.spout.acked.values().forEach(item -> {
+        for(Object item : context.spout.acked.values()) {
             assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
-        });
+        };
     }
 
     @Test
@@ -134,22 +135,22 @@ public class SingleTopicKafkaSpoutTest {
         SpoutContext context = initializeSpout(messageCount);
 
 
-        IntStream.range(0, messageCount).forEach(value -> {
+        for (int i = 0; i < messageCount; i++) {
             context.spout.nextTuple();
             ArgumentCaptor<Object> messageId = ArgumentCaptor.forClass(Object.class);
             verify(context.collector).emit(
                     eq(SingleTopicKafkaSpoutConfiguration.STREAM),
                     eq(new Values(SingleTopicKafkaSpoutConfiguration.TOPIC,
-                            Integer.toString(value),
-                            Integer.toString(value))),
+                            Integer.toString(i),
+                            Integer.toString(i))),
             messageId.capture());
             context.spout.ack(messageId.getValue());
             reset(context.collector);
-        });
+        };
 
-        context.spout.acked.values().forEach(item -> {
+        for (Object item : context.spout.acked.values()) {
             assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
-        });
+        };
     }
 
     @Test
@@ -160,14 +161,14 @@ public class SingleTopicKafkaSpoutTest {
         //play and ack 1 tuple
         ArgumentCaptor<Object> messageIdAcked = ArgumentCaptor.forClass(Object.class);
         context.spout.nextTuple();
-        verify(context.collector).emit(anyObject(), anyObject(), messageIdAcked.capture());
+        verify(context.collector).emit(anyString(), anyList(), messageIdAcked.capture());
         context.spout.ack(messageIdAcked.getValue());
         reset(context.collector);
 
         //play and fail 1 tuple
         ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class);
         context.spout.nextTuple();
-        verify(context.collector).emit(anyObject(), anyObject(), messageIdFailed.capture());
+        verify(context.collector).emit(anyString(), anyList(), messageIdFailed.capture());
         context.spout.fail(messageIdFailed.getValue());
         reset(context.collector);
 
@@ -176,22 +177,24 @@ public class SingleTopicKafkaSpoutTest {
 
 
         //allow for some calls to nextTuple() to fail to emit a tuple
-        IntStream.range(0, messageCount + 5).forEach(value -> {
+        for (int i = 0; i < messageCount + 5; i++) {
             context.spout.nextTuple();
-        });
+        };
 
         ArgumentCaptor<Object> remainingMessageIds = ArgumentCaptor.forClass(Object.class);
 
         //1 message replayed, messageCount - 2 messages emitted for the first time
         verify(context.collector, times(messageCount - 1)).emit(
                 eq(SingleTopicKafkaSpoutConfiguration.STREAM),
-                anyObject(),
+                anyList(),
                 remainingMessageIds.capture());
-        remainingMessageIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+        for (Object id : remainingMessageIds.getAllValues()) {
+            context.spout.ack(id);
+        }
 
-        context.spout.acked.values().forEach(item -> {
+        for (Object item : context.spout.acked.values()) {
             assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
-        });
+        };
     }
 
     @Test
@@ -203,13 +206,13 @@ public class SingleTopicKafkaSpoutTest {
         //play 1st tuple
         ArgumentCaptor<Object> messageIdToFail = ArgumentCaptor.forClass(Object.class);
         context.spout.nextTuple();
-        verify(context.collector).emit(anyObject(), anyObject(), messageIdToFail.capture());
+        verify(context.collector).emit(anyString(), anyList(), messageIdToFail.capture());
         reset(context.collector);
 
         //play 2nd tuple
         ArgumentCaptor<Object> messageIdToAck = ArgumentCaptor.forClass(Object.class);
         context.spout.nextTuple();
-        verify(context.collector).emit(anyObject(), anyObject(), messageIdToAck.capture());
+        verify(context.collector).emit(anyString(), anyList(), messageIdToAck.capture());
         reset(context.collector);
 
         //ack 2nd tuple
@@ -221,20 +224,22 @@ public class SingleTopicKafkaSpoutTest {
         Thread.sleep(200);
 
         //allow for some calls to nextTuple() to fail to emit a tuple
-        IntStream.range(0, messageCount + 5).forEach(value -> {
+        for (int i = 0; i < messageCount + 5; i++) {
             context.spout.nextTuple();
-        });
+        };
 
         ArgumentCaptor<Object> remainingIds = ArgumentCaptor.forClass(Object.class);
         //1 message replayed, messageCount - 2 messages emitted for the first time
         verify(context.collector, times(messageCount - 1)).emit(
                 eq(SingleTopicKafkaSpoutConfiguration.STREAM),
-                anyObject(),
+                anyList(),
                 remainingIds.capture());
-        remainingIds.getAllValues().iterator().forEachRemaining(context.spout::ack);
+        for (Object id : remainingIds.getAllValues()) {
+            context.spout.ack(id);
+        };
 
-        context.spout.acked.values().forEach(item -> {
+        for (Object item : context.spout.acked.values()) {
             assertOffsetCommitted(messageCount - 1, (KafkaSpout.OffsetEntry) item);
-        });
+        };
     }
 }
\ No newline at end of file