You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "chamikaramj (via GitHub)" <gi...@apache.org> on 2023/03/03 22:45:35 UTC

[GitHub] [beam] chamikaramj commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

chamikaramj commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1124960350


##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2353,6 +2395,98 @@ class BeamModulePlugin implements Plugin<Project> {
       }
     }
 
+    /** ***********************************************************************************************/
+    // Method to create the createCrossLanguageUsingJavaExpansionTask.
+    // The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter.
+    // This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service
+    project.ext.createCrossLanguageUsingJavaExpansionTask = {
+      // This task won't work if the python build file doesn't exist.
+      if (!project.project(":sdks:python").buildFile.exists()) {
+        System.err.println 'Python build file not found. Skipping createCrossLanguageUsingJavaExpansionTask.'
+        return
+      }
+      def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration()
+
+      project.evaluationDependsOn(":sdks:python")
+      project.evaluationDependsOn(config.expansionProjectPath)
+      project.evaluationDependsOn(":runners:core-construction-java")
+      project.evaluationDependsOn(":sdks:java:extensions:python")
+
+      // Setting up args to launch the expansion service
+      def envDir = project.project(":sdks:python").envdir
+      def pythonDir = project.project(":sdks:python").projectDir
+      def javaExpansionPort = getRandomPort()
+      def expansionJar = project.project(config.expansionProjectPath).buildTestExpansionServiceJar.archivePath
+      def javaClassLookupAllowlistFile = project.project(config.expansionProjectPath).projectDir.getPath()
+      def expansionServiceOpts = [
+        "group_id": project.name,
+        "java_expansion_service_jar": expansionJar,
+        "java_port": javaExpansionPort,
+        "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
+      ]
+      def serviceArgs = project.project(':sdks:python').mapToArgString(expansionServiceOpts)
+      def javaContainerSuffix
+      if (JavaVersion.current() == JavaVersion.VERSION_1_8) {

Review Comment:
   We should get supported versions from JavaTestProperties .SUPPORTED_VERSIONS instead of hardcoding here.



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
     return self.attributes[key].__get__(self, WriteResult)
 
 
+def _default_io_expansion_service(append_args=None):
+  return BeamJarExpansionService(
+      'sdks:java:io:google-cloud-platform:expansion-service:build',
+      append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):

Review Comment:
   Can we better integrate this transform into the existing bigquery sink API by adding a WriteMethod ?
   
   https://github.com/apache/beam/blob/39cab94361bc44e8b48257385d37d8978ab7fc02/sdks/python/apache_beam/io/gcp/bigquery.py#L1753



##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2353,6 +2395,98 @@ class BeamModulePlugin implements Plugin<Project> {
       }
     }
 
+    /** ***********************************************************************************************/
+    // Method to create the createCrossLanguageUsingJavaExpansionTask.
+    // The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter.
+    // This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service
+    project.ext.createCrossLanguageUsingJavaExpansionTask = {
+      // This task won't work if the python build file doesn't exist.
+      if (!project.project(":sdks:python").buildFile.exists()) {
+        System.err.println 'Python build file not found. Skipping createCrossLanguageUsingJavaExpansionTask.'
+        return
+      }
+      def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration()
+
+      project.evaluationDependsOn(":sdks:python")
+      project.evaluationDependsOn(config.expansionProjectPath)
+      project.evaluationDependsOn(":runners:core-construction-java")
+      project.evaluationDependsOn(":sdks:java:extensions:python")
+
+      // Setting up args to launch the expansion service
+      def envDir = project.project(":sdks:python").envdir
+      def pythonDir = project.project(":sdks:python").projectDir
+      def javaExpansionPort = getRandomPort()

Review Comment:
   There could be a race between port acquisition here and the expansion service startup later which could result in failures, specially when we start running a large number of cross-language test suites.



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java:
##########
@@ -102,53 +120,61 @@ public void testInvalidConfig() {
 
   public PCollectionRowTuple runWithConfig(
       BigQueryStorageWriteApiSchemaTransformConfiguration config) {
+    return runWithConfig(config, ROWS);
+  }
+
+  public PCollectionRowTuple runWithConfig(
+      BigQueryStorageWriteApiSchemaTransformConfiguration config, List<Row> inputRows) {
     BigQueryStorageWriteApiSchemaTransformProvider provider =
         new BigQueryStorageWriteApiSchemaTransformProvider();
 
     BigQueryStorageWriteApiPCollectionRowTupleTransform writeRowTupleTransform =
         (BigQueryStorageWriteApiPCollectionRowTupleTransform)
             provider.from(config).buildTransform();
 
-    List<Row> testRows =
-        Arrays.asList(
-            Row.withSchema(SCHEMA)
-                .withFieldValue("name", "a")
-                .withFieldValue("number", 1L)
-                .withFieldValue("dt", LocalDateTime.parse("2000-01-01T00:00:00"))
-                .build(),
-            Row.withSchema(SCHEMA)
-                .withFieldValue("name", "b")
-                .withFieldValue("number", 2L)
-                .withFieldValue("dt", LocalDateTime.parse("2000-01-02T00:00:00"))
-                .build(),
-            Row.withSchema(SCHEMA)
-                .withFieldValue("name", "c")
-                .withFieldValue("number", 3L)
-                .withFieldValue("dt", LocalDateTime.parse("2000-01-03T00:00:00"))
-                .build());
-
     writeRowTupleTransform.setBigQueryServices(fakeBigQueryServices);
     String tag = provider.inputCollectionNames().get(0);
 
-    PCollection<Row> rows = p.apply(Create.of(testRows).withRowSchema(SCHEMA));
+    PCollection<Row> rows = p.apply(Create.of(inputRows).withRowSchema(SCHEMA));
 
     PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows);
     PCollectionRowTuple result = input.apply(writeRowTupleTransform);
 
     return result;
   }
 
+  public Boolean rowsEquals(List<Row> expectedRows, List<TableRow> actualRows) {
+    if (expectedRows.size() != actualRows.size()) {
+      return false;
+    }
+    for (int i = 0; i < expectedRows.size(); i++) {
+      // Actual rows may come back disordered. For each TableRow, find its "number" column value

Review Comment:
   You mean "out of order" ?



##########
sdks/python/test-suites/xlang/build.gradle:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// This is a base file to set up cross language tests for different runners
+import org.apache.beam.gradle.BeamModulePlugin
+import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTaskCommon
+project.evaluationDependsOn(":sdks:python")
+
+// Set up cross language tests
+def envDir = project.project(":sdks:python").envdir
+def jobPort = BeamModulePlugin.getRandomPort()
+def tmpDir = System.getenv("TMPDIR") ?: System.getenv("WORKSPACE") ?: "/tmp"
+def pidFile = "${tmpDir}/local_job_service_main-${jobPort}.pid"
+
+def setupTask = project.tasks.register("fnApiJobServerSetup", Exec) {
+  dependsOn ':sdks:python:installGcpTest'
+
+  executable 'sh'
+  args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --job_port ${jobPort} --pid_file ${pidFile} --background --stdout_file ${tmpDir}/beam-fnapi-job-server.log"
+}
+
+def cleanupTask = project.tasks.register("fnApiJobServerCleanup", Exec) {
+  executable 'sh'
+  args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --pid_file ${pidFile} --stop"
+}
+
+// List of objects representing task metadata to create cross-language tasks from.
+// Each object contains the minimum relevant metadata.
+def xlangTasks = []
+
+// ******** Java GCP expansion service ********
+def gcpExpansionProject = project.project(':sdks:java:io:google-cloud-platform:expansion-service')

Review Comment:
   Seems like we are hardcoding the "google-cloud-platform:expansion-service" to all Python x-lang tests ?
   
   This should be specific to GCP tests instead.



##########
sdks/java/io/google-cloud-platform/expansion-service/build.gradle:
##########
@@ -45,3 +46,24 @@ task runExpansionService (type: JavaExec) {
     classpath = sourceSets.test.runtimeClasspath
     args = [project.findProperty("constructionService.port") ?: "8097"]
 }
+
+task buildTestExpansionServiceJar(type: ShadowJar) {

Review Comment:
   Is this needed ?
   We already build and release a Google Cloud Platform jar: https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-google-cloud-platform-expansion-service/2.45.0
   
   So can we just build the jar using the _sdks:java:io:google-cloud-platform:expansion-service:shadowJar_ target and use that ?



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -302,6 +302,35 @@ def chain_after(result):
 result.destination_copy_jobid_pairs <--> result['destination_copy_jobid_pairs']
 ```
 
+Writing with Storage Write API using Cross Language
+---------------------------------------------------
+After starting up an expansion service that contains the Java implementation

Review Comment:
   Users should be able to just use the default expansion service, right ? Starting up a custom expansion service is a more advanced use-case which probably should not be mentioned here for clarity. 



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
     return self.attributes[key].__get__(self, WriteResult)
 
 
+def _default_io_expansion_service(append_args=None):
+  return BeamJarExpansionService(
+      'sdks:java:io:google-cloud-platform:expansion-service:build',
+      append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+  """Writes data to BigQuery using Storage API."""
+  URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+  FAILED_ROWS = "failed_rows"
+  FAILED_ROWS_WITH_ERRORS = "failed_rows_with_errors"
+
+  def __init__(
+      self,
+      table,
+      create_disposition="",

Review Comment:
   Please make sure default values are set to valid values. Many users may try to use the BQ sink without overriding the optional arguments.



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
     return self.attributes[key].__get__(self, WriteResult)
 
 
+def _default_io_expansion_service(append_args=None):
+  return BeamJarExpansionService(
+      'sdks:java:io:google-cloud-platform:expansion-service:build',
+      append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+  """Writes data to BigQuery using Storage API."""
+  URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+  FAILED_ROWS = "failed_rows"
+  FAILED_ROWS_WITH_ERRORS = "failed_rows_with_errors"
+
+  def __init__(
+      self,
+      table,
+      create_disposition="",
+      write_disposition="",
+      triggering_frequency=0,
+      use_at_least_once=False,
+      expansion_service=None):
+    """Initialize a StorageWriteToBigQuery transform.
+
+    :param table:
+      Fully-qualified table ID specified as ``'PROJECT:DATASET.TABLE'``.
+    :param create_disposition:
+      String specifying the strategy to take when the table doesn't
+      exist. Possible values are:
+      * ``'CREATE_IF_NEEDED'``: create if does not exist.
+      * ``'CREATE_NEVER'``: fail the write if does not exist.
+    :param write_disposition:
+      String specifying the strategy to take when the table already
+      contains data. Possible values are:
+      * ``'WRITE_TRUNCATE'``: delete existing rows.
+      * ``'WRITE_APPEND'``: add to existing rows.
+      * ``'WRITE_EMPTY'``: fail the write if table not empty.
+    :param triggering_frequency:
+      The time in seconds between write commits. Should only be specified
+      for streaming pipelines. Defaults to 5 seconds.
+    :param use_at_least_once:
+      Use at-least-once semantics. Is cheaper and provides lower latency,
+      but will potentially duplicate records.
+    :param expansion_service:
+      The address (host:port) of the expansion service. If no expansion
+      service is provided, will attempt to run the default GCP expansion
+      service.
+    """
+    super().__init__()
+    self._table = table
+    self._create_disposition = create_disposition
+    self._write_disposition = write_disposition
+    self._triggering_frequency = triggering_frequency
+    self._use_at_least_once = use_at_least_once
+    self._expansion_service = (
+        expansion_service or _default_io_expansion_service())
+    self.schematransform_config = SchemaAwareExternalTransform.discover_config(
+        self._expansion_service, self.URN)
+
+  def expand(self, input):
+    opts = input.pipeline.options.view_as(StandardOptions)
+    # TODO(https://github.com/apache/beam/issues/21307): Add support for
+    # OnWindowExpiration to more runners. Storage Write API requires
+    # `beam:requirement:pardo:on_window_expiration:v1` when unbounded
+    available_runners = ['DataflowRunner', 'TestDataflowRunner']

Review Comment:
   I don't see the Java implementation limited to Dataflow this way. Why do we just want the limit the Python x-lang version ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org