You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/08 00:28:05 UTC

[GitHub] [beam] yirutang opened a new pull request #17038: feat: Add e2e test for BigQuery Write API beam connector

yirutang opened a new pull request #17038:
URL: https://github.com/apache/beam/pull/17038


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on pull request #17038: [BEAM-13747] Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
yirutang commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1072882092


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on pull request #17038: feat: Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
yirutang commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1062602070


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #17038: [BEAM-13747] Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1072914265






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #17038: feat: Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1068421740


   Run PostCommit_Java_Dataflow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #17038: [BEAM-13747] Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1071533227


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on pull request #17038: feat: Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
yirutang commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1061488384


   R: @reuvenlax 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on pull request #17038: feat: Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
yirutang commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1061489606


   Run PostCommit_Java_Dataflow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #17038: [BEAM-13747] Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #17038:
URL: https://github.com/apache/beam/pull/17038#discussion_r828338310



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link
+ * org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO#write(SerializableFunction)}. This test writes
+ * 30MB data to BQ and verify the written row count.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryIOStorageWriteIT {
+
+  private enum WriteMode {
+    EXACT_ONCE,
+    AT_LEAST_ONCE
+  };
+
+  private String project;
+  private static final String DATASET_ID = "big_query_storage";
+  private static final String TABLE_PREFIX = "storage_write_";
+
+  private BigQueryOptions bqOptions;
+  private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryStorageIOWriteIT");
+
+  private void setUpTestEnvironment(WriteMode writeMode) {
+    PipelineOptionsFactory.register(BigQueryOptions.class);
+    bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject(TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject());
+    bqOptions.setUseStorageWriteApi(true);
+    if (writeMode == WriteMode.AT_LEAST_ONCE) {
+      bqOptions.setUseStorageWriteApiAtLeastOnce(true);
+    }
+    bqOptions.setNumStorageWriteApiStreams(2);

Review comment:
       I think options "setNumStorageWriteApiStreams" and  "setStorageWriteApiTriggeringFrequencySec" are only intended for unbounded input (in streaming pipelines). @reuvenlax should this have resulted in a validation failure ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on pull request #17038: [BEAM-13747] Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
yirutang commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1072882188


   Run PostCommit_Java_Dataflow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on pull request #17038: feat: Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
yirutang commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1061490449


   R: @chamikaramj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on pull request #17038: feat: Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
yirutang commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1062056550


   Run PostCommit_Java_Dataflow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #17038: [BEAM-13747] Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1075610472






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #17038: feat: Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #17038:
URL: https://github.com/apache/beam/pull/17038#discussion_r826637152



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link
+ * org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO#write(SerializableFunction)}. This test writes
+ * 30MB data to BQ and verify the written row count.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryIOStorageWriteIT {
+
+  private enum WriteMode {
+    EXACT_ONCE,
+    AT_LEAST_ONCE
+  };
+
+  private String project;
+  private static final String DATASET_ID = "big_query_storage";
+  private static final String TABLE_PREFIX = "storage_write_";
+
+  private BigQueryOptions bqOptions;
+  private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryStorageIOWriteIT");
+
+  private void setUpTestEnvironment(WriteMode writeMode) {
+    PipelineOptionsFactory.register(BigQueryOptions.class);
+    bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject(TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject());
+    if (writeMode == WriteMode.EXACT_ONCE) {
+      bqOptions.setUseStorageWriteApi(true);
+    } else {
+      bqOptions.setUseStorageWriteApiAtLeastOnce(true);
+    }
+    bqOptions.setNumStorageWriteApiStreams(2);
+    bqOptions.setStorageWriteApiTriggeringFrequencySec(1);
+    project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+  }
+
+  static class FillRowFn extends DoFn<Long, TableRow> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(new TableRow().set("number", c.element()).set("str", "aaaaaaaaaa"));
+    }
+  }
+
+  private void runBigQueryIOStorageWritePipeline(int rowCount, WriteMode writeMode) {
+    String tableName = TABLE_PREFIX + System.currentTimeMillis();
+    TableSchema schema =
+        new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("number").setType("INTEGER"),
+                    new TableFieldSchema().setName("str").setType("STRING")));
+
+    Pipeline p = Pipeline.create(bqOptions);
+    p.apply("Input", GenerateSequence.from(0).to(rowCount))
+        .apply("GenerateMessage", ParDo.of(new FillRowFn()))
+        .apply(
+            "WriteToBQ",
+            BigQueryIO.writeTableRows()
+                .to(String.format("%s:%s.%s", project, DATASET_ID, tableName))

Review comment:
       Looking at the job executions, seems like tests ran using BATCH_LOADS mode for some reason.
   https://pantheon.corp.google.com/dataflow/jobs/us-central1/2022-03-09_17_22_34-16871784130096288394;step=?project=apache-beam-testing&pageState=(%22dfTime%22:(%22l%22:%22dfJobMaxTime%22))
   
   https://pantheon.corp.google.com/dataflow/jobs/us-central1/2022-03-09_17_22_34-16871784130096288394?project=apache-beam-testing&pageState=(%22dfTime%22:(%22l%22:%22dfJobMaxTime%22))
   
   
   Can you try setting following ?
   
   "withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)" or "withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #17038: [BEAM-13747] Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1071533227


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj merged pull request #17038: [BEAM-13747] Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
chamikaramj merged pull request #17038:
URL: https://github.com/apache/beam/pull/17038


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #17038: [BEAM-13747] Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1072590369


   Seems like pre-commit failures are related.
   
   * What went wrong:
   14:56:10 Execution failed for task ':sdks:java:io:google-cloud-platform:checkstyleTest'.
   14:56:10 > Checkstyle rule violations were found. See the report at: file:///home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Phrase@2/src/sdks/java/io/google-cloud-platform/build/reports/checkstyle/test.html
   14:56:10   Checkstyle files with violations: 1
   14:56:10   Checkstyle violations by severity: [error:1]


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on pull request #17038: feat: Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
yirutang commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1063516321


   Run PostCommit_Java_Dataflow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on pull request #17038: feat: Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
yirutang commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1061286702


   Run PostCommit_Java_Dataflow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on pull request #17038: feat: Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
yirutang commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1061487597


   Run PostCommit_Java_Dataflow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on pull request #17038: feat: Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
yirutang commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1062600158


   Run Java_PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on a change in pull request #17038: feat: Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
yirutang commented on a change in pull request #17038:
URL: https://github.com/apache/beam/pull/17038#discussion_r827366524



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link
+ * org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO#write(SerializableFunction)}. This test writes
+ * 30MB data to BQ and verify the written row count.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryIOStorageWriteIT {
+
+  private enum WriteMode {
+    EXACT_ONCE,
+    AT_LEAST_ONCE
+  };
+
+  private String project;
+  private static final String DATASET_ID = "big_query_storage";
+  private static final String TABLE_PREFIX = "storage_write_";
+
+  private BigQueryOptions bqOptions;
+  private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryStorageIOWriteIT");
+
+  private void setUpTestEnvironment(WriteMode writeMode) {
+    PipelineOptionsFactory.register(BigQueryOptions.class);
+    bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject(TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject());
+    if (writeMode == WriteMode.EXACT_ONCE) {
+      bqOptions.setUseStorageWriteApi(true);
+    } else {
+      bqOptions.setUseStorageWriteApiAtLeastOnce(true);
+    }
+    bqOptions.setNumStorageWriteApiStreams(2);
+    bqOptions.setStorageWriteApiTriggeringFrequencySec(1);
+    project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+  }
+
+  static class FillRowFn extends DoFn<Long, TableRow> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(new TableRow().set("number", c.element()).set("str", "aaaaaaaaaa"));
+    }
+  }
+
+  private void runBigQueryIOStorageWritePipeline(int rowCount, WriteMode writeMode) {
+    String tableName = TABLE_PREFIX + System.currentTimeMillis();
+    TableSchema schema =
+        new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("number").setType("INTEGER"),
+                    new TableFieldSchema().setName("str").setType("STRING")));
+
+    Pipeline p = Pipeline.create(bqOptions);
+    p.apply("Input", GenerateSequence.from(0).to(rowCount))
+        .apply("GenerateMessage", ParDo.of(new FillRowFn()))
+        .apply(
+            "WriteToBQ",
+            BigQueryIO.writeTableRows()
+                .to(String.format("%s:%s.%s", project, DATASET_ID, tableName))

Review comment:
       I thought by setting them at Line 67, it should use the write api? 

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link
+ * org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO#write(SerializableFunction)}. This test writes
+ * 30MB data to BQ and verify the written row count.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryIOStorageWriteIT {
+
+  private enum WriteMode {
+    EXACT_ONCE,
+    AT_LEAST_ONCE
+  };
+
+  private String project;
+  private static final String DATASET_ID = "big_query_storage";
+  private static final String TABLE_PREFIX = "storage_write_";
+
+  private BigQueryOptions bqOptions;
+  private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryStorageIOWriteIT");
+
+  private void setUpTestEnvironment(WriteMode writeMode) {
+    PipelineOptionsFactory.register(BigQueryOptions.class);
+    bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject(TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject());
+    if (writeMode == WriteMode.EXACT_ONCE) {
+      bqOptions.setUseStorageWriteApi(true);
+    } else {
+      bqOptions.setUseStorageWriteApiAtLeastOnce(true);
+    }
+    bqOptions.setNumStorageWriteApiStreams(2);
+    bqOptions.setStorageWriteApiTriggeringFrequencySec(1);
+    project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+  }
+
+  static class FillRowFn extends DoFn<Long, TableRow> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(new TableRow().set("number", c.element()).set("str", "aaaaaaaaaa"));
+    }
+  }
+
+  private void runBigQueryIOStorageWritePipeline(int rowCount, WriteMode writeMode) {
+    String tableName = TABLE_PREFIX + System.currentTimeMillis();
+    TableSchema schema =
+        new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("number").setType("INTEGER"),
+                    new TableFieldSchema().setName("str").setType("STRING")));
+
+    Pipeline p = Pipeline.create(bqOptions);
+    p.apply("Input", GenerateSequence.from(0).to(rowCount))
+        .apply("GenerateMessage", ParDo.of(new FillRowFn()))
+        .apply(
+            "WriteToBQ",
+            BigQueryIO.writeTableRows()
+                .to(String.format("%s:%s.%s", project, DATASET_ID, tableName))

Review comment:
       nvm fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on pull request #17038: feat: Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
yirutang commented on pull request #17038:
URL: https://github.com/apache/beam/pull/17038#issuecomment-1063172579


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yirutang commented on a change in pull request #17038: feat: Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
yirutang commented on a change in pull request #17038:
URL: https://github.com/apache/beam/pull/17038#discussion_r827367829



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link
+ * org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO#write(SerializableFunction)}. This test writes
+ * 30MB data to BQ and verify the written row count.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryIOStorageWriteIT {
+
+  private enum WriteMode {
+    EXACT_ONCE,
+    AT_LEAST_ONCE
+  };
+
+  private String project;
+  private static final String DATASET_ID = "big_query_storage";
+  private static final String TABLE_PREFIX = "storage_write_";
+
+  private BigQueryOptions bqOptions;
+  private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryStorageIOWriteIT");
+
+  private void setUpTestEnvironment(WriteMode writeMode) {
+    PipelineOptionsFactory.register(BigQueryOptions.class);
+    bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject(TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject());
+    if (writeMode == WriteMode.EXACT_ONCE) {
+      bqOptions.setUseStorageWriteApi(true);
+    } else {
+      bqOptions.setUseStorageWriteApiAtLeastOnce(true);
+    }
+    bqOptions.setNumStorageWriteApiStreams(2);
+    bqOptions.setStorageWriteApiTriggeringFrequencySec(1);
+    project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+  }
+
+  static class FillRowFn extends DoFn<Long, TableRow> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(new TableRow().set("number", c.element()).set("str", "aaaaaaaaaa"));
+    }
+  }
+
+  private void runBigQueryIOStorageWritePipeline(int rowCount, WriteMode writeMode) {
+    String tableName = TABLE_PREFIX + System.currentTimeMillis();
+    TableSchema schema =
+        new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("number").setType("INTEGER"),
+                    new TableFieldSchema().setName("str").setType("STRING")));
+
+    Pipeline p = Pipeline.create(bqOptions);
+    p.apply("Input", GenerateSequence.from(0).to(rowCount))
+        .apply("GenerateMessage", ParDo.of(new FillRowFn()))
+        .apply(
+            "WriteToBQ",
+            BigQueryIO.writeTableRows()
+                .to(String.format("%s:%s.%s", project, DATASET_ID, tableName))

Review comment:
       nvm fixed, according to: https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2584




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #17038: [BEAM-13747] Add e2e test for BigQuery Write API beam connector

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #17038:
URL: https://github.com/apache/beam/pull/17038#discussion_r830423492



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageWriteIT.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.bigquery.model.QueryResponse;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link
+ * org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO#write(SerializableFunction)}. This test writes
+ * 30MB data to BQ and verify the written row count.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryIOStorageWriteIT {
+
+  private enum WriteMode {
+    EXACT_ONCE,
+    AT_LEAST_ONCE
+  };
+
+  private String project;
+  private static final String DATASET_ID = "big_query_storage";
+  private static final String TABLE_PREFIX = "storage_write_";
+
+  private BigQueryOptions bqOptions;
+  private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryStorageIOWriteIT");
+
+  private void setUpTestEnvironment(WriteMode writeMode) {
+    PipelineOptionsFactory.register(BigQueryOptions.class);
+    bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject(TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject());
+    bqOptions.setUseStorageWriteApi(true);
+    if (writeMode == WriteMode.AT_LEAST_ONCE) {
+      bqOptions.setUseStorageWriteApiAtLeastOnce(true);
+    }
+    bqOptions.setNumStorageWriteApiStreams(2);

Review comment:
       Let's address this seperately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org