You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@wayang.apache.org by Michalis Vargiamis <mi...@gmail.com> on 2023/05/03 12:43:34 UTC

Re: Flink Operator Tests

Hello!


Regarding the Flink operator tests, there is progress. First of all, I 
looked at the already implemented Spark operator tests, some of them 
were already implemented for Flink and some others did not exist for 
Flink (like the BernoulliSampleOperator). So what I have implemented so 
far are the UnionAllOperator and the SortOperator which went very 
smoothly, and also the MaterializedGroupByOperator for which I had to do 
some tweaks in the test code and also add the following line to the 
operator code

.returns(this.getOutputType().getDataUnitType().getTypeClass());


Now, regarding the CoGroupOperator, GlobalReduceOperator, 
MapPartitionsOperator, I get the following error for all of them, that 
has got me kind of stuck:


java.lang.IllegalArgumentException
         at 
org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown 
Source)
         at 
org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown 
Source)
         at 
org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown 
Source)
         at 
org.apache.flink.api.java.ClosureCleaner.getClassReader(ClosureCleaner.java:148)
         at 
org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:115)
         at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:75)
         at org.apache.flink.api.java.DataSet.clean(DataSet.java:186)
         at 
org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets$CoGroupOperatorSetsPredicate$CoGroupOperatorWithoutFunction.with(CoGroupOperator.java:622)
         at 
org.apache.wayang.flink.operators.FlinkCoGroupOperator.evaluate(FlinkCoGroupOperator.java:116)
         at 
org.apache.wayang.flink.operators.FlinkOperatorTestBase.evaluate(FlinkOperatorTestBase.java:75)
         at 
org.apache.wayang.flink.operators.FlinkCoGroupOperatorTest.testExecution(FlinkCoGroupOperatorTest.java:72)
         at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
         at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
         at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
         at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
         at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
         at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
         at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
         at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
         at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
         at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
         at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
         at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
         at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
         at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
         at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
         at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
         at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
         at 
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
         at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
         at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
         at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
         at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
         at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
         at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
         at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:82)
         at 
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:73)
         at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
         at 
org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
         at 
org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
         at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
         at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
         at 
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
         at 
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
         at 
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:128)
         at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
         at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
         at 
org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
         at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)


Any help or feedback would be appreciated!


Thanks,

Michalis


On 27-Apr-23 8:45 PM, Jorge Arnulfo Quiané Ruiz wrote:
> Hi Michalis,
>
> Sure! It was more of a heads-up than anything else.
> Please don’t hesitate to pose your questions here or to call for a meeting if necessary :)
>
> Best,
> Jorge
>
>> On 27 Apr 2023, at 16.52, Michalis Vargiamis <mi...@gmail.com> wrote:
>>
>> Hi!
>>
>>
>> Thanks! Actually I've lost quite some time re configuring my setup. I currently have a windows laptop, so for the Linux I initially tried the WSL that windows provide but it turned out to make the whole development/debugging process quite inefficient, so i switched to vmware but then again I lost some time with some other stuff. Anyway, I'd have to ask for a bit more time on this one.
>>
>>
>> Thank you,
>>
>> Michalis
>>
>>
>> On 27-Apr-23 10:41 AM, Jorge Arnulfo Quiané Ruiz wrote:
>>> Hi Michalis,
>>>
>>> Please let us know if you need help :)
>>>
>>> Best,
>>> Jorge
>>>
>>>> On 25 Apr 2023, at 14.07, Zoi Kaoudi <zk...@yahoo.gr.INVALID> wrote:
>>>>
>>>> Hi Michalis,
>>>> can you double check that you define the types of the Tuple2 output?
>>>>
>>>>
>>>> According to the error
>>>> "Return type
>>>> PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
>>>> GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>"it seems like the Tuple2 does not have specific types but they are java.lang.Objects. Maybe that could be the problem.
>>>>
>>>> Also to give some context, if you see in the FlinkMaterializedGroupByOperator code there are some utility functions we use to map the Wayang UDFs to the Flink (or Spark) UDFs. For example, the line:
>>>> final KeySelector<Type, KeyType> keyExtractor =        flinkExecutor.getCompiler().compileKeySelector(this.getKeyDescriptor());
>>>> converts the Wayang UDF keyDescriptor to a Flink UDF KeySelector. Maybe if you check the code of this class you could spot the problem?
>>>> Best
>>>> --
>>>> Zoi
>>>>
>>>>     Στις Τρίτη 25 Απριλίου 2023 στις 01:30:20 μ.μ. CEST, ο χρήστης Michalis Vargiamis <mi...@gmail.com> έγραψε:
>>>>
>>>> Hello!
>>>>
>>>>
>>>> I've been working with the missing operator tests for Flink. I've
>>>> successfully done the SortOperator and the UnionAllOperator by seeing
>>>> the respective Spark operator tests and modifying RddChannel to
>>>> DataSetChannel.
>>>>
>>>>
>>>> I'm having trouble with the tests for other operators though, for
>>>> example the FlinkMaterializedGroupByOperator. I tried starting with
>>>> SparkMaterializedGroupByOperatorTest and doing the same RddChannel to
>>>> DataSetChannel modifications as before, but I get the following error:
>>>>
>>>>
>>>> [ERROR]
>>>> org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution
>>>> Time elapsed: 1.911 s  <<< ERROR!
>>>> org.apache.flink.api.common.InvalidProgramException: Return type
>>>> PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
>>>> GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>
>>>> of KeySelector class
>>>> org.apache.wayang.flink.compiler.KeySelectorFunction is not a valid key type
>>>>          at
>>>> org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution(FlinkMaterializedGroupByOperatorTest.java:50)
>>>>
>>>>
>>>> Digging into the operator code a bit more, the error happens at
>>>>
>>>> dataSetInput.groupBy(keyExtractor);
>>>>
>>>>
>>>> Any ideas on what should be changed?
>>>>
>>>>
>>>> Here is a permalink to the respective spark test
>>>> [https://github.com/apache/incubator-wayang/blob/6aad4eea8c91a52f2a41e79424491e6c2c5206af/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/operators/SparkMaterializedGroupByOperatorTest.java]
>>>>
>>>>
>>>> Thank you,
>>>>
>>>> Michalis Vargiamis
>>>>

Re: Flink Operator Tests

Posted by Zoi Kaoudi <zk...@yahoo.gr.INVALID>.
 Thanks a lot Michalis. 
I just made a review on your PR. Please take a look.
And let us know if you find an API discrepancy with Flink.
Best
--
Zoi

    Στις Τετάρτη 3 Μαΐου 2023 στις 05:43:56 μ.μ. CEST, ο χρήστης Michalis Vargiamis <mi...@gmail.com> έγραψε:  
 
 I created the pull request here 
[https://github.com/apache/incubator-wayang/pull/316].

I can also verify that the ReduceBy and the Join operator throw the same 
exception I encountered! I'll check out the documentation as you suggested.


Thanks,

Michalis


On 03-May-23 4:38 PM, Zoi Kaoudi wrote:
>  Hi Michalis,
>
> first, I suggest to make a pull request with what you have that is working.
> Regarding the error, I have not encountered this before. I think there should be a bug in the Flink operator implementation. By browsing the Flink tests we currently have, it seems that ReduceBy and Join also throw some errors. Can you check if the ReduceBy and the Join operator throw the same exception with the one you have?
> If so, I suggest taking one of them and going to the Flink documentation of that operator. It may be that we are using an older API of Flink.
> Best
> --
> Zoi
>
>      Στις Τετάρτη 3 Μαΐου 2023 στις 02:43:44 μ.μ. CEST, ο χρήστης Michalis Vargiamis <mi...@gmail.com> έγραψε:
>  
>  Hello!
>
>
> Regarding the Flink operator tests, there is progress. First of all, I
> looked at the already implemented Spark operator tests, some of them
> were already implemented for Flink and some others did not exist for
> Flink (like the BernoulliSampleOperator). So what I have implemented so
> far are the UnionAllOperator and the SortOperator which went very
> smoothly, and also the MaterializedGroupByOperator for which I had to do
> some tweaks in the test code and also add the following line to the
> operator code
>
> .returns(this.getOutputType().getDataUnitType().getTypeClass());
>
>
> Now, regarding the CoGroupOperator, GlobalReduceOperator,
> MapPartitionsOperator, I get the following error for all of them, that
> has got me kind of stuck:
>
>
> java.lang.IllegalArgumentException
>          at
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown
> Source)
>          at
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown
> Source)
>          at
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown
> Source)
>          at
> org.apache.flink.api.java.ClosureCleaner.getClassReader(ClosureCleaner.java:148)
>          at
> org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:115)
>          at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:75)
>          at org.apache.flink.api.java.DataSet.clean(DataSet.java:186)
>          at
> org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets$CoGroupOperatorSetsPredicate$CoGroupOperatorWithoutFunction.with(CoGroupOperator.java:622)
>          at
> org.apache.wayang.flink.operators.FlinkCoGroupOperator.evaluate(FlinkCoGroupOperator.java:116)
>          at
> org.apache.wayang.flink.operators.FlinkOperatorTestBase.evaluate(FlinkOperatorTestBase.java:75)
>          at
> org.apache.wayang.flink.operators.FlinkCoGroupOperatorTest.testExecution(FlinkCoGroupOperatorTest.java:72)
>          at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>          at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>          at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>          at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>          at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>          at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>          at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>          at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>          at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>          at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>          at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>          at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>          at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>          at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>          at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>          at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>          at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>          at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>          at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>          at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>          at
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
>          at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>          at
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>          at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
>          at
> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
>          at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>          at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>          at
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>          at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>          at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>          at
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
>          at
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:82)
>          at
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:73)
>          at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
>          at
> org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
>          at
> org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
>          at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
>          at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
>          at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
>          at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
>          at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:128)
>          at
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
>          at
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
>          at
> org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
>          at
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
>
>
> Any help or feedback would be appreciated!
>
>
> Thanks,
>
> Michalis
>
>
> On 27-Apr-23 8:45 PM, Jorge Arnulfo Quiané Ruiz wrote:
>> Hi Michalis,
>>
>> Sure! It was more of a heads-up than anything else.
>> Please don’t hesitate to pose your questions here or to call for a meeting if necessary :)
>>
>> Best,
>> Jorge
>>
>>> On 27 Apr 2023, at 16.52, Michalis Vargiamis <mi...@gmail.com> wrote:
>>>
>>> Hi!
>>>
>>>
>>> Thanks! Actually I've lost quite some time re configuring my setup. I currently have a windows laptop, so for the Linux I initially tried the WSL that windows provide but it turned out to make the whole development/debugging process quite inefficient, so i switched to vmware but then again I lost some time with some other stuff. Anyway, I'd have to ask for a bit more time on this one.
>>>
>>>
>>> Thank you,
>>>
>>> Michalis
>>>
>>>
>>> On 27-Apr-23 10:41 AM, Jorge Arnulfo Quiané Ruiz wrote:
>>>> Hi Michalis,
>>>>
>>>> Please let us know if you need help :)
>>>>
>>>> Best,
>>>> Jorge
>>>>
>>>>> On 25 Apr 2023, at 14.07, Zoi Kaoudi <zk...@yahoo.gr.INVALID> wrote:
>>>>>
>>>>> Hi Michalis,
>>>>> can you double check that you define the types of the Tuple2 output?
>>>>>
>>>>>
>>>>> According to the error
>>>>> "Return type
>>>>> PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
>>>>> GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>"it seems like the Tuple2 does not have specific types but they are java.lang.Objects. Maybe that could be the problem.
>>>>>
>>>>> Also to give some context, if you see in the FlinkMaterializedGroupByOperator code there are some utility functions we use to map the Wayang UDFs to the Flink (or Spark) UDFs. For example, the line:
>>>>> final KeySelector<Type, KeyType> keyExtractor =        flinkExecutor.getCompiler().compileKeySelector(this.getKeyDescriptor());
>>>>> converts the Wayang UDF keyDescriptor to a Flink UDF KeySelector. Maybe if you check the code of this class you could spot the problem?
>>>>> Best
>>>>> --
>>>>> Zoi
>>>>>
>>>>>      Στις Τρίτη 25 Απριλίου 2023 στις 01:30:20 μ.μ. CEST, ο χρήστης Michalis Vargiamis <mi...@gmail.com> έγραψε:
>>>>>
>>>>> Hello!
>>>>>
>>>>>
>>>>> I've been working with the missing operator tests for Flink. I've
>>>>> successfully done the SortOperator and the UnionAllOperator by seeing
>>>>> the respective Spark operator tests and modifying RddChannel to
>>>>> DataSetChannel.
>>>>>
>>>>>
>>>>> I'm having trouble with the tests for other operators though, for
>>>>> example the FlinkMaterializedGroupByOperator. I tried starting with
>>>>> SparkMaterializedGroupByOperatorTest and doing the same RddChannel to
>>>>> DataSetChannel modifications as before, but I get the following error:
>>>>>
>>>>>
>>>>> [ERROR]
>>>>> org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution
>>>>> Time elapsed: 1.911 s  <<< ERROR!
>>>>> org.apache.flink.api.common.InvalidProgramException: Return type
>>>>> PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
>>>>> GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>
>>>>> of KeySelector class
>>>>> org.apache.wayang.flink.compiler.KeySelectorFunction is not a valid key type
>>>>>            at
>>>>> org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution(FlinkMaterializedGroupByOperatorTest.java:50)
>>>>>
>>>>>
>>>>> Digging into the operator code a bit more, the error happens at
>>>>>
>>>>> dataSetInput.groupBy(keyExtractor);
>>>>>
>>>>>
>>>>> Any ideas on what should be changed?
>>>>>
>>>>>
>>>>> Here is a permalink to the respective spark test
>>>>> [https://github.com/apache/incubator-wayang/blob/6aad4eea8c91a52f2a41e79424491e6c2c5206af/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/operators/SparkMaterializedGroupByOperatorTest.java]
>>>>>
>>>>>
>>>>> Thank you,
>>>>>
>>>>> Michalis Vargiamis
>>>>>
>    
  

Re: Flink Operator Tests

Posted by Michalis Vargiamis <mi...@gmail.com>.
I created the pull request here 
[https://github.com/apache/incubator-wayang/pull/316].

I can also verify that the ReduceBy and the Join operator throw the same 
exception I encountered! I'll check out the documentation as you suggested.


Thanks,

Michalis


On 03-May-23 4:38 PM, Zoi Kaoudi wrote:
>   Hi Michalis,
>
> first, I suggest to make a pull request with what you have that is working.
> Regarding the error, I have not encountered this before. I think there should be a bug in the Flink operator implementation. By browsing the Flink tests we currently have, it seems that ReduceBy and Join also throw some errors. Can you check if the ReduceBy and the Join operator throw the same exception with the one you have?
> If so, I suggest taking one of them and going to the Flink documentation of that operator. It may be that we are using an older API of Flink.
> Best
> --
> Zoi
>
>      Στις Τετάρτη 3 Μαΐου 2023 στις 02:43:44 μ.μ. CEST, ο χρήστης Michalis Vargiamis <mi...@gmail.com> έγραψε:
>   
>   Hello!
>
>
> Regarding the Flink operator tests, there is progress. First of all, I
> looked at the already implemented Spark operator tests, some of them
> were already implemented for Flink and some others did not exist for
> Flink (like the BernoulliSampleOperator). So what I have implemented so
> far are the UnionAllOperator and the SortOperator which went very
> smoothly, and also the MaterializedGroupByOperator for which I had to do
> some tweaks in the test code and also add the following line to the
> operator code
>
> .returns(this.getOutputType().getDataUnitType().getTypeClass());
>
>
> Now, regarding the CoGroupOperator, GlobalReduceOperator,
> MapPartitionsOperator, I get the following error for all of them, that
> has got me kind of stuck:
>
>
> java.lang.IllegalArgumentException
>           at
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown
> Source)
>           at
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown
> Source)
>           at
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown
> Source)
>           at
> org.apache.flink.api.java.ClosureCleaner.getClassReader(ClosureCleaner.java:148)
>           at
> org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:115)
>           at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:75)
>           at org.apache.flink.api.java.DataSet.clean(DataSet.java:186)
>           at
> org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets$CoGroupOperatorSetsPredicate$CoGroupOperatorWithoutFunction.with(CoGroupOperator.java:622)
>           at
> org.apache.wayang.flink.operators.FlinkCoGroupOperator.evaluate(FlinkCoGroupOperator.java:116)
>           at
> org.apache.wayang.flink.operators.FlinkOperatorTestBase.evaluate(FlinkOperatorTestBase.java:75)
>           at
> org.apache.wayang.flink.operators.FlinkCoGroupOperatorTest.testExecution(FlinkCoGroupOperatorTest.java:72)
>           at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>           at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>           at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>           at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>           at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>           at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>           at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>           at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>           at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>           at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>           at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>           at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>           at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>           at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>           at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>           at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>           at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>           at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>           at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>           at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>           at
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
>           at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>           at
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>           at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
>           at
> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
>           at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>           at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>           at
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>           at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>           at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>           at
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
>           at
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:82)
>           at
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:73)
>           at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
>           at
> org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
>           at
> org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
>           at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
>           at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
>           at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
>           at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
>           at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:128)
>           at
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
>           at
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
>           at
> org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
>           at
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
>
>
> Any help or feedback would be appreciated!
>
>
> Thanks,
>
> Michalis
>
>
> On 27-Apr-23 8:45 PM, Jorge Arnulfo Quiané Ruiz wrote:
>> Hi Michalis,
>>
>> Sure! It was more of a heads-up than anything else.
>> Please don’t hesitate to pose your questions here or to call for a meeting if necessary :)
>>
>> Best,
>> Jorge
>>
>>> On 27 Apr 2023, at 16.52, Michalis Vargiamis <mi...@gmail.com> wrote:
>>>
>>> Hi!
>>>
>>>
>>> Thanks! Actually I've lost quite some time re configuring my setup. I currently have a windows laptop, so for the Linux I initially tried the WSL that windows provide but it turned out to make the whole development/debugging process quite inefficient, so i switched to vmware but then again I lost some time with some other stuff. Anyway, I'd have to ask for a bit more time on this one.
>>>
>>>
>>> Thank you,
>>>
>>> Michalis
>>>
>>>
>>> On 27-Apr-23 10:41 AM, Jorge Arnulfo Quiané Ruiz wrote:
>>>> Hi Michalis,
>>>>
>>>> Please let us know if you need help :)
>>>>
>>>> Best,
>>>> Jorge
>>>>
>>>>> On 25 Apr 2023, at 14.07, Zoi Kaoudi <zk...@yahoo.gr.INVALID> wrote:
>>>>>
>>>>> Hi Michalis,
>>>>> can you double check that you define the types of the Tuple2 output?
>>>>>
>>>>>
>>>>> According to the error
>>>>> "Return type
>>>>> PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
>>>>> GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>"it seems like the Tuple2 does not have specific types but they are java.lang.Objects. Maybe that could be the problem.
>>>>>
>>>>> Also to give some context, if you see in the FlinkMaterializedGroupByOperator code there are some utility functions we use to map the Wayang UDFs to the Flink (or Spark) UDFs. For example, the line:
>>>>> final KeySelector<Type, KeyType> keyExtractor =        flinkExecutor.getCompiler().compileKeySelector(this.getKeyDescriptor());
>>>>> converts the Wayang UDF keyDescriptor to a Flink UDF KeySelector. Maybe if you check the code of this class you could spot the problem?
>>>>> Best
>>>>> --
>>>>> Zoi
>>>>>
>>>>>      Στις Τρίτη 25 Απριλίου 2023 στις 01:30:20 μ.μ. CEST, ο χρήστης Michalis Vargiamis <mi...@gmail.com> έγραψε:
>>>>>
>>>>> Hello!
>>>>>
>>>>>
>>>>> I've been working with the missing operator tests for Flink. I've
>>>>> successfully done the SortOperator and the UnionAllOperator by seeing
>>>>> the respective Spark operator tests and modifying RddChannel to
>>>>> DataSetChannel.
>>>>>
>>>>>
>>>>> I'm having trouble with the tests for other operators though, for
>>>>> example the FlinkMaterializedGroupByOperator. I tried starting with
>>>>> SparkMaterializedGroupByOperatorTest and doing the same RddChannel to
>>>>> DataSetChannel modifications as before, but I get the following error:
>>>>>
>>>>>
>>>>> [ERROR]
>>>>> org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution
>>>>> Time elapsed: 1.911 s  <<< ERROR!
>>>>> org.apache.flink.api.common.InvalidProgramException: Return type
>>>>> PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
>>>>> GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>
>>>>> of KeySelector class
>>>>> org.apache.wayang.flink.compiler.KeySelectorFunction is not a valid key type
>>>>>            at
>>>>> org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution(FlinkMaterializedGroupByOperatorTest.java:50)
>>>>>
>>>>>
>>>>> Digging into the operator code a bit more, the error happens at
>>>>>
>>>>> dataSetInput.groupBy(keyExtractor);
>>>>>
>>>>>
>>>>> Any ideas on what should be changed?
>>>>>
>>>>>
>>>>> Here is a permalink to the respective spark test
>>>>> [https://github.com/apache/incubator-wayang/blob/6aad4eea8c91a52f2a41e79424491e6c2c5206af/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/operators/SparkMaterializedGroupByOperatorTest.java]
>>>>>
>>>>>
>>>>> Thank you,
>>>>>
>>>>> Michalis Vargiamis
>>>>>
>    

Re: Flink Operator Tests

Posted by Michalis Vargiamis <mi...@gmail.com>.
Hi there!


Turns out the problem had to do with the version of Flink indeed. As 
seen here

[https://nightlies.apache.org/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#java-11-support-flink-10725]

support for Java 11 starts from Flink 1.10, whereas the project had 
Flink 1.7.1. So after updating it, the error was gone and the tests run 
smoothly. So the tests I have added so far, based on their spark 
counterparts are:

CoGroupOperator
GlobalReduceOperator
MapPartitionsOperator
MaterializedGroupByOperator
SortOperator
UnionAllOperator


I also, changed the Flink version on the wayang-tests-integration pom so 
as to have the same version throughout the project. The integration 
tests also run smoothly. I'm gonna make a pull request with those 
changes. Do we need tests for other Flink operators as well?


Note also that the above link also mentions the kyro error/warning we 
saw with Zoi on my last pull request, and it says "These warnings are 
considered harmless and will be addressed in future Flink releases."


Thanks,

Michalis


On 03-May-23 4:38 PM, Zoi Kaoudi wrote:
>   Hi Michalis,
>
> first, I suggest to make a pull request with what you have that is working.
> Regarding the error, I have not encountered this before. I think there should be a bug in the Flink operator implementation. By browsing the Flink tests we currently have, it seems that ReduceBy and Join also throw some errors. Can you check if the ReduceBy and the Join operator throw the same exception with the one you have?
> If so, I suggest taking one of them and going to the Flink documentation of that operator. It may be that we are using an older API of Flink.
> Best
> --
> Zoi
>
>      Στις Τετάρτη 3 Μαΐου 2023 στις 02:43:44 μ.μ. CEST, ο χρήστης Michalis Vargiamis <mi...@gmail.com> έγραψε:
>   
>   Hello!
>
>
> Regarding the Flink operator tests, there is progress. First of all, I
> looked at the already implemented Spark operator tests, some of them
> were already implemented for Flink and some others did not exist for
> Flink (like the BernoulliSampleOperator). So what I have implemented so
> far are the UnionAllOperator and the SortOperator which went very
> smoothly, and also the MaterializedGroupByOperator for which I had to do
> some tweaks in the test code and also add the following line to the
> operator code
>
> .returns(this.getOutputType().getDataUnitType().getTypeClass());
>
>
> Now, regarding the CoGroupOperator, GlobalReduceOperator,
> MapPartitionsOperator, I get the following error for all of them, that
> has got me kind of stuck:
>
>
> java.lang.IllegalArgumentException
>           at
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown
> Source)
>           at
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown
> Source)
>           at
> org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown
> Source)
>           at
> org.apache.flink.api.java.ClosureCleaner.getClassReader(ClosureCleaner.java:148)
>           at
> org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:115)
>           at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:75)
>           at org.apache.flink.api.java.DataSet.clean(DataSet.java:186)
>           at
> org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets$CoGroupOperatorSetsPredicate$CoGroupOperatorWithoutFunction.with(CoGroupOperator.java:622)
>           at
> org.apache.wayang.flink.operators.FlinkCoGroupOperator.evaluate(FlinkCoGroupOperator.java:116)
>           at
> org.apache.wayang.flink.operators.FlinkOperatorTestBase.evaluate(FlinkOperatorTestBase.java:75)
>           at
> org.apache.wayang.flink.operators.FlinkCoGroupOperatorTest.testExecution(FlinkCoGroupOperatorTest.java:72)
>           at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>           at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>           at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>           at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>           at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>           at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>           at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>           at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>           at
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>           at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>           at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>           at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>           at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>           at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>           at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>           at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>           at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>           at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>           at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>           at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>           at
> org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
>           at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
>           at
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>           at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
>           at
> java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
>           at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
>           at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>           at
> java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
>           at
> java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
>           at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>           at
> java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
>           at
> org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:82)
>           at
> org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:73)
>           at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
>           at
> org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
>           at
> org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
>           at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
>           at
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
>           at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
>           at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
>           at
> org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:128)
>           at
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
>           at
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
>           at
> org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
>           at
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
>
>
> Any help or feedback would be appreciated!
>
>
> Thanks,
>
> Michalis
>
>
> On 27-Apr-23 8:45 PM, Jorge Arnulfo Quiané Ruiz wrote:
>> Hi Michalis,
>>
>> Sure! It was more of a heads-up than anything else.
>> Please don’t hesitate to pose your questions here or to call for a meeting if necessary :)
>>
>> Best,
>> Jorge
>>
>>> On 27 Apr 2023, at 16.52, Michalis Vargiamis <mi...@gmail.com> wrote:
>>>
>>> Hi!
>>>
>>>
>>> Thanks! Actually I've lost quite some time re configuring my setup. I currently have a windows laptop, so for the Linux I initially tried the WSL that windows provide but it turned out to make the whole development/debugging process quite inefficient, so i switched to vmware but then again I lost some time with some other stuff. Anyway, I'd have to ask for a bit more time on this one.
>>>
>>>
>>> Thank you,
>>>
>>> Michalis
>>>
>>>
>>> On 27-Apr-23 10:41 AM, Jorge Arnulfo Quiané Ruiz wrote:
>>>> Hi Michalis,
>>>>
>>>> Please let us know if you need help :)
>>>>
>>>> Best,
>>>> Jorge
>>>>
>>>>> On 25 Apr 2023, at 14.07, Zoi Kaoudi <zk...@yahoo.gr.INVALID> wrote:
>>>>>
>>>>> Hi Michalis,
>>>>> can you double check that you define the types of the Tuple2 output?
>>>>>
>>>>>
>>>>> According to the error
>>>>> "Return type
>>>>> PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
>>>>> GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>"it seems like the Tuple2 does not have specific types but they are java.lang.Objects. Maybe that could be the problem.
>>>>>
>>>>> Also to give some context, if you see in the FlinkMaterializedGroupByOperator code there are some utility functions we use to map the Wayang UDFs to the Flink (or Spark) UDFs. For example, the line:
>>>>> final KeySelector<Type, KeyType> keyExtractor =        flinkExecutor.getCompiler().compileKeySelector(this.getKeyDescriptor());
>>>>> converts the Wayang UDF keyDescriptor to a Flink UDF KeySelector. Maybe if you check the code of this class you could spot the problem?
>>>>> Best
>>>>> --
>>>>> Zoi
>>>>>
>>>>>      Στις Τρίτη 25 Απριλίου 2023 στις 01:30:20 μ.μ. CEST, ο χρήστης Michalis Vargiamis <mi...@gmail.com> έγραψε:
>>>>>
>>>>> Hello!
>>>>>
>>>>>
>>>>> I've been working with the missing operator tests for Flink. I've
>>>>> successfully done the SortOperator and the UnionAllOperator by seeing
>>>>> the respective Spark operator tests and modifying RddChannel to
>>>>> DataSetChannel.
>>>>>
>>>>>
>>>>> I'm having trouble with the tests for other operators though, for
>>>>> example the FlinkMaterializedGroupByOperator. I tried starting with
>>>>> SparkMaterializedGroupByOperatorTest and doing the same RddChannel to
>>>>> DataSetChannel modifications as before, but I get the following error:
>>>>>
>>>>>
>>>>> [ERROR]
>>>>> org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution
>>>>> Time elapsed: 1.911 s  <<< ERROR!
>>>>> org.apache.flink.api.common.InvalidProgramException: Return type
>>>>> PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
>>>>> GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>
>>>>> of KeySelector class
>>>>> org.apache.wayang.flink.compiler.KeySelectorFunction is not a valid key type
>>>>>            at
>>>>> org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution(FlinkMaterializedGroupByOperatorTest.java:50)
>>>>>
>>>>>
>>>>> Digging into the operator code a bit more, the error happens at
>>>>>
>>>>> dataSetInput.groupBy(keyExtractor);
>>>>>
>>>>>
>>>>> Any ideas on what should be changed?
>>>>>
>>>>>
>>>>> Here is a permalink to the respective spark test
>>>>> [https://github.com/apache/incubator-wayang/blob/6aad4eea8c91a52f2a41e79424491e6c2c5206af/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/operators/SparkMaterializedGroupByOperatorTest.java]
>>>>>
>>>>>
>>>>> Thank you,
>>>>>
>>>>> Michalis Vargiamis
>>>>>
>    

Re: Flink Operator Tests

Posted by Zoi Kaoudi <zk...@yahoo.gr.INVALID>.
 Hi Michalis, 

first, I suggest to make a pull request with what you have that is working.
Regarding the error, I have not encountered this before. I think there should be a bug in the Flink operator implementation. By browsing the Flink tests we currently have, it seems that ReduceBy and Join also throw some errors. Can you check if the ReduceBy and the Join operator throw the same exception with the one you have?
If so, I suggest taking one of them and going to the Flink documentation of that operator. It may be that we are using an older API of Flink.
Best
--
Zoi

    Στις Τετάρτη 3 Μαΐου 2023 στις 02:43:44 μ.μ. CEST, ο χρήστης Michalis Vargiamis <mi...@gmail.com> έγραψε:  
 
 Hello!


Regarding the Flink operator tests, there is progress. First of all, I 
looked at the already implemented Spark operator tests, some of them 
were already implemented for Flink and some others did not exist for 
Flink (like the BernoulliSampleOperator). So what I have implemented so 
far are the UnionAllOperator and the SortOperator which went very 
smoothly, and also the MaterializedGroupByOperator for which I had to do 
some tweaks in the test code and also add the following line to the 
operator code

.returns(this.getOutputType().getDataUnitType().getTypeClass());


Now, regarding the CoGroupOperator, GlobalReduceOperator, 
MapPartitionsOperator, I get the following error for all of them, that 
has got me kind of stuck:


java.lang.IllegalArgumentException
         at 
org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown 
Source)
         at 
org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown 
Source)
         at 
org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown 
Source)
         at 
org.apache.flink.api.java.ClosureCleaner.getClassReader(ClosureCleaner.java:148)
         at 
org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:115)
         at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:75)
         at org.apache.flink.api.java.DataSet.clean(DataSet.java:186)
         at 
org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets$CoGroupOperatorSetsPredicate$CoGroupOperatorWithoutFunction.with(CoGroupOperator.java:622)
         at 
org.apache.wayang.flink.operators.FlinkCoGroupOperator.evaluate(FlinkCoGroupOperator.java:116)
         at 
org.apache.wayang.flink.operators.FlinkOperatorTestBase.evaluate(FlinkOperatorTestBase.java:75)
         at 
org.apache.wayang.flink.operators.FlinkCoGroupOperatorTest.testExecution(FlinkCoGroupOperatorTest.java:72)
         at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
         at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
         at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
         at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
         at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
         at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
         at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
         at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
         at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
         at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
         at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
         at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
         at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
         at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
         at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
         at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
         at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
         at 
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
         at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
         at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
         at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
         at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
         at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
         at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
         at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:82)
         at 
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:73)
         at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
         at 
org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
         at 
org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
         at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
         at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
         at 
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
         at 
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
         at 
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:128)
         at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
         at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
         at 
org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
         at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)


Any help or feedback would be appreciated!


Thanks,

Michalis


On 27-Apr-23 8:45 PM, Jorge Arnulfo Quiané Ruiz wrote:
> Hi Michalis,
>
> Sure! It was more of a heads-up than anything else.
> Please don’t hesitate to pose your questions here or to call for a meeting if necessary :)
>
> Best,
> Jorge
>
>> On 27 Apr 2023, at 16.52, Michalis Vargiamis <mi...@gmail.com> wrote:
>>
>> Hi!
>>
>>
>> Thanks! Actually I've lost quite some time re configuring my setup. I currently have a windows laptop, so for the Linux I initially tried the WSL that windows provide but it turned out to make the whole development/debugging process quite inefficient, so i switched to vmware but then again I lost some time with some other stuff. Anyway, I'd have to ask for a bit more time on this one.
>>
>>
>> Thank you,
>>
>> Michalis
>>
>>
>> On 27-Apr-23 10:41 AM, Jorge Arnulfo Quiané Ruiz wrote:
>>> Hi Michalis,
>>>
>>> Please let us know if you need help :)
>>>
>>> Best,
>>> Jorge
>>>
>>>> On 25 Apr 2023, at 14.07, Zoi Kaoudi <zk...@yahoo.gr.INVALID> wrote:
>>>>
>>>> Hi Michalis,
>>>> can you double check that you define the types of the Tuple2 output?
>>>>
>>>>
>>>> According to the error
>>>> "Return type
>>>> PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
>>>> GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>"it seems like the Tuple2 does not have specific types but they are java.lang.Objects. Maybe that could be the problem.
>>>>
>>>> Also to give some context, if you see in the FlinkMaterializedGroupByOperator code there are some utility functions we use to map the Wayang UDFs to the Flink (or Spark) UDFs. For example, the line:
>>>> final KeySelector<Type, KeyType> keyExtractor =        flinkExecutor.getCompiler().compileKeySelector(this.getKeyDescriptor());
>>>> converts the Wayang UDF keyDescriptor to a Flink UDF KeySelector. Maybe if you check the code of this class you could spot the problem?
>>>> Best
>>>> --
>>>> Zoi
>>>>
>>>>    Στις Τρίτη 25 Απριλίου 2023 στις 01:30:20 μ.μ. CEST, ο χρήστης Michalis Vargiamis <mi...@gmail.com> έγραψε:
>>>>
>>>> Hello!
>>>>
>>>>
>>>> I've been working with the missing operator tests for Flink. I've
>>>> successfully done the SortOperator and the UnionAllOperator by seeing
>>>> the respective Spark operator tests and modifying RddChannel to
>>>> DataSetChannel.
>>>>
>>>>
>>>> I'm having trouble with the tests for other operators though, for
>>>> example the FlinkMaterializedGroupByOperator. I tried starting with
>>>> SparkMaterializedGroupByOperatorTest and doing the same RddChannel to
>>>> DataSetChannel modifications as before, but I get the following error:
>>>>
>>>>
>>>> [ERROR]
>>>> org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution
>>>> Time elapsed: 1.911 s  <<< ERROR!
>>>> org.apache.flink.api.common.InvalidProgramException: Return type
>>>> PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
>>>> GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>
>>>> of KeySelector class
>>>> org.apache.wayang.flink.compiler.KeySelectorFunction is not a valid key type
>>>>          at
>>>> org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution(FlinkMaterializedGroupByOperatorTest.java:50)
>>>>
>>>>
>>>> Digging into the operator code a bit more, the error happens at
>>>>
>>>> dataSetInput.groupBy(keyExtractor);
>>>>
>>>>
>>>> Any ideas on what should be changed?
>>>>
>>>>
>>>> Here is a permalink to the respective spark test
>>>> [https://github.com/apache/incubator-wayang/blob/6aad4eea8c91a52f2a41e79424491e6c2c5206af/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/operators/SparkMaterializedGroupByOperatorTest.java]
>>>>
>>>>
>>>> Thank you,
>>>>
>>>> Michalis Vargiamis
>>>>