You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/01 16:24:19 UTC

[GitHub] [beam] chamikaramj commented on a change in pull request #12023: [BEAM-10135] Add Python wrapper for Jdbc Write external transform

chamikaramj commented on a change in pull request #12023:
URL: https://github.com/apache/beam/pull/12023#discussion_r448474601



##########
File path: sdks/python/apache_beam/io/external/jdbc.py
##########
@@ -0,0 +1,116 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+  PTransforms for supporting Jdbc in Python pipelines. These transforms do not
+  run a Jdbc client in Python. Instead, they expand to ExternalTransforms
+  which the Expansion Service resolves to the Java SDK's JdbcIO.
+
+  Note: To use these transforms, you need to start a Java Expansion Service.
+  Please refer to the portability documentation on how to do that. Flink Users
+  can use the built-in Expansion Service of the Flink Runner's Job Server. The
+  expansion service address has to be provided when instantiating the
+  transforms.
+
+  If you start Flink's Job Server, the expansion service will be started on
+  port 8097. This is also the configured default for this transform. For a

Review comment:
       We updated Kafka to build or download the IO expansion jar by default. Probably you can follow the same model here. I updated documentation for Kafka recently: https://github.com/apache/beam/pull/11928/files
   
   BTW one downside with a single IO expansion jar is that the size of the jar increases linearly with the number of cross-language wrappers that we have. This will increase the pipeline setup time for users since a larger jar has to be staged. What is the size increase of IO expansion jar due to this wrapper ? 
   
   cc: @robertwb 

##########
File path: sdks/python/apache_beam/io/external/jdbc.py
##########
@@ -0,0 +1,135 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+  PTransforms for supporting Jdbc in Python pipelines. These transforms do not
+  run a Jdbc client in Python. Instead, they expand to ExternalTransforms
+  which the Expansion Service resolves to the Java SDK's JdbcIO.
+
+  Note: To use these transforms, you need to start a Java Expansion Service.
+  Please refer to the portability documentation on how to do that. Flink Users
+  can use the built-in Expansion Service of the Flink Runner's Job Server. The
+  expansion service address has to be provided when instantiating the
+  transforms.
+
+  If you start Flink's Job Server, the expansion service will be started on
+  port 8097. This is also the configured default for this transform. For a
+  different address, please set the expansion_service parameter.
+
+  For more information see:
+  - https://beam.apache.org/documentation/runners/flink/
+  - https://beam.apache.org/roadmap/portability/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import typing
+
+from past.builtins import unicode
+
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+
+__all__ = ['WriteToJdbc']

Review comment:
       Just curious. Were you able to successfully test this with portable Flink/Spark and Dataflow ?

##########
File path: sdks/python/apache_beam/io/external/jdbc.py
##########
@@ -0,0 +1,135 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+  PTransforms for supporting Jdbc in Python pipelines. These transforms do not
+  run a Jdbc client in Python. Instead, they expand to ExternalTransforms
+  which the Expansion Service resolves to the Java SDK's JdbcIO.
+
+  Note: To use these transforms, you need to start a Java Expansion Service.
+  Please refer to the portability documentation on how to do that. Flink Users
+  can use the built-in Expansion Service of the Flink Runner's Job Server. The
+  expansion service address has to be provided when instantiating the
+  transforms.
+
+  If you start Flink's Job Server, the expansion service will be started on
+  port 8097. This is also the configured default for this transform. For a
+  different address, please set the expansion_service parameter.
+
+  For more information see:
+  - https://beam.apache.org/documentation/runners/flink/
+  - https://beam.apache.org/roadmap/portability/
+"""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import typing
+
+from past.builtins import unicode
+
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import ExternalTransform
+from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
+
+__all__ = ['WriteToJdbc']
+
+
+def default_io_expansion_service():
+  return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')
+
+
+WriteToJdbcSchema = typing.NamedTuple(
+    'WriteToJdbcSchema',
+    [
+        ('driver_class_name', unicode),
+        ('jdbc_url', unicode),
+        ('username', unicode),
+        ('password', unicode),
+        ('connection_properties', typing.Optional[unicode]),
+        ('connection_init_sqls', typing.Optional[typing.List[unicode]]),
+        ('statement', unicode),
+    ],
+)
+
+
+class WriteToJdbc(ExternalTransform):
+  """
+  An external PTransform which writes Rows to the specified database.
+
+  This transform receives Rows defined as NamedTuple type and registered in
+  the coders registry to use RowCoder, e.g.
+
+    import typing
+    from apache_beam import coders
+
+    ExampleRow = typing.NamedTuple(
+      "ExampleRow",
+      [
+        ("id", int),
+        ("name", unicode),
+        ("budget", float),
+      ],
+    )
+    coders.registry.register_coder(ExampleRow, coders.RowCoder)
+
+  Experimental; no backwards compatibility guarantees.  It requires special
+  preparation of the Java SDK.  See BEAM-7870.
+  """
+
+  URN = 'beam:external:java:jdbc:write:v1'
+
+  def __init__(
+      self,
+      driver_class_name,
+      jdbc_url,
+      username,
+      password,
+      statement,
+      **kwargs):
+    """
+    Initializes a write operation to Jdbc.
+
+    :param driver_class_name: name of the jdbc driver class

Review comment:
       Should these parameters be specified as strings ? Probably good if we can clarify that here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org