You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by davis k <fo...@gmail.com> on 2016/08/15 20:14:27 UTC
Error joining with Python API
I've got an issue performing joins using Python API in flink-1.1.1. With
this example code get an NPE (below). However, the NPE disappears when the
filter is removed. Is there an error I'm making in this brief example or is
this a Flink bug?
env = get_environment()
env.set_parallelism(1)
input1 = env.from_elements("1|0","1|2") \
.map(lambda x: x.split("|"))
input2 = env.from_elements("1|b") \
.map(lambda x: x.split("|")) \
.filter(lambda x: x[0] != "0")
joined = input1 \
.join(input2) \
.where(0) \
.equal_to(0) \
.write_text("output.txt", write_mode=WriteMode.OVERWRITE)
env.execute(local=True)
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
at
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: java.lang.NullPointerException
at
org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64)
at
org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59)
at
org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55)
at
org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850)
at org.apache.flink.api.java.DataSet.join(DataSet.java:742)
at
org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599)
at
org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591)
at
org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360)
at
org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235)
at
org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139)
at
org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
... 6 more
Re: Error joining with Python API
Posted by davis k <fo...@gmail.com>.
Awesome, thanks Chesnay!
On Wed, Aug 17, 2016 at 2:58 AM, Chesnay Schepler <ch...@apache.org>
wrote:
> Found the issue, there was a missing tab in the chaining method...
>
>
> On 16.08.2016 12:12, Chesnay Schepler wrote:
>
>> looks like a bug, will look into it. :)
>>
>> On 16.08.2016 10:29, Ufuk Celebi wrote:
>>
>>> I think that this is actually a bug in Flink. I'm cc'ing Chesnay who
>>> originally contributed the Python API. He can probably tell whether
>>> this is a bug in the Python API or Flink ioperator side of things. ;)
>>>
>>> On Mon, Aug 15, 2016 at 10:14 PM, davis k <fo...@gmail.com>
>>> wrote:
>>>
>>>> I've got an issue performing joins using Python API in flink-1.1.1. With
>>>> this example code get an NPE (below). However, the NPE disappears when
>>>> the
>>>> filter is removed. Is there an error I'm making in this brief example
>>>> or is
>>>> this a Flink bug?
>>>>
>>>>
>>>>
>>>> env = get_environment()
>>>> env.set_parallelism(1)
>>>>
>>>> input1 = env.from_elements("1|0","1|2") \
>>>> .map(lambda x: x.split("|"))
>>>>
>>>> input2 = env.from_elements("1|b") \
>>>> .map(lambda x: x.split("|")) \
>>>> .filter(lambda x: x[0] != "0")
>>>>
>>>>
>>>> joined = input1 \
>>>> .join(input2) \
>>>> .where(0) \
>>>> .equal_to(0) \
>>>> .write_text("output.txt", write_mode=WriteMode.OVERWRITE)
>>>>
>>>> env.execute(local=True)
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------------------------------------
>>>> The program finished with the following exception:
>>>>
>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>> method
>>>> caused an error.
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
>>>>
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.invokeIntera
>>>> ctiveModeForExecution(PackagedProgram.java:403)
>>>> at
>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>>>>
>>>> at
>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>>>>
>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:
>>>> 253)
>>>> at
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>>>>
>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:
>>>> 1048)
>>>> Caused by: java.lang.NullPointerException
>>>> at
>>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBas
>>>> e.<init>(JoinOperatorSetsBase.java:64)
>>>> at
>>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBas
>>>> e.<init>(JoinOperatorSetsBase.java:59)
>>>> at
>>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBas
>>>> e.<init>(JoinOperatorSetsBase.java:55)
>>>> at
>>>> org.apache.flink.api.java.operators.JoinOperator$JoinOperato
>>>> rSets.<init>(JoinOperator.java:850)
>>>> at org.apache.flink.api.java.DataSet.join(DataSet.java:742)
>>>> at
>>>> org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599)
>>>>
>>>> at
>>>> org.apache.flink.python.api.PythonPlanBinder.createJoinOpera
>>>> tion(PythonPlanBinder.java:591)
>>>> at
>>>> org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360)
>>>>
>>>> at
>>>> org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235)
>>>>
>>>> at
>>>> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139)
>>>>
>>>> at
>>>> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112)
>>>>
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>
>>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>>> at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>>>>
>>>> ... 6 more
>>>>
>>>
>>
>>
>
Re: Error joining with Python API
Posted by Chesnay Schepler <ch...@apache.org>.
Found the issue, there was a missing tab in the chaining method...
On 16.08.2016 12:12, Chesnay Schepler wrote:
> looks like a bug, will look into it. :)
>
> On 16.08.2016 10:29, Ufuk Celebi wrote:
>> I think that this is actually a bug in Flink. I'm cc'ing Chesnay who
>> originally contributed the Python API. He can probably tell whether
>> this is a bug in the Python API or Flink ioperator side of things. ;)
>>
>> On Mon, Aug 15, 2016 at 10:14 PM, davis k
>> <fo...@gmail.com> wrote:
>>> I've got an issue performing joins using Python API in flink-1.1.1.
>>> With
>>> this example code get an NPE (below). However, the NPE disappears
>>> when the
>>> filter is removed. Is there an error I'm making in this brief
>>> example or is
>>> this a Flink bug?
>>>
>>>
>>>
>>> env = get_environment()
>>> env.set_parallelism(1)
>>>
>>> input1 = env.from_elements("1|0","1|2") \
>>> .map(lambda x: x.split("|"))
>>>
>>> input2 = env.from_elements("1|b") \
>>> .map(lambda x: x.split("|")) \
>>> .filter(lambda x: x[0] != "0")
>>>
>>>
>>> joined = input1 \
>>> .join(input2) \
>>> .where(0) \
>>> .equal_to(0) \
>>> .write_text("output.txt", write_mode=WriteMode.OVERWRITE)
>>>
>>> env.execute(local=True)
>>>
>>>
>>>
>>>
>>>
>>> ------------------------------------------------------------
>>> The program finished with the following exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method
>>> caused an error.
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>>>
>>> at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>>>
>>> at
>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>>> at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>>>
>>> at
>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
>>> Caused by: java.lang.NullPointerException
>>> at
>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64)
>>>
>>> at
>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59)
>>>
>>> at
>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55)
>>>
>>> at
>>> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850)
>>>
>>> at org.apache.flink.api.java.DataSet.join(DataSet.java:742)
>>> at
>>> org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599)
>>>
>>> at
>>> org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591)
>>>
>>> at
>>> org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360)
>>>
>>> at
>>> org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235)
>>>
>>> at
>>> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139)
>>>
>>> at
>>> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>>>
>>> ... 6 more
>
>
Re: Error joining with Python API
Posted by Chesnay Schepler <ch...@apache.org>.
looks like a bug, will look into it. :)
On 16.08.2016 10:29, Ufuk Celebi wrote:
> I think that this is actually a bug in Flink. I'm cc'ing Chesnay who
> originally contributed the Python API. He can probably tell whether
> this is a bug in the Python API or Flink ioperator side of things. ;)
>
> On Mon, Aug 15, 2016 at 10:14 PM, davis k <fo...@gmail.com> wrote:
>> I've got an issue performing joins using Python API in flink-1.1.1. With
>> this example code get an NPE (below). However, the NPE disappears when the
>> filter is removed. Is there an error I'm making in this brief example or is
>> this a Flink bug?
>>
>>
>>
>> env = get_environment()
>> env.set_parallelism(1)
>>
>> input1 = env.from_elements("1|0","1|2") \
>> .map(lambda x: x.split("|"))
>>
>> input2 = env.from_elements("1|b") \
>> .map(lambda x: x.split("|")) \
>> .filter(lambda x: x[0] != "0")
>>
>>
>> joined = input1 \
>> .join(input2) \
>> .where(0) \
>> .equal_to(0) \
>> .write_text("output.txt", write_mode=WriteMode.OVERWRITE)
>>
>> env.execute(local=True)
>>
>>
>>
>>
>>
>> ------------------------------------------------------------
>> The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main method
>> caused an error.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>> at
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>> at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
>> Caused by: java.lang.NullPointerException
>> at
>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64)
>> at
>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59)
>> at
>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55)
>> at
>> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850)
>> at org.apache.flink.api.java.DataSet.join(DataSet.java:742)
>> at
>> org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599)
>> at
>> org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591)
>> at
>> org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360)
>> at
>> org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235)
>> at
>> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139)
>> at
>> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>> ... 6 more
Re: Error joining with Python API
Posted by Ufuk Celebi <uc...@apache.org>.
I think that this is actually a bug in Flink. I'm cc'ing Chesnay who
originally contributed the Python API. He can probably tell whether
this is a bug in the Python API or Flink ioperator side of things. ;)
On Mon, Aug 15, 2016 at 10:14 PM, davis k <fo...@gmail.com> wrote:
> I've got an issue performing joins using Python API in flink-1.1.1. With
> this example code get an NPE (below). However, the NPE disappears when the
> filter is removed. Is there an error I'm making in this brief example or is
> this a Flink bug?
>
>
>
> env = get_environment()
> env.set_parallelism(1)
>
> input1 = env.from_elements("1|0","1|2") \
> .map(lambda x: x.split("|"))
>
> input2 = env.from_elements("1|b") \
> .map(lambda x: x.split("|")) \
> .filter(lambda x: x[0] != "0")
>
>
> joined = input1 \
> .join(input2) \
> .where(0) \
> .equal_to(0) \
> .write_text("output.txt", write_mode=WriteMode.OVERWRITE)
>
> env.execute(local=True)
>
>
>
>
>
> ------------------------------------------------------------
> The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
> at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64)
> at
> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59)
> at
> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55)
> at
> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850)
> at org.apache.flink.api.java.DataSet.join(DataSet.java:742)
> at
> org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599)
> at
> org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591)
> at
> org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360)
> at
> org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235)
> at
> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139)
> at
> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
> ... 6 more