You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/04/21 17:41:13 UTC
[beam] branch master updated: [BEAM-13945] add json type support for java bigquery connector (#17209)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a64572ea68d [BEAM-13945] add json type support for java bigquery connector (#17209)
a64572ea68d is described below
commit a64572ea68d8f33cfccc894f8b204ece93e20fb6
Author: Ahmed Abualsaud <65...@users.noreply.github.com>
AuthorDate: Thu Apr 21 13:41:03 2022 -0400
[BEAM-13945] add json type support for java bigquery connector (#17209)
* added json support to BigQueryAvroUtils for DIRECT_READ
* added json integration test class
* added options and static data for test
* test for reading json data from bq and validating with DIRECT_READ
* read test for EXPORT read
* testing query read using json field access operator
* cleaning up a few things
* added storage api write test
* added streaming inserts write test
* changed class name (so gradle test command can catch it). disabled running test with DirectRunner
* changed test data but tests still same
* changed test data structure to make easier testing
* helper methods and more tests to include bigquery structs, arrays
* added test data for query test
* cleanup
* updated write tests to fit json in structs, arrays
* schema for new test data. cleanup
* spotlessJavaCheck fix
* fixes and cleanups
* saying FILE_LOADS doesn't support json type in docs
* added error message if user tries to write JSON data using FILE_LOADS
* fixed typo
---
sdks/java/io/google-cloud-platform/build.gradle | 1 +
.../sdk/io/gcp/bigquery/BigQueryAvroUtils.java | 2 +
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 12 +-
.../beam/sdk/io/gcp/bigquery/BigQueryIOJsonIT.java | 674 +++++++++++++++++++++
sdks/python/apache_beam/io/gcp/bigquery.py | 2 +
5 files changed, 690 insertions(+), 1 deletion(-)
diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle
index 6548ab4bbb5..e89a8bf350e 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -189,6 +189,7 @@ task integrationTest(type: Test, dependsOn: processTestResources) {
exclude '**/BigQueryIOStorageReadTableRowIT.class'
exclude '**/BigQueryIOStorageWriteIT.class'
exclude '**/BigQueryToTableIT.class'
+ exclude '**/BigQueryIOJsonTest.class'
maxParallelForks 4
classpath = sourceSets.test.runtimeClasspath
testClassesDirs = sourceSets.test.output.classesDirs
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
index f465fd9823b..220489d7cbc 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -91,6 +91,7 @@ class BigQueryAvroUtils {
.put("DATETIME", Type.STRING)
.put("TIME", Type.STRING)
.put("TIME", Type.LONG)
+ .put("JSON", Type.STRING)
.build();
/**
@@ -303,6 +304,7 @@ class BigQueryAvroUtils {
case "STRING":
case "DATETIME":
case "GEOGRAPHY":
+ case "JSON":
// Avro will use a CharSequence to represent String objects, but it may not always use
// java.lang.String; for example, it may prefer org.apache.avro.util.Utf8.
verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 06d27e15d27..54f485a9d71 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1789,7 +1789,9 @@ public class BigQueryIO {
* of load jobs allowed per day, so be careful not to set the triggering frequency too
* frequent. For more information, see <a
* href="https://cloud.google.com/bigquery/docs/loading-data-cloud-storage">Loading Data from
- * Cloud Storage</a>.
+ * Cloud Storage</a>. Note: Load jobs currently do not support BigQuery's <a
+ * href="https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type">
+ * JSON data type</a>.
*/
FILE_LOADS,
@@ -2912,6 +2914,14 @@ public class BigQueryIO {
"useAvroLogicalTypes can only be set with Avro output.");
}
+ if (getJsonSchema() != null && getJsonSchema().isAccessible()) {
+ // Batch load jobs currently support JSON data insertion only with CSV files
+ checkArgument(
+ !getJsonSchema().get().contains("JSON"),
+ "Found JSON type in TableSchema. JSON data insertion is currently "
+ + "not supported with batch loads.");
+ }
+
BatchLoads<DestinationT, T> batchLoads =
new BatchLoads<>(
getWriteDisposition(),
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOJsonIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOJsonIT.java
new file mode 100644
index 00000000000..213b6b1d7f8
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOJsonIT.java
@@ -0,0 +1,674 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/* Integration test for reading and writing JSON data to/from BigQuery */
+@RunWith(JUnit4.class)
+public class BigQueryIOJsonIT {
+ private static final Logger LOG = LoggerFactory.getLogger(BigQueryIOJsonIT.class);
+
+ @Rule public final transient TestPipeline p = TestPipeline.create();
+
+ @Rule public final transient TestPipeline pWrite = TestPipeline.create();
+
+ private BigQueryIOJsonOptions options;
+
+ private static final String project = "apache-beam-testing";
+ private static final String DATASET_ID = "bq_jsontype_test_nodelete";
+ private static final String JSON_TABLE_NAME = "json_data";
+
+ private static final String JSON_TABLE_DESTINATION =
+ String.format("%s:%s.%s", project, DATASET_ID, JSON_TABLE_NAME);
+
+ private static final TableSchema JSON_TYPE_TABLE_SCHEMA =
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("country_code").setType("STRING"),
+ new TableFieldSchema().setName("country").setType("JSON"),
+ new TableFieldSchema()
+ .setName("stats")
+ .setType("STRUCT")
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("gdp_per_capita").setType("JSON"),
+ new TableFieldSchema().setName("co2_emissions").setType("JSON"))),
+ new TableFieldSchema()
+ .setName("cities")
+ .setType("STRUCT")
+ .setMode("REPEATED")
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("city_name").setType("STRING"),
+ new TableFieldSchema().setName("city").setType("JSON"))),
+ new TableFieldSchema().setName("landmarks").setType("JSON").setMode("REPEATED")));
+
+ private static final List<TableRow> JSON_QUERY_TEST_DATA =
+ Arrays.asList(
+ new TableRow()
+ .set("country_code", "usa")
+ .set("past_leader", "\"George W. Bush\"")
+ .set("gdp", "58559.675")
+ .set("city_name", "\"Los Angeles\"")
+ .set("landmark_name", "\"Golden Gate Bridge\""),
+ new TableRow()
+ .set("country_code", "aus")
+ .set("past_leader", "\"Kevin Rudd\"")
+ .set("gdp", "58043.581")
+ .set("city_name", "\"Melbourne\"")
+ .set("landmark_name", "\"Great Barrier Reef\""),
+ new TableRow()
+ .set("country_code", "special")
+ .set("past_leader", "\"!@#$%^&*()_+\"")
+ .set("gdp", "421.7")
+ .set("city_name", "\"Bikini Bottom\"")
+ .set("landmark_name", "\"Willy Wonka's Factory\""));
+
+ public static final String STORAGE_WRITE_TEST_TABLE =
+ "storagewrite_test" + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32);
+
+ public static final String STREAMING_TEST_TABLE =
+ "streaming_test" + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32);
+
+ private static final Map<String, Map<String, Object>> JSON_TEST_DATA = generateCountryData();
+
+ // Make KV Json String pairs out of "country" column
+ static class CountryToKVJsonString extends DoFn<TableRow, KV<String, String>> {
+ @ProcessElement
+ public void processElement(@Element TableRow row, OutputReceiver<KV<String, String>> out) {
+ String countryCode = row.get("country_code").toString();
+ String country = row.get("country").toString();
+
+ out.output(KV.of(countryCode, country));
+ }
+ }
+
+ // Make KV Json String pairs out of "cities" column
+ static class CitiesToKVJsonString extends DoFn<TableRow, KV<String, String>> {
+ @ProcessElement
+ public void processElement(@Element TableRow row, OutputReceiver<KV<String, String>> out) {
+ String countryCode = row.get("country_code").toString();
+ ArrayList<Map<String, String>> cities = (ArrayList<Map<String, String>>) row.get("cities");
+
+ for (Map<String, String> city : cities) {
+ String key = countryCode + "_" + city.get("city_name");
+ String value = city.get("city");
+
+ out.output(KV.of(key, value));
+ }
+ }
+ }
+
+ // Make KV Json String pairs out of "stats" column
+ static class StatsToKVJsonString extends DoFn<TableRow, KV<String, String>> {
+ @ProcessElement
+ public void processElement(@Element TableRow row, OutputReceiver<KV<String, String>> out) {
+ String countryCode = row.get("country_code").toString();
+ Map<String, String> map = (Map<String, String>) row.get("stats");
+
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ String key = countryCode + "_" + entry.getKey();
+ String value = entry.getValue();
+
+ out.output(KV.of(key, value));
+ }
+ }
+ }
+
+ // Make KV Json String pairs out of "landmarks" column
+ static class LandmarksToKVJsonString extends DoFn<TableRow, KV<String, String>> {
+ @ProcessElement
+ public void processElement(@Element TableRow row, OutputReceiver<KV<String, String>> out) {
+ String countryCode = row.get("country_code").toString();
+ ArrayList<String> landmarks = (ArrayList<String>) row.get("landmarks");
+
+ for (int i = 0; i < landmarks.size(); i++) {
+ String key = countryCode + "_" + i;
+ String value = landmarks.get(i);
+
+ out.output(KV.of(key, value));
+ }
+ }
+ }
+
+ // Compare PCollection input with expected results.
+ static class CompareJsonStrings
+ implements SerializableFunction<Iterable<KV<String, String>>, Void> {
+ Map<String, String> expected;
+
+ public CompareJsonStrings(Map<String, String> expected) {
+ this.expected = expected;
+ }
+
+ @Override
+ public Void apply(Iterable<KV<String, String>> input) throws RuntimeException {
+ int counter = 0;
+
+ // Compare by converting strings to JsonElements
+ for (KV<String, String> actual : input) {
+ String key = actual.getKey();
+
+ if (!expected.containsKey(key)) {
+ throw new NoSuchElementException(
+ String.format(
+ "Unexpected key '%s' found in input but does not exist in expected results.",
+ key));
+ }
+ String jsonStringActual = actual.getValue();
+ JsonElement jsonActual = JsonParser.parseString(jsonStringActual);
+
+ String jsonStringExpected = expected.get(key);
+ JsonElement jsonExpected = JsonParser.parseString(jsonStringExpected);
+
+ assertEquals(jsonExpected, jsonActual);
+ counter += 1;
+ }
+ if (counter != expected.size()) {
+ throw new RuntimeException(
+ String.format("Expected %d elements but got %d elements.", expected.size(), counter));
+ }
+ return null;
+ }
+ }
+
+ // Writes with given write method then reads back and validates with original test data.
+ public void runTestWrite(BigQueryIOJsonOptions options) {
+ List<String> countries = Arrays.asList("usa", "aus", "special");
+ List<TableRow> rowsToWrite = new ArrayList<>();
+ for (Map.Entry<String, Map<String, Object>> element : JSON_TEST_DATA.entrySet()) {
+ if (!countries.contains(element.getKey())) {
+ continue;
+ }
+
+ TableRow row =
+ new TableRow()
+ .set("country_code", element.getKey())
+ .set("country", element.getValue().get("country"))
+ .set("stats", element.getValue().get("stats"))
+ .set("cities", element.getValue().get("cities"))
+ .set("landmarks", element.getValue().get("landmarks"));
+
+ rowsToWrite.add(row);
+ }
+
+ pWrite
+ .apply("Create Elements", Create.of(rowsToWrite))
+ .apply(
+ "Write To BigQuery",
+ BigQueryIO.writeTableRows()
+ .to(options.getOutput())
+ .withSchema(JSON_TYPE_TABLE_SCHEMA)
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withMethod(options.getWriteMethod()));
+ pWrite.run().waitUntilFinish();
+
+ readAndValidateRows(options);
+ }
+
+ public static Map<String, String> getTestData(String column) {
+ Map<String, String> testData =
+ JSON_TEST_DATA.get(column).entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> (String) e.getValue()));
+ return testData;
+ }
+
+ // Read from BigQuery and compare with local test data.
+ public void readAndValidateRows(BigQueryIOJsonOptions options) {
+ TypedRead<TableRow> bigqueryIO = BigQueryIO.readTableRows().withMethod(options.getReadMethod());
+
+ // read from input query or from table
+ if (!options.getInputQuery().isEmpty()) {
+ bigqueryIO = bigqueryIO.fromQuery(options.getInputQuery()).usingStandardSql();
+ } else {
+ bigqueryIO = bigqueryIO.from(options.getInputTable());
+ }
+
+ PCollection<TableRow> jsonRows = p.apply("Read rows", bigqueryIO);
+
+ if (!options.getInputQuery().isEmpty()) {
+ PAssert.that(jsonRows).containsInAnyOrder(JSON_QUERY_TEST_DATA);
+ p.run().waitUntilFinish();
+ return;
+ }
+
+ // Testing countries (straight json)
+ PCollection<KV<String, String>> countries =
+ jsonRows.apply(
+ "Convert countries to KV JSON Strings", ParDo.of(new CountryToKVJsonString()));
+
+ PAssert.that(countries).satisfies(new CompareJsonStrings(getTestData("countries")));
+
+ // Testing stats (json in struct)
+ PCollection<KV<String, String>> stats =
+ jsonRows.apply("Convert stats to KV JSON Strings", ParDo.of(new StatsToKVJsonString()));
+
+ PAssert.that(stats).satisfies(new CompareJsonStrings(getTestData("stats")));
+
+ // Testing cities (json in array of structs)
+ PCollection<KV<String, String>> cities =
+ jsonRows.apply("Convert cities to KV JSON Strings", ParDo.of(new CitiesToKVJsonString()));
+
+ PAssert.that(cities).satisfies(new CompareJsonStrings(getTestData("cities")));
+
+ // Testing landmarks (json in array)
+ PCollection<KV<String, String>> landmarks =
+ jsonRows.apply(
+ "Convert landmarks to KV JSON Strings", ParDo.of(new LandmarksToKVJsonString()));
+
+ PAssert.that(landmarks).satisfies(new CompareJsonStrings(getTestData("landmarks")));
+
+ p.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testDirectRead() throws Exception {
+ LOG.info("Testing DIRECT_READ read method with JSON data");
+ options = TestPipeline.testingPipelineOptions().as(BigQueryIOJsonOptions.class);
+ options.setReadMethod(TypedRead.Method.DIRECT_READ);
+ options.setInputTable(JSON_TABLE_DESTINATION);
+
+ readAndValidateRows(options);
+ }
+
+ @Test
+ public void testExportRead() throws Exception {
+ LOG.info("Testing EXPORT read method with JSON data");
+ options = TestPipeline.testingPipelineOptions().as(BigQueryIOJsonOptions.class);
+ options.setReadMethod(TypedRead.Method.EXPORT);
+ options.setInputTable(JSON_TABLE_DESTINATION);
+
+ readAndValidateRows(options);
+ }
+
+ @Test
+ public void testQueryRead() throws Exception {
+ LOG.info("Testing querying JSON data with DIRECT_READ read method");
+ options = TestPipeline.testingPipelineOptions().as(BigQueryIOJsonOptions.class);
+ options.setReadMethod(TypedRead.Method.DIRECT_READ);
+ options.setInputQuery(
+ String.format(
+ "SELECT "
+ + "country_code, "
+ + "country.past_leaders[2] AS past_leader, "
+ + "stats.gdp_per_capita[\"gdp_per_capita\"] AS gdp, "
+ + "cities[OFFSET(1)].city.name AS city_name, "
+ + "landmarks[OFFSET(1)][\"name\"] AS landmark_name FROM "
+ + "`%s.%s.%s`",
+ project, DATASET_ID, JSON_TABLE_NAME));
+
+ readAndValidateRows(options);
+ }
+
+ @Test
+ public void testStorageWrite() throws Exception {
+ LOG.info("Testing writing JSON data with Storage API");
+
+ options = TestPipeline.testingPipelineOptions().as(BigQueryIOJsonOptions.class);
+ options.setWriteMethod(Write.Method.STORAGE_WRITE_API);
+
+ String storageDestination =
+ String.format("%s:%s.%s", project, DATASET_ID, STORAGE_WRITE_TEST_TABLE);
+ options.setOutput(storageDestination);
+ options.setInputTable(storageDestination);
+
+ runTestWrite(options);
+ }
+
+ @Test
+ public void testLegacyStreamingWrite() throws Exception {
+ options = TestPipeline.testingPipelineOptions().as(BigQueryIOJsonOptions.class);
+ options.setWriteMethod(Write.Method.STREAMING_INSERTS);
+
+ String streamingDestination =
+ String.format("%s:%s.%s", project, DATASET_ID, STREAMING_TEST_TABLE);
+ options.setOutput(streamingDestination);
+ options.setInputTable(streamingDestination);
+
+ runTestWrite(options);
+ }
+
+ @BeforeClass
+ public static void setupTestEnvironment() throws Exception {
+ PipelineOptionsFactory.register(BigQueryIOJsonOptions.class);
+ }
+
+ public interface BigQueryIOJsonOptions extends TestPipelineOptions {
+ @Description("Table to read from, specified as <project_id>:<dataset_id>.<table_id>")
+ @Validation.Required
+ String getInputTable();
+
+ void setInputTable(String value);
+
+ @Description("Query used to read from BigQuery")
+ @Default.String("")
+ String getInputQuery();
+
+ void setInputQuery(String query);
+
+ @Description("Read method used to read from BigQuery")
+ @Default.Enum("EXPORT")
+ TypedRead.Method getReadMethod();
+
+ void setReadMethod(TypedRead.Method value);
+
+ @Description(
+ "BigQuery table to write to, specified as "
+ + "<project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
+ @Validation.Required
+ String getOutput();
+
+ void setOutput(String value);
+
+ @Description("Write method used to write to BigQuery")
+ @Default.Enum("STORAGE_WRITE_API")
+ BigQueryIO.Write.Method getWriteMethod();
+
+ void setWriteMethod(BigQueryIO.Write.Method value);
+ }
+
+ private static Map<String, Map<String, Object>> generateCountryData() {
+ // Data from World Bank as of 2020
+ JSONObject usa = new JSONObject();
+
+ JSONObject nyc = new JSONObject();
+ nyc.put("name", "New York City");
+ nyc.put("state", "NY");
+ nyc.put("population", 8622357);
+ JSONObject la = new JSONObject();
+ la.put("name", "Los Angeles");
+ la.put("state", "CA");
+ la.put("population", 4085014);
+ JSONObject chicago = new JSONObject();
+ chicago.put("name", "Chicago");
+ chicago.put("state", "IL");
+ chicago.put("population", 2670406);
+
+ JSONObject usaCities = new JSONObject();
+ usaCities.put("nyc", nyc);
+ usaCities.put("la", la);
+ usaCities.put("chicago", chicago);
+
+ JSONArray usaLeaders = new JSONArray();
+ usaLeaders.put("Donald Trump");
+ usaLeaders.put("Barack Obama");
+ usaLeaders.put("George W. Bush");
+ usaLeaders.put("Bill Clinton");
+
+ usa.put("name", "United States of America");
+ usa.put("population", 329484123);
+ usa.put("cities", usaCities);
+ usa.put("past_leaders", usaLeaders);
+ usa.put("in_northern_hemisphere", true);
+
+ JSONObject aus = new JSONObject();
+
+ JSONObject sydney = new JSONObject();
+ sydney.put("name", "Sydney");
+ sydney.put("state", "New South Wales");
+ sydney.put("population", 5367206);
+
+ JSONObject melbourne = new JSONObject();
+ melbourne.put("name", "Melbourne");
+ melbourne.put("state", "Victoria");
+ melbourne.put("population", 5159211);
+
+ JSONObject brisbane = new JSONObject();
+ brisbane.put("name", "Brisbane");
+ brisbane.put("state", "Queensland");
+ brisbane.put("population", 2560720);
+
+ JSONObject ausCities = new JSONObject();
+ ausCities.put("sydney", sydney);
+ ausCities.put("melbourne", melbourne);
+ ausCities.put("brisbane", brisbane);
+
+ JSONArray ausLeaders = new JSONArray();
+ ausLeaders.put("Malcolm Turnbull");
+ ausLeaders.put("Tony Abbot");
+ ausLeaders.put("Kevin Rudd");
+
+ aus.put("name", "Australia");
+ aus.put("population", 25687041);
+ aus.put("cities", ausCities);
+ aus.put("past_leaders", ausLeaders);
+ aus.put("in_northern_hemisphere", false);
+
+ JSONObject special = new JSONObject();
+
+ JSONObject specialCities = new JSONObject();
+
+ JSONObject basingse = new JSONObject();
+ basingse.put("name", "Ba Sing Se");
+ basingse.put("state", "The Earth Kingdom");
+ basingse.put("population", 200000);
+
+ JSONObject bikinibottom = new JSONObject();
+ bikinibottom.put("name", "Bikini Bottom");
+ bikinibottom.put("state", "The Pacific Ocean");
+ bikinibottom.put("population", 50000);
+
+ specialCities.put("basingse", basingse);
+ specialCities.put("bikinibottom", bikinibottom);
+
+ JSONArray specialArr = new JSONArray();
+
+ specialArr.put("1");
+ specialArr.put("2");
+ specialArr.put("!@#$%^&*()_+");
+
+ special.put(
+ "name",
+ "newline\n, form\f, tab\t, \"quotes\", \\backslash\\, backspace\b, \u0000_hex_\u0f0f");
+ special.put("population", -123456789);
+ special.put("cities", specialCities);
+ special.put("past_leaders", specialArr);
+ special.put("in_northern_hemisphere", true);
+
+ // usa landmarks
+ JSONObject statueOfLiberty = new JSONObject();
+ statueOfLiberty.put("name", "Statue of Liberty");
+ statueOfLiberty.put("cool rating", JSONObject.NULL);
+ JSONObject goldenGateBridge = new JSONObject();
+ goldenGateBridge.put("name", "Golden Gate Bridge");
+ goldenGateBridge.put("cool rating", "very cool");
+ JSONObject grandCanyon = new JSONObject();
+ grandCanyon.put("name", "Grand Canyon");
+ grandCanyon.put("cool rating", "very very cool");
+
+ // australia landmarks
+ JSONObject operaHouse = new JSONObject();
+ operaHouse.put("name", "Sydney Opera House");
+ operaHouse.put("cool rating", "amazing");
+ JSONObject greatBarrierReef = new JSONObject();
+ greatBarrierReef.put("name", "Great Barrier Reef");
+ greatBarrierReef.put("cool rating", JSONObject.NULL);
+
+ // special landmarks
+ JSONObject hogwarts = new JSONObject();
+ hogwarts.put("name", "Hogwarts School of WitchCraft and Wizardry");
+ hogwarts.put("cool rating", "magical");
+ JSONObject willyWonka = new JSONObject();
+ willyWonka.put("name", "Willy Wonka's Factory");
+ willyWonka.put("cool rating", JSONObject.NULL);
+ JSONObject rivendell = new JSONObject();
+ rivendell.put("name", "Rivendell");
+ rivendell.put("cool rating", "precious");
+
+ // stats
+ JSONObject usGdp = new JSONObject();
+ usGdp.put("gdp_per_capita", 58559.675);
+ usGdp.put("currency", "constant 2015 US$");
+ JSONObject usCo2 = new JSONObject();
+ usCo2.put("co2 emissions", 15.241);
+ usCo2.put("measurement", "metric tons per capita");
+ usCo2.put("year", 2018);
+
+ JSONObject ausGdp = new JSONObject();
+ ausGdp.put("gdp_per_capita", 58043.581);
+ ausGdp.put("currency", "constant 2015 US$");
+ JSONObject ausCo2 = new JSONObject();
+ ausCo2.put("co2 emissions", 15.476);
+ ausCo2.put("measurement", "metric tons per capita");
+ ausCo2.put("year", 2018);
+
+ JSONObject specialGdp = new JSONObject();
+ specialGdp.put("gdp_per_capita", 421.70);
+ specialGdp.put("currency", "constant 200 BC gold");
+ JSONObject specialCo2 = new JSONObject();
+ specialCo2.put("co2 emissions", -10.79);
+ specialCo2.put("measurement", "metric tons per capita");
+ specialCo2.put("year", 2018);
+
+ Map<String, Map<String, Object>> data = new HashMap<>();
+ // Nested maps for testing
+ data.put(
+ "countries",
+ ImmutableMap.of(
+ "usa", usa.toString(),
+ "aus", aus.toString(),
+ "special", special.toString()));
+ data.put(
+ "cities",
+ new HashMap<>(
+ ImmutableMap.<String, Object>builder()
+ .put("usa_nyc", nyc.toString())
+ .put("usa_la", la.toString())
+ .put("usa_chicago", chicago.toString())
+ .put("aus_sydney", sydney.toString())
+ .put("aus_melbourne", melbourne.toString())
+ .put("aus_brisbane", brisbane.toString())
+ .put("special_basingse", basingse.toString())
+ .put("special_bikinibottom", bikinibottom.toString())
+ .build()));
+ data.put(
+ "landmarks",
+ new HashMap<>(
+ ImmutableMap.<String, Object>builder()
+ .put("usa_0", statueOfLiberty.toString())
+ .put("usa_1", goldenGateBridge.toString())
+ .put("usa_2", grandCanyon.toString())
+ .put("aus_0", operaHouse.toString())
+ .put("aus_1", greatBarrierReef.toString())
+ .put("special_0", hogwarts.toString())
+ .put("special_1", willyWonka.toString())
+ .put("special_2", rivendell.toString())
+ .build()));
+ data.put(
+ "stats",
+ new HashMap<>(
+ ImmutableMap.<String, Object>builder()
+ .put("usa_gdp_per_capita", usGdp.toString())
+ .put("usa_co2_emissions", usCo2.toString())
+ .put("aus_gdp_per_capita", ausGdp.toString())
+ .put("aus_co2_emissions", ausCo2.toString())
+ .put("special_gdp_per_capita", specialGdp.toString())
+ .put("special_co2_emissions", specialCo2.toString())
+ .build()));
+ // Nested maps for writing to BigQuery
+ data.put(
+ "usa",
+ ImmutableMap.of(
+ "country", usa.toString(),
+ "cities",
+ Arrays.asList(
+ ImmutableMap.of("city_name", "nyc", "city", nyc.toString()),
+ ImmutableMap.of("city_name", "la", "city", la.toString()),
+ ImmutableMap.of("city_name", "chicago", "city", chicago.toString())),
+ "landmarks",
+ Arrays.asList(
+ statueOfLiberty.toString(),
+ goldenGateBridge.toString(),
+ grandCanyon.toString()),
+ "stats",
+ ImmutableMap.of(
+ "gdp_per_capita", usGdp.toString(),
+ "co2_emissions", usCo2.toString())));
+ data.put(
+ "aus",
+ ImmutableMap.of(
+ "country", aus.toString(),
+ "cities",
+ Arrays.asList(
+ ImmutableMap.of("city_name", "sydney", "city", sydney.toString()),
+ ImmutableMap.of("city_name", "melbourne", "city", melbourne.toString()),
+ ImmutableMap.of("city_name", "brisbane", "city", brisbane.toString())),
+ "landmarks", Arrays.asList(operaHouse.toString(), greatBarrierReef.toString()),
+ "stats",
+ ImmutableMap.of(
+ "gdp_per_capita", ausGdp.toString(),
+ "co2_emissions", ausCo2.toString())));
+ data.put(
+ "special",
+ ImmutableMap.of(
+ "country", special.toString(),
+ "cities",
+ Arrays.asList(
+ ImmutableMap.of("city_name", "basingse", "city", basingse.toString()),
+ ImmutableMap.of("city_name", "bikinibottom", "city", bikinibottom.toString())),
+ "landmarks",
+ Arrays.asList(hogwarts.toString(), willyWonka.toString(), rivendell.toString()),
+ "stats",
+ ImmutableMap.of(
+ "gdp_per_capita", specialGdp.toString(),
+ "co2_emissions", specialCo2.toString())));
+ return data;
+ }
+}
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index f443d24950f..fa47eb9db01 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -2016,6 +2016,8 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
data to BigQuery: https://cloud.google.com/bigquery/docs/loading-data.
DEFAULT will use STREAMING_INSERTS on Streaming pipelines and
FILE_LOADS on Batch pipelines.
+ Note: FILE_LOADS currently does not support BigQuery's JSON data type:
+ https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type">
insert_retry_strategy: The strategy to use when retrying streaming inserts
into BigQuery. Options are shown in bigquery_tools.RetryStrategy attrs.
Default is to retry always. This means that whenever there are rows