You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Michel Hummel <hu...@gmail.com> on 2016/11/24 23:00:25 UTC

Trident persistentAggregate/join

Hi every body.

I want to use a simple topology which is basically composed of 2
spout, so 2 Stream.

On one of the stream I do a simple filter "F1", then groupBy(),
persistantAggregate( with a MemoryMapState.Factory() and a basic
CombinerAggregator)
I do a newStream() to push the results and finally a filter "F2".

On the other stream I do a simple filter "F3".

Finally i do a join between the 2 streams (I joins the output of F2 and F3)

Every Stream work perfectly when it is alone F2 as well as F3 produces
tuples (I mean if I build and test the 2 topologies by deleting the
final join from the topology).

But if I add the Join, the first Stream (With the
persistantAggregate()) "hangs", and no tuples gets out of the
topology.

After hours of analysis, I think that the problem comes from the
persistantAggregate.

I don't know why but It seems that the CombinerAggregator never emits
the combined tuples.


I see in my analysis, that the method "combine" is called for "some" tuples.
From what I have understand from the documentation, the init() method
of the CombinerAggregator as well as the "combine" method is called
for every tuple. For every tuple of the batch but not for the last,
the persistantAggregate emits the "zero" method as output, and for the
last it should emits directly the result of the combine method.
Well in my case I think that the final emit which should send the
result to the next bolt is never done, the "combine" is called but no
tuples get out of the persistentAggregate.

Like I said at the beginning if I build the first topology (without
the join) the aggregation is done as expected.

Is someone have an idear ?

I probably can extract a simple test case if necessary.

Any help whould be appreciated,
Michel Hummel

Re: Trident persistentAggregate/join

Posted by Michel Hummel <hu...@gmail.com>.
Hello,
To illustrate the issue here is an example of a hanging trident topology :
===========================
        final FixedBatchSpout spout1 = new FixedBatchSpout(new
Fields("sentence"), 6,
                new Values("the cow jumped over the moon"));

        final FixedBatchSpout spout2 = new FixedBatchSpout(new
Fields("word2", "count"), 6, new Values("the", 2),
                new Values("cow", 1), new Values("jumped", 1), new
Values("over", 1),
                new Values("the", 1),
                new Values("moon", 1));

        final TridentTopology topology = new TridentTopology();
        final Stream wordCounts1 = topology.newStream("spout",
spout1).filter(new Debug("1"))
                .each(new Fields("sentence"), new Split(), new
Fields("word1")).groupBy(new Fields("word1"))
                .persistentAggregate(new MemoryMapState.Factory(), new
Count(), new Fields("count")).parallelismHint(2)
                .newValuesStream().filter(new Debug("11"));
        final Stream wordCounts2 = topology.newStream("spout2",
spout2).filter(new Debug("2"));

        topology.join(wordCounts1, new Fields("word1"), wordCounts2,
new Fields("word2"), new Fields("key", "a", "b"),
                JoinType.INNER).filter(new Debug("J"));

        final Config stormConfig = new Config();
        final LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", stormConfig, topology.build());
        Utils.sleep(30 * 1000L);
        cluster.killTopology("test");
        cluster.shutdown();
==========================

As you can see in the output  (below), the aggregated tuples are not
printed (11) :
<Fri Nov 25 17:57:39 CET 2016> DEBUG(1): [the cow jumped over the moon]
<Fri Nov 25 17:57:39 CET 2016> DEBUG(2): [the, 2]
<Fri Nov 25 17:57:39 CET 2016> DEBUG(2): [cow, 1]
<Fri Nov 25 17:57:39 CET 2016> DEBUG(2): [jumped, 1]
<Fri Nov 25 17:57:39 CET 2016> DEBUG(2): [over, 1]
<Fri Nov 25 17:57:39 CET 2016> DEBUG(2): [the, 1]
<Fri Nov 25 17:57:39 CET 2016> DEBUG(2): [moon, 1]


if i replace the persistentAggregate with an aggregate, the topology work fine :
==========================
       final FixedBatchSpout spout1 = new FixedBatchSpout(new
Fields("sentence"), 6,
                new Values("the cow jumped over the moon"));

        final FixedBatchSpout spout2 = new FixedBatchSpout(new
Fields("word2", "count"), 6,
                new Values("the", 2),
                new Values("cow", 1),
                new Values("jumped", 1),
                new Values("over", 1),
                new Values("the", 1),
                new Values("moon", 1));

        final TridentTopology topology = new TridentTopology();
        final Stream wordCounts1 =
                topology.newStream("spout", spout1).filter(new Debug("1"))
                .each(new Fields("sentence"), new Split(), new
Fields("word1")).groupBy(new Fields("word1"))
                .aggregate(new Count(), new Fields("count"))
                .parallelismHint(2).filter(new Debug("11"));
        final Stream wordCounts2 =topology.newStream("spout2",
spout2).filter(new Debug("2"));

        topology.join(wordCounts1, new Fields("word1"), wordCounts2,
new Fields("word2"), new Fields("key", "a", "b"),
                JoinType.INNER)
        .filter(new Debug("J"));

        final Config stormConfig = new Config();
        final LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("test", stormConfig, topology.build());
        Utils.sleep(30 * 1000L);
        cluster.killTopology("test");
        cluster.shutdown();
==========================
As you can see in the output  (below), the aggregated tuples as well
as the joined tuples are printed (11) (J) :
<Fri Nov 25 18:05:35 CET 2016> DEBUG(1): [the cow jumped over the moon]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(11): [over, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(2): [the, 2]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(2): [cow, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(2): [jumped, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(2): [over, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(2): [the, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(2): [moon, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(11): [the, 2]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(11): [moon, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(11): [jumped, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(11): [cow, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(J): [over, 1, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(J): [the, 2, 2]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(J): [the, 2, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(J): [moon, 1, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(J): [jumped, 1, 1]
<Fri Nov 25 18:05:35 CET 2016> DEBUG(J): [cow, 1, 1]

Is any one have an idea ?

2016-11-25 0:00 GMT+01:00 Michel Hummel <hu...@gmail.com>:
> Hi every body.
>
> I want to use a simple topology which is basically composed of 2
> spout, so 2 Stream.
>
> On one of the stream I do a simple filter "F1", then groupBy(),
> persistantAggregate( with a MemoryMapState.Factory() and a basic
> CombinerAggregator)
> I do a newStream() to push the results and finally a filter "F2".
>
> On the other stream I do a simple filter "F3".
>
> Finally i do a join between the 2 streams (I joins the output of F2 and F3)
>
> Every Stream work perfectly when it is alone F2 as well as F3 produces
> tuples (I mean if I build and test the 2 topologies by deleting the
> final join from the topology).
>
> But if I add the Join, the first Stream (With the
> persistantAggregate()) "hangs", and no tuples gets out of the
> topology.
>
> After hours of analysis, I think that the problem comes from the
> persistantAggregate.
>
> I don't know why but It seems that the CombinerAggregator never emits
> the combined tuples.
>
>
> I see in my analysis, that the method "combine" is called for "some" tuples.
> From what I have understand from the documentation, the init() method
> of the CombinerAggregator as well as the "combine" method is called
> for every tuple. For every tuple of the batch but not for the last,
> the persistantAggregate emits the "zero" method as output, and for the
> last it should emits directly the result of the combine method.
> Well in my case I think that the final emit which should send the
> result to the next bolt is never done, the "combine" is called but no
> tuples get out of the persistentAggregate.
>
> Like I said at the beginning if I build the first topology (without
> the join) the aggregation is done as expected.
>
> Is someone have an idear ?
>
> I probably can extract a simple test case if necessary.
>
> Any help whould be appreciated,
> Michel Hummel