You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Szabó Péter <ne...@gmail.com> on 2015/06/03 15:32:36 UTC

Fwd: Discussion: Storm Comparability Layer

---------- Forwarded message ----------
From: Szabó Péter <ne...@gmail.com>
Date: 2015-06-03 15:31 GMT+02:00
Subject: Re: Discussion: Storm Comparability Layer
To: Márton Balassi <ba...@gmail.com>


Hey, Matthias,

Of course, you can remove my last commit. I just wanted to remove the
failing tests, and some unnecessary comments. Please do the latter it in
your commit as well.

As for StormSpoutCollector, I used Queue with LinkedList implementation,
because the list we keep is a queue in nature: we put records into it, and
remove the head from time to time. The collector implements iterator,
because I wanted to use something like next() and hasNext() in the
StormSpoutWrapper. I think emphasizing this iterator-nature makes the code
more readable.

Peter

2015-06-03 14:16 GMT+02:00 Márton Balassi <ba...@gmail.com>:

> Hey Matthias,
>
> We can undo Peter's commit if that helps you and have yours instead. You
> can simply remove that commit in a rebase. Besides this let us push to the
> same branch with trying not to break the history, I will squash the commits
> once again if it gets too bulky.
>
> I would like to bring the discussion to the mailing list, so the cummunity
> is seeing that you are actively working on this. Are you OK with reposting
> this thread to the dev mailing list?
>
> On Wed, Jun 3, 2015 at 2:09 PM, Matthias J. Sax <
> mjsax@informatik.hu-berlin.de> wrote:
>
>> Hi,
>>
>> I just saw, that Peter pushed a new commit. It makes it hard for me to
>> push my changes. Can we undo the last commit?
>>
>> If I get it right, it removes StormFiniteSpoutWrapper and disables
>> failing test only. Do we want to delete StormFiniteSpoutWrapper? I would
>> rather keep it.
>>
>> -Matthias
>>
>> On 06/03/2015 01:58 PM, Matthias J. Sax wrote:
>> > Hi,
>> >
>> > I have a few questions about the current status ("storm" branch from
>> > Marton).
>> >
>> > StormSpoutCollector:
>> >   - is there any specify advantage in using a Queue instead of
>> > LinkedList for the internal buffer?
>> >   - Why are us implementing Iterator interface and mark
>> > flinkCollectionDelegates as private?
>> >     -> I would rather drop the interface and make the variable "package
>> > private" to access it directly (avoids "unnecessary" method calls)
>> >
>> > StormSpoutWrapper:
>> >   - do we still need "isRunning" and "cancel()"? The new API should make
>> > them obsolete from my point of view.
>> >   - I would avoid "busy wait" in "next()" and apply a "not-emit" penalty
>> > within the while-loop:
>> >
>> >>      long sleep = 1;
>> >>      while(!stormCollector.hasNext()) {
>> >>              Thread.sleep(sleep);
>> >>              sleep *= 2;
>> >>              spout.nextTuple();
>> >>      }
>> >
>> > StormFiniteSpoutWrapper:
>> >   - remove member variable "isDefined" --> this is redundant information
>> > and might cause bugs...
>> >   - can we remove the "tupleEmitted" flag? Maybe we can implement it
>> > without it (nor sure though)
>> >
>> >
>> > I am also working on a new implementation of StormSpoutOutputWrapper. I
>> > will push it into my own repository if finished and tell you. It could
>> > replace the current implementation without the "nasty" buffering Queue
>> > (which I don't like). However, we need to discuss this alternative
>> > implementation first.
>> >
>> > Things I would like to push:
>> >
>> > I fixed the following tests (was already fixed in my branch but not
>> > merged by Marton):
>> >  - StormBoltWrapperTest
>> >  - StormSpoutWrapperTest
>> >  - StormFiniteSpoutWrapperTest
>> >  - Added new Test class InfiniteTestSpout
>> >
>> > I also step throw the hole code, removed "unused" tag (which are not
>> > necessary for public methods), corrected a few spelling mistakes is
>> > comments, and did some other minor "improvements".
>> >
>> > Additionally, I "merged" my changes (after my rebase) that are different
>> > to Peters changes. Peter and I discussed some of the rebase differences
>> > and I "merged" my and his changes (we both agreed how to resolve the
>> > differenced already).
>> >
>> > If it is ok, I will push it directly into Marton's git repository.
>> >
>> >
>> >
>> > -Matthias
>> >
>>
>>
>

Re: Fwd: Discussion: Storm Comparability Layer

Posted by Aljoscha Krettek <al...@apache.org>.
The back-and-forth on the Source interface was unfortunate, yes.

In general, I think, that we should not doctor around on other
peoples's pull requests in semi secrecy. Some small cosmetic fixes or
rewordings of the commit message are OK. But if the PR needs rework
then this should be voiced in the PR discussion and the person can
react. Correct me if I'm wrong, but Matthias seems very eager to work
on this and get it in a good shape.

On Thu, Jun 4, 2015 at 9:35 AM, Márton Balassi <ba...@gmail.com> wrote:
> Yes, this is indeed a big change, but it was openly discussed multiple
> times here on the mailing list and in a number of PRs. I am pretty sure
> that we do not want to break the source interface any more, but there is
> still some open discussion on it. Let us keep an eye on PR 742 where it is
> currently happening.
>
> I have retriggered the travis build, will look into it if fails again.
>
> On Wed, Jun 3, 2015 at 10:35 PM, Matthias J. Sax <
> mjsax@informatik.hu-berlin.de> wrote:
>
>> Thanks for the info.
>>
>> I am just a little bit "disappointed". The whole rewrite to the new
>> interface was unnecessary... We need to "revert" everything again...
>> I can also stop working on the new StormSpoutWrapper and
>> StormSpoutCollector implementation in this case...
>>
>> Let's see how it goes. But from my point of view, The simplest way might
>> even be to use the old state (including the clean up commit from Peter
>> before his rebase) from my original pull request and work from there
>> after the changed interface is in the master.
>>
>> Any other thought on this?
>>
>> (I guess, the current reworking was a little bit too eager.)
>>
>> @Marton: Can you have a lock at the travis build. It failed with a weird
>> compilation error. It builds locally. I cannot reproduce the error...
>> (Maybe just trigger the build again)
>>
>>
>> -Matthias
>>
>>
>> On 06/03/2015 09:09 PM, Márton Balassi wrote:
>> > Thanks for the updates, Matthias.
>> >
>> > Both of your questions get an other context, because we have decided to
>> go
>> > back to the run()/cancel() type of source interface - but with a slightly
>> > changed signature to enable "transactional" operator state checkpointing.
>> > You can check out the new source interface here [1] which is part of PR
>> 755.
>> >
>> > We hope that PR 755 will be in the master in the following days as it is
>> a
>> > release blocker, so you can plan with those interfaces. As far as the
>> > FiniteSpoutWrapper I think this makes your job easier and you definitely
>> > need the isRunning flag and the cancel method.
>> >
>> > [1]
>> >
>> https://github.com/StephanEwen/incubator-flink/blob/stream_sources/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
>> >
>> >
>> > On Wed, Jun 3, 2015 at 7:51 PM, Matthias J. Sax <
>> > mjsax@informatik.hu-berlin.de> wrote:
>> >
>> >> I just pushed my changes to Marton's "storm" branch.
>> >>
>> >> It is still open how to process with the following (please give
>> feedback):
>> >>
>> >> StormSpoutWrapper:
>> >>   - do we still need "isRunning" and "cancel()"? The new API should make
>> >> them obsolete from my point of view.
>> >>  - I would avoid "busy wait" in "next()" and apply a "not-emit" penalty
>> >> within the while-loop:
>> >>
>> >>>       long sleep = 1;
>> >>>       while(!stormCollector.hasNext()) {
>> >>>               Thread.sleep(sleep);
>> >>>               sleep *= 2;
>> >>>               spout.nextTuple();
>> >>>       }
>> >>
>> >> StormFiniteSpoutWrapper:
>> >>   - remove member variable "isDefined" --> this is redundant information
>> >> and might cause bugs...
>> >>  - can we remove the "tupleEmitted" flag? Maybe we can implement it
>> >> without it (nor sure though)
>> >>
>> >>
>> >> I am also working on a new implementation of StromSpoutWrapper and
>> >> StormSpoutCollector. I will push it into my own repository if finished
>> >> and tell you. It could replace the current implementation without the
>> >> "nasty" buffering Queue (which I don't like). However, we need to
>> >> discuss this alternative implementation first.
>> >>
>> >>
>> >> -Matthias
>> >>
>> >>
>> >> On 06/03/2015 03:32 PM, Szabó Péter wrote:
>> >>> ---------- Forwarded message ----------
>> >>> From: Szabó Péter <ne...@gmail.com>
>> >>> Date: 2015-06-03 15:31 GMT+02:00
>> >>> Subject: Re: Discussion: Storm Comparability Layer
>> >>> To: Márton Balassi <ba...@gmail.com>
>> >>>
>> >>>
>> >>> Hey, Matthias,
>> >>>
>> >>> Of course, you can remove my last commit. I just wanted to remove the
>> >>> failing tests, and some unnecessary comments. Please do the latter it
>> in
>> >>> your commit as well.
>> >>>
>> >>> As for StormSpoutCollector, I used Queue with LinkedList
>> implementation,
>> >>> because the list we keep is a queue in nature: we put records into it,
>> >> and
>> >>> remove the head from time to time. The collector implements iterator,
>> >>> because I wanted to use something like next() and hasNext() in the
>> >>> StormSpoutWrapper. I think emphasizing this iterator-nature makes the
>> >> code
>> >>> more readable.
>> >>>
>> >>> Peter
>> >>>
>> >>> 2015-06-03 14:16 GMT+02:00 Márton Balassi <ba...@gmail.com>:
>> >>>
>> >>>> Hey Matthias,
>> >>>>
>> >>>> We can undo Peter's commit if that helps you and have yours instead.
>> You
>> >>>> can simply remove that commit in a rebase. Besides this let us push to
>> >> the
>> >>>> same branch with trying not to break the history, I will squash the
>> >> commits
>> >>>> once again if it gets too bulky.
>> >>>>
>> >>>> I would like to bring the discussion to the mailing list, so the
>> >> cummunity
>> >>>> is seeing that you are actively working on this. Are you OK with
>> >> reposting
>> >>>> this thread to the dev mailing list?
>> >>>>
>> >>>> On Wed, Jun 3, 2015 at 2:09 PM, Matthias J. Sax <
>> >>>> mjsax@informatik.hu-berlin.de> wrote:
>> >>>>
>> >>>>> Hi,
>> >>>>>
>> >>>>> I just saw, that Peter pushed a new commit. It makes it hard for me
>> to
>> >>>>> push my changes. Can we undo the last commit?
>> >>>>>
>> >>>>> If I get it right, it removes StormFiniteSpoutWrapper and disables
>> >>>>> failing test only. Do we want to delete StormFiniteSpoutWrapper? I
>> >> would
>> >>>>> rather keep it.
>> >>>>>
>> >>>>> -Matthias
>> >>>>>
>> >>>>> On 06/03/2015 01:58 PM, Matthias J. Sax wrote:
>> >>>>>> Hi,
>> >>>>>>
>> >>>>>> I have a few questions about the current status ("storm" branch from
>> >>>>>> Marton).
>> >>>>>>
>> >>>>>> StormSpoutCollector:
>> >>>>>>   - is there any specify advantage in using a Queue instead of
>> >>>>>> LinkedList for the internal buffer?
>> >>>>>>   - Why are us implementing Iterator interface and mark
>> >>>>>> flinkCollectionDelegates as private?
>> >>>>>>     -> I would rather drop the interface and make the variable
>> >> "package
>> >>>>>> private" to access it directly (avoids "unnecessary" method calls)
>> >>>>>>
>> >>>>>> StormSpoutWrapper:
>> >>>>>>   - do we still need "isRunning" and "cancel()"? The new API should
>> >> make
>> >>>>>> them obsolete from my point of view.
>> >>>>>>   - I would avoid "busy wait" in "next()" and apply a "not-emit"
>> >> penalty
>> >>>>>> within the while-loop:
>> >>>>>>
>> >>>>>>>      long sleep = 1;
>> >>>>>>>      while(!stormCollector.hasNext()) {
>> >>>>>>>              Thread.sleep(sleep);
>> >>>>>>>              sleep *= 2;
>> >>>>>>>              spout.nextTuple();
>> >>>>>>>      }
>> >>>>>>
>> >>>>>> StormFiniteSpoutWrapper:
>> >>>>>>   - remove member variable "isDefined" --> this is redundant
>> >> information
>> >>>>>> and might cause bugs...
>> >>>>>>   - can we remove the "tupleEmitted" flag? Maybe we can implement it
>> >>>>>> without it (nor sure though)
>> >>>>>>
>> >>>>>>
>> >>>>>> I am also working on a new implementation of
>> StormSpoutOutputWrapper.
>> >> I
>> >>>>>> will push it into my own repository if finished and tell you. It
>> could
>> >>>>>> replace the current implementation without the "nasty" buffering
>> Queue
>> >>>>>> (which I don't like). However, we need to discuss this alternative
>> >>>>>> implementation first.
>> >>>>>>
>> >>>>>> Things I would like to push:
>> >>>>>>
>> >>>>>> I fixed the following tests (was already fixed in my branch but not
>> >>>>>> merged by Marton):
>> >>>>>>  - StormBoltWrapperTest
>> >>>>>>  - StormSpoutWrapperTest
>> >>>>>>  - StormFiniteSpoutWrapperTest
>> >>>>>>  - Added new Test class InfiniteTestSpout
>> >>>>>>
>> >>>>>> I also step throw the hole code, removed "unused" tag (which are not
>> >>>>>> necessary for public methods), corrected a few spelling mistakes is
>> >>>>>> comments, and did some other minor "improvements".
>> >>>>>>
>> >>>>>> Additionally, I "merged" my changes (after my rebase) that are
>> >> different
>> >>>>>> to Peters changes. Peter and I discussed some of the rebase
>> >> differences
>> >>>>>> and I "merged" my and his changes (we both agreed how to resolve the
>> >>>>>> differenced already).
>> >>>>>>
>> >>>>>> If it is ok, I will push it directly into Marton's git repository.
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> -Matthias
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >>
>> >
>>
>>
>>

Re: Fwd: Discussion: Storm Comparability Layer

Posted by Márton Balassi <ba...@gmail.com>.
Yes, this is indeed a big change, but it was openly discussed multiple
times here on the mailing list and in a number of PRs. I am pretty sure
that we do not want to break the source interface any more, but there is
still some open discussion on it. Let us keep an eye on PR 742 where it is
currently happening.

I have retriggered the travis build, will look into it if fails again.

On Wed, Jun 3, 2015 at 10:35 PM, Matthias J. Sax <
mjsax@informatik.hu-berlin.de> wrote:

> Thanks for the info.
>
> I am just a little bit "disappointed". The whole rewrite to the new
> interface was unnecessary... We need to "revert" everything again...
> I can also stop working on the new StormSpoutWrapper and
> StormSpoutCollector implementation in this case...
>
> Let's see how it goes. But from my point of view, The simplest way might
> even be to use the old state (including the clean up commit from Peter
> before his rebase) from my original pull request and work from there
> after the changed interface is in the master.
>
> Any other thought on this?
>
> (I guess, the current reworking was a little bit too eager.)
>
> @Marton: Can you have a lock at the travis build. It failed with a weird
> compilation error. It builds locally. I cannot reproduce the error...
> (Maybe just trigger the build again)
>
>
> -Matthias
>
>
> On 06/03/2015 09:09 PM, Márton Balassi wrote:
> > Thanks for the updates, Matthias.
> >
> > Both of your questions get an other context, because we have decided to
> go
> > back to the run()/cancel() type of source interface - but with a slightly
> > changed signature to enable "transactional" operator state checkpointing.
> > You can check out the new source interface here [1] which is part of PR
> 755.
> >
> > We hope that PR 755 will be in the master in the following days as it is
> a
> > release blocker, so you can plan with those interfaces. As far as the
> > FiniteSpoutWrapper I think this makes your job easier and you definitely
> > need the isRunning flag and the cancel method.
> >
> > [1]
> >
> https://github.com/StephanEwen/incubator-flink/blob/stream_sources/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
> >
> >
> > On Wed, Jun 3, 2015 at 7:51 PM, Matthias J. Sax <
> > mjsax@informatik.hu-berlin.de> wrote:
> >
> >> I just pushed my changes to Marton's "storm" branch.
> >>
> >> It is still open how to process with the following (please give
> feedback):
> >>
> >> StormSpoutWrapper:
> >>   - do we still need "isRunning" and "cancel()"? The new API should make
> >> them obsolete from my point of view.
> >>  - I would avoid "busy wait" in "next()" and apply a "not-emit" penalty
> >> within the while-loop:
> >>
> >>>       long sleep = 1;
> >>>       while(!stormCollector.hasNext()) {
> >>>               Thread.sleep(sleep);
> >>>               sleep *= 2;
> >>>               spout.nextTuple();
> >>>       }
> >>
> >> StormFiniteSpoutWrapper:
> >>   - remove member variable "isDefined" --> this is redundant information
> >> and might cause bugs...
> >>  - can we remove the "tupleEmitted" flag? Maybe we can implement it
> >> without it (nor sure though)
> >>
> >>
> >> I am also working on a new implementation of StromSpoutWrapper and
> >> StormSpoutCollector. I will push it into my own repository if finished
> >> and tell you. It could replace the current implementation without the
> >> "nasty" buffering Queue (which I don't like). However, we need to
> >> discuss this alternative implementation first.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 06/03/2015 03:32 PM, Szabó Péter wrote:
> >>> ---------- Forwarded message ----------
> >>> From: Szabó Péter <ne...@gmail.com>
> >>> Date: 2015-06-03 15:31 GMT+02:00
> >>> Subject: Re: Discussion: Storm Comparability Layer
> >>> To: Márton Balassi <ba...@gmail.com>
> >>>
> >>>
> >>> Hey, Matthias,
> >>>
> >>> Of course, you can remove my last commit. I just wanted to remove the
> >>> failing tests, and some unnecessary comments. Please do the latter it
> in
> >>> your commit as well.
> >>>
> >>> As for StormSpoutCollector, I used Queue with LinkedList
> implementation,
> >>> because the list we keep is a queue in nature: we put records into it,
> >> and
> >>> remove the head from time to time. The collector implements iterator,
> >>> because I wanted to use something like next() and hasNext() in the
> >>> StormSpoutWrapper. I think emphasizing this iterator-nature makes the
> >> code
> >>> more readable.
> >>>
> >>> Peter
> >>>
> >>> 2015-06-03 14:16 GMT+02:00 Márton Balassi <ba...@gmail.com>:
> >>>
> >>>> Hey Matthias,
> >>>>
> >>>> We can undo Peter's commit if that helps you and have yours instead.
> You
> >>>> can simply remove that commit in a rebase. Besides this let us push to
> >> the
> >>>> same branch with trying not to break the history, I will squash the
> >> commits
> >>>> once again if it gets too bulky.
> >>>>
> >>>> I would like to bring the discussion to the mailing list, so the
> >> cummunity
> >>>> is seeing that you are actively working on this. Are you OK with
> >> reposting
> >>>> this thread to the dev mailing list?
> >>>>
> >>>> On Wed, Jun 3, 2015 at 2:09 PM, Matthias J. Sax <
> >>>> mjsax@informatik.hu-berlin.de> wrote:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> I just saw, that Peter pushed a new commit. It makes it hard for me
> to
> >>>>> push my changes. Can we undo the last commit?
> >>>>>
> >>>>> If I get it right, it removes StormFiniteSpoutWrapper and disables
> >>>>> failing test only. Do we want to delete StormFiniteSpoutWrapper? I
> >> would
> >>>>> rather keep it.
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 06/03/2015 01:58 PM, Matthias J. Sax wrote:
> >>>>>> Hi,
> >>>>>>
> >>>>>> I have a few questions about the current status ("storm" branch from
> >>>>>> Marton).
> >>>>>>
> >>>>>> StormSpoutCollector:
> >>>>>>   - is there any specify advantage in using a Queue instead of
> >>>>>> LinkedList for the internal buffer?
> >>>>>>   - Why are us implementing Iterator interface and mark
> >>>>>> flinkCollectionDelegates as private?
> >>>>>>     -> I would rather drop the interface and make the variable
> >> "package
> >>>>>> private" to access it directly (avoids "unnecessary" method calls)
> >>>>>>
> >>>>>> StormSpoutWrapper:
> >>>>>>   - do we still need "isRunning" and "cancel()"? The new API should
> >> make
> >>>>>> them obsolete from my point of view.
> >>>>>>   - I would avoid "busy wait" in "next()" and apply a "not-emit"
> >> penalty
> >>>>>> within the while-loop:
> >>>>>>
> >>>>>>>      long sleep = 1;
> >>>>>>>      while(!stormCollector.hasNext()) {
> >>>>>>>              Thread.sleep(sleep);
> >>>>>>>              sleep *= 2;
> >>>>>>>              spout.nextTuple();
> >>>>>>>      }
> >>>>>>
> >>>>>> StormFiniteSpoutWrapper:
> >>>>>>   - remove member variable "isDefined" --> this is redundant
> >> information
> >>>>>> and might cause bugs...
> >>>>>>   - can we remove the "tupleEmitted" flag? Maybe we can implement it
> >>>>>> without it (nor sure though)
> >>>>>>
> >>>>>>
> >>>>>> I am also working on a new implementation of
> StormSpoutOutputWrapper.
> >> I
> >>>>>> will push it into my own repository if finished and tell you. It
> could
> >>>>>> replace the current implementation without the "nasty" buffering
> Queue
> >>>>>> (which I don't like). However, we need to discuss this alternative
> >>>>>> implementation first.
> >>>>>>
> >>>>>> Things I would like to push:
> >>>>>>
> >>>>>> I fixed the following tests (was already fixed in my branch but not
> >>>>>> merged by Marton):
> >>>>>>  - StormBoltWrapperTest
> >>>>>>  - StormSpoutWrapperTest
> >>>>>>  - StormFiniteSpoutWrapperTest
> >>>>>>  - Added new Test class InfiniteTestSpout
> >>>>>>
> >>>>>> I also step throw the hole code, removed "unused" tag (which are not
> >>>>>> necessary for public methods), corrected a few spelling mistakes is
> >>>>>> comments, and did some other minor "improvements".
> >>>>>>
> >>>>>> Additionally, I "merged" my changes (after my rebase) that are
> >> different
> >>>>>> to Peters changes. Peter and I discussed some of the rebase
> >> differences
> >>>>>> and I "merged" my and his changes (we both agreed how to resolve the
> >>>>>> differenced already).
> >>>>>>
> >>>>>> If it is ok, I will push it directly into Marton's git repository.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
>
>

Re: Fwd: Discussion: Storm Comparability Layer

Posted by "Matthias J. Sax" <mj...@informatik.hu-berlin.de>.
Thanks for the info.

I am just a little bit "disappointed". The whole rewrite to the new
interface was unnecessary... We need to "revert" everything again...
I can also stop working on the new StormSpoutWrapper and
StormSpoutCollector implementation in this case...

Let's see how it goes. But from my point of view, The simplest way might
even be to use the old state (including the clean up commit from Peter
before his rebase) from my original pull request and work from there
after the changed interface is in the master.

Any other thought on this?

(I guess, the current reworking was a little bit too eager.)

@Marton: Can you have a lock at the travis build. It failed with a weird
compilation error. It builds locally. I cannot reproduce the error...
(Maybe just trigger the build again)


-Matthias


On 06/03/2015 09:09 PM, Márton Balassi wrote:
> Thanks for the updates, Matthias.
> 
> Both of your questions get an other context, because we have decided to go
> back to the run()/cancel() type of source interface - but with a slightly
> changed signature to enable "transactional" operator state checkpointing.
> You can check out the new source interface here [1] which is part of PR 755.
> 
> We hope that PR 755 will be in the master in the following days as it is a
> release blocker, so you can plan with those interfaces. As far as the
> FiniteSpoutWrapper I think this makes your job easier and you definitely
> need the isRunning flag and the cancel method.
> 
> [1]
> https://github.com/StephanEwen/incubator-flink/blob/stream_sources/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
> 
> 
> On Wed, Jun 3, 2015 at 7:51 PM, Matthias J. Sax <
> mjsax@informatik.hu-berlin.de> wrote:
> 
>> I just pushed my changes to Marton's "storm" branch.
>>
>> It is still open how to process with the following (please give feedback):
>>
>> StormSpoutWrapper:
>>   - do we still need "isRunning" and "cancel()"? The new API should make
>> them obsolete from my point of view.
>>  - I would avoid "busy wait" in "next()" and apply a "not-emit" penalty
>> within the while-loop:
>>
>>>       long sleep = 1;
>>>       while(!stormCollector.hasNext()) {
>>>               Thread.sleep(sleep);
>>>               sleep *= 2;
>>>               spout.nextTuple();
>>>       }
>>
>> StormFiniteSpoutWrapper:
>>   - remove member variable "isDefined" --> this is redundant information
>> and might cause bugs...
>>  - can we remove the "tupleEmitted" flag? Maybe we can implement it
>> without it (nor sure though)
>>
>>
>> I am also working on a new implementation of StromSpoutWrapper and
>> StormSpoutCollector. I will push it into my own repository if finished
>> and tell you. It could replace the current implementation without the
>> "nasty" buffering Queue (which I don't like). However, we need to
>> discuss this alternative implementation first.
>>
>>
>> -Matthias
>>
>>
>> On 06/03/2015 03:32 PM, Szabó Péter wrote:
>>> ---------- Forwarded message ----------
>>> From: Szabó Péter <ne...@gmail.com>
>>> Date: 2015-06-03 15:31 GMT+02:00
>>> Subject: Re: Discussion: Storm Comparability Layer
>>> To: Márton Balassi <ba...@gmail.com>
>>>
>>>
>>> Hey, Matthias,
>>>
>>> Of course, you can remove my last commit. I just wanted to remove the
>>> failing tests, and some unnecessary comments. Please do the latter it in
>>> your commit as well.
>>>
>>> As for StormSpoutCollector, I used Queue with LinkedList implementation,
>>> because the list we keep is a queue in nature: we put records into it,
>> and
>>> remove the head from time to time. The collector implements iterator,
>>> because I wanted to use something like next() and hasNext() in the
>>> StormSpoutWrapper. I think emphasizing this iterator-nature makes the
>> code
>>> more readable.
>>>
>>> Peter
>>>
>>> 2015-06-03 14:16 GMT+02:00 Márton Balassi <ba...@gmail.com>:
>>>
>>>> Hey Matthias,
>>>>
>>>> We can undo Peter's commit if that helps you and have yours instead. You
>>>> can simply remove that commit in a rebase. Besides this let us push to
>> the
>>>> same branch with trying not to break the history, I will squash the
>> commits
>>>> once again if it gets too bulky.
>>>>
>>>> I would like to bring the discussion to the mailing list, so the
>> cummunity
>>>> is seeing that you are actively working on this. Are you OK with
>> reposting
>>>> this thread to the dev mailing list?
>>>>
>>>> On Wed, Jun 3, 2015 at 2:09 PM, Matthias J. Sax <
>>>> mjsax@informatik.hu-berlin.de> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I just saw, that Peter pushed a new commit. It makes it hard for me to
>>>>> push my changes. Can we undo the last commit?
>>>>>
>>>>> If I get it right, it removes StormFiniteSpoutWrapper and disables
>>>>> failing test only. Do we want to delete StormFiniteSpoutWrapper? I
>> would
>>>>> rather keep it.
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 06/03/2015 01:58 PM, Matthias J. Sax wrote:
>>>>>> Hi,
>>>>>>
>>>>>> I have a few questions about the current status ("storm" branch from
>>>>>> Marton).
>>>>>>
>>>>>> StormSpoutCollector:
>>>>>>   - is there any specify advantage in using a Queue instead of
>>>>>> LinkedList for the internal buffer?
>>>>>>   - Why are us implementing Iterator interface and mark
>>>>>> flinkCollectionDelegates as private?
>>>>>>     -> I would rather drop the interface and make the variable
>> "package
>>>>>> private" to access it directly (avoids "unnecessary" method calls)
>>>>>>
>>>>>> StormSpoutWrapper:
>>>>>>   - do we still need "isRunning" and "cancel()"? The new API should
>> make
>>>>>> them obsolete from my point of view.
>>>>>>   - I would avoid "busy wait" in "next()" and apply a "not-emit"
>> penalty
>>>>>> within the while-loop:
>>>>>>
>>>>>>>      long sleep = 1;
>>>>>>>      while(!stormCollector.hasNext()) {
>>>>>>>              Thread.sleep(sleep);
>>>>>>>              sleep *= 2;
>>>>>>>              spout.nextTuple();
>>>>>>>      }
>>>>>>
>>>>>> StormFiniteSpoutWrapper:
>>>>>>   - remove member variable "isDefined" --> this is redundant
>> information
>>>>>> and might cause bugs...
>>>>>>   - can we remove the "tupleEmitted" flag? Maybe we can implement it
>>>>>> without it (nor sure though)
>>>>>>
>>>>>>
>>>>>> I am also working on a new implementation of StormSpoutOutputWrapper.
>> I
>>>>>> will push it into my own repository if finished and tell you. It could
>>>>>> replace the current implementation without the "nasty" buffering Queue
>>>>>> (which I don't like). However, we need to discuss this alternative
>>>>>> implementation first.
>>>>>>
>>>>>> Things I would like to push:
>>>>>>
>>>>>> I fixed the following tests (was already fixed in my branch but not
>>>>>> merged by Marton):
>>>>>>  - StormBoltWrapperTest
>>>>>>  - StormSpoutWrapperTest
>>>>>>  - StormFiniteSpoutWrapperTest
>>>>>>  - Added new Test class InfiniteTestSpout
>>>>>>
>>>>>> I also step throw the hole code, removed "unused" tag (which are not
>>>>>> necessary for public methods), corrected a few spelling mistakes is
>>>>>> comments, and did some other minor "improvements".
>>>>>>
>>>>>> Additionally, I "merged" my changes (after my rebase) that are
>> different
>>>>>> to Peters changes. Peter and I discussed some of the rebase
>> differences
>>>>>> and I "merged" my and his changes (we both agreed how to resolve the
>>>>>> differenced already).
>>>>>>
>>>>>> If it is ok, I will push it directly into Marton's git repository.
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
> 



Re: Fwd: Discussion: Storm Comparability Layer

Posted by Márton Balassi <ba...@gmail.com>.
Thanks for the updates, Matthias.

Both of your questions get an other context, because we have decided to go
back to the run()/cancel() type of source interface - but with a slightly
changed signature to enable "transactional" operator state checkpointing.
You can check out the new source interface here [1] which is part of PR 755.

We hope that PR 755 will be in the master in the following days as it is a
release blocker, so you can plan with those interfaces. As far as the
FiniteSpoutWrapper I think this makes your job easier and you definitely
need the isRunning flag and the cancel method.

[1]
https://github.com/StephanEwen/incubator-flink/blob/stream_sources/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java


On Wed, Jun 3, 2015 at 7:51 PM, Matthias J. Sax <
mjsax@informatik.hu-berlin.de> wrote:

> I just pushed my changes to Marton's "storm" branch.
>
> It is still open how to process with the following (please give feedback):
>
> StormSpoutWrapper:
>   - do we still need "isRunning" and "cancel()"? The new API should make
> them obsolete from my point of view.
>  - I would avoid "busy wait" in "next()" and apply a "not-emit" penalty
> within the while-loop:
>
> >       long sleep = 1;
> >       while(!stormCollector.hasNext()) {
> >               Thread.sleep(sleep);
> >               sleep *= 2;
> >               spout.nextTuple();
> >       }
>
> StormFiniteSpoutWrapper:
>   - remove member variable "isDefined" --> this is redundant information
> and might cause bugs...
>  - can we remove the "tupleEmitted" flag? Maybe we can implement it
> without it (nor sure though)
>
>
> I am also working on a new implementation of StromSpoutWrapper and
> StormSpoutCollector. I will push it into my own repository if finished
> and tell you. It could replace the current implementation without the
> "nasty" buffering Queue (which I don't like). However, we need to
> discuss this alternative implementation first.
>
>
> -Matthias
>
>
> On 06/03/2015 03:32 PM, Szabó Péter wrote:
> > ---------- Forwarded message ----------
> > From: Szabó Péter <ne...@gmail.com>
> > Date: 2015-06-03 15:31 GMT+02:00
> > Subject: Re: Discussion: Storm Comparability Layer
> > To: Márton Balassi <ba...@gmail.com>
> >
> >
> > Hey, Matthias,
> >
> > Of course, you can remove my last commit. I just wanted to remove the
> > failing tests, and some unnecessary comments. Please do the latter it in
> > your commit as well.
> >
> > As for StormSpoutCollector, I used Queue with LinkedList implementation,
> > because the list we keep is a queue in nature: we put records into it,
> and
> > remove the head from time to time. The collector implements iterator,
> > because I wanted to use something like next() and hasNext() in the
> > StormSpoutWrapper. I think emphasizing this iterator-nature makes the
> code
> > more readable.
> >
> > Peter
> >
> > 2015-06-03 14:16 GMT+02:00 Márton Balassi <ba...@gmail.com>:
> >
> >> Hey Matthias,
> >>
> >> We can undo Peter's commit if that helps you and have yours instead. You
> >> can simply remove that commit in a rebase. Besides this let us push to
> the
> >> same branch with trying not to break the history, I will squash the
> commits
> >> once again if it gets too bulky.
> >>
> >> I would like to bring the discussion to the mailing list, so the
> cummunity
> >> is seeing that you are actively working on this. Are you OK with
> reposting
> >> this thread to the dev mailing list?
> >>
> >> On Wed, Jun 3, 2015 at 2:09 PM, Matthias J. Sax <
> >> mjsax@informatik.hu-berlin.de> wrote:
> >>
> >>> Hi,
> >>>
> >>> I just saw, that Peter pushed a new commit. It makes it hard for me to
> >>> push my changes. Can we undo the last commit?
> >>>
> >>> If I get it right, it removes StormFiniteSpoutWrapper and disables
> >>> failing test only. Do we want to delete StormFiniteSpoutWrapper? I
> would
> >>> rather keep it.
> >>>
> >>> -Matthias
> >>>
> >>> On 06/03/2015 01:58 PM, Matthias J. Sax wrote:
> >>>> Hi,
> >>>>
> >>>> I have a few questions about the current status ("storm" branch from
> >>>> Marton).
> >>>>
> >>>> StormSpoutCollector:
> >>>>   - is there any specify advantage in using a Queue instead of
> >>>> LinkedList for the internal buffer?
> >>>>   - Why are us implementing Iterator interface and mark
> >>>> flinkCollectionDelegates as private?
> >>>>     -> I would rather drop the interface and make the variable
> "package
> >>>> private" to access it directly (avoids "unnecessary" method calls)
> >>>>
> >>>> StormSpoutWrapper:
> >>>>   - do we still need "isRunning" and "cancel()"? The new API should
> make
> >>>> them obsolete from my point of view.
> >>>>   - I would avoid "busy wait" in "next()" and apply a "not-emit"
> penalty
> >>>> within the while-loop:
> >>>>
> >>>>>      long sleep = 1;
> >>>>>      while(!stormCollector.hasNext()) {
> >>>>>              Thread.sleep(sleep);
> >>>>>              sleep *= 2;
> >>>>>              spout.nextTuple();
> >>>>>      }
> >>>>
> >>>> StormFiniteSpoutWrapper:
> >>>>   - remove member variable "isDefined" --> this is redundant
> information
> >>>> and might cause bugs...
> >>>>   - can we remove the "tupleEmitted" flag? Maybe we can implement it
> >>>> without it (nor sure though)
> >>>>
> >>>>
> >>>> I am also working on a new implementation of StormSpoutOutputWrapper.
> I
> >>>> will push it into my own repository if finished and tell you. It could
> >>>> replace the current implementation without the "nasty" buffering Queue
> >>>> (which I don't like). However, we need to discuss this alternative
> >>>> implementation first.
> >>>>
> >>>> Things I would like to push:
> >>>>
> >>>> I fixed the following tests (was already fixed in my branch but not
> >>>> merged by Marton):
> >>>>  - StormBoltWrapperTest
> >>>>  - StormSpoutWrapperTest
> >>>>  - StormFiniteSpoutWrapperTest
> >>>>  - Added new Test class InfiniteTestSpout
> >>>>
> >>>> I also step throw the hole code, removed "unused" tag (which are not
> >>>> necessary for public methods), corrected a few spelling mistakes is
> >>>> comments, and did some other minor "improvements".
> >>>>
> >>>> Additionally, I "merged" my changes (after my rebase) that are
> different
> >>>> to Peters changes. Peter and I discussed some of the rebase
> differences
> >>>> and I "merged" my and his changes (we both agreed how to resolve the
> >>>> differenced already).
> >>>>
> >>>> If it is ok, I will push it directly into Marton's git repository.
> >>>>
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>
> >>>
> >>
> >
>
>

Re: Fwd: Discussion: Storm Comparability Layer

Posted by "Matthias J. Sax" <mj...@informatik.hu-berlin.de>.
I just pushed my changes to Marton's "storm" branch.

It is still open how to process with the following (please give feedback):

StormSpoutWrapper:
  - do we still need "isRunning" and "cancel()"? The new API should make
them obsolete from my point of view.
 - I would avoid "busy wait" in "next()" and apply a "not-emit" penalty
within the while-loop:

>       long sleep = 1;
>       while(!stormCollector.hasNext()) {
>               Thread.sleep(sleep);
>               sleep *= 2;
>               spout.nextTuple();
>       }

StormFiniteSpoutWrapper:
  - remove member variable "isDefined" --> this is redundant information
and might cause bugs...
 - can we remove the "tupleEmitted" flag? Maybe we can implement it
without it (nor sure though)


I am also working on a new implementation of StromSpoutWrapper and
StormSpoutCollector. I will push it into my own repository if finished
and tell you. It could replace the current implementation without the
"nasty" buffering Queue (which I don't like). However, we need to
discuss this alternative implementation first.


-Matthias


On 06/03/2015 03:32 PM, Szabó Péter wrote:
> ---------- Forwarded message ----------
> From: Szabó Péter <ne...@gmail.com>
> Date: 2015-06-03 15:31 GMT+02:00
> Subject: Re: Discussion: Storm Comparability Layer
> To: Márton Balassi <ba...@gmail.com>
> 
> 
> Hey, Matthias,
> 
> Of course, you can remove my last commit. I just wanted to remove the
> failing tests, and some unnecessary comments. Please do the latter it in
> your commit as well.
> 
> As for StormSpoutCollector, I used Queue with LinkedList implementation,
> because the list we keep is a queue in nature: we put records into it, and
> remove the head from time to time. The collector implements iterator,
> because I wanted to use something like next() and hasNext() in the
> StormSpoutWrapper. I think emphasizing this iterator-nature makes the code
> more readable.
> 
> Peter
> 
> 2015-06-03 14:16 GMT+02:00 Márton Balassi <ba...@gmail.com>:
> 
>> Hey Matthias,
>>
>> We can undo Peter's commit if that helps you and have yours instead. You
>> can simply remove that commit in a rebase. Besides this let us push to the
>> same branch with trying not to break the history, I will squash the commits
>> once again if it gets too bulky.
>>
>> I would like to bring the discussion to the mailing list, so the cummunity
>> is seeing that you are actively working on this. Are you OK with reposting
>> this thread to the dev mailing list?
>>
>> On Wed, Jun 3, 2015 at 2:09 PM, Matthias J. Sax <
>> mjsax@informatik.hu-berlin.de> wrote:
>>
>>> Hi,
>>>
>>> I just saw, that Peter pushed a new commit. It makes it hard for me to
>>> push my changes. Can we undo the last commit?
>>>
>>> If I get it right, it removes StormFiniteSpoutWrapper and disables
>>> failing test only. Do we want to delete StormFiniteSpoutWrapper? I would
>>> rather keep it.
>>>
>>> -Matthias
>>>
>>> On 06/03/2015 01:58 PM, Matthias J. Sax wrote:
>>>> Hi,
>>>>
>>>> I have a few questions about the current status ("storm" branch from
>>>> Marton).
>>>>
>>>> StormSpoutCollector:
>>>>   - is there any specify advantage in using a Queue instead of
>>>> LinkedList for the internal buffer?
>>>>   - Why are us implementing Iterator interface and mark
>>>> flinkCollectionDelegates as private?
>>>>     -> I would rather drop the interface and make the variable "package
>>>> private" to access it directly (avoids "unnecessary" method calls)
>>>>
>>>> StormSpoutWrapper:
>>>>   - do we still need "isRunning" and "cancel()"? The new API should make
>>>> them obsolete from my point of view.
>>>>   - I would avoid "busy wait" in "next()" and apply a "not-emit" penalty
>>>> within the while-loop:
>>>>
>>>>>      long sleep = 1;
>>>>>      while(!stormCollector.hasNext()) {
>>>>>              Thread.sleep(sleep);
>>>>>              sleep *= 2;
>>>>>              spout.nextTuple();
>>>>>      }
>>>>
>>>> StormFiniteSpoutWrapper:
>>>>   - remove member variable "isDefined" --> this is redundant information
>>>> and might cause bugs...
>>>>   - can we remove the "tupleEmitted" flag? Maybe we can implement it
>>>> without it (nor sure though)
>>>>
>>>>
>>>> I am also working on a new implementation of StormSpoutOutputWrapper. I
>>>> will push it into my own repository if finished and tell you. It could
>>>> replace the current implementation without the "nasty" buffering Queue
>>>> (which I don't like). However, we need to discuss this alternative
>>>> implementation first.
>>>>
>>>> Things I would like to push:
>>>>
>>>> I fixed the following tests (was already fixed in my branch but not
>>>> merged by Marton):
>>>>  - StormBoltWrapperTest
>>>>  - StormSpoutWrapperTest
>>>>  - StormFiniteSpoutWrapperTest
>>>>  - Added new Test class InfiniteTestSpout
>>>>
>>>> I also step throw the hole code, removed "unused" tag (which are not
>>>> necessary for public methods), corrected a few spelling mistakes is
>>>> comments, and did some other minor "improvements".
>>>>
>>>> Additionally, I "merged" my changes (after my rebase) that are different
>>>> to Peters changes. Peter and I discussed some of the rebase differences
>>>> and I "merged" my and his changes (we both agreed how to resolve the
>>>> differenced already).
>>>>
>>>> If it is ok, I will push it directly into Marton's git repository.
>>>>
>>>>
>>>>
>>>> -Matthias
>>>>
>>>
>>>
>>
>