You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Majid Hajibaba (JIRA)" <ji...@apache.org> on 2016/08/06 07:36:20 UTC

[jira] [Created] (STORM-2025) dropping messages in withTumblingWindow

Majid Hajibaba created STORM-2025:
-------------------------------------

             Summary: dropping messages in withTumblingWindow
                 Key: STORM-2025
                 URL: https://issues.apache.org/jira/browse/STORM-2025
             Project: Apache Storm
          Issue Type: Bug
          Components: storm-core
    Affects Versions: 1.0.1
         Environment: ubuntu 14.0.1 LTS
            Reporter: Majid Hajibaba


when i use withTumblingWindow and process the input messages, if the processing time is longer than input rate, we will not get all input messages.

int count=0;
	@Override
	public void execute(TupleWindow inputWindow) {
		try {
			List<Event> windowEvenets = new ArrayList<>();
			for(Tuple tuple: inputWindow.get()) {
				if (tuple.getSourceComponent().equals("KafkaSpout")) {
					count++;
					windowEvenets.add(ScenarioUtils.convertToKlugEvent(tuple.getString(0)));
				}
			}
			logger.info(count + "======= Process event ");
			Thread.sleep(4000);
		}
		catch (Exception ex) {
			ex.printStackTrace();
		}
	}



 TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1 /*kafkaSpoutCount*/);
            //builder.setSpout("KafkaSpout", new MyKafkaSpout("correlateTest"), 1);
            builder.setBolt("WindowInputTest", new WindowInputTest(zookeeperHosts).withTumblingWindow(new BaseWindowedBolt.Duration(4,TimeUnit.SECONDS)), 1).shuffleGrouping("KafkaSpout");




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)