You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by davis k <fo...@gmail.com> on 2016/08/15 20:14:27 UTC

Error joining with Python API

I've got an issue performing joins using Python API in flink-1.1.1. With
this example code get an NPE (below). However, the NPE disappears when the
filter is removed. Is there an error I'm making in this brief example or is
this a Flink bug?



    env = get_environment()
    env.set_parallelism(1)

    input1 = env.from_elements("1|0","1|2") \
        .map(lambda x: x.split("|"))

    input2 = env.from_elements("1|b") \
        .map(lambda x: x.split("|")) \
        .filter(lambda x: x[0] != "0")


    joined = input1 \
        .join(input2) \
        .where(0) \
        .equal_to(0) \
        .write_text("output.txt", write_mode=WriteMode.OVERWRITE)

    env.execute(local=True)





------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
        at
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
        at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: java.lang.NullPointerException
        at
org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64)
        at
org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59)
        at
org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55)
        at
org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850)
        at org.apache.flink.api.java.DataSet.join(DataSet.java:742)
        at
org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599)
        at
org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591)
        at
org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360)
        at
org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235)
        at
org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139)
        at
org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
        ... 6 more

Re: Error joining with Python API

Posted by davis k <fo...@gmail.com>.
Awesome, thanks Chesnay!

On Wed, Aug 17, 2016 at 2:58 AM, Chesnay Schepler <ch...@apache.org>
wrote:

> Found the issue, there was a missing tab in the chaining method...
>
>
> On 16.08.2016 12:12, Chesnay Schepler wrote:
>
>> looks like a bug, will look into it. :)
>>
>> On 16.08.2016 10:29, Ufuk Celebi wrote:
>>
>>> I think that this is actually a bug in Flink. I'm cc'ing Chesnay who
>>> originally contributed the Python API. He can probably tell whether
>>> this is a bug in the Python API or Flink ioperator side of things. ;)
>>>
>>> On Mon, Aug 15, 2016 at 10:14 PM, davis k <fo...@gmail.com>
>>> wrote:
>>>
>>>> I've got an issue performing joins using Python API in flink-1.1.1. With
>>>> this example code get an NPE (below). However, the NPE disappears when
>>>> the
>>>> filter is removed. Is there an error I'm making in this brief example
>>>> or is
>>>> this a Flink bug?
>>>>
>>>>
>>>>
>>>>      env = get_environment()
>>>>      env.set_parallelism(1)
>>>>
>>>>      input1 = env.from_elements("1|0","1|2") \
>>>>          .map(lambda x: x.split("|"))
>>>>
>>>>      input2 = env.from_elements("1|b") \
>>>>          .map(lambda x: x.split("|")) \
>>>>          .filter(lambda x: x[0] != "0")
>>>>
>>>>
>>>>      joined = input1 \
>>>>          .join(input2) \
>>>>          .where(0) \
>>>>          .equal_to(0) \
>>>>          .write_text("output.txt", write_mode=WriteMode.OVERWRITE)
>>>>
>>>>      env.execute(local=True)
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------------------------------------
>>>>   The program finished with the following exception:
>>>>
>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>> method
>>>> caused an error.
>>>>          at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
>>>>
>>>>          at
>>>> org.apache.flink.client.program.PackagedProgram.invokeIntera
>>>> ctiveModeForExecution(PackagedProgram.java:403)
>>>>          at
>>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>>>>
>>>>          at
>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>>>>
>>>>          at org.apache.flink.client.CliFrontend.run(CliFrontend.java:
>>>> 253)
>>>>          at
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>>>>
>>>>          at org.apache.flink.client.CliFrontend.main(CliFrontend.java:
>>>> 1048)
>>>> Caused by: java.lang.NullPointerException
>>>>          at
>>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBas
>>>> e.<init>(JoinOperatorSetsBase.java:64)
>>>>          at
>>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBas
>>>> e.<init>(JoinOperatorSetsBase.java:59)
>>>>          at
>>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBas
>>>> e.<init>(JoinOperatorSetsBase.java:55)
>>>>          at
>>>> org.apache.flink.api.java.operators.JoinOperator$JoinOperato
>>>> rSets.<init>(JoinOperator.java:850)
>>>>          at org.apache.flink.api.java.DataSet.join(DataSet.java:742)
>>>>          at
>>>> org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599)
>>>>
>>>>          at
>>>> org.apache.flink.python.api.PythonPlanBinder.createJoinOpera
>>>> tion(PythonPlanBinder.java:591)
>>>>          at
>>>> org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360)
>>>>
>>>>          at
>>>> org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235)
>>>>
>>>>          at
>>>> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139)
>>>>
>>>>          at
>>>> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112)
>>>>
>>>>          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>          at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>
>>>>          at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>
>>>>          at java.lang.reflect.Method.invoke(Method.java:497)
>>>>          at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>>>>
>>>>          ... 6 more
>>>>
>>>
>>
>>
>

Re: Error joining with Python API

Posted by Chesnay Schepler <ch...@apache.org>.
Found the issue, there was a missing tab in the chaining method...

On 16.08.2016 12:12, Chesnay Schepler wrote:
> looks like a bug, will look into it. :)
>
> On 16.08.2016 10:29, Ufuk Celebi wrote:
>> I think that this is actually a bug in Flink. I'm cc'ing Chesnay who
>> originally contributed the Python API. He can probably tell whether
>> this is a bug in the Python API or Flink ioperator side of things. ;)
>>
>> On Mon, Aug 15, 2016 at 10:14 PM, davis k 
>> <fo...@gmail.com> wrote:
>>> I've got an issue performing joins using Python API in flink-1.1.1. 
>>> With
>>> this example code get an NPE (below). However, the NPE disappears 
>>> when the
>>> filter is removed. Is there an error I'm making in this brief 
>>> example or is
>>> this a Flink bug?
>>>
>>>
>>>
>>>      env = get_environment()
>>>      env.set_parallelism(1)
>>>
>>>      input1 = env.from_elements("1|0","1|2") \
>>>          .map(lambda x: x.split("|"))
>>>
>>>      input2 = env.from_elements("1|b") \
>>>          .map(lambda x: x.split("|")) \
>>>          .filter(lambda x: x[0] != "0")
>>>
>>>
>>>      joined = input1 \
>>>          .join(input2) \
>>>          .where(0) \
>>>          .equal_to(0) \
>>>          .write_text("output.txt", write_mode=WriteMode.OVERWRITE)
>>>
>>>      env.execute(local=True)
>>>
>>>
>>>
>>>
>>>
>>> ------------------------------------------------------------
>>>   The program finished with the following exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main 
>>> method
>>> caused an error.
>>>          at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524) 
>>>
>>>          at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) 
>>>
>>>          at
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) 
>>>
>>>          at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) 
>>>
>>>          at 
>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>>>          at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) 
>>>
>>>          at 
>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
>>> Caused by: java.lang.NullPointerException
>>>          at
>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64) 
>>>
>>>          at
>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59) 
>>>
>>>          at
>>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55) 
>>>
>>>          at
>>> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850) 
>>>
>>>          at org.apache.flink.api.java.DataSet.join(DataSet.java:742)
>>>          at
>>> org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599) 
>>>
>>>          at
>>> org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591) 
>>>
>>>          at
>>> org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360) 
>>>
>>>          at
>>> org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235) 
>>>
>>>          at
>>> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139) 
>>>
>>>          at
>>> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112) 
>>>
>>>          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>          at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
>>>
>>>          at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
>>>
>>>          at java.lang.reflect.Method.invoke(Method.java:497)
>>>          at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) 
>>>
>>>          ... 6 more
>
>


Re: Error joining with Python API

Posted by Chesnay Schepler <ch...@apache.org>.
looks like a bug, will look into it. :)

On 16.08.2016 10:29, Ufuk Celebi wrote:
> I think that this is actually a bug in Flink. I'm cc'ing Chesnay who
> originally contributed the Python API. He can probably tell whether
> this is a bug in the Python API or Flink ioperator side of things. ;)
>
> On Mon, Aug 15, 2016 at 10:14 PM, davis k <fo...@gmail.com> wrote:
>> I've got an issue performing joins using Python API in flink-1.1.1. With
>> this example code get an NPE (below). However, the NPE disappears when the
>> filter is removed. Is there an error I'm making in this brief example or is
>> this a Flink bug?
>>
>>
>>
>>      env = get_environment()
>>      env.set_parallelism(1)
>>
>>      input1 = env.from_elements("1|0","1|2") \
>>          .map(lambda x: x.split("|"))
>>
>>      input2 = env.from_elements("1|b") \
>>          .map(lambda x: x.split("|")) \
>>          .filter(lambda x: x[0] != "0")
>>
>>
>>      joined = input1 \
>>          .join(input2) \
>>          .where(0) \
>>          .equal_to(0) \
>>          .write_text("output.txt", write_mode=WriteMode.OVERWRITE)
>>
>>      env.execute(local=True)
>>
>>
>>
>>
>>
>> ------------------------------------------------------------
>>   The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main method
>> caused an error.
>>          at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
>>          at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>          at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>>          at
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>>          at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>>          at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>>          at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
>> Caused by: java.lang.NullPointerException
>>          at
>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64)
>>          at
>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59)
>>          at
>> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55)
>>          at
>> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850)
>>          at org.apache.flink.api.java.DataSet.join(DataSet.java:742)
>>          at
>> org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599)
>>          at
>> org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591)
>>          at
>> org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360)
>>          at
>> org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235)
>>          at
>> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139)
>>          at
>> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112)
>>          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>          at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>          at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>          at java.lang.reflect.Method.invoke(Method.java:497)
>>          at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>>          ... 6 more


Re: Error joining with Python API

Posted by Ufuk Celebi <uc...@apache.org>.
I think that this is actually a bug in Flink. I'm cc'ing Chesnay who
originally contributed the Python API. He can probably tell whether
this is a bug in the Python API or Flink ioperator side of things. ;)

On Mon, Aug 15, 2016 at 10:14 PM, davis k <fo...@gmail.com> wrote:
> I've got an issue performing joins using Python API in flink-1.1.1. With
> this example code get an NPE (below). However, the NPE disappears when the
> filter is removed. Is there an error I'm making in this brief example or is
> this a Flink bug?
>
>
>
>     env = get_environment()
>     env.set_parallelism(1)
>
>     input1 = env.from_elements("1|0","1|2") \
>         .map(lambda x: x.split("|"))
>
>     input2 = env.from_elements("1|b") \
>         .map(lambda x: x.split("|")) \
>         .filter(lambda x: x[0] != "0")
>
>
>     joined = input1 \
>         .join(input2) \
>         .where(0) \
>         .equal_to(0) \
>         .write_text("output.txt", write_mode=WriteMode.OVERWRITE)
>
>     env.execute(local=True)
>
>
>
>
>
> ------------------------------------------------------------
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>         at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>         at
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: java.lang.NullPointerException
>         at
> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:64)
>         at
> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:59)
>         at
> org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.<init>(JoinOperatorSetsBase.java:55)
>         at
> org.apache.flink.api.java.operators.JoinOperator$JoinOperatorSets.<init>(JoinOperator.java:850)
>         at org.apache.flink.api.java.DataSet.join(DataSet.java:742)
>         at
> org.apache.flink.python.api.PythonPlanBinder.createDefaultJoin(PythonPlanBinder.java:599)
>         at
> org.apache.flink.python.api.PythonPlanBinder.createJoinOperation(PythonPlanBinder.java:591)
>         at
> org.apache.flink.python.api.PythonPlanBinder.receiveOperations(PythonPlanBinder.java:360)
>         at
> org.apache.flink.python.api.PythonPlanBinder.receivePlan(PythonPlanBinder.java:235)
>         at
> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:139)
>         at
> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:112)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>         ... 6 more