You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by subash basnet <ya...@gmail.com> on 2016/05/07 17:16:20 UTC

Unable to understand datastream error message

Hello all,

I am getting the below error on execute of StreamExecutionEnvironment.


*Caused by: java.lang.IllegalStateException: Iteration
FeedbackTransformation{id=15, name='Feedback',
outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String,
pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback
edges.*
The run method inside the thread class of DataStreamUtils handles this
exception:
@Override
public void run(){
try {
stream.getExecutionEnvironment().execute();
} catch (Exception e) {
throw new RuntimeException("Exception in execute()", e);
}
}

I am not able to understand what to infer from this error message so that I
could solve it.


Best Regards,
Subash Basnet

Re: Unable to understand datastream error message

Posted by Aljoscha Krettek <al...@apache.org>.
Could you please post your code.

On Sat, 7 May 2016 at 19:16 subash basnet <ya...@gmail.com> wrote:

> Hello all,
>
> I am getting the below error on execute of StreamExecutionEnvironment.
>
>
> *Caused by: java.lang.IllegalStateException: Iteration
> FeedbackTransformation{id=15, name='Feedback',
> outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String,
> pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback
> edges.*
> The run method inside the thread class of DataStreamUtils handles this
> exception:
> @Override
> public void run(){
> try {
> stream.getExecutionEnvironment().execute();
> } catch (Exception e) {
> throw new RuntimeException("Exception in execute()", e);
> }
> }
>
> I am not able to understand what to infer from this error message so that
> I could solve it.
>
>
> Best Regards,
> Subash Basnet
>

Re: Unable to understand datastream error message

Posted by Aljoscha Krettek <al...@apache.org>.
There you have your explanation. A loop actually has to be a loop for it to
work in Flink.

On Sat, 14 May 2016 at 16:35 subash basnet <ya...@gmail.com> wrote:

> Hello,
>
> I had to use,
> private static IterativeStream<Centroid> *loop*;
> loop as global variable because I cannot broadcast it like that of DataSet
> API in DataStream API.
>
> I tried to use *closewith * like that of DataSet as below in DataStream
> but it gives me exception:
> DataStream<Centroid> finalCentroids = *loop*.closeWith(newCentroids);
>
>
> Exception in thread "main" java.lang.UnsupportedOperationException: *Cannot
> close an iteration with a feedback DataStream that does not originate from
> said iteration.*
> at
> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75)
> at wikiedits.StockAnalysis.main(StockAnalysis.java:64)
>
>
> Best Regards,
> Subash Basnet
>
>
> On Sat, May 14, 2016 at 4:26 PM, subash basnet <ya...@gmail.com> wrote:
>
>> Hello Aljoscha,
>>
>> Below is the shorted version of StockAnalysis class which is a datastream
>> adapation of the *KMeans.java* dataset code.
>>
>> public class StockAnalysis{
>>     public static void main(String args[]){
>>        DataStream<Centroid> centroids = newCentroidDataStream.map(new
>> TupleCentroidConverter());
>>   *loop* = centroids.iterate(10);
>>   DataStream<Centroid> newCentroids = points.map(new
>> SelectNearestCenter()).map(new CountAppender()).keyBy(0)
>> .reduce(new CentroidAccumulator()).map(new CentroidAverager());
>>       public static final class SelectNearestCenter extends
>> RichMapFunction<Point, Tuple2<String, Point>> {
>> private Collection<Centroid> centroids;
>> @Override
>> public void open(Configuration parameters) throws Exception {
>> Iterator<Centroid> iter = DataStreamUtils.collect(*loop*);
>> this.*centroids* = Lists.newArrayList(iter);
>> }
>>                @Override
>> public Tuple2<String, Point> map(Point p) throws Exception {
>>                      for (Centroid centroid : *centroids*) {
>>                      }...................
>>                 }
>>      }
>>    }
>>
>> }
>>
>>
>> On Sun, May 8, 2016 at 7:10 AM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> [image: Boxbe] <https://www.boxbe.com/overview> This message is
>>> eligible for Automatic Cleanup! (aljoscha@apache.org) Add cleanup rule
>>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3DgC%252BkGO7SbhS%252FoALSDkW8dBXumMPYysp%252B5FWf%252FX8whAVqYWiJqTOpC2fjBOdm%252BrZr6ZTM6BmqH1lYr8kUEWi3BxO7RFl%252FqJC2kUoaP4Q2L98wc9thjH6dY6QYn7ZQ6hN0GCi5xDFMhOo%253D%26key%3DNwIHY0Ppe%252BKHFaaQd88hYlg52OwTtztNKydGoopQE7I%253D&tc_serial=25330148286&tc_rand=1488102128&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>> | More info
>>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=25330148286&tc_rand=1488102128&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>>
>>> Could you please post your code.
>>>
>>> On Sat, 7 May 2016 at 19:16 subash basnet <ya...@gmail.com> wrote:
>>>
>>>> Hello all,
>>>>
>>>> I am getting the below error on execute of StreamExecutionEnvironment.
>>>>
>>>>
>>>> *Caused by: java.lang.IllegalStateException: Iteration
>>>> FeedbackTransformation{id=15, name='Feedback',
>>>> outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String,
>>>> pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback
>>>> edges.*
>>>> The run method inside the thread class of DataStreamUtils handles this
>>>> exception:
>>>> @Override
>>>> public void run(){
>>>> try {
>>>> stream.getExecutionEnvironment().execute();
>>>> } catch (Exception e) {
>>>> throw new RuntimeException("Exception in execute()", e);
>>>> }
>>>> }
>>>>
>>>> I am not able to understand what to infer from this error message so
>>>> that I could solve it.
>>>>
>>>>
>>>> Best Regards,
>>>> Subash Basnet
>>>>
>>>
>>>
>>
>

Re: Unable to understand datastream error message

Posted by subash basnet <ya...@gmail.com>.
Hello Aljoscha,

For *DataSet: *
IterativeDataSet<Centroid> *loop* = centroids.iterate(numIterations);
DataSet<Centroid> *newCentroids* = points.map(new SelectNearestCenter()).
*withBroadcastSet*(*loop*, "*centroids*")
.map(new CountAppender()).groupBy(0).reduce(new CentroidAccumulator())
.map(new CentroidAverager());
// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = *loop*.closeWith(*newCentroids*);


It's working fine, now if I do the same operation in *DataStream *as below
*: *
IterativeDataStream<Centroid>*loop* = centroids.iterate(numIterations);
DataStream<Centroid> *newCentroids* = points.map(new
SelectNearestCenter()).map(new CountAppender()).keyBy(0)
.reduce(new CentroidAccumulator()).map(new CentroidAverager());
DataSet<Centroid> finalCentroids = *loop*.closeWith(*newCentroids*);

I get the following exception as already mentioned in earlier thread:
Exception in thread "main" java.lang.UnsupportedOperationException: *Cannot
close an iteration with a feedback DataStream that does not originate from
said iteration.*
at
org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75)
at
wikiedits.StockAnalysisKMeansOutlierDetection.main(StockAnalysisKMeansOutlierDetection.java:98)

Could you please suggest me where I am wrong here?


Best Regards,
Subash Basnet


On Mon, May 16, 2016 at 6:11 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> [image: Boxbe] <https://www.boxbe.com/overview> This message is eligible
> for Automatic Cleanup! (aljoscha@apache.org) Add cleanup rule
> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3DyDJIsi0%252BlOrq0WaCT1DQ5avMUJUkSXJki5zTdOMFzGLCT9QEN6CODvxxs2LdU6pmNOjPbJji3cC2zJc3b%252Bfu3rfrxOGeNPyMFpYFPOxTmS1v%252BlZg%252FqDiVL9xQPMAVc9q4JdxgkBuvk8%253D%26key%3DScLPbPXy3TSkZu%252Bu2GXdktVXu45EsoEFtCeUNP6bChg%253D&tc_serial=25425051479&tc_rand=1401352945&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
> | More info
> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=25425051479&tc_rand=1401352945&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>
> There you have your explanation. A loop actually has to be a loop for it
> to work in Flink.
>
> On Sat, 14 May 2016 at 16:35 subash basnet <ya...@gmail.com> wrote:
>
>> Hello,
>>
>> I had to use,
>> private static IterativeStream<Centroid> *loop*;
>> loop as global variable because I cannot broadcast it like that of
>> DataSet API in DataStream API.
>>
>> I tried to use *closewith * like that of DataSet as below in DataStream
>> but it gives me exception:
>> DataStream<Centroid> finalCentroids = *loop*.closeWith(newCentroids);
>>
>>
>> Exception in thread "main" java.lang.UnsupportedOperationException: *Cannot
>> close an iteration with a feedback DataStream that does not originate from
>> said iteration.*
>> at
>> org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75)
>> at wikiedits.StockAnalysis.main(StockAnalysis.java:64)
>>
>>
>> Best Regards,
>> Subash Basnet
>>
>>
>> On Sat, May 14, 2016 at 4:26 PM, subash basnet <ya...@gmail.com>
>> wrote:
>>
>>> Hello Aljoscha,
>>>
>>> Below is the shorted version of StockAnalysis class which is a
>>> datastream adapation of the *KMeans.java* dataset code.
>>>
>>> public class StockAnalysis{
>>>     public static void main(String args[]){
>>>        DataStream<Centroid> centroids = newCentroidDataStream.map(new
>>> TupleCentroidConverter());
>>>   *loop* = centroids.iterate(10);
>>>   DataStream<Centroid> newCentroids = points.map(new
>>> SelectNearestCenter()).map(new CountAppender()).keyBy(0)
>>> .reduce(new CentroidAccumulator()).map(new CentroidAverager());
>>>       public static final class SelectNearestCenter extends
>>> RichMapFunction<Point, Tuple2<String, Point>> {
>>> private Collection<Centroid> centroids;
>>> @Override
>>> public void open(Configuration parameters) throws Exception {
>>> Iterator<Centroid> iter = DataStreamUtils.collect(*loop*);
>>> this.*centroids* = Lists.newArrayList(iter);
>>> }
>>>                @Override
>>> public Tuple2<String, Point> map(Point p) throws Exception {
>>>                      for (Centroid centroid : *centroids*) {
>>>                      }...................
>>>                 }
>>>      }
>>>    }
>>>
>>> }
>>>
>>>
>>> On Sun, May 8, 2016 at 7:10 AM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> [image: Boxbe] <https://www.boxbe.com/overview> This message is
>>>> eligible for Automatic Cleanup! (aljoscha@apache.org) Add cleanup rule
>>>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3DgC%252BkGO7SbhS%252FoALSDkW8dBXumMPYysp%252B5FWf%252FX8whAVqYWiJqTOpC2fjBOdm%252BrZr6ZTM6BmqH1lYr8kUEWi3BxO7RFl%252FqJC2kUoaP4Q2L98wc9thjH6dY6QYn7ZQ6hN0GCi5xDFMhOo%253D%26key%3DNwIHY0Ppe%252BKHFaaQd88hYlg52OwTtztNKydGoopQE7I%253D&tc_serial=25330148286&tc_rand=1488102128&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>>> | More info
>>>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=25330148286&tc_rand=1488102128&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>>>
>>>> Could you please post your code.
>>>>
>>>> On Sat, 7 May 2016 at 19:16 subash basnet <ya...@gmail.com> wrote:
>>>>
>>>>> Hello all,
>>>>>
>>>>> I am getting the below error on execute of StreamExecutionEnvironment.
>>>>>
>>>>>
>>>>> *Caused by: java.lang.IllegalStateException: Iteration
>>>>> FeedbackTransformation{id=15, name='Feedback',
>>>>> outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String,
>>>>> pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback
>>>>> edges.*
>>>>> The run method inside the thread class of DataStreamUtils handles this
>>>>> exception:
>>>>> @Override
>>>>> public void run(){
>>>>> try {
>>>>> stream.getExecutionEnvironment().execute();
>>>>> } catch (Exception e) {
>>>>> throw new RuntimeException("Exception in execute()", e);
>>>>> }
>>>>> }
>>>>>
>>>>> I am not able to understand what to infer from this error message so
>>>>> that I could solve it.
>>>>>
>>>>>
>>>>> Best Regards,
>>>>> Subash Basnet
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Unable to understand datastream error message

Posted by subash basnet <ya...@gmail.com>.
Hello,

I had to use,
private static IterativeStream<Centroid> *loop*;
loop as global variable because I cannot broadcast it like that of DataSet
API in DataStream API.

I tried to use *closewith * like that of DataSet as below in DataStream but
it gives me exception:
DataStream<Centroid> finalCentroids = *loop*.closeWith(newCentroids);


Exception in thread "main" java.lang.UnsupportedOperationException: *Cannot
close an iteration with a feedback DataStream that does not originate from
said iteration.*
at
org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75)
at wikiedits.StockAnalysis.main(StockAnalysis.java:64)


Best Regards,
Subash Basnet


On Sat, May 14, 2016 at 4:26 PM, subash basnet <ya...@gmail.com> wrote:

> Hello Aljoscha,
>
> Below is the shorted version of StockAnalysis class which is a datastream
> adapation of the *KMeans.java* dataset code.
>
> public class StockAnalysis{
>     public static void main(String args[]){
>        DataStream<Centroid> centroids = newCentroidDataStream.map(new
> TupleCentroidConverter());
>   *loop* = centroids.iterate(10);
>   DataStream<Centroid> newCentroids = points.map(new
> SelectNearestCenter()).map(new CountAppender()).keyBy(0)
> .reduce(new CentroidAccumulator()).map(new CentroidAverager());
>       public static final class SelectNearestCenter extends
> RichMapFunction<Point, Tuple2<String, Point>> {
> private Collection<Centroid> centroids;
> @Override
> public void open(Configuration parameters) throws Exception {
> Iterator<Centroid> iter = DataStreamUtils.collect(*loop*);
> this.*centroids* = Lists.newArrayList(iter);
> }
>                @Override
> public Tuple2<String, Point> map(Point p) throws Exception {
>                      for (Centroid centroid : *centroids*) {
>                      }...................
>                 }
>      }
>    }
>
> }
>
>
> On Sun, May 8, 2016 at 7:10 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> [image: Boxbe] <https://www.boxbe.com/overview> This message is eligible
>> for Automatic Cleanup! (aljoscha@apache.org) Add cleanup rule
>> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3DgC%252BkGO7SbhS%252FoALSDkW8dBXumMPYysp%252B5FWf%252FX8whAVqYWiJqTOpC2fjBOdm%252BrZr6ZTM6BmqH1lYr8kUEWi3BxO7RFl%252FqJC2kUoaP4Q2L98wc9thjH6dY6QYn7ZQ6hN0GCi5xDFMhOo%253D%26key%3DNwIHY0Ppe%252BKHFaaQd88hYlg52OwTtztNKydGoopQE7I%253D&tc_serial=25330148286&tc_rand=1488102128&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>> | More info
>> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=25330148286&tc_rand=1488102128&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>>
>> Could you please post your code.
>>
>> On Sat, 7 May 2016 at 19:16 subash basnet <ya...@gmail.com> wrote:
>>
>>> Hello all,
>>>
>>> I am getting the below error on execute of StreamExecutionEnvironment.
>>>
>>>
>>> *Caused by: java.lang.IllegalStateException: Iteration
>>> FeedbackTransformation{id=15, name='Feedback',
>>> outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String,
>>> pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback
>>> edges.*
>>> The run method inside the thread class of DataStreamUtils handles this
>>> exception:
>>> @Override
>>> public void run(){
>>> try {
>>> stream.getExecutionEnvironment().execute();
>>> } catch (Exception e) {
>>> throw new RuntimeException("Exception in execute()", e);
>>> }
>>> }
>>>
>>> I am not able to understand what to infer from this error message so
>>> that I could solve it.
>>>
>>>
>>> Best Regards,
>>> Subash Basnet
>>>
>>
>>
>

Re: Unable to understand datastream error message

Posted by subash basnet <ya...@gmail.com>.
Hello Aljoscha,

Below is the shorted version of StockAnalysis class which is a datastream
adapation of the *KMeans.java* dataset code.

public class StockAnalysis{
    public static void main(String args[]){
       DataStream<Centroid> centroids = newCentroidDataStream.map(new
TupleCentroidConverter());
  *loop* = centroids.iterate(10);
  DataStream<Centroid> newCentroids = points.map(new
SelectNearestCenter()).map(new CountAppender()).keyBy(0)
.reduce(new CentroidAccumulator()).map(new CentroidAverager());
      public static final class SelectNearestCenter extends
RichMapFunction<Point, Tuple2<String, Point>> {
private Collection<Centroid> centroids;
@Override
public void open(Configuration parameters) throws Exception {
Iterator<Centroid> iter = DataStreamUtils.collect(*loop*);
this.*centroids* = Lists.newArrayList(iter);
}
               @Override
public Tuple2<String, Point> map(Point p) throws Exception {
                     for (Centroid centroid : *centroids*) {
                     }...................
                }
     }
   }

}


On Sun, May 8, 2016 at 7:10 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> [image: Boxbe] <https://www.boxbe.com/overview> This message is eligible
> for Automatic Cleanup! (aljoscha@apache.org) Add cleanup rule
> <https://www.boxbe.com/popup?url=https%3A%2F%2Fwww.boxbe.com%2Fcleanup%3Ftoken%3DgC%252BkGO7SbhS%252FoALSDkW8dBXumMPYysp%252B5FWf%252FX8whAVqYWiJqTOpC2fjBOdm%252BrZr6ZTM6BmqH1lYr8kUEWi3BxO7RFl%252FqJC2kUoaP4Q2L98wc9thjH6dY6QYn7ZQ6hN0GCi5xDFMhOo%253D%26key%3DNwIHY0Ppe%252BKHFaaQd88hYlg52OwTtztNKydGoopQE7I%253D&tc_serial=25330148286&tc_rand=1488102128&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
> | More info
> <http://blog.boxbe.com/general/boxbe-automatic-cleanup?tc_serial=25330148286&tc_rand=1488102128&utm_source=stf&utm_medium=email&utm_campaign=ANNO_CLEANUP_ADD&utm_content=001>
>
> Could you please post your code.
>
> On Sat, 7 May 2016 at 19:16 subash basnet <ya...@gmail.com> wrote:
>
>> Hello all,
>>
>> I am getting the below error on execute of StreamExecutionEnvironment.
>>
>>
>> *Caused by: java.lang.IllegalStateException: Iteration
>> FeedbackTransformation{id=15, name='Feedback',
>> outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String,
>> pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback
>> edges.*
>> The run method inside the thread class of DataStreamUtils handles this
>> exception:
>> @Override
>> public void run(){
>> try {
>> stream.getExecutionEnvironment().execute();
>> } catch (Exception e) {
>> throw new RuntimeException("Exception in execute()", e);
>> }
>> }
>>
>> I am not able to understand what to infer from this error message so that
>> I could solve it.
>>
>>
>> Best Regards,
>> Subash Basnet
>>
>
>