You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/04/04 14:12:00 UTC

[jira] [Work logged] (BEAM-13945) Update BQ connector to support new JSON type

     [ https://issues.apache.org/jira/browse/BEAM-13945?focusedWorklogId=752280&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-752280 ]

ASF GitHub Bot logged work on BEAM-13945:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Apr/22 14:11
            Start Date: 04/Apr/22 14:11
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on code in PR #17209:
URL: https://github.com/apache/beam/pull/17209#discussion_r841770762


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOJSONIT.java:
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@RunWith(JUnit4.class)
+public class BigQueryIOJSONIT {
+  private static final Logger LOG = LoggerFactory.getLogger(BigQueryIOJSONIT.class);
+
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
+  @Rule
+  public transient TestPipeline p_write = TestPipeline.create();
+
+  private BigQueryIOJSONOptions options;
+
+  private static String project;
+
+  private static final String DATASET_ID = "bq_jsontype_test_nodelete";
+
+  private static final String JSON_TYPE_TABLE_NAME = "json_data";
+
+  private static String JSON_TABLE_DESTINATION;
+
+  private static final TableSchema JSON_TYPE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(ImmutableList.of(
+              new TableFieldSchema().setName("country_code").setType("STRING"),
+              new TableFieldSchema().setName("country").setType("JSON")
+          ));
+
+  public static final String STORAGE_WRITE_TEST_TABLE = "storagewrite_test"
+      + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32);
+
+  private static final Map<String, String> JSON_TYPE_DATA = generateCountryData(false);
+
+  // Convert PCollection of TableRows to a PCollection of KV JSON string pairs
+  static class TableRowToJSONStringFn extends DoFn<TableRow, KV<String, String>> {
+    @ProcessElement
+    public void processElement(@Element TableRow row, OutputReceiver<KV<String, String>> out){
+      String country_code = row.get("country_code").toString();
+      String country = row.get("country").toString();
+
+      out.output(KV.of(country_code, country));
+    }
+  }
+
+  // Compare PCollection input with expected results.
+  static class CompareJSON implements SerializableFunction<Iterable<KV<String, String>>, Void> {
+    Map<String, String> expected;
+    public CompareJSON(Map<String, String> expected){
+      this.expected = expected;
+    }
+
+    @Override
+    public Void apply(Iterable<KV<String, String>> input) throws RuntimeException {
+      int counter = 0;
+
+      // Iterate through input list and convert each String to JsonElement
+      // Compare with expected result JsonElements
+      for(KV<String, String> actual: input){
+        String key = actual.getKey();
+
+        if(!expected.containsKey(key)){
+          throw new NoSuchElementException(String.format(
+              "Unexpected key '%s' found in input but does not exist in expected results.", key));
+        }
+        String jsonStringActual = actual.getValue();
+        JsonElement jsonActual = JsonParser.parseString(jsonStringActual);
+
+        String jsonStringExpected = expected.get(key);
+        JsonElement jsonExpected = JsonParser.parseString(jsonStringExpected);
+
+        assertEquals(jsonExpected, jsonActual);
+        counter += 1;
+      }
+      if(counter != expected.size()){
+        throw new RuntimeException(String.format(
+            "Expected %d elements but got %d elements.", expected.size(), counter));
+      }
+      return null;

Review Comment:
   Nit: can we drop the return type and drop this return ?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOJSONIT.java:
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@RunWith(JUnit4.class)
+public class BigQueryIOJSONIT {
+  private static final Logger LOG = LoggerFactory.getLogger(BigQueryIOJSONIT.class);
+
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
+  @Rule
+  public transient TestPipeline p_write = TestPipeline.create();
+
+  private BigQueryIOJSONOptions options;
+
+  private static String project;
+
+  private static final String DATASET_ID = "bq_jsontype_test_nodelete";
+
+  private static final String JSON_TYPE_TABLE_NAME = "json_data";
+
+  private static String JSON_TABLE_DESTINATION;
+
+  private static final TableSchema JSON_TYPE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(ImmutableList.of(
+              new TableFieldSchema().setName("country_code").setType("STRING"),

Review Comment:
   Can we include other types in the test, for example, other primitives, an array type, a struct type ?



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOJSONIT.java:
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.

Review Comment:
   I think we had issues running BQ tests with DirectRunner. So you might have to only enable this for Dataflow test suites. You can do this by disabling at the following location.
   
   https://github.com/apache/beam/blob/8d5ca41992b3f4fda75e678fa5d517e5333bbb8a/sdks/java/io/google-cloud-platform/build.gradle#L184
   
   Test will be skipped by "beam_PostCommit_Java" but will be picked by "beam_PostCommit_Java_Examples_Dataflow*" test suites.



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOJSONIT.java:
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@RunWith(JUnit4.class)
+public class BigQueryIOJSONIT {
+  private static final Logger LOG = LoggerFactory.getLogger(BigQueryIOJSONIT.class);
+
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
+  @Rule
+  public transient TestPipeline p_write = TestPipeline.create();
+
+  private BigQueryIOJSONOptions options;
+
+  private static String project;
+
+  private static final String DATASET_ID = "bq_jsontype_test_nodelete";
+
+  private static final String JSON_TYPE_TABLE_NAME = "json_data";
+
+  private static String JSON_TABLE_DESTINATION;
+
+  private static final TableSchema JSON_TYPE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(ImmutableList.of(
+              new TableFieldSchema().setName("country_code").setType("STRING"),
+              new TableFieldSchema().setName("country").setType("JSON")
+          ));
+
+  public static final String STORAGE_WRITE_TEST_TABLE = "storagewrite_test"
+      + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32);
+
+  private static final Map<String, String> JSON_TYPE_DATA = generateCountryData(false);
+
+  // Convert PCollection of TableRows to a PCollection of KV JSON string pairs
+  static class TableRowToJSONStringFn extends DoFn<TableRow, KV<String, String>> {
+    @ProcessElement
+    public void processElement(@Element TableRow row, OutputReceiver<KV<String, String>> out){
+      String country_code = row.get("country_code").toString();
+      String country = row.get("country").toString();
+
+      out.output(KV.of(country_code, country));
+    }
+  }
+
+  // Compare PCollection input with expected results.
+  static class CompareJSON implements SerializableFunction<Iterable<KV<String, String>>, Void> {
+    Map<String, String> expected;
+    public CompareJSON(Map<String, String> expected){
+      this.expected = expected;
+    }
+
+    @Override
+    public Void apply(Iterable<KV<String, String>> input) throws RuntimeException {
+      int counter = 0;
+
+      // Iterate through input list and convert each String to JsonElement
+      // Compare with expected result JsonElements
+      for(KV<String, String> actual: input){
+        String key = actual.getKey();
+
+        if(!expected.containsKey(key)){
+          throw new NoSuchElementException(String.format(
+              "Unexpected key '%s' found in input but does not exist in expected results.", key));
+        }
+        String jsonStringActual = actual.getValue();
+        JsonElement jsonActual = JsonParser.parseString(jsonStringActual);
+
+        String jsonStringExpected = expected.get(key);
+        JsonElement jsonExpected = JsonParser.parseString(jsonStringExpected);
+
+        assertEquals(jsonExpected, jsonActual);
+        counter += 1;
+      }
+      if(counter != expected.size()){
+        throw new RuntimeException(String.format(
+            "Expected %d elements but got %d elements.", expected.size(), counter));
+      }
+      return null;
+    }
+  }
+
+  public void runTestWrite(BigQueryIOJSONOptions options){
+    List<TableRow> rowsToWrite = new ArrayList<>();
+    for(Map.Entry<String, String> element: JSON_TYPE_DATA.entrySet()){
+      rowsToWrite.add(new TableRow()
+          .set("country_code", element.getKey())
+          .set("country", element.getValue()));
+    }
+
+    p_write
+        .apply("Create Elements", Create.of(rowsToWrite))
+        .apply("Write To BigQuery",
+            BigQueryIO.writeTableRows()
+                .to(options.getOutput())
+                .withSchema(JSON_TYPE_TABLE_SCHEMA)
+                .withCreateDisposition(options.getCreateDisposition())
+                .withMethod(options.getWriteMethod()));
+    p_write.run().waitUntilFinish();
+
+    options.setReadMethod(TypedRead.Method.EXPORT);
+    readAndValidateRows(options, JSON_TYPE_DATA);
+  }
+
+  // reads TableRows from BigQuery and validates JSON Strings
+  // expectedJsonResults Strings must be in valid json format
+  public void readAndValidateRows(BigQueryIOJSONOptions options, Map<String, String> expectedResults){
+    TypedRead<TableRow> bigqueryIO =
+        BigQueryIO.readTableRows().withMethod(options.getReadMethod());
+
+    // read from input query or from table
+    if(!options.getQuery().isEmpty()) {
+      bigqueryIO = bigqueryIO.fromQuery(options.getQuery()).usingStandardSql();
+    } else {
+      bigqueryIO = bigqueryIO.from(options.getInput());
+    }
+
+    PCollection<KV<String, String>> jsonKVPairs = p
+        .apply("Read rows", bigqueryIO)
+        .apply("Convert to KV JSON Strings", ParDo.of(new TableRowToJSONStringFn()));
+
+    PAssert.that(jsonKVPairs).satisfies(new CompareJSON(expectedResults));
+
+    p.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testDirectRead() throws Exception {
+    LOG.info("Testing DIRECT_READ read method with JSON data");
+    options = TestPipeline.testingPipelineOptions().as(BigQueryIOJSONOptions.class);
+    options.setReadMethod(TypedRead.Method.DIRECT_READ);
+    options.setInput(JSON_TABLE_DESTINATION);
+
+    readAndValidateRows(options, JSON_TYPE_DATA);
+  }
+
+  @Test
+  public void testExportRead() throws Exception {
+    LOG.info("Testing EXPORT read method with JSON data");
+    options = TestPipeline.testingPipelineOptions().as(BigQueryIOJSONOptions.class);
+    options.setReadMethod(TypedRead.Method.EXPORT);
+    options.setInput(JSON_TABLE_DESTINATION);
+
+    readAndValidateRows(options, JSON_TYPE_DATA);
+  }
+
+  @Test
+  public void testQueryRead() throws Exception {
+    LOG.info("Testing querying JSON data with DIRECT_READ read method");
+
+    options = TestPipeline.testingPipelineOptions().as(BigQueryIOJSONOptions.class);
+    options.setReadMethod(TypedRead.Method.DIRECT_READ);
+    options.setQuery(
+        String.format("SELECT country_code, country.cities AS country FROM "
+            + "`%s.%s.%s`", project, DATASET_ID, JSON_TYPE_TABLE_NAME));
+
+    // get nested json objects from static data
+    Map<String, String> expected = generateCountryData(true);
+
+    readAndValidateRows(options, expected);
+  }
+
+  @Test
+  public void testStorageWrite() throws Exception{
+    LOG.info("Testing writing JSON data with Storage API");
+
+    options = TestPipeline.testingPipelineOptions().as(BigQueryIOJSONOptions.class);
+    options.setWriteMethod(Write.Method.STORAGE_WRITE_API);
+
+    String storage_destination = String.format("%s:%s.%s", project, DATASET_ID, STORAGE_WRITE_TEST_TABLE);
+    options.setOutput(storage_destination);
+    options.setInput(storage_destination);
+
+    runTestWrite(options);
+  }
+
+  @Test
+  public void testLegacyStreamingWrite() throws Exception{

Review Comment:
   Let's document why this would not work for BATCH_LOADS. For example, in BigQueryIO Java docs and/or following.
   
   https://beam.apache.org/documentation/io/built-in/google-bigquery/





Issue Time Tracking
-------------------

    Worklog Id:     (was: 752280)
    Time Spent: 40m  (was: 0.5h)

> Update BQ connector to support new JSON type
> --------------------------------------------
>
>                 Key: BEAM-13945
>                 URL: https://issues.apache.org/jira/browse/BEAM-13945
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-gcp
>            Reporter: Chamikara Madhusanka Jayalath
>            Assignee: Ahmed Abualsaud
>            Priority: P2
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> BQ has a new JSON type that is defined here: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type
> We should update Beam BQ Java and Python connectors to support that for various read methods (export jobs, storage API) and write methods (load jobs, streaming inserts, storage API).
> We should also add integration tests that exercise reading from /writing to BQ tables with columns that has JSON type.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)