You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "ahmedabu98 (via GitHub)" <gi...@apache.org> on 2023/03/01 23:26:59 UTC
[GitHub] [beam] ahmedabu98 opened a new pull request, #25685: BigQuery Storage Write API wrapper for Python SDK
ahmedabu98 opened a new pull request, #25685:
URL: https://github.com/apache/beam/pull/25685
Implementing a wrapper for Python SDK that uses Java's [Storage API SchemaTransform](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java) to write to BigQuery
Fixes #21961
Also adding some support to more easily create cross language tests for a particular expansion service (ie GCP, kinesis, etc.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451038262
Run Python_Xlang_Gcp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1484216430
Run XVR_JavaUsingPython_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464645526
Run Python_Xlang_Gcp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1125780795
##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2353,6 +2395,98 @@ class BeamModulePlugin implements Plugin<Project> {
}
}
+ /** ***********************************************************************************************/
+ // Method to create the createCrossLanguageUsingJavaExpansionTask.
+ // The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter.
+ // This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service
+ project.ext.createCrossLanguageUsingJavaExpansionTask = {
+ // This task won't work if the python build file doesn't exist.
+ if (!project.project(":sdks:python").buildFile.exists()) {
+ System.err.println 'Python build file not found. Skipping createCrossLanguageUsingJavaExpansionTask.'
+ return
+ }
+ def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration()
+
+ project.evaluationDependsOn(":sdks:python")
+ project.evaluationDependsOn(config.expansionProjectPath)
+ project.evaluationDependsOn(":runners:core-construction-java")
+ project.evaluationDependsOn(":sdks:java:extensions:python")
+
+ // Setting up args to launch the expansion service
+ def envDir = project.project(":sdks:python").envdir
+ def pythonDir = project.project(":sdks:python").projectDir
+ def javaExpansionPort = getRandomPort()
+ def expansionJar = project.project(config.expansionProjectPath).buildTestExpansionServiceJar.archivePath
+ def javaClassLookupAllowlistFile = project.project(config.expansionProjectPath).projectDir.getPath()
+ def expansionServiceOpts = [
+ "group_id": project.name,
+ "java_expansion_service_jar": expansionJar,
+ "java_port": javaExpansionPort,
+ "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
+ ]
+ def serviceArgs = project.project(':sdks:python').mapToArgString(expansionServiceOpts)
+ def javaContainerSuffix
+ if (JavaVersion.current() == JavaVersion.VERSION_1_8) {
Review Comment:
Sounds good will set up a property in that class
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1125802640
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
return self.attributes[key].__get__(self, WriteResult)
+def _default_io_expansion_service(append_args=None):
+ return BeamJarExpansionService(
+ 'sdks:java:io:google-cloud-platform:expansion-service:build',
+ append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+ """Writes data to BigQuery using Storage API."""
+ URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+ FAILED_ROWS = "failed_rows"
+ FAILED_ROWS_WITH_ERRORS = "failed_rows_with_errors"
+
+ def __init__(
+ self,
+ table,
+ create_disposition="",
+ write_disposition="",
+ triggering_frequency=0,
+ use_at_least_once=False,
+ expansion_service=None):
+ """Initialize a StorageWriteToBigQuery transform.
+
+ :param table:
+ Fully-qualified table ID specified as ``'PROJECT:DATASET.TABLE'``.
+ :param create_disposition:
+ String specifying the strategy to take when the table doesn't
+ exist. Possible values are:
+ * ``'CREATE_IF_NEEDED'``: create if does not exist.
+ * ``'CREATE_NEVER'``: fail the write if does not exist.
+ :param write_disposition:
+ String specifying the strategy to take when the table already
+ contains data. Possible values are:
+ * ``'WRITE_TRUNCATE'``: delete existing rows.
+ * ``'WRITE_APPEND'``: add to existing rows.
+ * ``'WRITE_EMPTY'``: fail the write if table not empty.
+ :param triggering_frequency:
+ The time in seconds between write commits. Should only be specified
+ for streaming pipelines. Defaults to 5 seconds.
+ :param use_at_least_once:
+ Use at-least-once semantics. Is cheaper and provides lower latency,
+ but will potentially duplicate records.
+ :param expansion_service:
+ The address (host:port) of the expansion service. If no expansion
+ service is provided, will attempt to run the default GCP expansion
+ service.
+ """
+ super().__init__()
+ self._table = table
+ self._create_disposition = create_disposition
+ self._write_disposition = write_disposition
+ self._triggering_frequency = triggering_frequency
+ self._use_at_least_once = use_at_least_once
+ self._expansion_service = (
+ expansion_service or _default_io_expansion_service())
+ self.schematransform_config = SchemaAwareExternalTransform.discover_config(
+ self._expansion_service, self.URN)
+
+ def expand(self, input):
+ opts = input.pipeline.options.view_as(StandardOptions)
+ # TODO(https://github.com/apache/beam/issues/21307): Add support for
+ # OnWindowExpiration to more runners. Storage Write API requires
+ # `beam:requirement:pardo:on_window_expiration:v1` when unbounded
+ available_runners = ['DataflowRunner', 'TestDataflowRunner']
Review Comment:
This is meant to be for streaming use cases; `StorageApiWritesShardedRecords` has an [`OnWindowExpiration`](https://github.com/apache/beam/blob/4da602517292adcd9ffcd7cc0acb8b0c1155aa02/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java#L763) method, which is only supported by DataflowRunner. Trying to run this with another runner will result in:
```
ValueError: Unable to run pipeline with requirement: beam:requirement:pardo:on_window_expiration:v1
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464713457
Run Python_Xlang_Gcp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466282833
Run Python_Xlang_Gcp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1481801177
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485648796
Run SQL_Java11 PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485648138
Run SQL PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485736582
Run Python_Xlang_Gcp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485685452
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485228244
Run Python_Xlang_Gcp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 closed pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 closed pull request #25685: BigQuery Storage Write API wrapper for Python SDK
URL: https://github.com/apache/beam/pull/25685
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468315654
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468637048
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451041971
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468650240
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468980715
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485647843
Run Java_GCP_IO_Direct PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464645329
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1125780995
##########
sdks/java/io/google-cloud-platform/expansion-service/build.gradle:
##########
@@ -45,3 +46,24 @@ task runExpansionService (type: JavaExec) {
classpath = sourceSets.test.runtimeClasspath
args = [project.findProperty("constructionService.port") ?: "8097"]
}
+
+task buildTestExpansionServiceJar(type: ShadowJar) {
Review Comment:
Sure will try this out
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java:
##########
@@ -102,53 +120,61 @@ public void testInvalidConfig() {
public PCollectionRowTuple runWithConfig(
BigQueryStorageWriteApiSchemaTransformConfiguration config) {
+ return runWithConfig(config, ROWS);
+ }
+
+ public PCollectionRowTuple runWithConfig(
+ BigQueryStorageWriteApiSchemaTransformConfiguration config, List<Row> inputRows) {
BigQueryStorageWriteApiSchemaTransformProvider provider =
new BigQueryStorageWriteApiSchemaTransformProvider();
BigQueryStorageWriteApiPCollectionRowTupleTransform writeRowTupleTransform =
(BigQueryStorageWriteApiPCollectionRowTupleTransform)
provider.from(config).buildTransform();
- List<Row> testRows =
- Arrays.asList(
- Row.withSchema(SCHEMA)
- .withFieldValue("name", "a")
- .withFieldValue("number", 1L)
- .withFieldValue("dt", LocalDateTime.parse("2000-01-01T00:00:00"))
- .build(),
- Row.withSchema(SCHEMA)
- .withFieldValue("name", "b")
- .withFieldValue("number", 2L)
- .withFieldValue("dt", LocalDateTime.parse("2000-01-02T00:00:00"))
- .build(),
- Row.withSchema(SCHEMA)
- .withFieldValue("name", "c")
- .withFieldValue("number", 3L)
- .withFieldValue("dt", LocalDateTime.parse("2000-01-03T00:00:00"))
- .build());
-
writeRowTupleTransform.setBigQueryServices(fakeBigQueryServices);
String tag = provider.inputCollectionNames().get(0);
- PCollection<Row> rows = p.apply(Create.of(testRows).withRowSchema(SCHEMA));
+ PCollection<Row> rows = p.apply(Create.of(inputRows).withRowSchema(SCHEMA));
PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows);
PCollectionRowTuple result = input.apply(writeRowTupleTransform);
return result;
}
+ public Boolean rowsEquals(List<Row> expectedRows, List<TableRow> actualRows) {
+ if (expectedRows.size() != actualRows.size()) {
+ return false;
+ }
+ for (int i = 0; i < expectedRows.size(); i++) {
+ // Actual rows may come back disordered. For each TableRow, find its "number" column value
Review Comment:
Yup
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464742773
Run Python_Xlang_Gcp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451038333
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451093225
Run Python_Transforms PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1125781060
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java:
##########
@@ -102,53 +120,61 @@ public void testInvalidConfig() {
public PCollectionRowTuple runWithConfig(
BigQueryStorageWriteApiSchemaTransformConfiguration config) {
+ return runWithConfig(config, ROWS);
+ }
+
+ public PCollectionRowTuple runWithConfig(
+ BigQueryStorageWriteApiSchemaTransformConfiguration config, List<Row> inputRows) {
BigQueryStorageWriteApiSchemaTransformProvider provider =
new BigQueryStorageWriteApiSchemaTransformProvider();
BigQueryStorageWriteApiPCollectionRowTupleTransform writeRowTupleTransform =
(BigQueryStorageWriteApiPCollectionRowTupleTransform)
provider.from(config).buildTransform();
- List<Row> testRows =
- Arrays.asList(
- Row.withSchema(SCHEMA)
- .withFieldValue("name", "a")
- .withFieldValue("number", 1L)
- .withFieldValue("dt", LocalDateTime.parse("2000-01-01T00:00:00"))
- .build(),
- Row.withSchema(SCHEMA)
- .withFieldValue("name", "b")
- .withFieldValue("number", 2L)
- .withFieldValue("dt", LocalDateTime.parse("2000-01-02T00:00:00"))
- .build(),
- Row.withSchema(SCHEMA)
- .withFieldValue("name", "c")
- .withFieldValue("number", 3L)
- .withFieldValue("dt", LocalDateTime.parse("2000-01-03T00:00:00"))
- .build());
-
writeRowTupleTransform.setBigQueryServices(fakeBigQueryServices);
String tag = provider.inputCollectionNames().get(0);
- PCollection<Row> rows = p.apply(Create.of(testRows).withRowSchema(SCHEMA));
+ PCollection<Row> rows = p.apply(Create.of(inputRows).withRowSchema(SCHEMA));
PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows);
PCollectionRowTuple result = input.apply(writeRowTupleTransform);
return result;
}
+ public Boolean rowsEquals(List<Row> expectedRows, List<TableRow> actualRows) {
+ if (expectedRows.size() != actualRows.size()) {
+ return false;
+ }
+ for (int i = 0; i < expectedRows.size(); i++) {
+ // Actual rows may come back disordered. For each TableRow, find its "number" column value
Review Comment:
Yup, changing to that
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464718335
Run Python_Xlang_Gcp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466648872
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468568650
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1484224164
Run XVR_PythonUsingJava_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485229698
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] codecov[bot] commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451082652
# [Codecov](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#25685](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c2ebf68) into [master](https://codecov.io/gh/apache/beam/commit/e1491a637db0389eed5089c63aabc0dd9a97b249?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e1491a6) will **decrease** coverage by `0.01%`.
> The diff coverage is `54.05%`.
```diff
@@ Coverage Diff @@
## master #25685 +/- ##
==========================================
- Coverage 72.81% 72.81% -0.01%
==========================================
Files 775 775
Lines 102928 102965 +37
==========================================
+ Hits 74948 74972 +24
- Misses 26526 26539 +13
Partials 1454 1454
```
| Flag | Coverage Δ | |
|---|---|---|
| python | `81.95% <54.05%> (-0.01%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `70.35% <36.00%> (-0.99%)` | :arrow_down: |
| [sdks/python/apache\_beam/transforms/external.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9leHRlcm5hbC5weQ==) | `80.51% <91.66%> (+1.77%)` | :arrow_up: |
| [.../python/apache\_beam/testing/test\_stream\_service.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy90ZXN0X3N0cmVhbV9zZXJ2aWNlLnB5) | `88.09% <0.00%> (-4.77%)` | :arrow_down: |
| [...che\_beam/runners/interactive/interactive\_runner.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9ydW5uZXIucHk=) | `90.50% <0.00%> (-1.27%)` | :arrow_down: |
| [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `94.10% <0.00%> (-0.36%)` | :arrow_down: |
| [...s/python/apache\_beam/testing/synthetic\_pipeline.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdGVzdGluZy9zeW50aGV0aWNfcGlwZWxpbmUucHk=) | `82.04% <0.00%> (-0.25%)` | :arrow_down: |
| [sdks/python/apache\_beam/runners/common.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9jb21tb24ucHk=) | `88.44% <0.00%> (+0.11%)` | :arrow_up: |
| [...on/apache\_beam/runners/dataflow/dataflow\_runner.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9kYXRhZmxvd19ydW5uZXIucHk=) | `81.88% <0.00%> (+0.14%)` | :arrow_up: |
| [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.46% <0.00%> (+0.15%)` | :arrow_up: |
| [sdks/python/apache\_beam/transforms/combiners.py](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb21iaW5lcnMucHk=) | `93.43% <0.00%> (+0.38%)` | :arrow_up: |
| ... and [2 more](https://codecov.io/gh/apache/beam/pull/25685?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
:mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] chamikaramj commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1124960350
##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2353,6 +2395,98 @@ class BeamModulePlugin implements Plugin<Project> {
}
}
+ /** ***********************************************************************************************/
+ // Method to create the createCrossLanguageUsingJavaExpansionTask.
+ // The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter.
+ // This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service
+ project.ext.createCrossLanguageUsingJavaExpansionTask = {
+ // This task won't work if the python build file doesn't exist.
+ if (!project.project(":sdks:python").buildFile.exists()) {
+ System.err.println 'Python build file not found. Skipping createCrossLanguageUsingJavaExpansionTask.'
+ return
+ }
+ def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration()
+
+ project.evaluationDependsOn(":sdks:python")
+ project.evaluationDependsOn(config.expansionProjectPath)
+ project.evaluationDependsOn(":runners:core-construction-java")
+ project.evaluationDependsOn(":sdks:java:extensions:python")
+
+ // Setting up args to launch the expansion service
+ def envDir = project.project(":sdks:python").envdir
+ def pythonDir = project.project(":sdks:python").projectDir
+ def javaExpansionPort = getRandomPort()
+ def expansionJar = project.project(config.expansionProjectPath).buildTestExpansionServiceJar.archivePath
+ def javaClassLookupAllowlistFile = project.project(config.expansionProjectPath).projectDir.getPath()
+ def expansionServiceOpts = [
+ "group_id": project.name,
+ "java_expansion_service_jar": expansionJar,
+ "java_port": javaExpansionPort,
+ "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
+ ]
+ def serviceArgs = project.project(':sdks:python').mapToArgString(expansionServiceOpts)
+ def javaContainerSuffix
+ if (JavaVersion.current() == JavaVersion.VERSION_1_8) {
Review Comment:
We should get supported versions from JavaTestProperties .SUPPORTED_VERSIONS instead of hardcoding here.
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
return self.attributes[key].__get__(self, WriteResult)
+def _default_io_expansion_service(append_args=None):
+ return BeamJarExpansionService(
+ 'sdks:java:io:google-cloud-platform:expansion-service:build',
+ append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
Review Comment:
Can we better integrate this transform into the existing bigquery sink API by adding a WriteMethod ?
https://github.com/apache/beam/blob/39cab94361bc44e8b48257385d37d8978ab7fc02/sdks/python/apache_beam/io/gcp/bigquery.py#L1753
##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2353,6 +2395,98 @@ class BeamModulePlugin implements Plugin<Project> {
}
}
+ /** ***********************************************************************************************/
+ // Method to create the createCrossLanguageUsingJavaExpansionTask.
+ // The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter.
+ // This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service
+ project.ext.createCrossLanguageUsingJavaExpansionTask = {
+ // This task won't work if the python build file doesn't exist.
+ if (!project.project(":sdks:python").buildFile.exists()) {
+ System.err.println 'Python build file not found. Skipping createCrossLanguageUsingJavaExpansionTask.'
+ return
+ }
+ def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration()
+
+ project.evaluationDependsOn(":sdks:python")
+ project.evaluationDependsOn(config.expansionProjectPath)
+ project.evaluationDependsOn(":runners:core-construction-java")
+ project.evaluationDependsOn(":sdks:java:extensions:python")
+
+ // Setting up args to launch the expansion service
+ def envDir = project.project(":sdks:python").envdir
+ def pythonDir = project.project(":sdks:python").projectDir
+ def javaExpansionPort = getRandomPort()
Review Comment:
There could be a race between port acquisition here and the expansion service startup later which could result in failures, specially when we start running a large number of cross-language test suites.
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java:
##########
@@ -102,53 +120,61 @@ public void testInvalidConfig() {
public PCollectionRowTuple runWithConfig(
BigQueryStorageWriteApiSchemaTransformConfiguration config) {
+ return runWithConfig(config, ROWS);
+ }
+
+ public PCollectionRowTuple runWithConfig(
+ BigQueryStorageWriteApiSchemaTransformConfiguration config, List<Row> inputRows) {
BigQueryStorageWriteApiSchemaTransformProvider provider =
new BigQueryStorageWriteApiSchemaTransformProvider();
BigQueryStorageWriteApiPCollectionRowTupleTransform writeRowTupleTransform =
(BigQueryStorageWriteApiPCollectionRowTupleTransform)
provider.from(config).buildTransform();
- List<Row> testRows =
- Arrays.asList(
- Row.withSchema(SCHEMA)
- .withFieldValue("name", "a")
- .withFieldValue("number", 1L)
- .withFieldValue("dt", LocalDateTime.parse("2000-01-01T00:00:00"))
- .build(),
- Row.withSchema(SCHEMA)
- .withFieldValue("name", "b")
- .withFieldValue("number", 2L)
- .withFieldValue("dt", LocalDateTime.parse("2000-01-02T00:00:00"))
- .build(),
- Row.withSchema(SCHEMA)
- .withFieldValue("name", "c")
- .withFieldValue("number", 3L)
- .withFieldValue("dt", LocalDateTime.parse("2000-01-03T00:00:00"))
- .build());
-
writeRowTupleTransform.setBigQueryServices(fakeBigQueryServices);
String tag = provider.inputCollectionNames().get(0);
- PCollection<Row> rows = p.apply(Create.of(testRows).withRowSchema(SCHEMA));
+ PCollection<Row> rows = p.apply(Create.of(inputRows).withRowSchema(SCHEMA));
PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows);
PCollectionRowTuple result = input.apply(writeRowTupleTransform);
return result;
}
+ public Boolean rowsEquals(List<Row> expectedRows, List<TableRow> actualRows) {
+ if (expectedRows.size() != actualRows.size()) {
+ return false;
+ }
+ for (int i = 0; i < expectedRows.size(); i++) {
+ // Actual rows may come back disordered. For each TableRow, find its "number" column value
Review Comment:
You mean "out of order" ?
##########
sdks/python/test-suites/xlang/build.gradle:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// This is a base file to set up cross language tests for different runners
+import org.apache.beam.gradle.BeamModulePlugin
+import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTaskCommon
+project.evaluationDependsOn(":sdks:python")
+
+// Set up cross language tests
+def envDir = project.project(":sdks:python").envdir
+def jobPort = BeamModulePlugin.getRandomPort()
+def tmpDir = System.getenv("TMPDIR") ?: System.getenv("WORKSPACE") ?: "/tmp"
+def pidFile = "${tmpDir}/local_job_service_main-${jobPort}.pid"
+
+def setupTask = project.tasks.register("fnApiJobServerSetup", Exec) {
+ dependsOn ':sdks:python:installGcpTest'
+
+ executable 'sh'
+ args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --job_port ${jobPort} --pid_file ${pidFile} --background --stdout_file ${tmpDir}/beam-fnapi-job-server.log"
+}
+
+def cleanupTask = project.tasks.register("fnApiJobServerCleanup", Exec) {
+ executable 'sh'
+ args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --pid_file ${pidFile} --stop"
+}
+
+// List of objects representing task metadata to create cross-language tasks from.
+// Each object contains the minimum relevant metadata.
+def xlangTasks = []
+
+// ******** Java GCP expansion service ********
+def gcpExpansionProject = project.project(':sdks:java:io:google-cloud-platform:expansion-service')
Review Comment:
Seems like we are hardcoding the "google-cloud-platform:expansion-service" to all Python x-lang tests ?
This should be specific to GCP tests instead.
##########
sdks/java/io/google-cloud-platform/expansion-service/build.gradle:
##########
@@ -45,3 +46,24 @@ task runExpansionService (type: JavaExec) {
classpath = sourceSets.test.runtimeClasspath
args = [project.findProperty("constructionService.port") ?: "8097"]
}
+
+task buildTestExpansionServiceJar(type: ShadowJar) {
Review Comment:
Is this needed ?
We already build and release a Google Cloud Platform jar: https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-google-cloud-platform-expansion-service/2.45.0
So can we just build the jar using the _sdks:java:io:google-cloud-platform:expansion-service:shadowJar_ target and use that ?
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -302,6 +302,35 @@ def chain_after(result):
result.destination_copy_jobid_pairs <--> result['destination_copy_jobid_pairs']
```
+Writing with Storage Write API using Cross Language
+---------------------------------------------------
+After starting up an expansion service that contains the Java implementation
Review Comment:
Users should be able to just use the default expansion service, right ? Starting up a custom expansion service is a more advanced use-case which probably should not be mentioned here for clarity.
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
return self.attributes[key].__get__(self, WriteResult)
+def _default_io_expansion_service(append_args=None):
+ return BeamJarExpansionService(
+ 'sdks:java:io:google-cloud-platform:expansion-service:build',
+ append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+ """Writes data to BigQuery using Storage API."""
+ URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+ FAILED_ROWS = "failed_rows"
+ FAILED_ROWS_WITH_ERRORS = "failed_rows_with_errors"
+
+ def __init__(
+ self,
+ table,
+ create_disposition="",
Review Comment:
Please make sure default values are set to valid values. Many users may try to use the BQ sink without overriding the optional arguments.
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
return self.attributes[key].__get__(self, WriteResult)
+def _default_io_expansion_service(append_args=None):
+ return BeamJarExpansionService(
+ 'sdks:java:io:google-cloud-platform:expansion-service:build',
+ append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+ """Writes data to BigQuery using Storage API."""
+ URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+ FAILED_ROWS = "failed_rows"
+ FAILED_ROWS_WITH_ERRORS = "failed_rows_with_errors"
+
+ def __init__(
+ self,
+ table,
+ create_disposition="",
+ write_disposition="",
+ triggering_frequency=0,
+ use_at_least_once=False,
+ expansion_service=None):
+ """Initialize a StorageWriteToBigQuery transform.
+
+ :param table:
+ Fully-qualified table ID specified as ``'PROJECT:DATASET.TABLE'``.
+ :param create_disposition:
+ String specifying the strategy to take when the table doesn't
+ exist. Possible values are:
+ * ``'CREATE_IF_NEEDED'``: create if does not exist.
+ * ``'CREATE_NEVER'``: fail the write if does not exist.
+ :param write_disposition:
+ String specifying the strategy to take when the table already
+ contains data. Possible values are:
+ * ``'WRITE_TRUNCATE'``: delete existing rows.
+ * ``'WRITE_APPEND'``: add to existing rows.
+ * ``'WRITE_EMPTY'``: fail the write if table not empty.
+ :param triggering_frequency:
+ The time in seconds between write commits. Should only be specified
+ for streaming pipelines. Defaults to 5 seconds.
+ :param use_at_least_once:
+ Use at-least-once semantics. Is cheaper and provides lower latency,
+ but will potentially duplicate records.
+ :param expansion_service:
+ The address (host:port) of the expansion service. If no expansion
+ service is provided, will attempt to run the default GCP expansion
+ service.
+ """
+ super().__init__()
+ self._table = table
+ self._create_disposition = create_disposition
+ self._write_disposition = write_disposition
+ self._triggering_frequency = triggering_frequency
+ self._use_at_least_once = use_at_least_once
+ self._expansion_service = (
+ expansion_service or _default_io_expansion_service())
+ self.schematransform_config = SchemaAwareExternalTransform.discover_config(
+ self._expansion_service, self.URN)
+
+ def expand(self, input):
+ opts = input.pipeline.options.view_as(StandardOptions)
+ # TODO(https://github.com/apache/beam/issues/21307): Add support for
+ # OnWindowExpiration to more runners. Storage Write API requires
+ # `beam:requirement:pardo:on_window_expiration:v1` when unbounded
+ available_runners = ['DataflowRunner', 'TestDataflowRunner']
Review Comment:
I don't see the Java implementation limited to Dataflow this way. Why do we just want the limit the Python x-lang version ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1125786706
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
return self.attributes[key].__get__(self, WriteResult)
+def _default_io_expansion_service(append_args=None):
+ return BeamJarExpansionService(
+ 'sdks:java:io:google-cloud-platform:expansion-service:build',
+ append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
Review Comment:
Yup, good idea
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468722840
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466244263
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468375360
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485530342
Run Python_Xlang_Gcp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485773882
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1146333450
##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -103,6 +107,19 @@
_DATASET_PATTERN = r'\w{1,1024}'
_TABLE_PATTERN = r'[\p{L}\p{M}\p{N}\p{Pc}\p{Pd}\p{Zs}$]{1,1024}'
+BIGQUERY_TYPE_TO_PYTHON_TYPE = {
+ "STRING": str,
+ "BOOL": bool,
+ "BOOLEAN": bool,
+ "BYTES": bytes,
+ "INT64": np.int64,
+ "INTEGER": np.int64,
+ "FLOAT64": np.float64,
+ "FLOAT": np.float64,
+ "NUMERIC": decimal.Decimal,
+ "TIMESTAMP": apache_beam.utils.timestamp.Timestamp,
Review Comment:
Issue: https://github.com/apache/beam/issues/25946
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1481801308
Run Python_Xlang_Gcp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1146282748
##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -103,6 +107,19 @@
_DATASET_PATTERN = r'\w{1,1024}'
_TABLE_PATTERN = r'[\p{L}\p{M}\p{N}\p{Pc}\p{Pd}\p{Zs}$]{1,1024}'
+BIGQUERY_TYPE_TO_PYTHON_TYPE = {
+ "STRING": str,
+ "BOOL": bool,
+ "BOOLEAN": bool,
+ "BYTES": bytes,
+ "INT64": np.int64,
+ "INTEGER": np.int64,
+ "FLOAT64": np.float64,
+ "FLOAT": np.float64,
+ "NUMERIC": decimal.Decimal,
+ "TIMESTAMP": apache_beam.utils.timestamp.Timestamp,
Review Comment:
Yup, I only included the BQ types that were supported by Beam types. I'll create a tracking issue for supporting other types. As it stands today, these are the only types users would be able to write with the Storage Write xlang connector.
The other BQ types can be represented as strings, so a possible workaround would be to add a new configuration field for the BQ schema and pass it to Java. In the Java SchemaTransform we can use the input schema instead of `.useBeamSchema()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485530463
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485685316
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485774013
Run Java_GCP_IO_Direct PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1132879510
##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2353,6 +2395,98 @@ class BeamModulePlugin implements Plugin<Project> {
}
}
+ /** ***********************************************************************************************/
+ // Method to create the createCrossLanguageUsingJavaExpansionTask.
+ // The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter.
+ // This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service
+ project.ext.createCrossLanguageUsingJavaExpansionTask = {
+ // This task won't work if the python build file doesn't exist.
+ if (!project.project(":sdks:python").buildFile.exists()) {
+ System.err.println 'Python build file not found. Skipping createCrossLanguageUsingJavaExpansionTask.'
+ return
+ }
+ def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration()
+
+ project.evaluationDependsOn(":sdks:python")
+ project.evaluationDependsOn(config.expansionProjectPath)
+ project.evaluationDependsOn(":runners:core-construction-java")
+ project.evaluationDependsOn(":sdks:java:extensions:python")
+
+ // Setting up args to launch the expansion service
+ def envDir = project.project(":sdks:python").envdir
+ def pythonDir = project.project(":sdks:python").projectDir
+ def javaExpansionPort = getRandomPort()
+ def expansionJar = project.project(config.expansionProjectPath).buildTestExpansionServiceJar.archivePath
+ def javaClassLookupAllowlistFile = project.project(config.expansionProjectPath).projectDir.getPath()
+ def expansionServiceOpts = [
+ "group_id": project.name,
+ "java_expansion_service_jar": expansionJar,
+ "java_port": javaExpansionPort,
+ "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
+ ]
+ def serviceArgs = project.project(':sdks:python').mapToArgString(expansionServiceOpts)
+ def javaContainerSuffix
+ if (JavaVersion.current() == JavaVersion.VERSION_1_8) {
Review Comment:
Not sure how I can access JavaTestProperties from BeamModulePlugin. Should I create a new property in `gradle.properties`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464657745
Run Python_PVR_Flink PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468612685
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468850915
Run Python_Xlang_Gcp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1469002126
R: @chamikaramj
Tests are passing now, ready for another review.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1469070854
Run Java_Examples_Dataflow PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1125787745
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
return self.attributes[key].__get__(self, WriteResult)
+def _default_io_expansion_service(append_args=None):
+ return BeamJarExpansionService(
+ 'sdks:java:io:google-cloud-platform:expansion-service:build',
+ append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+ """Writes data to BigQuery using Storage API."""
+ URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+ FAILED_ROWS = "failed_rows"
+ FAILED_ROWS_WITH_ERRORS = "failed_rows_with_errors"
+
+ def __init__(
+ self,
+ table,
+ create_disposition="",
Review Comment:
These defaults are already handled in the Java SchemaTransform implementation (e.g. an empty string for create disposition means no disposition will be explicitly set, so the default will be CREATE_IF_NEEDED). I can include the defaults here too (ie. `create_disposition="CREATE_IF_NEEDED"`) to make it more clear what they are.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451042351
Run Python_Xlang_Gcp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468980787
Run Python_Xlang_Gcp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464713674
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464731431
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464718440
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466648517
Run Python_Xlang_G
cp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466893064
> Seems like tests couldn't find the expansion service ?
Yeah it's only seeing the default value (see [debug println](https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Dataflow_PR/21/console#:~:text=JAVA%20EXPANSION%20PORT%3A%20%2D1)) that I set [here](https://github.com/apache/beam/pull/25685/files#diff-0435a83a413ec063bf7e682cadcd56776cd18fc878f197cc99a65fc231ef2047R2418). The setup task overwrites that value and launches an expansion service as expected, but the python task doesn't read the updated value. I'm not sure how scoping works between these tasks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485737051
Run SQL_Java17 PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485647530
Run Java PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1464731471
Run Python_Xlang_Gcp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] chamikaramj commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1137821416
##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -1514,6 +1531,42 @@ def process(self, element, *side_inputs):
yield (self.destination(element, *side_inputs), element)
+def beam_row_from_dict(row: dict, schema):
Review Comment:
Let's make sure this util is extensively covered by unit testing for various types etc.
##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -103,6 +107,19 @@
_DATASET_PATTERN = r'\w{1,1024}'
_TABLE_PATTERN = r'[\p{L}\p{M}\p{N}\p{Pc}\p{Pd}\p{Zs}$]{1,1024}'
+BIGQUERY_TYPE_TO_PYTHON_TYPE = {
+ "STRING": str,
+ "BOOL": bool,
+ "BOOLEAN": bool,
+ "BYTES": bytes,
+ "INT64": np.int64,
+ "INTEGER": np.int64,
+ "FLOAT64": np.float64,
+ "FLOAT": np.float64,
+ "NUMERIC": decimal.Decimal,
+ "TIMESTAMP": apache_beam.utils.timestamp.Timestamp,
Review Comment:
Seems like this doesn't cover all BQ types ? For example, Geography types [1]. We should add least add a TODO to fix this.
[1] https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#geography_type
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2300,6 +2332,91 @@ def __getitem__(self, key):
return self.attributes[key].__get__(self, WriteResult)
+def _default_io_expansion_service(append_args=None):
+ return BeamJarExpansionService(
+ 'sdks:java:io:google-cloud-platform:expansion-service:build',
+ append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
Review Comment:
Let's also hide this transform from the public API by renaming to "_StorageWriteToBigQuery".
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2307,6 +2397,94 @@ def __getitem__(self, key):
return self.attributes[key].__get__(self, WriteResult)
+def _default_io_expansion_service(append_args=None):
+ return BeamJarExpansionService(
+ 'sdks:java:io:google-cloud-platform:expansion-service:build',
+ append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+ """Writes data to BigQuery using Storage API.
+
+ Experimental; no backwards compatibility guarantees.
+ """
+ URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+ FAILED_ROWS = "FailedRows"
+ FAILED_ROWS_WITH_ERRORS = "FailedRowsWithErrors"
+
+ def __init__(
+ self,
+ table,
+ create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=BigQueryDisposition.WRITE_APPEND,
+ triggering_frequency=0,
+ use_at_least_once=False,
+ expansion_service=None):
+ """Initialize a StorageWriteToBigQuery transform.
+
+ :param table:
+ Fully-qualified table ID specified as ``'PROJECT:DATASET.TABLE'``.
+ :param create_disposition:
+ String specifying the strategy to take when the table doesn't
+ exist. Possible values are:
+ * ``'CREATE_IF_NEEDED'``: create if does not exist.
+ * ``'CREATE_NEVER'``: fail the write if does not exist.
+ :param write_disposition:
+ String specifying the strategy to take when the table already
+ contains data. Possible values are:
+ * ``'WRITE_TRUNCATE'``: delete existing rows.
+ * ``'WRITE_APPEND'``: add to existing rows.
+ * ``'WRITE_EMPTY'``: fail the write if table not empty.
+ :param triggering_frequency:
+ The time in seconds between write commits. Should only be specified
+ for streaming pipelines. Defaults to 5 seconds.
+ :param use_at_least_once:
+ Use at-least-once semantics. Is cheaper and provides lower latency,
+ but will potentially duplicate records.
+ :param expansion_service:
+ The address (host:port) of the expansion service. If no expansion
+ service is provided, will attempt to run the default GCP expansion
+ service.
+ """
+ super().__init__()
+ self._table = table
+ self._create_disposition = create_disposition
+ self._write_disposition = write_disposition
+ self._triggering_frequency = triggering_frequency
+ self._use_at_least_once = use_at_least_once
+ self._expansion_service = (
+ expansion_service or _default_io_expansion_service())
+ self.schematransform_config = SchemaAwareExternalTransform.discover_config(
+ self._expansion_service, self.URN)
+
+ def expand(self, input):
+ opts = input.pipeline.options.view_as(StandardOptions)
+ # TODO(https://github.com/apache/beam/issues/21307): Add support for
+ # OnWindowExpiration to more runners. Storage Write API requires
+ # `beam:requirement:pardo:on_window_expiration:v1` when unbounded
+ streaming_runners = ['DataflowRunner', 'TestDataflowRunner']
Review Comment:
I don't think we should add a runner check like this to I/O transforms. In Beam code we try to keep transform and runner logic separate. (so even if there's an issue, someone could fix it elsewhere and transforms will just work).
Also, any idea what's broken for other runners in this case (for the sink) ?
##########
sdks/python/test-suites/xlang/build.gradle:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// This is a base file to set up cross language tests for different runners
+import org.apache.beam.gradle.BeamModulePlugin
+import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTaskCommon
+project.evaluationDependsOn(":sdks:python")
+
+// Set up cross language tests
+def envDir = project.project(":sdks:python").envdir
+def jobPort = BeamModulePlugin.getRandomPort()
+def tmpDir = System.getenv("TMPDIR") ?: System.getenv("WORKSPACE") ?: "/tmp"
+def pidFile = "${tmpDir}/local_job_service_main-${jobPort}.pid"
+
+def setupTask = project.tasks.register("fnApiJobServerSetup", Exec) {
+ dependsOn ':sdks:python:installGcpTest'
+
+ executable 'sh'
+ args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --job_port ${jobPort} --pid_file ${pidFile} --background --stdout_file ${tmpDir}/beam-fnapi-job-server.log"
+}
+
+def cleanupTask = project.tasks.register("fnApiJobServerCleanup", Exec) {
+ executable 'sh'
+ args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --pid_file ${pidFile} --stop"
+}
+
+// List of objects representing task metadata to create cross-language tasks from.
+// Each object contains the minimum relevant metadata.
+def xlangTasks = []
+
+// ******** Java GCP expansion service ********
+def gcpExpansionProject = project.project(':sdks:java:io:google-cloud-platform:expansion-service')
Review Comment:
I see. Probably expand the comment above to clarify that.
##########
sdks/python/test-suites/xlang/build.gradle:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// This is a base file to set up cross language tests for different runners
+import org.apache.beam.gradle.BeamModulePlugin
+import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTaskCommon
+project.evaluationDependsOn(":sdks:python")
+
+// Set up cross language tests
+def envDir = project.project(":sdks:python").envdir
+def jobPort = BeamModulePlugin.getRandomPort()
+def tmpDir = System.getenv("TMPDIR") ?: System.getenv("WORKSPACE") ?: "/tmp"
+def pidFile = "${tmpDir}/local_job_service_main-${jobPort}.pid"
+
+def setupTask = project.tasks.register("fnApiJobServerSetup", Exec) {
+ dependsOn ':sdks:python:installGcpTest'
+
+ executable 'sh'
+ args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --job_port ${jobPort} --pid_file ${pidFile} --background --stdout_file ${tmpDir}/beam-fnapi-job-server.log"
+}
+
+def cleanupTask = project.tasks.register("fnApiJobServerCleanup", Exec) {
+ executable 'sh'
+ args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --pid_file ${pidFile} --stop"
+}
+
+// List of objects representing task metadata to create cross-language tasks from.
+// Each object contains the minimum relevant metadata.
+def xlangTasks = []
+
+// ******** Java GCP expansion service ********
+def gcpExpansionProject = project.project(':sdks:java:io:google-cloud-platform:expansion-service')
+// Properties that are common across runners.
+// Used to launch the expansion service, collect the right tests, and cleanup afterwards
+def gcpXlangCommon = new CrossLanguageTaskCommon().tap {
+ name = "gcpCrossLanguage"
+ expansionProjectPath = gcpExpansionProject.getPath()
+ collectMarker = "uses_gcp_java_expansion_service"
+ startJobServer = setupTask
+ cleanupJobServer = cleanupTask
+}
+xlangTasks.add(gcpXlangCommon)
+
+// ******** Java _____ expansion service ********
Review Comment:
Remove commented code ?
##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2353,6 +2395,98 @@ class BeamModulePlugin implements Plugin<Project> {
}
}
+ /** ***********************************************************************************************/
+ // Method to create the createCrossLanguageUsingJavaExpansionTask.
+ // The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter.
+ // This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service
+ project.ext.createCrossLanguageUsingJavaExpansionTask = {
+ // This task won't work if the python build file doesn't exist.
+ if (!project.project(":sdks:python").buildFile.exists()) {
+ System.err.println 'Python build file not found. Skipping createCrossLanguageUsingJavaExpansionTask.'
+ return
+ }
+ def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration()
+
+ project.evaluationDependsOn(":sdks:python")
+ project.evaluationDependsOn(config.expansionProjectPath)
+ project.evaluationDependsOn(":runners:core-construction-java")
+ project.evaluationDependsOn(":sdks:java:extensions:python")
+
+ // Setting up args to launch the expansion service
+ def envDir = project.project(":sdks:python").envdir
+ def pythonDir = project.project(":sdks:python").projectDir
+ def javaExpansionPort = getRandomPort()
+ def expansionJar = project.project(config.expansionProjectPath).buildTestExpansionServiceJar.archivePath
+ def javaClassLookupAllowlistFile = project.project(config.expansionProjectPath).projectDir.getPath()
+ def expansionServiceOpts = [
+ "group_id": project.name,
+ "java_expansion_service_jar": expansionJar,
+ "java_port": javaExpansionPort,
+ "java_expansion_service_allowlist_file": javaClassLookupAllowlistFile,
+ ]
+ def serviceArgs = project.project(':sdks:python').mapToArgString(expansionServiceOpts)
+ def javaContainerSuffix
+ if (JavaVersion.current() == JavaVersion.VERSION_1_8) {
Review Comment:
Ah, yeah, these are configs for Jenkins vs Gradle. Let's keep as is for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466880379
Seems like tests couldn't find the expansion service ?
https://ci-beam.apache.org/job/beam_PostCommit_Python_Xlang_Gcp_Dataflow_PR/21/testReport/junit/apache_beam.io.gcp.bigquery_write_it_test/BigQueryXlangStorageWriteIT/test_storage_write_beam_rows/
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468437202
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1146260144
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2307,6 +2397,94 @@ def __getitem__(self, key):
return self.attributes[key].__get__(self, WriteResult)
+def _default_io_expansion_service(append_args=None):
+ return BeamJarExpansionService(
+ 'sdks:java:io:google-cloud-platform:expansion-service:build',
+ append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+ """Writes data to BigQuery using Storage API.
+
+ Experimental; no backwards compatibility guarantees.
+ """
+ URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+ FAILED_ROWS = "FailedRows"
+ FAILED_ROWS_WITH_ERRORS = "FailedRowsWithErrors"
+
+ def __init__(
+ self,
+ table,
+ create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=BigQueryDisposition.WRITE_APPEND,
+ triggering_frequency=0,
+ use_at_least_once=False,
+ expansion_service=None):
+ """Initialize a StorageWriteToBigQuery transform.
+
+ :param table:
+ Fully-qualified table ID specified as ``'PROJECT:DATASET.TABLE'``.
+ :param create_disposition:
+ String specifying the strategy to take when the table doesn't
+ exist. Possible values are:
+ * ``'CREATE_IF_NEEDED'``: create if does not exist.
+ * ``'CREATE_NEVER'``: fail the write if does not exist.
+ :param write_disposition:
+ String specifying the strategy to take when the table already
+ contains data. Possible values are:
+ * ``'WRITE_TRUNCATE'``: delete existing rows.
+ * ``'WRITE_APPEND'``: add to existing rows.
+ * ``'WRITE_EMPTY'``: fail the write if table not empty.
+ :param triggering_frequency:
+ The time in seconds between write commits. Should only be specified
+ for streaming pipelines. Defaults to 5 seconds.
+ :param use_at_least_once:
+ Use at-least-once semantics. Is cheaper and provides lower latency,
+ but will potentially duplicate records.
+ :param expansion_service:
+ The address (host:port) of the expansion service. If no expansion
+ service is provided, will attempt to run the default GCP expansion
+ service.
+ """
+ super().__init__()
+ self._table = table
+ self._create_disposition = create_disposition
+ self._write_disposition = write_disposition
+ self._triggering_frequency = triggering_frequency
+ self._use_at_least_once = use_at_least_once
+ self._expansion_service = (
+ expansion_service or _default_io_expansion_service())
+ self.schematransform_config = SchemaAwareExternalTransform.discover_config(
+ self._expansion_service, self.URN)
+
+ def expand(self, input):
+ opts = input.pipeline.options.view_as(StandardOptions)
+ # TODO(https://github.com/apache/beam/issues/21307): Add support for
+ # OnWindowExpiration to more runners. Storage Write API requires
+ # `beam:requirement:pardo:on_window_expiration:v1` when unbounded
+ streaming_runners = ['DataflowRunner', 'TestDataflowRunner']
Review Comment:
Sure SGTM
> Also, any idea what's broken for other runners in this case (for the sink) ?
When I asked about the `beam:requirement:pardo:on_window_expiration:v1` requirement, Luke said he didn't know of any other runner currently supporting it (besides DataflowRunner)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451177320
Run Java_Kafka_IO_Direct PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451026901
R: @chamikaramj
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1125804087
##########
sdks/python/test-suites/xlang/build.gradle:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// This is a base file to set up cross language tests for different runners
+import org.apache.beam.gradle.BeamModulePlugin
+import static org.apache.beam.gradle.BeamModulePlugin.CrossLanguageTaskCommon
+project.evaluationDependsOn(":sdks:python")
+
+// Set up cross language tests
+def envDir = project.project(":sdks:python").envdir
+def jobPort = BeamModulePlugin.getRandomPort()
+def tmpDir = System.getenv("TMPDIR") ?: System.getenv("WORKSPACE") ?: "/tmp"
+def pidFile = "${tmpDir}/local_job_service_main-${jobPort}.pid"
+
+def setupTask = project.tasks.register("fnApiJobServerSetup", Exec) {
+ dependsOn ':sdks:python:installGcpTest'
+
+ executable 'sh'
+ args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --job_port ${jobPort} --pid_file ${pidFile} --background --stdout_file ${tmpDir}/beam-fnapi-job-server.log"
+}
+
+def cleanupTask = project.tasks.register("fnApiJobServerCleanup", Exec) {
+ executable 'sh'
+ args '-c', ". ${envDir}/bin/activate && python -m apache_beam.runners.portability.local_job_service_main --pid_file ${pidFile} --stop"
+}
+
+// List of objects representing task metadata to create cross-language tasks from.
+// Each object contains the minimum relevant metadata.
+def xlangTasks = []
+
+// ******** Java GCP expansion service ********
+def gcpExpansionProject = project.project(':sdks:java:io:google-cloud-platform:expansion-service')
Review Comment:
This is just for the GCP tests. Tests with other expansion services should make a new `CrossLanguageTaskCommon` object (which would have a different `expansionProjectPath`) and add that to the `xlangTasks` list.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468315421
Run Python_Xlang_Gcp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1148659343
##########
sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java:
##########
@@ -70,10 +70,10 @@ public class ExpansionServiceSchemaTransformProviderTest {
private static final Schema TEST_SCHEMATRANSFORM_CONFIG_SCHEMA =
Schema.of(
- Field.of("str1", FieldType.STRING),
- Field.of("str2", FieldType.STRING),
Field.of("int1", FieldType.INT32),
- Field.of("int2", FieldType.INT32));
+ Field.of("int2", FieldType.INT32),
+ Field.of("str1", FieldType.STRING),
+ Field.of("str2", FieldType.STRING));
Review Comment:
Schema needs to be in alphabetical order because tests use `TypedSchemaTransformProvider`, which returns a config schema with fields that are in alphabetical order. Recent commit that fixes encoding positions makes this required or else we end up with encoding error.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485303813
Run Python_Runners PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485650225
Run SQL_Java17 PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485736827
Run Java_Pulsar_IO_Direct PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] chamikaramj merged pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "chamikaramj (via GitHub)" <gi...@apache.org>.
chamikaramj merged PR #25685:
URL: https://github.com/apache/beam/pull/25685
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1125786377
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -302,6 +302,35 @@ def chain_after(result):
result.destination_copy_jobid_pairs <--> result['destination_copy_jobid_pairs']
```
+Writing with Storage Write API using Cross Language
+---------------------------------------------------
+After starting up an expansion service that contains the Java implementation
Review Comment:
Agreed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1132907027
##########
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy:
##########
@@ -2353,6 +2395,98 @@ class BeamModulePlugin implements Plugin<Project> {
}
}
+ /** ***********************************************************************************************/
+ // Method to create the createCrossLanguageUsingJavaExpansionTask.
+ // The method takes CrossLanguageUsingJavaExpansionConfiguration as parameter.
+ // This method creates a task that runs Python SDK pipeline tests that use Java transforms via an input expansion service
+ project.ext.createCrossLanguageUsingJavaExpansionTask = {
+ // This task won't work if the python build file doesn't exist.
+ if (!project.project(":sdks:python").buildFile.exists()) {
+ System.err.println 'Python build file not found. Skipping createCrossLanguageUsingJavaExpansionTask.'
+ return
+ }
+ def config = it ? it as CrossLanguageUsingJavaExpansionConfiguration : new CrossLanguageUsingJavaExpansionConfiguration()
+
+ project.evaluationDependsOn(":sdks:python")
+ project.evaluationDependsOn(config.expansionProjectPath)
+ project.evaluationDependsOn(":runners:core-construction-java")
+ project.evaluationDependsOn(":sdks:java:extensions:python")
+
+ // Setting up args to launch the expansion service
+ def envDir = project.project(":sdks:python").envdir
+ def pythonDir = project.project(":sdks:python").projectDir
+ def javaExpansionPort = getRandomPort()
Review Comment:
Thanks for catching that, moved port acquisition to be right before expansion service launch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] github-actions[bot] commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1451028011
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485229360
un Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485649680
Run Spotless PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1485736911
Run Python_PVR_Flink PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on code in PR #25685:
URL: https://github.com/apache/beam/pull/25685#discussion_r1146260144
##########
sdks/python/apache_beam/io/gcp/bigquery.py:
##########
@@ -2307,6 +2397,94 @@ def __getitem__(self, key):
return self.attributes[key].__get__(self, WriteResult)
+def _default_io_expansion_service(append_args=None):
+ return BeamJarExpansionService(
+ 'sdks:java:io:google-cloud-platform:expansion-service:build',
+ append_args=append_args)
+
+
+class StorageWriteToBigQuery(PTransform):
+ """Writes data to BigQuery using Storage API.
+
+ Experimental; no backwards compatibility guarantees.
+ """
+ URN = "beam:schematransform:org.apache.beam:bigquery_storage_write:v1"
+ FAILED_ROWS = "FailedRows"
+ FAILED_ROWS_WITH_ERRORS = "FailedRowsWithErrors"
+
+ def __init__(
+ self,
+ table,
+ create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=BigQueryDisposition.WRITE_APPEND,
+ triggering_frequency=0,
+ use_at_least_once=False,
+ expansion_service=None):
+ """Initialize a StorageWriteToBigQuery transform.
+
+ :param table:
+ Fully-qualified table ID specified as ``'PROJECT:DATASET.TABLE'``.
+ :param create_disposition:
+ String specifying the strategy to take when the table doesn't
+ exist. Possible values are:
+ * ``'CREATE_IF_NEEDED'``: create if does not exist.
+ * ``'CREATE_NEVER'``: fail the write if does not exist.
+ :param write_disposition:
+ String specifying the strategy to take when the table already
+ contains data. Possible values are:
+ * ``'WRITE_TRUNCATE'``: delete existing rows.
+ * ``'WRITE_APPEND'``: add to existing rows.
+ * ``'WRITE_EMPTY'``: fail the write if table not empty.
+ :param triggering_frequency:
+ The time in seconds between write commits. Should only be specified
+ for streaming pipelines. Defaults to 5 seconds.
+ :param use_at_least_once:
+ Use at-least-once semantics. Is cheaper and provides lower latency,
+ but will potentially duplicate records.
+ :param expansion_service:
+ The address (host:port) of the expansion service. If no expansion
+ service is provided, will attempt to run the default GCP expansion
+ service.
+ """
+ super().__init__()
+ self._table = table
+ self._create_disposition = create_disposition
+ self._write_disposition = write_disposition
+ self._triggering_frequency = triggering_frequency
+ self._use_at_least_once = use_at_least_once
+ self._expansion_service = (
+ expansion_service or _default_io_expansion_service())
+ self.schematransform_config = SchemaAwareExternalTransform.discover_config(
+ self._expansion_service, self.URN)
+
+ def expand(self, input):
+ opts = input.pipeline.options.view_as(StandardOptions)
+ # TODO(https://github.com/apache/beam/issues/21307): Add support for
+ # OnWindowExpiration to more runners. Storage Write API requires
+ # `beam:requirement:pardo:on_window_expiration:v1` when unbounded
+ streaming_runners = ['DataflowRunner', 'TestDataflowRunner']
Review Comment:
Sure SGTM
> Also, any idea what's broken for other runners in this case (for the sink) ?
When I asked about the `beam:requirement:pardo:on_window_expiration:v1` requirement, Luke said he didn't know of any other runner currently supporting it (besides DataflowRunner)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1481801678
Run Whitespace PreCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468759178
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1468825445
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466282261
Run Python_Xlang_Gcp_Direct PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466649236
Run Python_Xlang_Gcp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] ahmedabu98 commented on pull request #25685: BigQuery Storage Write API wrapper for Python SDK
Posted by "ahmedabu98 (via GitHub)" <gi...@apache.org>.
ahmedabu98 commented on PR #25685:
URL: https://github.com/apache/beam/pull/25685#issuecomment-1466244136
Run Python_Xlang_Gcp_Dataflow PostCommit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org