You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/12 22:47:26 UTC

[GitHub] [beam] ahmedabu98 commented on a diff in pull request #17209: [BEAM-13945] add json type support for java bigquery connector

ahmedabu98 commented on code in PR #17209:
URL: https://github.com/apache/beam/pull/17209#discussion_r848929455


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOJSONIT.java:
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@RunWith(JUnit4.class)
+public class BigQueryIOJSONIT {
+  private static final Logger LOG = LoggerFactory.getLogger(BigQueryIOJSONIT.class);
+
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
+  @Rule
+  public transient TestPipeline p_write = TestPipeline.create();
+
+  private BigQueryIOJSONOptions options;
+
+  private static String project;
+
+  private static final String DATASET_ID = "bq_jsontype_test_nodelete";
+
+  private static final String JSON_TYPE_TABLE_NAME = "json_data";
+
+  private static String JSON_TABLE_DESTINATION;
+
+  private static final TableSchema JSON_TYPE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(ImmutableList.of(
+              new TableFieldSchema().setName("country_code").setType("STRING"),
+              new TableFieldSchema().setName("country").setType("JSON")
+          ));
+
+  public static final String STORAGE_WRITE_TEST_TABLE = "storagewrite_test"
+      + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32);
+
+  private static final Map<String, String> JSON_TYPE_DATA = generateCountryData(false);
+
+  // Convert PCollection of TableRows to a PCollection of KV JSON string pairs
+  static class TableRowToJSONStringFn extends DoFn<TableRow, KV<String, String>> {
+    @ProcessElement
+    public void processElement(@Element TableRow row, OutputReceiver<KV<String, String>> out){
+      String country_code = row.get("country_code").toString();
+      String country = row.get("country").toString();
+
+      out.output(KV.of(country_code, country));
+    }
+  }
+
+  // Compare PCollection input with expected results.
+  static class CompareJSON implements SerializableFunction<Iterable<KV<String, String>>, Void> {
+    Map<String, String> expected;
+    public CompareJSON(Map<String, String> expected){
+      this.expected = expected;
+    }
+
+    @Override
+    public Void apply(Iterable<KV<String, String>> input) throws RuntimeException {
+      int counter = 0;
+
+      // Iterate through input list and convert each String to JsonElement
+      // Compare with expected result JsonElements
+      for(KV<String, String> actual: input){
+        String key = actual.getKey();
+
+        if(!expected.containsKey(key)){
+          throw new NoSuchElementException(String.format(
+              "Unexpected key '%s' found in input but does not exist in expected results.", key));
+        }
+        String jsonStringActual = actual.getValue();
+        JsonElement jsonActual = JsonParser.parseString(jsonStringActual);
+
+        String jsonStringExpected = expected.get(key);
+        JsonElement jsonExpected = JsonParser.parseString(jsonStringExpected);
+
+        assertEquals(jsonExpected, jsonActual);
+        counter += 1;
+      }
+      if(counter != expected.size()){
+        throw new RuntimeException(String.format(
+            "Expected %d elements but got %d elements.", expected.size(), counter));
+      }
+      return null;
+    }
+  }
+
+  public void runTestWrite(BigQueryIOJSONOptions options){
+    List<TableRow> rowsToWrite = new ArrayList<>();
+    for(Map.Entry<String, String> element: JSON_TYPE_DATA.entrySet()){
+      rowsToWrite.add(new TableRow()
+          .set("country_code", element.getKey())
+          .set("country", element.getValue()));
+    }
+
+    p_write
+        .apply("Create Elements", Create.of(rowsToWrite))
+        .apply("Write To BigQuery",
+            BigQueryIO.writeTableRows()
+                .to(options.getOutput())
+                .withSchema(JSON_TYPE_TABLE_SCHEMA)
+                .withCreateDisposition(options.getCreateDisposition())
+                .withMethod(options.getWriteMethod()));
+    p_write.run().waitUntilFinish();
+
+    options.setReadMethod(TypedRead.Method.EXPORT);
+    readAndValidateRows(options, JSON_TYPE_DATA);
+  }
+
+  // reads TableRows from BigQuery and validates JSON Strings
+  // expectedJsonResults Strings must be in valid json format
+  public void readAndValidateRows(BigQueryIOJSONOptions options, Map<String, String> expectedResults){
+    TypedRead<TableRow> bigqueryIO =
+        BigQueryIO.readTableRows().withMethod(options.getReadMethod());
+
+    // read from input query or from table
+    if(!options.getQuery().isEmpty()) {
+      bigqueryIO = bigqueryIO.fromQuery(options.getQuery()).usingStandardSql();
+    } else {
+      bigqueryIO = bigqueryIO.from(options.getInput());
+    }
+
+    PCollection<KV<String, String>> jsonKVPairs = p
+        .apply("Read rows", bigqueryIO)
+        .apply("Convert to KV JSON Strings", ParDo.of(new TableRowToJSONStringFn()));
+
+    PAssert.that(jsonKVPairs).satisfies(new CompareJSON(expectedResults));
+
+    p.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testDirectRead() throws Exception {
+    LOG.info("Testing DIRECT_READ read method with JSON data");
+    options = TestPipeline.testingPipelineOptions().as(BigQueryIOJSONOptions.class);
+    options.setReadMethod(TypedRead.Method.DIRECT_READ);
+    options.setInput(JSON_TABLE_DESTINATION);
+
+    readAndValidateRows(options, JSON_TYPE_DATA);
+  }
+
+  @Test
+  public void testExportRead() throws Exception {
+    LOG.info("Testing EXPORT read method with JSON data");
+    options = TestPipeline.testingPipelineOptions().as(BigQueryIOJSONOptions.class);
+    options.setReadMethod(TypedRead.Method.EXPORT);
+    options.setInput(JSON_TABLE_DESTINATION);
+
+    readAndValidateRows(options, JSON_TYPE_DATA);
+  }
+
+  @Test
+  public void testQueryRead() throws Exception {
+    LOG.info("Testing querying JSON data with DIRECT_READ read method");
+
+    options = TestPipeline.testingPipelineOptions().as(BigQueryIOJSONOptions.class);
+    options.setReadMethod(TypedRead.Method.DIRECT_READ);
+    options.setQuery(
+        String.format("SELECT country_code, country.cities AS country FROM "
+            + "`%s.%s.%s`", project, DATASET_ID, JSON_TYPE_TABLE_NAME));
+
+    // get nested json objects from static data
+    Map<String, String> expected = generateCountryData(true);
+
+    readAndValidateRows(options, expected);
+  }
+
+  @Test
+  public void testStorageWrite() throws Exception{
+    LOG.info("Testing writing JSON data with Storage API");
+
+    options = TestPipeline.testingPipelineOptions().as(BigQueryIOJSONOptions.class);
+    options.setWriteMethod(Write.Method.STORAGE_WRITE_API);
+
+    String storage_destination = String.format("%s:%s.%s", project, DATASET_ID, STORAGE_WRITE_TEST_TABLE);
+    options.setOutput(storage_destination);
+    options.setInput(storage_destination);
+
+    runTestWrite(options);
+  }
+
+  @Test
+  public void testLegacyStreamingWrite() throws Exception{

Review Comment:
   Yeah we should document it somewhere, I think docs is a better place. I can include it in both Java and Python docs since it affects both. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org