You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "James Xu (JIRA)" <ji...@apache.org> on 2013/12/15 04:22:06 UTC

[jira] [Created] (STORM-124) Tuple Lost in Trident Topology on Remote Cluster, while the same topology works correctly on local cluster

James Xu created STORM-124:
------------------------------

             Summary: Tuple Lost in Trident Topology on Remote Cluster, while the same topology works correctly on local cluster
                 Key: STORM-124
                 URL: https://issues.apache.org/jira/browse/STORM-124
             Project: Apache Storm (Incubating)
          Issue Type: Bug
            Reporter: James Xu
            Priority: Minor


https://github.com/nathanmarz/storm/issues/535

I have created a trident topology, while it runs correctly on local cluster , it gets random result when deployed on remote cluster due to tuple loss in trident

Code example is as followed, In ExtractFunction it will parse kafka message and create 4 tuples from the content(5 fields), then CustomReduceAggregator will aggregate based on first 4 fields.
I generated 800 kafka messages during the test, ExtractFunction is expected to get 800 tuples and generate 800*4=3200 tuples for CustomReduceAggregator.
This works correctly in local cluster.
In remoted cluster, while ExtractFunction generated 3200 tuples, CustomReduceAggregator will always get only a few hundred tuples, therefore the result is not correct

Is this a bug in storm/trident or there is some configuration needed for my trident topology?

    TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(new ZkHosts(zkServer+":"+zkPort, zkBrokerPath), kafkaTopic);
    tridentKafkaConfig.forceStartOffsetTime(-2);

    TridentTopology topology=new TridentTopology();
    TridentState tridentState = topology.newStream("kafkaSpout"+System.currentTimeMillis(), new TransactionalTridentKafkaSpout(tridentKafkaConfig))
        .parallelismHint(1)
        .each(new Fields("bytes"), new ExtractFunction(), new Fields("a", "b", "c", "b", "e"))
        .parallelismHint(1)
        .groupBy(new Fields("a", "b", "c", "f"))
        .persistentAggregate(new CassandraMapState.Factory(StateType.TRANSACTIONAL, "cf_1"),  
                new Fields("a", "b", "c", "b", "e"), 
                new CustomReduceAggregator(), 
                new Fields("value"))
        .parallelismHint(1);

    topology.newDRPCStream("profile", localDRPC)
        .each(new Fields("args"), new DrpcParamSplitFunction(), new Fields("a", "b", "c", "b"))
        .groupBy(new Fields("a", "b", "c", "b"))
        .stateQuery(tridentState, new Fields("a", "b", "c", "b"), new MapGet(), new Fields("value"));

    return topology.build();



--
This message was sent by Atlassian JIRA
(v6.1.4#6159)