You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Eugene <ed...@gmail.com> on 2014/07/31 20:27:40 UTC

Trident topology UI show many processing tuples when in reality It shouldn't be any

My trident topology is using kafka-spout and processing messages from
kafka.
When I deploy storm topology to a cluster,UI immediately start showing that
it's processing  a big number of tuples. I don't even started sending
messages to kafka!
In log file I see a lot of messages like this one:

""message":"Reading reply sessionid:0x34712e79b9204f3, packet::
clientPath:null serverPath:null finished:false header:: 225,4
 replyHeader:: 225,34401287364,0  request:: '/storm/
assignments/OfflineRuleTopology-83-1406827538,T  response::
#ffffffac ......"

What does this message mean?

I tried to delete topic from kafka but it did not help.
My topology is as follow:


BrokerHosts brokerHosts = new ZkHosts("zooas31d-con-08");
TridentTopology topology = new TridentTopology();
TridentKafkaConfig spoutConfig = new TridentKafkaConfig(
        brokerHosts,
        "offline_events",
        "offline_events_client");

spoutConfig.scheme = new SchemeAsMultiScheme(new EventStringScheme());
StateFactory mongodb = new MongoDBStateFactory("cp","tlc")
// should it be transactional?
TransactionalTridentKafkaSpout spout = new
TransactionalTridentKafkaSpout(spoutConfig);

Stream inputStream=topology.newStream("offlineEvents", spout)
inputStream.each(new Fields("event"),new getProfile(),new Fields("profile"))
.shuffle()
.each(new Fields("event"),new getTargetList(),new Fields("targetList"))
.each(new Fields("event"),new partitionRules(),new Fields("ruleFile"))
.shuffle()
.each(new Fields("event","profile","targetList","ruleFile"), new
executeRule(), new Fields("tacticId","tactic"))
.project(new Fields("tacticId","tactic","event"))
.shuffle()
.each(new Fields("tactic"),new filterAssignedTactic())
.partitionPersist(mongodb, new Fields("tactic","event"), new
MongoDBStateUpdater())

Config config = new Config()

config.setDebug(false)
config.setNumAckers(1)
config.setNumWorkers(1)

Map statsdConfig = new HashMap();
statsdConfig.put(StatsdMetricConsumer.STATSD_HOST, "
statsd.compute-1.amazonaws.com");
statsdConfig.put(StatsdMetricConsumer.STATSD_PORT, 8125);
statsdConfig.put(StatsdMetricConsumer.STATSD_PREFIX, "storm.metrics.");

Config conf = new Config();
conf.registerMetricsConsumer(StatsdMetricConsumer.class, statsdConfig, 2);



config.registerMetricsConsumer(LoggingMetricsConsumer.class, 2);

    config.setMaxSpoutPending(5)
    config.setMessageTimeoutSecs(60);
    config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 20);
    config.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 500);

    StormSubmitter.submitTopology("OfflineRuleTopology", config,
topology.build())



Is it something to do with Kafka/Zookeeper? What I am missing here?
If it is endlessly reprocessing batches, but then why I don't see any error
messages and new batches still get processed?



Thanks

Screenshot:
[image: Inline image 1]


-- 
Eugene Dvorkin
Software Engineer
New York City Storm User Group - organizer
WebMD
email: edvorkin@gmail.com
phone: 646-250-9649
eugenedvorkin.com
Connect with me on:
LinkedIn <http://www.linkedin.com/in/eugenedvorkin> Twitter
<http://twitter.com/edvorkin>

Re: Trident topology UI show many processing tuples when in reality It shouldn't be any

Posted by Derek Dagit <de...@yahoo-inc.com>.
> ""message":"Reading reply sessionid:0x34712e79b9204f3, packet::
> clientPath:null serverPath:null finished:false header:: 225,4
>   replyHeader:: 225,34401287364,0  request:: '/storm/
> assignments/OfflineRuleTopology-83-1406827538,T  response::
> #ffffffac ......"

This looks like a ZooKeeper log message.

These will happen even when no tuples are flowing yet.
-- 
Derek

On 7/31/14, 13:27, Eugene wrote:
> My trident topology is using kafka-spout and processing messages from
> kafka.
> When I deploy storm topology to a cluster,UI immediately start showing that
> it's processing  a big number of tuples. I don't even started sending
> messages to kafka!
> In log file I see a lot of messages like this one:
>
> ""message":"Reading reply sessionid:0x34712e79b9204f3, packet::
> clientPath:null serverPath:null finished:false header:: 225,4
>   replyHeader:: 225,34401287364,0  request:: '/storm/
> assignments/OfflineRuleTopology-83-1406827538,T  response::
> #ffffffac ......"
>
> What does this message mean?
>
> I tried to delete topic from kafka but it did not help.
> My topology is as follow:
>
>
> BrokerHosts brokerHosts = new ZkHosts("zooas31d-con-08");
> TridentTopology topology = new TridentTopology();
> TridentKafkaConfig spoutConfig = new TridentKafkaConfig(
>          brokerHosts,
>          "offline_events",
>          "offline_events_client");
>
> spoutConfig.scheme = new SchemeAsMultiScheme(new EventStringScheme());
> StateFactory mongodb = new MongoDBStateFactory("cp","tlc")
> // should it be transactional?
> TransactionalTridentKafkaSpout spout = new
> TransactionalTridentKafkaSpout(spoutConfig);
>
> Stream inputStream=topology.newStream("offlineEvents", spout)
> inputStream.each(new Fields("event"),new getProfile(),new Fields("profile"))
> .shuffle()
> .each(new Fields("event"),new getTargetList(),new Fields("targetList"))
> .each(new Fields("event"),new partitionRules(),new Fields("ruleFile"))
> .shuffle()
> .each(new Fields("event","profile","targetList","ruleFile"), new
> executeRule(), new Fields("tacticId","tactic"))
> .project(new Fields("tacticId","tactic","event"))
> .shuffle()
> .each(new Fields("tactic"),new filterAssignedTactic())
> .partitionPersist(mongodb, new Fields("tactic","event"), new
> MongoDBStateUpdater())
>
> Config config = new Config()
>
> config.setDebug(false)
> config.setNumAckers(1)
> config.setNumWorkers(1)
>
> Map statsdConfig = new HashMap();
> statsdConfig.put(StatsdMetricConsumer.STATSD_HOST, "
> statsd.compute-1.amazonaws.com");
> statsdConfig.put(StatsdMetricConsumer.STATSD_PORT, 8125);
> statsdConfig.put(StatsdMetricConsumer.STATSD_PREFIX, "storm.metrics.");
>
> Config conf = new Config();
> conf.registerMetricsConsumer(StatsdMetricConsumer.class, statsdConfig, 2);
>
>
>
> config.registerMetricsConsumer(LoggingMetricsConsumer.class, 2);
>
>      config.setMaxSpoutPending(5)
>      config.setMessageTimeoutSecs(60);
>      config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 20);
>      config.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 500);
>
>      StormSubmitter.submitTopology("OfflineRuleTopology", config,
> topology.build())
>
>
>
> Is it something to do with Kafka/Zookeeper? What I am missing here?
> If it is endlessly reprocessing batches, but then why I don't see any error
> messages and new batches still get processed?
>
>
>
> Thanks
>
> Screenshot:
> [image: Inline image 1]
>
>