You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Jori <jo...@me.com> on 2017/02/28 08:59:35 UTC

Unit test with BaseRichBolt times out, but works with BaseBasicBolt

Hi,

I'm trying to implement unit tests for Storm bolts (Java). The code below works fine and ends with a success on Storm 1.0.3:

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 3.887 sec

However, when I use BaseRichParrotBolt instead of BaseBasicParrotBolt (in line 46), the assertions never run and it ends with the following exception:

13610 [main] ERROR o.a.s.testing4j - Error in cluster java.lang.AssertionError: Test timed out (10000ms) (not (every? exhausted? (spout-objects spouts)))

If you step through it with a debugger you'll see that the bolt does receive and emit tuples, but it seems like Testing.completeTopology never returns. I find this really odd because the bolts are virtually the same. All my bolts extend from BaseRichBolt so I'd really like to make it work for those as well. Any ideas?

Code:

import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.ILocalCluster;
import org.apache.storm.Testing;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.testing.CompleteTopologyParam;
import org.apache.storm.testing.MkClusterParam;
import org.apache.storm.testing.MockedSources;
import org.apache.storm.testing.TestJob;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Arrays;
import java.util.List;
import static junit.framework.Assert.*;
import org.junit.Test;

public class StormTestExample {
    private final static String EVENT = "event";
    private final static String SPOUT_ID = "spout";
    private final static String BOLT_ID = "parrot";
    private final static List<String> COMPONENT_IDS = Arrays.asList(SPOUT_ID, BOLT_ID);

    @Test
    public void testBasicTopology() {
        final MkClusterParam mkClusterParam = new MkClusterParam();
        mkClusterParam.setSupervisors(4);
        final Config daemonConf = new Config();
        daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, false);
        mkClusterParam.setDaemonConf(daemonConf);

        Testing.withSimulatedTimeLocalCluster(mkClusterParam, new TestJob() {
            @Override
            public void run(ILocalCluster cluster) {
                TopologyBuilder builder = new TopologyBuilder();
                builder.setSpout(SPOUT_ID, new TestSpout());
                builder.setBolt(BOLT_ID, new BaseBasicParrotBolt()).shuffleGrouping(SPOUT_ID);
                StormTopology topology = builder.createTopology();

                MockedSources mockedSources = new MockedSources();
                mockedSources.addMockData(SPOUT_ID, 
                        new Values("nathan"),
                        new Values("bob"), 
                        new Values("joey"), 
                        new Values("nathan"));

                final Config conf = new Config();
                conf.setNumWorkers(2);

                final CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
                completeTopologyParam.setMockedSources(mockedSources);
                completeTopologyParam.setStormConf(conf);

                final Map result = Testing.completeTopology(cluster, topology, completeTopologyParam);

                final Values expected = new Values(new Values("nathan"), new Values("bob"), new Values("joey"), 
                        new Values("nathan"));
                
                for (String component : COMPONENT_IDS) {
                    assertTrue("Error in " + component + " output", 
                               Testing.multiseteq(expected, Testing.readTuples(result, component)));
                }
            }
        });
    }

    private static class TestSpout extends BaseRichSpout {
        @Override
        public void declareOutputFields(OutputFieldsDeclarer ofd) {
            ofd.declare(new Fields(EVENT));
        }

        @Override
        public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
            throw new UnsupportedOperationException(); // Must override, but not needed for test.
        }

        @Override
        public void nextTuple() {
            throw new UnsupportedOperationException(); // Must override, but not needed for test.
        }
    }
     
    private static class BaseBasicParrotBolt extends BaseBasicBolt {
        @Override
        public void declareOutputFields(OutputFieldsDeclarer ofd) {
            ofd.declare(new Fields(EVENT));
        }

        @Override
        public void execute(Tuple tuple, BasicOutputCollector boc) {
            boc.emit(new Values(tuple.getValue(0)));
        }
    }

     private static class BaseRichParrotBolt extends BaseRichBolt {
        private OutputCollector oc;

        @Override
        public void declareOutputFields(OutputFieldsDeclarer ofd) {
            ofd.declare(new Fields(EVENT));
        }
        
        @Override
        public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
            this.oc = oc;
        }
    
        @Override
        public void execute(Tuple tuple) {
            oc.emit(new Values(tuple.getValue(0)));
        }
        
        @Override
        public final Map<String, Object> getComponentConfiguration() {
            Config config = new Config();
            config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, null);
            return config;
        }
    }
}

I also posted this on SO but haven’t been getting any responses or many views. http://stackoverflow.com/questions/42491555/unit-testing-in-apache-storm-timeout-with-baserichbolt-but-not-with-basebasicb <http://stackoverflow.com/questions/42491555/unit-testing-in-apache-storm-timeout-with-baserichbolt-but-not-with-basebasicb> 



Thanks,
Jori