You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Brian Hulette (Jira)" <ji...@apache.org> on 2021/10/15 16:48:00 UTC

[jira] [Updated] (BEAM-12804) Issue with SQLTransform in python 3.7.11

     [ https://issues.apache.org/jira/browse/BEAM-12804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Brian Hulette updated BEAM-12804:
---------------------------------
    Description: 
[JavaJarServer](https://github.com/apache/beam/blob/fdf06361cd609335dc6c9763fb09f4e6b3e29e36/sdks/python/apache_beam/utils/subprocess_server.py#L151) should prefer the java binary from $JAVA_HOME if set.

h2. Original Report:

Using a SQLTransform results in an error.

 

python --version

Python 3.7.11

 

pip list | grep apache-beam

*apache-beam*        2.31.0

 

The following code works if removing the SQLTransform:

```
import itertools
import csv
import io

import apache_beam as beam
from apache_beam.dataframe.io import read_csv
from apache_beam.transforms.sql import SqlTransform


def parse_csv(val):
deflower_headers(iterator):
return itertools.chain([next(iterator).lower()], iterator)
return csv.DictReader(lower_headers(io.TextIOWrapper(val.open())))


class BeamTransformBuilder():

defbuild(self, pipeline):
practices = (
pipeline
| beam.io.fileio.MatchFiles("data.csv")
| beam.io.fileio.ReadMatches()
| beam.Reshuffle()
| beam.FlatMap(parse_csv)
| beam.Map(lambda x: beam.Row(id="test-id"))
| SqlTransform("""
SELECT
id
FROM PCOLLECTION""")
)
print("should print stuff")
practices | beam.Map(print)


def main():
builder = BeamTransformBuilder()
with beam.Pipeline('DirectRunner') as p:
builder.build(p)


if __name__ == '__main__':
main()
```
 

error message:

    main()    main()  File "./lib/transforms/care_site.py", line 38, in main    builder.build(p)  File "./lib/transforms/care_site.py", line 29, in build    FROM PCOLLECTION""")  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/pvalue.py", line 136, in __or__    return self.pipeline.apply(ptransform, self)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/pipeline.py", line 694, in apply    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 185, in apply    return m(transform, input, options)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform    return transform.expand(input)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/transforms/external.py", line 295, in expand    response = service.Expand(request)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/grpc/_channel.py", line 946, in __call__    return _end_unary_response_blocking(state, call, False, None)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking    raise _InactiveRpcError(state)grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNKNOWN details = "" debug_error_string = "\{"created":"@1629934400.533958000","description":"Error received from peer ipv6:[::1]:50780","file":"src/core/lib/surface/call.cc","file_line":1070,"grpc_message":"","grpc_status":2}"

  was:
Using a SQLTransform results in an error.

 

python --version

Python 3.7.11

 

pip list | grep apache-beam

*apache-beam*        2.31.0

 

The following code works if removing the SQLTransform:

```
import itertools
import csv
import io

import apache_beam as beam
from apache_beam.dataframe.io import read_csv
from apache_beam.transforms.sql import SqlTransform


def parse_csv(val):
deflower_headers(iterator):
return itertools.chain([next(iterator).lower()], iterator)
return csv.DictReader(lower_headers(io.TextIOWrapper(val.open())))


class BeamTransformBuilder():

defbuild(self, pipeline):
practices = (
pipeline
| beam.io.fileio.MatchFiles("data.csv")
| beam.io.fileio.ReadMatches()
| beam.Reshuffle()
| beam.FlatMap(parse_csv)
| beam.Map(lambda x: beam.Row(id="test-id"))
| SqlTransform("""
SELECT
id
FROM PCOLLECTION""")
)
print("should print stuff")
practices | beam.Map(print)


def main():
builder = BeamTransformBuilder()
with beam.Pipeline('DirectRunner') as p:
builder.build(p)


if __name__ == '__main__':
main()
```
 

error message:

    main()    main()  File "./lib/transforms/care_site.py", line 38, in main    builder.build(p)  File "./lib/transforms/care_site.py", line 29, in build    FROM PCOLLECTION""")  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/pvalue.py", line 136, in __or__    return self.pipeline.apply(ptransform, self)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/pipeline.py", line 694, in apply    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 185, in apply    return m(transform, input, options)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform    return transform.expand(input)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/transforms/external.py", line 295, in expand    response = service.Expand(request)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/grpc/_channel.py", line 946, in __call__    return _end_unary_response_blocking(state, call, False, None)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking    raise _InactiveRpcError(state)grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNKNOWN details = "" debug_error_string = "\{"created":"@1629934400.533958000","description":"Error received from peer ipv6:[::1]:50780","file":"src/core/lib/surface/call.cc","file_line":1070,"grpc_message":"","grpc_status":2}"


> Issue with SQLTransform in python 3.7.11
> ----------------------------------------
>
>                 Key: BEAM-12804
>                 URL: https://issues.apache.org/jira/browse/BEAM-12804
>             Project: Beam
>          Issue Type: Bug
>          Components: cross-language, sdk-py-core
>            Reporter: sean teeling
>            Priority: P2
>
> [JavaJarServer](https://github.com/apache/beam/blob/fdf06361cd609335dc6c9763fb09f4e6b3e29e36/sdks/python/apache_beam/utils/subprocess_server.py#L151) should prefer the java binary from $JAVA_HOME if set.
> h2. Original Report:
> Using a SQLTransform results in an error.
>  
> python --version
> Python 3.7.11
>  
> pip list | grep apache-beam
> *apache-beam*        2.31.0
>  
> The following code works if removing the SQLTransform:
> ```
> import itertools
> import csv
> import io
> import apache_beam as beam
> from apache_beam.dataframe.io import read_csv
> from apache_beam.transforms.sql import SqlTransform
> def parse_csv(val):
> deflower_headers(iterator):
> return itertools.chain([next(iterator).lower()], iterator)
> return csv.DictReader(lower_headers(io.TextIOWrapper(val.open())))
> class BeamTransformBuilder():
> defbuild(self, pipeline):
> practices = (
> pipeline
> | beam.io.fileio.MatchFiles("data.csv")
> | beam.io.fileio.ReadMatches()
> | beam.Reshuffle()
> | beam.FlatMap(parse_csv)
> | beam.Map(lambda x: beam.Row(id="test-id"))
> | SqlTransform("""
> SELECT
> id
> FROM PCOLLECTION""")
> )
> print("should print stuff")
> practices | beam.Map(print)
> def main():
> builder = BeamTransformBuilder()
> with beam.Pipeline('DirectRunner') as p:
> builder.build(p)
> if __name__ == '__main__':
> main()
> ```
>  
> error message:
>     main()    main()  File "./lib/transforms/care_site.py", line 38, in main    builder.build(p)  File "./lib/transforms/care_site.py", line 29, in build    FROM PCOLLECTION""")  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/pvalue.py", line 136, in __or__    return self.pipeline.apply(ptransform, self)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/pipeline.py", line 694, in apply    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 185, in apply    return m(transform, input, options)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 215, in apply_PTransform    return transform.expand(input)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/transforms/external.py", line 295, in expand    response = service.Expand(request)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/grpc/_channel.py", line 946, in __call__    return _end_unary_response_blocking(state, call, False, None)  File "/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking    raise _InactiveRpcError(state)grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNKNOWN details = "" debug_error_string = "\{"created":"@1629934400.533958000","description":"Error received from peer ipv6:[::1]:50780","file":"src/core/lib/surface/call.cc","file_line":1070,"grpc_message":"","grpc_status":2}"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)