You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "ahmedabu98 (via GitHub)" <gi...@apache.org> on 2023/03/01 23:26:59 UTC

[GitHub] [beam] ahmedabu98 opened a new pull request, #25685: BigQuery Storage Write API wrapper for Python SDK

ahmedabu98 opened a new pull request, #25685:
URL: https://github.com/apache/beam/pull/25685

   Implementing a wrapper for Python SDK that uses Java's [Storage API SchemaTransform](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java) to write to BigQuery
   
   Fixes #21961
   
   Also adding some support to more easily create cross language tests for a particular expansion service (ie GCP, kinesis, etc.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451038262

   Run Python_Xlang_Gcp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1484216430

   Run XVR_JavaUsingPython_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464645526

   Run Python_Xlang_Gcp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1125780795


##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2353,6 +2395,98 @@ class BeamModulePlugin implements Plugin<Project> {
       }
     }
 
+    /** ***********************************************************************************************/
+    // Method to create the createCrossLanguageUsingJavaExpansionTask.
+    // The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter.
+    // This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service
+    project.ext.createCrossLanguageUsingJavaExpansionTask = {
+      // This task won't work if the python build file doesn't exist.
+      if (!project.project(":sdks:python").buildFile.exists()) {
+        System.err.println 'Python build file not found. Skipping createCrossLanguageUsingJavaExpansionTask.'
+        return
+      }
+      def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration()
+
+      project.evaluationDependsOn(":sdks:python")
+      project.evaluationDependsOn(config.expansionProjectPath)
+      project.evaluationDependsOn(":runners:core-construction-java")
+      project.evaluationDependsOn(":sdks:java:extensions:python")
+
+      // Setting up args to launch the expansion service
+      def envDir = project.project(":sdks:python").envdir
+      def pythonDir = project.project(":sdks:python").projectDir
+      def javaExpansionPort = getRandomPort()
+      def expansionJar = project.project(config.expansionProjectPath).buildTestExpansionServiceJar.archivePath
+      def javaClassLookupAllowlistFile = project.project(config.expansionProjectPath).projectDir.getPath()
+      def expansionServiceOpts = [
+        "group_id": project.name,
+        "java_expansion_service_jar": expansionJar,
+        "java_port": javaExpansionPort,
+        "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
+      ]
+      def serviceArgs = project.project(':sdks:python').mapToArgString(expansionServiceOpts)
+      def javaContainerSuffix
+      if (JavaVersion.current() == JavaVersion.VERSION_1_8) {

Review Comment:
   Sounds good will set up a property in that class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1125802640


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
     return self.attributes[key].__get__(self, WriteResult)
 
 
+def _default_io_expansion_service(append_args=None):
+  return BeamJarExpansionService(
+      'sdks:java:io:google-cloud-platform:expansion-service:build',
+      append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+  """Writes data to BigQuery using Storage API."""
+  URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+  FAILED_ROWS = "failed_rows"
+  FAILED_ROWS_WITH_ERRORS = "failed_rows_with_errors"
+
+  def __init__(
+      self,
+      table,
+      create_disposition="",
+      write_disposition="",
+      triggering_frequency=0,
+      use_at_least_once=False,
+      expansion_service=None):
+    """Initialize a StorageWriteToBigQuery transform.
+
+    :param table:
+      Fully-qualified table ID specified as ``'PROJECT:DATASET.TABLE'``.
+    :param create_disposition:
+      String specifying the strategy to take when the table doesn't
+      exist. Possible values are:
+      * ``'CREATE_IF_NEEDED'``: create if does not exist.
+      * ``'CREATE_NEVER'``: fail the write if does not exist.
+    :param write_disposition:
+      String specifying the strategy to take when the table already
+      contains data. Possible values are:
+      * ``'WRITE_TRUNCATE'``: delete existing rows.
+      * ``'WRITE_APPEND'``: add to existing rows.
+      * ``'WRITE_EMPTY'``: fail the write if table not empty.
+    :param triggering_frequency:
+      The time in seconds between write commits. Should only be specified
+      for streaming pipelines. Defaults to 5 seconds.
+    :param use_at_least_once:
+      Use at-least-once semantics. Is cheaper and provides lower latency,
+      but will potentially duplicate records.
+    :param expansion_service:
+      The address (host:port) of the expansion service. If no expansion
+      service is provided, will attempt to run the default GCP expansion
+      service.
+    """
+    super().__init__()
+    self._table = table
+    self._create_disposition = create_disposition
+    self._write_disposition = write_disposition
+    self._triggering_frequency = triggering_frequency
+    self._use_at_least_once = use_at_least_once
+    self._expansion_service = (
+        expansion_service or _default_io_expansion_service())
+    self.schematransform_config = SchemaAwareExternalTransform.discover_config(
+        self._expansion_service, self.URN)
+
+  def expand(self, input):
+    opts = input.pipeline.options.view_as(StandardOptions)
+    # TODO(https://github.com/apache/beam/issues/21307): Add support for
+    # OnWindowExpiration to more runners. Storage Write API requires
+    # `beam:requirement:pardo:on_window_expiration:v1` when unbounded
+    available_runners = ['DataflowRunner', 'TestDataflowRunner']

Review Comment:
   This is meant to be for streaming use cases; `StorageApiWritesShardedRecords` has an [`OnWindowExpiration`](https://github.com/apache/beam/blob/4da602517292adcd9ffcd7cc0acb8b0c1155aa02/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java#L763) method, which is only supported by DataflowRunner. Trying to run this with another runner will result in:
   ```
   ValueError: Unable to run pipeline with requirement: beam:requirement:pardo:on_window_expiration:v1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464713457

   Run Python_Xlang_Gcp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466282833

   Run Python_Xlang_Gcp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1481801177

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485648796

   Run SQL_Java11 PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485648138

   Run SQL PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485736582

   Run Python_Xlang_Gcp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485685452

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485228244

   Run Python_Xlang_Gcp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 closed pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 closed pull request #25685: BigQuery Storage Write API wrapper for Python SDK
URL: https://github.com/apache/beam/pull/25685


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468315654

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468637048

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451041971

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468650240

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468980715

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485647843

   Run Java_GCP_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464645329

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1125780995


##########
sdks/java/io/google-cloud-platform/expansion-service/build.gradle:
##########
@@ -45,3 +46,24 @@ task runExpansionService (type: JavaExec) {
     classpath = sourceSets.test.runtimeClasspath
     args = [project.findProperty("constructionService.port") ?: "8097"]
 }
+
+task buildTestExpansionServiceJar(type: ShadowJar) {

Review Comment:
   Sure will try this out



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java:
##########
@@ -102,53 +120,61 @@ public void testInvalidConfig() {
 
   public PCollectionRowTuple runWithConfig(
       BigQueryStorageWriteApiSchemaTransformConfiguration config) {
+    return runWithConfig(config, ROWS);
+  }
+
+  public PCollectionRowTuple runWithConfig(
+      BigQueryStorageWriteApiSchemaTransformConfiguration config, List<Row> inputRows) {
     BigQueryStorageWriteApiSchemaTransformProvider provider =
         new BigQueryStorageWriteApiSchemaTransformProvider();
 
     BigQueryStorageWriteApiPCollectionRowTupleTransform writeRowTupleTransform =
         (BigQueryStorageWriteApiPCollectionRowTupleTransform)
             provider.from(config).buildTransform();
 
-    List<Row> testRows =
-        Arrays.asList(
-            Row.withSchema(SCHEMA)
-                .withFieldValue("name", "a")
-                .withFieldValue("number", 1L)
-                .withFieldValue("dt", LocalDateTime.parse("2000-01-01T00:00:00"))
-                .build(),
-            Row.withSchema(SCHEMA)
-                .withFieldValue("name", "b")
-                .withFieldValue("number", 2L)
-                .withFieldValue("dt", LocalDateTime.parse("2000-01-02T00:00:00"))
-                .build(),
-            Row.withSchema(SCHEMA)
-                .withFieldValue("name", "c")
-                .withFieldValue("number", 3L)
-                .withFieldValue("dt", LocalDateTime.parse("2000-01-03T00:00:00"))
-                .build());
-
     writeRowTupleTransform.setBigQueryServices(fakeBigQueryServices);
     String tag = provider.inputCollectionNames().get(0);
 
-    PCollection<Row> rows = p.apply(Create.of(testRows).withRowSchema(SCHEMA));
+    PCollection<Row> rows = p.apply(Create.of(inputRows).withRowSchema(SCHEMA));
 
     PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows);
     PCollectionRowTuple result = input.apply(writeRowTupleTransform);
 
     return result;
   }
 
+  public Boolean rowsEquals(List<Row> expectedRows, List<TableRow> actualRows) {
+    if (expectedRows.size() != actualRows.size()) {
+      return false;
+    }
+    for (int i = 0; i < expectedRows.size(); i++) {
+      // Actual rows may come back disordered. For each TableRow, find its "number" column value

Review Comment:
   Yup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464742773

   Run Python_Xlang_Gcp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451038333

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451093225

   Run Python_Transforms PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1125781060


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java:
##########
@@ -102,53 +120,61 @@ public void testInvalidConfig() {
 
   public PCollectionRowTuple runWithConfig(
       BigQueryStorageWriteApiSchemaTransformConfiguration config) {
+    return runWithConfig(config, ROWS);
+  }
+
+  public PCollectionRowTuple runWithConfig(
+      BigQueryStorageWriteApiSchemaTransformConfiguration config, List<Row> inputRows) {
     BigQueryStorageWriteApiSchemaTransformProvider provider =
         new BigQueryStorageWriteApiSchemaTransformProvider();
 
     BigQueryStorageWriteApiPCollectionRowTupleTransform writeRowTupleTransform =
         (BigQueryStorageWriteApiPCollectionRowTupleTransform)
             provider.from(config).buildTransform();
 
-    List<Row> testRows =
-        Arrays.asList(
-            Row.withSchema(SCHEMA)
-                .withFieldValue("name", "a")
-                .withFieldValue("number", 1L)
-                .withFieldValue("dt", LocalDateTime.parse("2000-01-01T00:00:00"))
-                .build(),
-            Row.withSchema(SCHEMA)
-                .withFieldValue("name", "b")
-                .withFieldValue("number", 2L)
-                .withFieldValue("dt", LocalDateTime.parse("2000-01-02T00:00:00"))
-                .build(),
-            Row.withSchema(SCHEMA)
-                .withFieldValue("name", "c")
-                .withFieldValue("number", 3L)
-                .withFieldValue("dt", LocalDateTime.parse("2000-01-03T00:00:00"))
-                .build());
-
     writeRowTupleTransform.setBigQueryServices(fakeBigQueryServices);
     String tag = provider.inputCollectionNames().get(0);
 
-    PCollection<Row> rows = p.apply(Create.of(testRows).withRowSchema(SCHEMA));
+    PCollection<Row> rows = p.apply(Create.of(inputRows).withRowSchema(SCHEMA));
 
     PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows);
     PCollectionRowTuple result = input.apply(writeRowTupleTransform);
 
     return result;
   }
 
+  public Boolean rowsEquals(List<Row> expectedRows, List<TableRow> actualRows) {
+    if (expectedRows.size() != actualRows.size()) {
+      return false;
+    }
+    for (int i = 0; i < expectedRows.size(); i++) {
+      // Actual rows may come back disordered. For each TableRow, find its "number" column value

Review Comment:
   Yup, changing to that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464718335

   Run Python_Xlang_Gcp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466648872

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468568650

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1484224164

   Run XVR_PythonUsingJava_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485229698

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451082652

   # [Codecov](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#25685](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c2ebf68) into [master](https://codecov.io/gh/apache/beam/commit/e1491a637db0389eed5089c63aabc0dd9a97b249?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e1491a6) will **decrease** coverage by `0.01%`.
   > The diff coverage is `54.05%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #25685      +/-   ##
   ==========================================
   - Coverage   72.81%   72.81%   -0.01%     
   ==========================================
     Files         775      775              
     Lines      102928   102965      +37     
   ==========================================
   + Hits        74948    74972      +24     
   - Misses      26526    26539      +13     
     Partials     1454     1454              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `81.95% <54.05%> (-0.01%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `70.35% <36.00%> (-0.99%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/external.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9leHRlcm5hbC5weQ==) | `80.51% <91.66%> (+1.77%)` | :arrow_up: |
   | [.../python/apache\_beam/testing/test\_stream\_service.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbV9zZXJ2aWNlLnB5) | `88.09% <0.00%> (-4.77%)` | :arrow_down: |
   | [...che\_beam/runners/interactive/interactive\_runner.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9ydW5uZXIucHk=) | `90.50% <0.00%> (-1.27%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `94.10% <0.00%> (-0.36%)` | :arrow_down: |
   | [...s/python/apache\_beam/testing/synthetic\_pipeline.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9zeW50aGV0aWNfcGlwZWxpbmUucHk=) | `82.04% <0.00%> (-0.25%)` | :arrow_down: |
   | [sdks/python/apache\_beam/runners/common.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9jb21tb24ucHk=) | `88.44% <0.00%> (+0.11%)` | :arrow_up: |
   | [...on/apache\_beam/runners/dataflow/dataflow\_runner.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9kYXRhZmxvd19ydW5uZXIucHk=) | `81.88% <0.00%> (+0.14%)` | :arrow_up: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.46% <0.00%> (+0.15%)` | :arrow_up: |
   | [sdks/python/apache\_beam/transforms/combiners.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb21iaW5lcnMucHk=) | `93.43% <0.00%> (+0.38%)` | :arrow_up: |
   | ... and [2 more](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1124960350


##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2353,6 +2395,98 @@ class BeamModulePlugin implements Plugin<Project> {
       }
     }
 
+    /** ***********************************************************************************************/
+    // Method to create the createCrossLanguageUsingJavaExpansionTask.
+    // The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter.
+    // This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service
+    project.ext.createCrossLanguageUsingJavaExpansionTask = {
+      // This task won't work if the python build file doesn't exist.
+      if (!project.project(":sdks:python").buildFile.exists()) {
+        System.err.println 'Python build file not found. Skipping createCrossLanguageUsingJavaExpansionTask.'
+        return
+      }
+      def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration()
+
+      project.evaluationDependsOn(":sdks:python")
+      project.evaluationDependsOn(config.expansionProjectPath)
+      project.evaluationDependsOn(":runners:core-construction-java")
+      project.evaluationDependsOn(":sdks:java:extensions:python")
+
+      // Setting up args to launch the expansion service
+      def envDir = project.project(":sdks:python").envdir
+      def pythonDir = project.project(":sdks:python").projectDir
+      def javaExpansionPort = getRandomPort()
+      def expansionJar = project.project(config.expansionProjectPath).buildTestExpansionServiceJar.archivePath
+      def javaClassLookupAllowlistFile = project.project(config.expansionProjectPath).projectDir.getPath()
+      def expansionServiceOpts = [
+        "group_id": project.name,
+        "java_expansion_service_jar": expansionJar,
+        "java_port": javaExpansionPort,
+        "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
+      ]
+      def serviceArgs = project.project(':sdks:python').mapToArgString(expansionServiceOpts)
+      def javaContainerSuffix
+      if (JavaVersion.current() == JavaVersion.VERSION_1_8) {

Review Comment:
   We should get supported versions from JavaTestProperties .SUPPORTED_VERSIONS instead of hardcoding here.



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
     return self.attributes[key].__get__(self, WriteResult)
 
 
+def _default_io_expansion_service(append_args=None):
+  return BeamJarExpansionService(
+      'sdks:java:io:google-cloud-platform:expansion-service:build',
+      append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):

Review Comment:
   Can we better integrate this transform into the existing bigquery sink API by adding a WriteMethod ?
   
   https://github.com/apache/beam/blob/39cab94361bc44e8b48257385d37d8978ab7fc02/sdks/python/apache_beam/io/gcp/bigquery.py#L1753



##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2353,6 +2395,98 @@ class BeamModulePlugin implements Plugin<Project> {
       }
     }
 
+    /** ***********************************************************************************************/
+    // Method to create the createCrossLanguageUsingJavaExpansionTask.
+    // The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter.
+    // This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service
+    project.ext.createCrossLanguageUsingJavaExpansionTask = {
+      // This task won't work if the python build file doesn't exist.
+      if (!project.project(":sdks:python").buildFile.exists()) {
+        System.err.println 'Python build file not found. Skipping createCrossLanguageUsingJavaExpansionTask.'
+        return
+      }
+      def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration()
+
+      project.evaluationDependsOn(":sdks:python")
+      project.evaluationDependsOn(config.expansionProjectPath)
+      project.evaluationDependsOn(":runners:core-construction-java")
+      project.evaluationDependsOn(":sdks:java:extensions:python")
+
+      // Setting up args to launch the expansion service
+      def envDir = project.project(":sdks:python").envdir
+      def pythonDir = project.project(":sdks:python").projectDir
+      def javaExpansionPort = getRandomPort()

Review Comment:
   There could be a race between port acquisition here and the expansion service startup later which could result in failures, specially when we start running a large number of cross-language test suites.



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java:
##########
@@ -102,53 +120,61 @@ public void testInvalidConfig() {
 
   public PCollectionRowTuple runWithConfig(
       BigQueryStorageWriteApiSchemaTransformConfiguration config) {
+    return runWithConfig(config, ROWS);
+  }
+
+  public PCollectionRowTuple runWithConfig(
+      BigQueryStorageWriteApiSchemaTransformConfiguration config, List<Row> inputRows) {
     BigQueryStorageWriteApiSchemaTransformProvider provider =
         new BigQueryStorageWriteApiSchemaTransformProvider();
 
     BigQueryStorageWriteApiPCollectionRowTupleTransform writeRowTupleTransform =
         (BigQueryStorageWriteApiPCollectionRowTupleTransform)
             provider.from(config).buildTransform();
 
-    List<Row> testRows =
-        Arrays.asList(
-            Row.withSchema(SCHEMA)
-                .withFieldValue("name", "a")
-                .withFieldValue("number", 1L)
-                .withFieldValue("dt", LocalDateTime.parse("2000-01-01T00:00:00"))
-                .build(),
-            Row.withSchema(SCHEMA)
-                .withFieldValue("name", "b")
-                .withFieldValue("number", 2L)
-                .withFieldValue("dt", LocalDateTime.parse("2000-01-02T00:00:00"))
-                .build(),
-            Row.withSchema(SCHEMA)
-                .withFieldValue("name", "c")
-                .withFieldValue("number", 3L)
-                .withFieldValue("dt", LocalDateTime.parse("2000-01-03T00:00:00"))
-                .build());
-
     writeRowTupleTransform.setBigQueryServices(fakeBigQueryServices);
     String tag = provider.inputCollectionNames().get(0);
 
-    PCollection<Row> rows = p.apply(Create.of(testRows).withRowSchema(SCHEMA));
+    PCollection<Row> rows = p.apply(Create.of(inputRows).withRowSchema(SCHEMA));
 
     PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows);
     PCollectionRowTuple result = input.apply(writeRowTupleTransform);
 
     return result;
   }
 
+  public Boolean rowsEquals(List<Row> expectedRows, List<TableRow> actualRows) {
+    if (expectedRows.size() != actualRows.size()) {
+      return false;
+    }
+    for (int i = 0; i < expectedRows.size(); i++) {
+      // Actual rows may come back disordered. For each TableRow, find its "number" column value

Review Comment:
   You mean "out of order" ?



##########
sdks/python/test-suites/xlang/build.gradle:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// This is a base file to set up cross language tests for different runners
+import org.apache.beam.gradle.BeamModulePlugin
+import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTaskCommon
+project.evaluationDependsOn(":sdks:python")
+
+// Set up cross language tests
+def envDir = project.project(":sdks:python").envdir
+def jobPort = BeamModulePlugin.getRandomPort()
+def tmpDir = System.getenv("TMPDIR") ?: System.getenv("WORKSPACE") ?: "/tmp"
+def pidFile = "${tmpDir}/local_job_service_main-${jobPort}.pid"
+
+def setupTask = project.tasks.register("fnApiJobServerSetup", Exec) {
+  dependsOn ':sdks:python:installGcpTest'
+
+  executable 'sh'
+  args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --job_port ${jobPort} --pid_file ${pidFile} --background --stdout_file ${tmpDir}/beam-fnapi-job-server.log"
+}
+
+def cleanupTask = project.tasks.register("fnApiJobServerCleanup", Exec) {
+  executable 'sh'
+  args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --pid_file ${pidFile} --stop"
+}
+
+// List of objects representing task metadata to create cross-language tasks from.
+// Each object contains the minimum relevant metadata.
+def xlangTasks = []
+
+// ******** Java GCP expansion service ********
+def gcpExpansionProject = project.project(':sdks:java:io:google-cloud-platform:expansion-service')

Review Comment:
   Seems like we are hardcoding the "google-cloud-platform:expansion-service" to all Python x-lang tests ?
   
   This should be specific to GCP tests instead.



##########
sdks/java/io/google-cloud-platform/expansion-service/build.gradle:
##########
@@ -45,3 +46,24 @@ task runExpansionService (type: JavaExec) {
     classpath = sourceSets.test.runtimeClasspath
     args = [project.findProperty("constructionService.port") ?: "8097"]
 }
+
+task buildTestExpansionServiceJar(type: ShadowJar) {

Review Comment:
   Is this needed ?
   We already build and release a Google Cloud Platform jar: https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-google-cloud-platform-expansion-service/2.45.0
   
   So can we just build the jar using the _sdks:java:io:google-cloud-platform:expansion-service:shadowJar_ target and use that ?



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -302,6 +302,35 @@ def chain_after(result):
 result.destination_copy_jobid_pairs <--> result['destination_copy_jobid_pairs']
 ```
 
+Writing with Storage Write API using Cross Language
+---------------------------------------------------
+After starting up an expansion service that contains the Java implementation

Review Comment:
   Users should be able to just use the default expansion service, right ? Starting up a custom expansion service is a more advanced use-case which probably should not be mentioned here for clarity. 



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
     return self.attributes[key].__get__(self, WriteResult)
 
 
+def _default_io_expansion_service(append_args=None):
+  return BeamJarExpansionService(
+      'sdks:java:io:google-cloud-platform:expansion-service:build',
+      append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+  """Writes data to BigQuery using Storage API."""
+  URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+  FAILED_ROWS = "failed_rows"
+  FAILED_ROWS_WITH_ERRORS = "failed_rows_with_errors"
+
+  def __init__(
+      self,
+      table,
+      create_disposition="",

Review Comment:
   Please make sure default values are set to valid values. Many users may try to use the BQ sink without overriding the optional arguments.



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
     return self.attributes[key].__get__(self, WriteResult)
 
 
+def _default_io_expansion_service(append_args=None):
+  return BeamJarExpansionService(
+      'sdks:java:io:google-cloud-platform:expansion-service:build',
+      append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+  """Writes data to BigQuery using Storage API."""
+  URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+  FAILED_ROWS = "failed_rows"
+  FAILED_ROWS_WITH_ERRORS = "failed_rows_with_errors"
+
+  def __init__(
+      self,
+      table,
+      create_disposition="",
+      write_disposition="",
+      triggering_frequency=0,
+      use_at_least_once=False,
+      expansion_service=None):
+    """Initialize a StorageWriteToBigQuery transform.
+
+    :param table:
+      Fully-qualified table ID specified as ``'PROJECT:DATASET.TABLE'``.
+    :param create_disposition:
+      String specifying the strategy to take when the table doesn't
+      exist. Possible values are:
+      * ``'CREATE_IF_NEEDED'``: create if does not exist.
+      * ``'CREATE_NEVER'``: fail the write if does not exist.
+    :param write_disposition:
+      String specifying the strategy to take when the table already
+      contains data. Possible values are:
+      * ``'WRITE_TRUNCATE'``: delete existing rows.
+      * ``'WRITE_APPEND'``: add to existing rows.
+      * ``'WRITE_EMPTY'``: fail the write if table not empty.
+    :param triggering_frequency:
+      The time in seconds between write commits. Should only be specified
+      for streaming pipelines. Defaults to 5 seconds.
+    :param use_at_least_once:
+      Use at-least-once semantics. Is cheaper and provides lower latency,
+      but will potentially duplicate records.
+    :param expansion_service:
+      The address (host:port) of the expansion service. If no expansion
+      service is provided, will attempt to run the default GCP expansion
+      service.
+    """
+    super().__init__()
+    self._table = table
+    self._create_disposition = create_disposition
+    self._write_disposition = write_disposition
+    self._triggering_frequency = triggering_frequency
+    self._use_at_least_once = use_at_least_once
+    self._expansion_service = (
+        expansion_service or _default_io_expansion_service())
+    self.schematransform_config = SchemaAwareExternalTransform.discover_config(
+        self._expansion_service, self.URN)
+
+  def expand(self, input):
+    opts = input.pipeline.options.view_as(StandardOptions)
+    # TODO(https://github.com/apache/beam/issues/21307): Add support for
+    # OnWindowExpiration to more runners. Storage Write API requires
+    # `beam:requirement:pardo:on_window_expiration:v1` when unbounded
+    available_runners = ['DataflowRunner', 'TestDataflowRunner']

Review Comment:
   I don't see the Java implementation limited to Dataflow this way. Why do we just want the limit the Python x-lang version ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1125786706


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
     return self.attributes[key].__get__(self, WriteResult)
 
 
+def _default_io_expansion_service(append_args=None):
+  return BeamJarExpansionService(
+      'sdks:java:io:google-cloud-platform:expansion-service:build',
+      append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):

Review Comment:
   Yup, good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468722840

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466244263

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468375360

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485530342

   Run Python_Xlang_Gcp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485773882

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1146333450


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -103,6 +107,19 @@
 _DATASET_PATTERN = r'\w{1,1024}'
 _TABLE_PATTERN = r'[\p{L}\p{M}\p{N}\p{Pc}\p{Pd}\p{Zs}$]{1,1024}'
 
+BIGQUERY_TYPE_TO_PYTHON_TYPE = {
+    "STRING": str,
+    "BOOL": bool,
+    "BOOLEAN": bool,
+    "BYTES": bytes,
+    "INT64": np.int64,
+    "INTEGER": np.int64,
+    "FLOAT64": np.float64,
+    "FLOAT": np.float64,
+    "NUMERIC": decimal.Decimal,
+    "TIMESTAMP": apache_beam.utils.timestamp.Timestamp,

Review Comment:
   Issue: https://github.com/apache/beam/issues/25946



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1481801308

   Run Python_Xlang_Gcp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1146282748


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -103,6 +107,19 @@
 _DATASET_PATTERN = r'\w{1,1024}'
 _TABLE_PATTERN = r'[\p{L}\p{M}\p{N}\p{Pc}\p{Pd}\p{Zs}$]{1,1024}'
 
+BIGQUERY_TYPE_TO_PYTHON_TYPE = {
+    "STRING": str,
+    "BOOL": bool,
+    "BOOLEAN": bool,
+    "BYTES": bytes,
+    "INT64": np.int64,
+    "INTEGER": np.int64,
+    "FLOAT64": np.float64,
+    "FLOAT": np.float64,
+    "NUMERIC": decimal.Decimal,
+    "TIMESTAMP": apache_beam.utils.timestamp.Timestamp,

Review Comment:
   Yup, I only included the BQ types that were supported by Beam types. I'll create a tracking issue for supporting other types. As it stands today, these are the only types users would be able to write with the Storage Write xlang connector.
   The other BQ types can be represented as strings, so a possible workaround would be to add a new configuration field for the BQ schema and pass it to Java. In the Java SchemaTransform we can use the input schema instead of `.useBeamSchema()`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485530463

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485685316

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485774013

   Run Java_GCP_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1132879510


##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2353,6 +2395,98 @@ class BeamModulePlugin implements Plugin<Project> {
       }
     }
 
+    /** ***********************************************************************************************/
+    // Method to create the createCrossLanguageUsingJavaExpansionTask.
+    // The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter.
+    // This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service
+    project.ext.createCrossLanguageUsingJavaExpansionTask = {
+      // This task won't work if the python build file doesn't exist.
+      if (!project.project(":sdks:python").buildFile.exists()) {
+        System.err.println 'Python build file not found. Skipping createCrossLanguageUsingJavaExpansionTask.'
+        return
+      }
+      def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration()
+
+      project.evaluationDependsOn(":sdks:python")
+      project.evaluationDependsOn(config.expansionProjectPath)
+      project.evaluationDependsOn(":runners:core-construction-java")
+      project.evaluationDependsOn(":sdks:java:extensions:python")
+
+      // Setting up args to launch the expansion service
+      def envDir = project.project(":sdks:python").envdir
+      def pythonDir = project.project(":sdks:python").projectDir
+      def javaExpansionPort = getRandomPort()
+      def expansionJar = project.project(config.expansionProjectPath).buildTestExpansionServiceJar.archivePath
+      def javaClassLookupAllowlistFile = project.project(config.expansionProjectPath).projectDir.getPath()
+      def expansionServiceOpts = [
+        "group_id": project.name,
+        "java_expansion_service_jar": expansionJar,
+        "java_port": javaExpansionPort,
+        "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
+      ]
+      def serviceArgs = project.project(':sdks:python').mapToArgString(expansionServiceOpts)
+      def javaContainerSuffix
+      if (JavaVersion.current() == JavaVersion.VERSION_1_8) {

Review Comment:
   Not sure how I can access JavaTestProperties from BeamModulePlugin. Should I create a new property in `gradle.properties`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464657745

   Run Python_PVR_Flink PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468612685

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468850915

   Run Python_Xlang_Gcp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1469002126

   R: @chamikaramj 
   
   Tests are passing now, ready for another review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1469070854

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1125787745


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
     return self.attributes[key].__get__(self, WriteResult)
 
 
+def _default_io_expansion_service(append_args=None):
+  return BeamJarExpansionService(
+      'sdks:java:io:google-cloud-platform:expansion-service:build',
+      append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+  """Writes data to BigQuery using Storage API."""
+  URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+  FAILED_ROWS = "failed_rows"
+  FAILED_ROWS_WITH_ERRORS = "failed_rows_with_errors"
+
+  def __init__(
+      self,
+      table,
+      create_disposition="",

Review Comment:
   These defaults are already handled in the Java SchemaTransform implementation (e.g. an empty string for create disposition means no disposition will be explicitly set, so the default will be CREATE_IF_NEEDED). I can include the defaults here too (ie. `create_disposition="CREATE_IF_NEEDED"`) to make it more clear what they are.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451042351

   Run Python_Xlang_Gcp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468980787

   Run Python_Xlang_Gcp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464713674

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464731431

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464718440

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466648517

   Run Python_Xlang_G
   cp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466893064

   > Seems like tests couldn't find the expansion service ?
   
   Yeah it's only seeing the default value (see [debug println](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Dataflow_PR/21/console#:~:text=JAVA%20EXPANSION%20PORT%3A%20%2D1)) that I set [here](https://github.com/apache/beam/pull/25685/files#diff-0435a83a413ec063bf7e682cadcd56776cd18fc878f197cc99a65fc231ef2047R2418). The setup task overwrites that value and launches an expansion service as expected, but the python task doesn't read the updated value. I'm not sure how scoping works between these tasks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485737051

   Run SQL_Java17 PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485647530

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464731471

   Run Python_Xlang_Gcp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1137821416


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -1514,6 +1531,42 @@ def process(self, element, *side_inputs):
     yield (self.destination(element, *side_inputs), element)
 
 
+def beam_row_from_dict(row: dict, schema):

Review Comment:
   Let's make sure this util is extensively covered by unit testing for various types etc.



##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -103,6 +107,19 @@
 _DATASET_PATTERN = r'\w{1,1024}'
 _TABLE_PATTERN = r'[\p{L}\p{M}\p{N}\p{Pc}\p{Pd}\p{Zs}$]{1,1024}'
 
+BIGQUERY_TYPE_TO_PYTHON_TYPE = {
+    "STRING": str,
+    "BOOL": bool,
+    "BOOLEAN": bool,
+    "BYTES": bytes,
+    "INT64": np.int64,
+    "INTEGER": np.int64,
+    "FLOAT64": np.float64,
+    "FLOAT": np.float64,
+    "NUMERIC": decimal.Decimal,
+    "TIMESTAMP": apache_beam.utils.timestamp.Timestamp,

Review Comment:
   Seems like this doesn't cover all BQ types ? For example, Geography types [1]. We should add least add a TODO to fix this.
   
   [1] https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#geography_type



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
     return self.attributes[key].__get__(self, WriteResult)
 
 
+def _default_io_expansion_service(append_args=None):
+  return BeamJarExpansionService(
+      'sdks:java:io:google-cloud-platform:expansion-service:build',
+      append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):

Review Comment:
   Let's also hide this transform from the public API by renaming to "_StorageWriteToBigQuery".



##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2307,6 +2397,94 @@ def __getitem__(self, key):
     return self.attributes[key].__get__(self, WriteResult)
 
 
+def _default_io_expansion_service(append_args=None):
+  return BeamJarExpansionService(
+      'sdks:java:io:google-cloud-platform:expansion-service:build',
+      append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+  """Writes data to BigQuery using Storage API.
+
+  Experimental; no backwards compatibility guarantees.
+  """
+  URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+  FAILED_ROWS = "FailedRows"
+  FAILED_ROWS_WITH_ERRORS = "FailedRowsWithErrors"
+
+  def __init__(
+      self,
+      table,
+      create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
+      write_disposition=BigQueryDisposition.WRITE_APPEND,
+      triggering_frequency=0,
+      use_at_least_once=False,
+      expansion_service=None):
+    """Initialize a StorageWriteToBigQuery transform.
+
+    :param table:
+      Fully-qualified table ID specified as ``'PROJECT:DATASET.TABLE'``.
+    :param create_disposition:
+      String specifying the strategy to take when the table doesn't
+      exist. Possible values are:
+      * ``'CREATE_IF_NEEDED'``: create if does not exist.
+      * ``'CREATE_NEVER'``: fail the write if does not exist.
+    :param write_disposition:
+      String specifying the strategy to take when the table already
+      contains data. Possible values are:
+      * ``'WRITE_TRUNCATE'``: delete existing rows.
+      * ``'WRITE_APPEND'``: add to existing rows.
+      * ``'WRITE_EMPTY'``: fail the write if table not empty.
+    :param triggering_frequency:
+      The time in seconds between write commits. Should only be specified
+      for streaming pipelines. Defaults to 5 seconds.
+    :param use_at_least_once:
+      Use at-least-once semantics. Is cheaper and provides lower latency,
+      but will potentially duplicate records.
+    :param expansion_service:
+      The address (host:port) of the expansion service. If no expansion
+      service is provided, will attempt to run the default GCP expansion
+      service.
+    """
+    super().__init__()
+    self._table = table
+    self._create_disposition = create_disposition
+    self._write_disposition = write_disposition
+    self._triggering_frequency = triggering_frequency
+    self._use_at_least_once = use_at_least_once
+    self._expansion_service = (
+        expansion_service or _default_io_expansion_service())
+    self.schematransform_config = SchemaAwareExternalTransform.discover_config(
+        self._expansion_service, self.URN)
+
+  def expand(self, input):
+    opts = input.pipeline.options.view_as(StandardOptions)
+    # TODO(https://github.com/apache/beam/issues/21307): Add support for
+    # OnWindowExpiration to more runners. Storage Write API requires
+    # `beam:requirement:pardo:on_window_expiration:v1` when unbounded
+    streaming_runners = ['DataflowRunner', 'TestDataflowRunner']

Review Comment:
   I don't think we should add a runner check like this to I/O transforms.  In Beam code we try to keep transform and runner logic separate. (so even if there's an issue, someone could fix it elsewhere and transforms will just work). 
   
   Also, any idea what's broken for other runners in this case (for the sink) ?



##########
sdks/python/test-suites/xlang/build.gradle:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// This is a base file to set up cross language tests for different runners
+import org.apache.beam.gradle.BeamModulePlugin
+import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTaskCommon
+project.evaluationDependsOn(":sdks:python")
+
+// Set up cross language tests
+def envDir = project.project(":sdks:python").envdir
+def jobPort = BeamModulePlugin.getRandomPort()
+def tmpDir = System.getenv("TMPDIR") ?: System.getenv("WORKSPACE") ?: "/tmp"
+def pidFile = "${tmpDir}/local_job_service_main-${jobPort}.pid"
+
+def setupTask = project.tasks.register("fnApiJobServerSetup", Exec) {
+  dependsOn ':sdks:python:installGcpTest'
+
+  executable 'sh'
+  args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --job_port ${jobPort} --pid_file ${pidFile} --background --stdout_file ${tmpDir}/beam-fnapi-job-server.log"
+}
+
+def cleanupTask = project.tasks.register("fnApiJobServerCleanup", Exec) {
+  executable 'sh'
+  args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --pid_file ${pidFile} --stop"
+}
+
+// List of objects representing task metadata to create cross-language tasks from.
+// Each object contains the minimum relevant metadata.
+def xlangTasks = []
+
+// ******** Java GCP expansion service ********
+def gcpExpansionProject = project.project(':sdks:java:io:google-cloud-platform:expansion-service')

Review Comment:
   I see. Probably expand the comment above to clarify that.



##########
sdks/python/test-suites/xlang/build.gradle:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// This is a base file to set up cross language tests for different runners
+import org.apache.beam.gradle.BeamModulePlugin
+import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTaskCommon
+project.evaluationDependsOn(":sdks:python")
+
+// Set up cross language tests
+def envDir = project.project(":sdks:python").envdir
+def jobPort = BeamModulePlugin.getRandomPort()
+def tmpDir = System.getenv("TMPDIR") ?: System.getenv("WORKSPACE") ?: "/tmp"
+def pidFile = "${tmpDir}/local_job_service_main-${jobPort}.pid"
+
+def setupTask = project.tasks.register("fnApiJobServerSetup", Exec) {
+  dependsOn ':sdks:python:installGcpTest'
+
+  executable 'sh'
+  args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --job_port ${jobPort} --pid_file ${pidFile} --background --stdout_file ${tmpDir}/beam-fnapi-job-server.log"
+}
+
+def cleanupTask = project.tasks.register("fnApiJobServerCleanup", Exec) {
+  executable 'sh'
+  args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --pid_file ${pidFile} --stop"
+}
+
+// List of objects representing task metadata to create cross-language tasks from.
+// Each object contains the minimum relevant metadata.
+def xlangTasks = []
+
+// ******** Java GCP expansion service ********
+def gcpExpansionProject = project.project(':sdks:java:io:google-cloud-platform:expansion-service')
+// Properties that are common across runners.
+// Used to launch the expansion service, collect the right tests, and cleanup afterwards
+def gcpXlangCommon = new CrossLanguageTaskCommon().tap {
+    name = "gcpCrossLanguage"
+    expansionProjectPath = gcpExpansionProject.getPath()
+    collectMarker = "uses_gcp_java_expansion_service"
+    startJobServer = setupTask
+    cleanupJobServer = cleanupTask
+}
+xlangTasks.add(gcpXlangCommon)
+
+// ******** Java _____ expansion service ********

Review Comment:
   Remove commented code ?



##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2353,6 +2395,98 @@ class BeamModulePlugin implements Plugin<Project> {
       }
     }
 
+    /** ***********************************************************************************************/
+    // Method to create the createCrossLanguageUsingJavaExpansionTask.
+    // The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter.
+    // This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service
+    project.ext.createCrossLanguageUsingJavaExpansionTask = {
+      // This task won't work if the python build file doesn't exist.
+      if (!project.project(":sdks:python").buildFile.exists()) {
+        System.err.println 'Python build file not found. Skipping createCrossLanguageUsingJavaExpansionTask.'
+        return
+      }
+      def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration()
+
+      project.evaluationDependsOn(":sdks:python")
+      project.evaluationDependsOn(config.expansionProjectPath)
+      project.evaluationDependsOn(":runners:core-construction-java")
+      project.evaluationDependsOn(":sdks:java:extensions:python")
+
+      // Setting up args to launch the expansion service
+      def envDir = project.project(":sdks:python").envdir
+      def pythonDir = project.project(":sdks:python").projectDir
+      def javaExpansionPort = getRandomPort()
+      def expansionJar = project.project(config.expansionProjectPath).buildTestExpansionServiceJar.archivePath
+      def javaClassLookupAllowlistFile = project.project(config.expansionProjectPath).projectDir.getPath()
+      def expansionServiceOpts = [
+        "group_id": project.name,
+        "java_expansion_service_jar": expansionJar,
+        "java_port": javaExpansionPort,
+        "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
+      ]
+      def serviceArgs = project.project(':sdks:python').mapToArgString(expansionServiceOpts)
+      def javaContainerSuffix
+      if (JavaVersion.current() == JavaVersion.VERSION_1_8) {

Review Comment:
   Ah, yeah, these are configs for Jenkins vs Gradle. Let's keep as is for now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466880379

   Seems like tests couldn't find the expansion service ?
   
   https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Dataflow_PR/21/testReport/junit/apache_beam.io.gcp.bigquery_write_it_test/BigQueryXlangStorageWriteIT/test_storage_write_beam_rows/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468437202

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1146260144


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2307,6 +2397,94 @@ def __getitem__(self, key):
     return self.attributes[key].__get__(self, WriteResult)
 
 
+def _default_io_expansion_service(append_args=None):
+  return BeamJarExpansionService(
+      'sdks:java:io:google-cloud-platform:expansion-service:build',
+      append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+  """Writes data to BigQuery using Storage API.
+
+  Experimental; no backwards compatibility guarantees.
+  """
+  URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+  FAILED_ROWS = "FailedRows"
+  FAILED_ROWS_WITH_ERRORS = "FailedRowsWithErrors"
+
+  def __init__(
+      self,
+      table,
+      create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
+      write_disposition=BigQueryDisposition.WRITE_APPEND,
+      triggering_frequency=0,
+      use_at_least_once=False,
+      expansion_service=None):
+    """Initialize a StorageWriteToBigQuery transform.
+
+    :param table:
+      Fully-qualified table ID specified as ``'PROJECT:DATASET.TABLE'``.
+    :param create_disposition:
+      String specifying the strategy to take when the table doesn't
+      exist. Possible values are:
+      * ``'CREATE_IF_NEEDED'``: create if does not exist.
+      * ``'CREATE_NEVER'``: fail the write if does not exist.
+    :param write_disposition:
+      String specifying the strategy to take when the table already
+      contains data. Possible values are:
+      * ``'WRITE_TRUNCATE'``: delete existing rows.
+      * ``'WRITE_APPEND'``: add to existing rows.
+      * ``'WRITE_EMPTY'``: fail the write if table not empty.
+    :param triggering_frequency:
+      The time in seconds between write commits. Should only be specified
+      for streaming pipelines. Defaults to 5 seconds.
+    :param use_at_least_once:
+      Use at-least-once semantics. Is cheaper and provides lower latency,
+      but will potentially duplicate records.
+    :param expansion_service:
+      The address (host:port) of the expansion service. If no expansion
+      service is provided, will attempt to run the default GCP expansion
+      service.
+    """
+    super().__init__()
+    self._table = table
+    self._create_disposition = create_disposition
+    self._write_disposition = write_disposition
+    self._triggering_frequency = triggering_frequency
+    self._use_at_least_once = use_at_least_once
+    self._expansion_service = (
+        expansion_service or _default_io_expansion_service())
+    self.schematransform_config = SchemaAwareExternalTransform.discover_config(
+        self._expansion_service, self.URN)
+
+  def expand(self, input):
+    opts = input.pipeline.options.view_as(StandardOptions)
+    # TODO(https://github.com/apache/beam/issues/21307): Add support for
+    # OnWindowExpiration to more runners. Storage Write API requires
+    # `beam:requirement:pardo:on_window_expiration:v1` when unbounded
+    streaming_runners = ['DataflowRunner', 'TestDataflowRunner']

Review Comment:
   Sure SGTM
   
   > Also, any idea what's broken for other runners in this case (for the sink) ?
   
   When I asked about the `beam:requirement:pardo:on_window_expiration:v1` requirement, Luke said he didn't know of any other runner currently supporting it (besides DataflowRunner)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451177320

   Run Java_Kafka_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451026901

   R: @chamikaramj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1125804087


##########
sdks/python/test-suites/xlang/build.gradle:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// This is a base file to set up cross language tests for different runners
+import org.apache.beam.gradle.BeamModulePlugin
+import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTaskCommon
+project.evaluationDependsOn(":sdks:python")
+
+// Set up cross language tests
+def envDir = project.project(":sdks:python").envdir
+def jobPort = BeamModulePlugin.getRandomPort()
+def tmpDir = System.getenv("TMPDIR") ?: System.getenv("WORKSPACE") ?: "/tmp"
+def pidFile = "${tmpDir}/local_job_service_main-${jobPort}.pid"
+
+def setupTask = project.tasks.register("fnApiJobServerSetup", Exec) {
+  dependsOn ':sdks:python:installGcpTest'
+
+  executable 'sh'
+  args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --job_port ${jobPort} --pid_file ${pidFile} --background --stdout_file ${tmpDir}/beam-fnapi-job-server.log"
+}
+
+def cleanupTask = project.tasks.register("fnApiJobServerCleanup", Exec) {
+  executable 'sh'
+  args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --pid_file ${pidFile} --stop"
+}
+
+// List of objects representing task metadata to create cross-language tasks from.
+// Each object contains the minimum relevant metadata.
+def xlangTasks = []
+
+// ******** Java GCP expansion service ********
+def gcpExpansionProject = project.project(':sdks:java:io:google-cloud-platform:expansion-service')

Review Comment:
   This is just for the GCP tests. Tests with other expansion services should make a new `CrossLanguageTaskCommon` object (which would have a different `expansionProjectPath`) and add that to the `xlangTasks` list. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468315421

   Run Python_Xlang_Gcp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1148659343


##########
sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java:
##########
@@ -70,10 +70,10 @@ public class ExpansionServiceSchemaTransformProviderTest {
 
   private static final Schema TEST_SCHEMATRANSFORM_CONFIG_SCHEMA =
       Schema.of(
-          Field.of("str1", FieldType.STRING),
-          Field.of("str2", FieldType.STRING),
           Field.of("int1", FieldType.INT32),
-          Field.of("int2", FieldType.INT32));
+          Field.of("int2", FieldType.INT32),
+          Field.of("str1", FieldType.STRING),
+          Field.of("str2", FieldType.STRING));

Review Comment:
   Schema needs to be in alphabetical order because tests use `TypedSchemaTransformProvider`, which returns a config schema with fields that are in alphabetical order. Recent commit that fixes encoding positions makes this required or else we end up with encoding error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485303813

   Run Python_Runners PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485650225

   Run SQL_Java17 PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485736827

   Run Java_Pulsar_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj merged pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj merged PR #25685:
URL: https://github.com/apache/beam/pull/25685


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1125786377


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -302,6 +302,35 @@ def chain_after(result):
 result.destination_copy_jobid_pairs <--> result['destination_copy_jobid_pairs']
 ```
 
+Writing with Storage Write API using Cross Language
+---------------------------------------------------
+After starting up an expansion service that contains the Java implementation

Review Comment:
   Agreed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1132907027


##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2353,6 +2395,98 @@ class BeamModulePlugin implements Plugin<Project> {
       }
     }
 
+    /** ***********************************************************************************************/
+    // Method to create the createCrossLanguageUsingJavaExpansionTask.
+    // The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter.
+    // This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service
+    project.ext.createCrossLanguageUsingJavaExpansionTask = {
+      // This task won't work if the python build file doesn't exist.
+      if (!project.project(":sdks:python").buildFile.exists()) {
+        System.err.println 'Python build file not found. Skipping createCrossLanguageUsingJavaExpansionTask.'
+        return
+      }
+      def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration()
+
+      project.evaluationDependsOn(":sdks:python")
+      project.evaluationDependsOn(config.expansionProjectPath)
+      project.evaluationDependsOn(":runners:core-construction-java")
+      project.evaluationDependsOn(":sdks:java:extensions:python")
+
+      // Setting up args to launch the expansion service
+      def envDir = project.project(":sdks:python").envdir
+      def pythonDir = project.project(":sdks:python").projectDir
+      def javaExpansionPort = getRandomPort()

Review Comment:
   Thanks for catching that, moved port acquisition to be right before expansion service launch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451028011

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485229360

   un Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485649680

   Run Spotless PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485736911

   Run Python_PVR_Flink PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1146260144


##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2307,6 +2397,94 @@ def __getitem__(self, key):
     return self.attributes[key].__get__(self, WriteResult)
 
 
+def _default_io_expansion_service(append_args=None):
+  return BeamJarExpansionService(
+      'sdks:java:io:google-cloud-platform:expansion-service:build',
+      append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+  """Writes data to BigQuery using Storage API.
+
+  Experimental; no backwards compatibility guarantees.
+  """
+  URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+  FAILED_ROWS = "FailedRows"
+  FAILED_ROWS_WITH_ERRORS = "FailedRowsWithErrors"
+
+  def __init__(
+      self,
+      table,
+      create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
+      write_disposition=BigQueryDisposition.WRITE_APPEND,
+      triggering_frequency=0,
+      use_at_least_once=False,
+      expansion_service=None):
+    """Initialize a StorageWriteToBigQuery transform.
+
+    :param table:
+      Fully-qualified table ID specified as ``'PROJECT:DATASET.TABLE'``.
+    :param create_disposition:
+      String specifying the strategy to take when the table doesn't
+      exist. Possible values are:
+      * ``'CREATE_IF_NEEDED'``: create if does not exist.
+      * ``'CREATE_NEVER'``: fail the write if does not exist.
+    :param write_disposition:
+      String specifying the strategy to take when the table already
+      contains data. Possible values are:
+      * ``'WRITE_TRUNCATE'``: delete existing rows.
+      * ``'WRITE_APPEND'``: add to existing rows.
+      * ``'WRITE_EMPTY'``: fail the write if table not empty.
+    :param triggering_frequency:
+      The time in seconds between write commits. Should only be specified
+      for streaming pipelines. Defaults to 5 seconds.
+    :param use_at_least_once:
+      Use at-least-once semantics. Is cheaper and provides lower latency,
+      but will potentially duplicate records.
+    :param expansion_service:
+      The address (host:port) of the expansion service. If no expansion
+      service is provided, will attempt to run the default GCP expansion
+      service.
+    """
+    super().__init__()
+    self._table = table
+    self._create_disposition = create_disposition
+    self._write_disposition = write_disposition
+    self._triggering_frequency = triggering_frequency
+    self._use_at_least_once = use_at_least_once
+    self._expansion_service = (
+        expansion_service or _default_io_expansion_service())
+    self.schematransform_config = SchemaAwareExternalTransform.discover_config(
+        self._expansion_service, self.URN)
+
+  def expand(self, input):
+    opts = input.pipeline.options.view_as(StandardOptions)
+    # TODO(https://github.com/apache/beam/issues/21307): Add support for
+    # OnWindowExpiration to more runners. Storage Write API requires
+    # `beam:requirement:pardo:on_window_expiration:v1` when unbounded
+    streaming_runners = ['DataflowRunner', 'TestDataflowRunner']

Review Comment:
   Sure SGTM
   
   > Also, any idea what's broken for other runners in this case (for the sink) ?
   When I asked about the `beam:requirement:pardo:on_window_expiration:v1` requirement, Luke said he didn't know of any other runner currently supporting it (besides DataflowRunner)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1481801678

   Run Whitespace PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468759178

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468825445

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466282261

   Run Python_Xlang_Gcp_Direct PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466649236

   Run Python_Xlang_Gcp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK

Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466244136

   Run Python_Xlang_Gcp_Dataflow PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org