You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/01/12 21:01:27 UTC
[07/10] storm git commit: Package name fixes
http://git-wip-us.apache.org/repos/asf/storm/blob/bfc5ffd5/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java b/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
new file mode 100644
index 0000000..69c541b
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/topology/StatefulBoltExecutorTest.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.topology;
+
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.spout.CheckpointSpout;
+import org.apache.storm.state.KeyValueState;
+import org.apache.storm.state.State;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.topology.IStatefulBolt;
+import org.apache.storm.topology.StatefulBoltExecutor;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.storm.spout.CheckPointState.Action.*;
+import static org.apache.storm.spout.CheckpointSpout.*;
+
+/**
+ * Unit tests for {@link StatefulBoltExecutor}
+ */
+public class StatefulBoltExecutorTest {
+ StatefulBoltExecutor<KeyValueState<String, String>> executor;
+ IStatefulBolt<KeyValueState<String, String>> mockBolt;
+ TopologyContext mockTopologyContext;
+ Tuple mockTuple;
+ Tuple mockCheckpointTuple;
+ Map<String, Object> mockStormConf = new HashMap<>();
+ OutputCollector mockOutputCollector;
+ State mockState;
+ Map<GlobalStreamId, Grouping> mockGlobalStream;
+ Set<GlobalStreamId> mockStreamIds;
+ @Before
+ public void setUp() throws Exception {
+ mockBolt = Mockito.mock(IStatefulBolt.class);
+ executor = new StatefulBoltExecutor<>(mockBolt);
+ GlobalStreamId mockGlobalStreamId = Mockito.mock(GlobalStreamId.class);
+ Mockito.when(mockGlobalStreamId.get_streamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
+ mockStreamIds = new HashSet<>();
+ mockStreamIds.add(mockGlobalStreamId);
+ mockTopologyContext = Mockito.mock(TopologyContext.class);
+ mockOutputCollector = Mockito.mock(OutputCollector.class);
+ mockGlobalStream = Mockito.mock(Map.class);
+ mockState = Mockito.mock(State.class);
+ Mockito.when(mockTopologyContext.getThisComponentId()).thenReturn("test");
+ Mockito.when(mockTopologyContext.getThisTaskId()).thenReturn(1);
+ Mockito.when(mockTopologyContext.getThisSources()).thenReturn(mockGlobalStream);
+ Mockito.when(mockTopologyContext.getComponentTasks(Mockito.anyString())).thenReturn(Collections.singletonList(1));
+ Mockito.when(mockGlobalStream.keySet()).thenReturn(mockStreamIds);
+ mockTuple = Mockito.mock(Tuple.class);
+ mockCheckpointTuple = Mockito.mock(Tuple.class);
+ executor.prepare(mockStormConf, mockTopologyContext, mockOutputCollector, mockState);
+ }
+
+ @Test
+ public void testHandleTupleBeforeInit() throws Exception {
+ Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
+ executor.execute(mockTuple);
+ Mockito.verify(mockBolt, Mockito.times(0)).execute(Mockito.any(Tuple.class));
+ }
+
+ @Test
+ public void testHandleTuple() throws Exception {
+ Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
+ executor.execute(mockTuple);
+ Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
+ Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(INITSTATE);
+ Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(0));
+ Mockito.doNothing().when(mockOutputCollector).ack(mockCheckpointTuple);
+ executor.execute(mockCheckpointTuple);
+ Mockito.verify(mockBolt, Mockito.times(1)).execute(mockTuple);
+ Mockito.verify(mockBolt, Mockito.times(1)).initState(Mockito.any(KeyValueState.class));
+ }
+
+ @Test
+ public void testRollback() throws Exception {
+ Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
+ executor.execute(mockTuple);
+ Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
+ Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(ROLLBACK);
+ Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(0));
+ Mockito.doNothing().when(mockOutputCollector).ack(mockCheckpointTuple);
+ executor.execute(mockCheckpointTuple);
+ Mockito.verify(mockState, Mockito.times(1)).rollback();
+ }
+
+ @Test
+ public void testCommit() throws Exception {
+ Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
+ executor.execute(mockTuple);
+ Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
+ Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(COMMIT);
+ Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(0));
+ Mockito.doNothing().when(mockOutputCollector).ack(mockCheckpointTuple);
+ executor.execute(mockCheckpointTuple);
+ Mockito.verify(mockBolt, Mockito.times(1)).preCommit(new Long(0));
+ Mockito.verify(mockState, Mockito.times(1)).commit(new Long(0));
+ }
+
+ @Test
+ public void testPrepareAndRollbackBeforeInitstate() throws Exception {
+ Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
+ executor.execute(mockTuple);
+ Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
+ Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(PREPARE);
+ Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
+ executor.execute(mockCheckpointTuple);
+ Mockito.verify(mockOutputCollector, Mockito.times(1)).fail(mockCheckpointTuple);
+
+ Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(ROLLBACK);
+ Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
+ Mockito.doNothing().when(mockOutputCollector).ack(mockCheckpointTuple);
+ executor.execute(mockCheckpointTuple);
+ Mockito.verify(mockState, Mockito.times(1)).rollback();
+ }
+
+ @Test
+ public void testCommitBeforeInitstate() throws Exception {
+ Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
+ Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
+ Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(COMMIT);
+ Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
+ executor.execute(mockCheckpointTuple);
+ Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(mockCheckpointTuple);
+
+ Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(ROLLBACK);
+ Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
+ executor.execute(mockCheckpointTuple);
+ Mockito.verify(mockState, Mockito.times(1)).rollback();
+ }
+
+ @Test
+ public void testPrepareAndCommit() throws Exception {
+ Mockito.when(mockTuple.getSourceStreamId()).thenReturn("default");
+ Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
+ Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(INITSTATE);
+ Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(0));
+ executor.execute(mockCheckpointTuple);
+
+ executor.execute(mockTuple);
+ Mockito.when(mockCheckpointTuple.getSourceStreamId()).thenReturn(CheckpointSpout.CHECKPOINT_STREAM_ID);
+ Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(PREPARE);
+ Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
+ executor.execute(mockCheckpointTuple);
+ executor.execute(mockTuple);
+ Mockito.when(mockCheckpointTuple.getValueByField(CHECKPOINT_FIELD_ACTION)).thenReturn(COMMIT);
+ Mockito.when(mockCheckpointTuple.getLongByField(CHECKPOINT_FIELD_TXID)).thenReturn(new Long(100));
+ executor.execute(mockCheckpointTuple);
+ Mockito.verify(mockState, Mockito.times(1)).commit(new Long(100));
+ Mockito.verify(mockBolt, Mockito.times(2)).execute(mockTuple);
+ Mockito.verify(mockOutputCollector, Mockito.times(1)).ack(mockTuple);
+ }
+}
\ No newline at end of file