You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (JIRA)" <ji...@apache.org> on 2019/07/04 01:14:00 UTC

[jira] [Commented] (FLINK-13085) unable to run python test locally

    [ https://issues.apache.org/jira/browse/FLINK-13085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878255#comment-16878255 ] 

Dian Fu commented on FLINK-13085:
---------------------------------

[~phoenixjiangnan] The exception is because the interfaces of Java CSVTableSource has changed and so you need to rebuild the Java package with "mvn clean package -DskipTests" before executing "./dev/lint-python.sh".

> unable to run python test locally
> ---------------------------------
>
>                 Key: FLINK-13085
>                 URL: https://issues.apache.org/jira/browse/FLINK-13085
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>            Reporter: Bowen Li
>            Assignee: sunjincheng
>            Priority: Major
>             Fix For: 1.9.0
>
>
> Ran ./dev/lint-python.sh and got:
> {code:java}
> =================================== FAILURES ===================================
> __________________ ExecutionConfigTests.test_equals_and_hash ___________________
> self = <pyflink.common.tests.test_execution_config.ExecutionConfigTests testMethod=test_equals_and_hash>
>     def test_equals_and_hash(self):
>         config1 = ExecutionEnvironment.get_execution_environment().get_config()
>         config2 = ExecutionEnvironment.get_execution_environment().get_config()
>         self.assertEqual(config1, config2)
>         self.assertEqual(hash(config1), hash(config2))
>         config1.set_parallelism(12)
>         self.assertNotEqual(config1, config2)
> >       self.assertNotEqual(hash(config1), hash(config2))
> E       AssertionError: -1960065877 == -1960065877
> pyflink/common/tests/test_execution_config.py:293: AssertionError
> ______________ ExecutionEnvironmentTests.test_get_execution_plan _______________
> self = <pyflink.dataset.tests.test_execution_environment.ExecutionEnvironmentTests testMethod=test_get_execution_plan>
>     def test_get_execution_plan(self):
>         tmp_dir = tempfile.gettempdir()
>         source_path = os.path.join(tmp_dir + '/streaming.csv')
>         tmp_csv = os.path.join(tmp_dir + '/streaming2.csv')
>         field_names = ["a", "b", "c"]
>         field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
>         t_env = BatchTableEnvironment.create(self.env)
>         csv_source = CsvTableSource(source_path, field_names, field_types)
>         t_env.register_table_source("Orders", csv_source)
>         t_env.register_table_sink(
>             "Results",
>             CsvTableSink(field_names, field_types, tmp_csv))
> >       t_env.scan("Orders").insert_into("Results")
> pyflink/dataset/tests/test_execution_environment.py:111:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> pyflink/table/table.py:583: in insert_into
>     self._j_table.insertInto(table_path, j_table_path)
> .tox/py27/lib/python2.7/site-packages/py4j/java_gateway.py:1286: in __call__
>     answer, self.gateway_client, self.target_id, self.name)
> pyflink/util/exceptions.py:139: in deco
>     return f(*a, **kw)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> answer = 'xro290'
> gateway_client = <py4j.java_gateway.GatewayClient object at 0x10bd139d0>
> target_id = 'o288', name = 'insertInto'
>     def get_return_value(answer, gateway_client, target_id=None, name=None):
>         """Converts an answer received from the Java gateway into a Python object.
>         For example, string representation of integers are converted to Python
>         integer, string representation of objects are converted to JavaObject
>         instances, etc.
>         :param answer: the string returned by the Java gateway
>         :param gateway_client: the gateway client used to communicate with the Java
>             Gateway. Only necessary if the answer is a reference (e.g., object,
>             list, map)
>         :param target_id: the name of the object from which the answer comes from
>             (e.g., *object1* in `object1.hello()`). Optional.
>         :param name: the name of the member from which the answer comes from
>             (e.g., *hello* in `object1.hello()`). Optional.
>         """
>         if is_error(answer)[0]:
>             if len(answer) > 1:
>                 type = answer[1]
>                 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
>                 if answer[1] == REFERENCE_TYPE:
>                     raise Py4JJavaError(
>                         "An error occurred while calling {0}{1}{2}.\n".
> >                       format(target_id, ".", name), value)
> E                   Py4JJavaError: An error occurred while calling o288.insertInto.
> E                   : java.lang.NullPointerException
> E                   	at org.apache.flink.api.common.io.FileOutputFormat.setWriteMode(FileOutputFormat.java:146)
> E                   	at org.apache.flink.api.java.DataSet.writeAsText(DataSet.java:1510)
> E                   	at org.apache.flink.table.sinks.CsvTableSink.emitDataSet(CsvTableSink.scala:76)
> E                   	at org.apache.flink.table.api.internal.BatchTableEnvImpl.writeToSink(BatchTableEnvImpl.scala:128)
> E                   	at org.apache.flink.table.api.internal.TableEnvImpl.insertInto(TableEnvImpl.scala:494)
> E                   	at org.apache.flink.table.api.internal.TableEnvImpl.insertInto(TableEnvImpl.scala:458)
> E                   	at org.apache.flink.table.api.internal.TableEnvImpl.insertInto(TableEnvImpl.scala:465)
> E                   	at org.apache.flink.table.api.internal.BatchTableEnvImpl.insertInto(BatchTableEnvImpl.scala:57)
> E                   	at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:408)
> E                   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> E                   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> E                   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> E                   	at java.lang.reflect.Method.invoke(Method.java:498)
> E                   	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> E                   	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> E                   	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> E                   	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> E                   	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> E                   	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> E                   	at java.lang.Thread.run(Thread.java:748)
> .tox/py27/lib/python2.7/site-packages/py4j/protocol.py:328: Py4JJavaError
> ___________ StreamExecutionEnvironmentTests.test_get_execution_plan ____________
> self = <pyflink.datastream.tests.test_stream_execution_environment.StreamExecutionEnvironmentTests testMethod=test_get_execution_plan>
>     def test_get_execution_plan(self):
>         tmp_dir = tempfile.gettempdir()
>         source_path = os.path.join(tmp_dir + '/streaming.csv')
>         tmp_csv = os.path.join(tmp_dir + '/streaming2.csv')
>         field_names = ["a", "b", "c"]
>         field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()]
>         t_env = StreamTableEnvironment.create(self.env)
>         csv_source = CsvTableSource(source_path, field_names, field_types)
>         t_env.register_table_source("Orders", csv_source)
>         t_env.register_table_sink(
>             "Results",
>             CsvTableSink(field_names, field_types, tmp_csv))
> >       t_env.scan("Orders").insert_into("Results")
> pyflink/datastream/tests/test_stream_execution_environment.py:188:
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> pyflink/table/table.py:583: in insert_into
>     self._j_table.insertInto(table_path, j_table_path)
> .tox/py27/lib/python2.7/site-packages/py4j/java_gateway.py:1286: in __call__
>     answer, self.gateway_client, self.target_id, self.name)
> pyflink/util/exceptions.py:139: in deco
>     return f(*a, **kw)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> answer = 'xro653'
> gateway_client = <py4j.java_gateway.GatewayClient object at 0x10bd139d0>
> target_id = 'o651', name = 'insertInto'
>     def get_return_value(answer, gateway_client, target_id=None, name=None):
>         """Converts an answer received from the Java gateway into a Python object.
>         For example, string representation of integers are converted to Python
>         integer, string representation of objects are converted to JavaObject
>         instances, etc.
>         :param answer: the string returned by the Java gateway
>         :param gateway_client: the gateway client used to communicate with the Java
>             Gateway. Only necessary if the answer is a reference (e.g., object,
>             list, map)
>         :param target_id: the name of the object from which the answer comes from
>             (e.g., *object1* in `object1.hello()`). Optional.
>         :param name: the name of the member from which the answer comes from
>             (e.g., *hello* in `object1.hello()`). Optional.
>         """
>         if is_error(answer)[0]:
>             if len(answer) > 1:
>                 type = answer[1]
>                 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
>                 if answer[1] == REFERENCE_TYPE:
>                     raise Py4JJavaError(
>                         "An error occurred while calling {0}{1}{2}.\n".
> >                       format(target_id, ".", name), value)
> E                   Py4JJavaError: An error occurred while calling o651.insertInto.
> E                   : java.lang.NullPointerException
> E                   	at org.apache.flink.api.common.io.FileOutputFormat.setWriteMode(FileOutputFormat.java:146)
> E                   	at org.apache.flink.streaming.api.datastream.DataStream.writeAsText(DataStream.java:1044)
> E                   	at org.apache.flink.table.sinks.CsvTableSink.emitDataStream(CsvTableSink.scala:95)
> E                   	at org.apache.flink.table.sinks.StreamTableSink.consumeDataStream(StreamTableSink.java:48)
> E                   	at org.apache.flink.table.planner.StreamPlanner.writeToAppendSink(StreamPlanner.scala:322)
> E                   	at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$writeToSink(StreamPlanner.scala:263)
> E                   	at org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:164)
> E                   	at org.apache.flink.table.planner.StreamPlanner$$anonfun$2.apply(StreamPlanner.scala:162)
> E                   	at scala.Option.map(Option.scala:146)
> E                   	at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:162)
> E                   	at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:124)
> E                   	at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:124)
> E                   	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> E                   	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> E                   	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> E                   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> E                   	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> E                   	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> E                   	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> E                   	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> E                   	at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:124)
> E                   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:279)
> E                   	at org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:289)
> E                   	at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:408)
> E                   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> E                   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> E                   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> E                   	at java.lang.reflect.Method.invoke(Method.java:498)
> E                   	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> E                   	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> E                   	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> E                   	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> E                   	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> E                   	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> E                   	at java.lang.Thread.run(Thread.java:748)
> .tox/py27/lib/python2.7/site-packages/py4j/protocol.py:328: Py4JJavaError
> ==================== 3 failed, 332 passed in 29.68 seconds =====================
> ERROR: InvocationError for command /Users/bowen.li/Desktop/git/flink/flink-python/.tox/py27/bin/pytest (exited with code 1)
> py33 recreate: /Users/bowen.li/Desktop/git/flink/flink-python/.tox/py33
> py33 installdeps: pytest
> ....
> ============== 3 failed, 332 passed, 5 warnings in 22.78 seconds ===============
> ERROR: InvocationError for command /Users/bowen.li/Desktop/git/flink/flink-python/.tox/py37/bin/pytest (exited with code 1)
> ___________________________________ summary ____________________________________
> ERROR:   py27: commands failed
> ERROR:   py33: commands failed
> ERROR:   py34: commands failed
> ERROR:   py35: commands failed
> ERROR:   py36: commands failed
> ERROR:   py37: commands failed
> =============tox checks... [FAILED]=============
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)