You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/16 21:41:01 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #12023: [BEAM-10135] Add Python wrapper for Jdbc Write external transform

TheNeuralBit commented on a change in pull request #12023:
URL: https://github.com/apache/beam/pull/12023#discussion_r441149710



##########
File path: sdks/python/apache_beam/io/external/jdbc.py
##########
@@ -0,0 +1,116 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+  PTransforms for supporting Jdbc in Python pipelines. These transforms do not
+  run a Jdbc client in Python. Instead, they expand to ExternalTransforms
+  which the Expansion Service resolves to the Java SDK's JdbcIO.
+
+  Note: To use these transforms, you need to start a Java Expansion Service.
+  Please refer to the portability documentation on how to do that. Flink Users
+  can use the built-in Expansion Service of the Flink Runner's Job Server. The
+  expansion service address has to be provided when instantiating the
+  transforms.
+
+  If you start Flink's Job Server, the expansion service will be started on
+  port 8097. This is also the configured default for this transform. For a

Review comment:
       It looks to me like the default is to start an expansion service with `sdks:java:io:expansion-service:shadowJar`, not to use port 8097, is this out of date?

##########
File path: sdks/python/apache_beam/io/external/jdbc.py
##########
@@ -0,0 +1,116 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+  PTransforms for supporting Jdbc in Python pipelines. These transforms do not
+  run a Jdbc client in Python. Instead, they expand to ExternalTransforms
+  which the Expansion Service resolves to the Java SDK's JdbcIO.
+
+  Note: To use these transforms, you need to start a Java Expansion Service.
+  Please refer to the portability documentation on how to do that. Flink Users
+  can use the built-in Expansion Service of the Flink Runner's Job Server. The
+  expansion service address has to be provided when instantiating the
+  transforms.

Review comment:
       I don't think we should indicate this is an ExternalTransform in the module docstring, it should be transparent to the user (I received similar guidance for the xlang SqlTransform: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/sql.py).
   
   This is useful information though, maybe it could be moved to the docstring for WriteToJdbc.__init__ to document the `expansion_service` param?

##########
File path: sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
##########
@@ -0,0 +1,165 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import typing
+import unittest
+
+from past.builtins import unicode
+
+import apache_beam as beam
+from apache_beam import coders
+from apache_beam.io.external.jdbc import WriteToJdbc
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.testing.test_pipeline import TestPipeline
+
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  import psycopg2
+except ImportError:
+  unittest.skip("no psycopg2 installed")
+  # pylint: enable=wrong-import-order, wrong-import-position
+
+JdbcTestRow = typing.NamedTuple(
+    "JdbcTestRow",
+    [
+        ("f_id", int),
+        ("f_real", float),
+        ("f_string", unicode),
+    ],
+)
+
+coders.registry.register_coder(JdbcTestRow, coders.RowCoder)
+
+ROW_COUNT = 10
+
+
+class JdbcExternalTransformTest(unittest.TestCase):
+  """Tests that exercise the cross-language JdbcIO Transform (implemented in java).
+
+  To run with the local expansion service and flink job server you need to build it,
+  e.g. via command:
+  ./gradlew :sdks:java:io:expansion-service:shadowJar
+  ./gradlew :runners:flink:1.10:job-server:shadowJar
+  and have flink1.10 master cluster running
+
+  If on development branch, it may be necessary to build java sdk docker image and tag
+  it with the latest released version.
+  ./gradlew :sdks:java:container:docker
+  docker tag apache/beam_java_sdk:x.xx.x.dev apache/beam_java_sdk:y.yy.y.dev

Review comment:
       I don't think this `docker tag` step should be necessary. At least I didn't need to do it when testing SqlTransform

##########
File path: sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
##########
@@ -0,0 +1,165 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import typing
+import unittest
+
+from past.builtins import unicode
+
+import apache_beam as beam
+from apache_beam import coders
+from apache_beam.io.external.jdbc import WriteToJdbc
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.testing.test_pipeline import TestPipeline
+
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  import psycopg2
+except ImportError:
+  unittest.skip("no psycopg2 installed")
+  # pylint: enable=wrong-import-order, wrong-import-position
+
+JdbcTestRow = typing.NamedTuple(
+    "JdbcTestRow",
+    [
+        ("f_id", int),
+        ("f_real", float),
+        ("f_string", unicode),
+    ],
+)
+
+coders.registry.register_coder(JdbcTestRow, coders.RowCoder)
+
+ROW_COUNT = 10
+
+
+class JdbcExternalTransformTest(unittest.TestCase):
+  """Tests that exercise the cross-language JdbcIO Transform (implemented in java).
+
+  To run with the local expansion service and flink job server you need to build it,
+  e.g. via command:
+  ./gradlew :sdks:java:io:expansion-service:shadowJar
+  ./gradlew :runners:flink:1.10:job-server:shadowJar
+  and have flink1.10 master cluster running
+
+  If on development branch, it may be necessary to build java sdk docker image and tag
+  it with the latest released version.
+  ./gradlew :sdks:java:container:docker
+  docker tag apache/beam_java_sdk:x.xx.x.dev apache/beam_java_sdk:y.yy.y.dev
+
+  python setup.py nosetests \
+  --tests=apache_beam.io.external.jdbc_test:JdbcExternalTransformTest.test_read_and_write \
+  --test-pipeline-options="
+    --driver_class_name=org.postgresql.Driver
+    --jdbc_url=jdbc:postgresql://localhost:5432/postgres

Review comment:
       This would require the user to setup an instance of postgres with these parameters right? I think you should have this test create the postgres instance it needs, I bet there are python libraries that can help with that, e.g. https://pypi.org/project/pytest-postgresql/
   
   

##########
File path: sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
##########
@@ -0,0 +1,165 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import typing
+import unittest
+
+from past.builtins import unicode
+
+import apache_beam as beam
+from apache_beam import coders
+from apache_beam.io.external.jdbc import WriteToJdbc
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.testing.test_pipeline import TestPipeline
+
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  import psycopg2
+except ImportError:
+  unittest.skip("no psycopg2 installed")
+  # pylint: enable=wrong-import-order, wrong-import-position
+
+JdbcTestRow = typing.NamedTuple(
+    "JdbcTestRow",
+    [
+        ("f_id", int),
+        ("f_real", float),
+        ("f_string", unicode),
+    ],
+)
+
+coders.registry.register_coder(JdbcTestRow, coders.RowCoder)
+
+ROW_COUNT = 10
+
+
+class JdbcExternalTransformTest(unittest.TestCase):
+  """Tests that exercise the cross-language JdbcIO Transform (implemented in java).
+
+  To run with the local expansion service and flink job server you need to build it,
+  e.g. via command:
+  ./gradlew :sdks:java:io:expansion-service:shadowJar
+  ./gradlew :runners:flink:1.10:job-server:shadowJar
+  and have flink1.10 master cluster running
+
+  If on development branch, it may be necessary to build java sdk docker image and tag
+  it with the latest released version.
+  ./gradlew :sdks:java:container:docker
+  docker tag apache/beam_java_sdk:x.xx.x.dev apache/beam_java_sdk:y.yy.y.dev
+
+  python setup.py nosetests \
+  --tests=apache_beam.io.external.jdbc_test:JdbcExternalTransformTest.test_read_and_write \
+  --test-pipeline-options="
+    --driver_class_name=org.postgresql.Driver
+    --jdbc_url=jdbc:postgresql://localhost:5432/postgres
+    --username=postgres
+    --password=postgres
+    --host=localhost
+    --port=5432
+    --database=postgres"
+  """
+  def __init__(self, *args, **kwargs):
+    super(JdbcExternalTransformTest, self).__init__(*args, **kwargs)
+
+    pipeline = TestPipeline()
+    #  Options for the external jdbc write transform
+    self.driver_class_name = pipeline.get_option('driver_class_name')
+    self.jdbc_url = pipeline.get_option('jdbc_url')
+    self.username = pipeline.get_option('username')
+    self.password = pipeline.get_option('password')
+
+    #  Required for table initialization and drop
+    self.host = pipeline.get_option('host')
+    self.port = pipeline.get_option('port')
+    self.database = pipeline.get_option('database')
+
+    #  The actual pipeline options
+    self.pipeline_options = {
+        'runner': 'FlinkRunner',
+        'flink_version': '1.10',
+        'flink_master': 'localhost:8081',

Review comment:
       If you specify FlinkRunner without specifying a flink_master a Flink instance should be started for you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org