You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/07/06 23:31:32 UTC
[beam] branch master updated: [BEAM-10317] Java - Update BigQueryIO
to tag BigQuery Jobs with the Dataflow Job ID
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0dede89 [BEAM-10317] Java - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID
new 96bf514 Merge pull request #12083 from [BEAM-10317] Java - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID
0dede89 is described below
commit 0dede89a89e6b1b8c5a62b7c5d30759841badbe0
Author: Alex Amato <aj...@google.com>
AuthorDate: Wed Jun 24 17:35:19 2020 -0700
[BEAM-10317] Java - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID
---
.../beam/examples/cookbook/BigQueryTornadoes.java | 46 ++++++++++---
.../examples/cookbook/BigQueryTornadoesIT.java | 27 ++++++++
.../sdk/extensions/gcp/util/GceMetadataUtil.java | 69 +++++++++++++++++++
.../sdk/io/gcp/bigquery/BigQueryIOMetadata.java | 78 ++++++++++++++++++++++
.../sdk/io/gcp/bigquery/BigQueryQuerySource.java | 3 +
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 33 ++++++---
.../gcp/bigquery/BigQueryStorageQuerySource.java | 3 +
.../gcp/bigquery/BigQueryStorageTableSource.java | 2 +
.../sdk/io/gcp/bigquery/BigQueryTableSource.java | 3 +
.../beam/sdk/io/gcp/bigquery/WriteRename.java | 5 +-
.../beam/sdk/io/gcp/bigquery/WriteTables.java | 8 +++
.../io/gcp/bigquery/BigQueryIOMetadataTest.java | 72 ++++++++++++++++++++
12 files changed, 329 insertions(+), 20 deletions(-)
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index b4b775e..3d70ab1 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -139,6 +139,12 @@ public class BigQueryTornadoes {
void setInput(String value);
+ @Description("SQL Query to read from, will be used if Input is not set.")
+ @Default.String("")
+ String getInputQuery();
+
+ void setInputQuery(String value);
+
@Description("Mode to use when reading from BigQuery")
@Default.Enum("EXPORT")
TypedRead.Method getReadMethod();
@@ -167,20 +173,38 @@ public class BigQueryTornadoes {
switch (options.getReadMethod()) {
case DIRECT_READ:
- rowsFromBigQuery =
- p.apply(
- BigQueryIO.readTableRows()
- .from(options.getInput())
- .withMethod(Method.DIRECT_READ)
- .withSelectedFields(Lists.newArrayList("month", "tornado")));
+ if (!options.getInputQuery().isEmpty()) {
+ rowsFromBigQuery =
+ p.apply(
+ BigQueryIO.readTableRows()
+ .fromQuery(options.getInputQuery())
+ .usingStandardSql()
+ .withMethod(Method.DIRECT_READ));
+ } else {
+ rowsFromBigQuery =
+ p.apply(
+ BigQueryIO.readTableRows()
+ .from(options.getInput())
+ .withMethod(Method.DIRECT_READ)
+ .withSelectedFields(Lists.newArrayList("month", "tornado")));
+ }
break;
default:
- rowsFromBigQuery =
- p.apply(
- BigQueryIO.readTableRows()
- .from(options.getInput())
- .withMethod(options.getReadMethod()));
+ if (!options.getInputQuery().isEmpty()) {
+ rowsFromBigQuery =
+ p.apply(
+ BigQueryIO.readTableRows()
+ .fromQuery(options.getInputQuery())
+ .usingStandardSql()
+ .withMethod(options.getReadMethod()));
+ } else {
+ rowsFromBigQuery =
+ p.apply(
+ BigQueryIO.readTableRows()
+ .from(options.getInput())
+ .withMethod(options.getReadMethod()));
+ }
break;
}
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
index 0825478..ea10fb1 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
@@ -96,4 +96,31 @@ public class BigQueryTornadoesIT {
runE2EBigQueryTornadoesTest(options);
}
+
+ @Test
+ public void testE2EBigQueryTornadoesWithExportUsingQuery() throws Exception {
+ BigQueryTornadoesITOptions options =
+ TestPipeline.testingPipelineOptions().as(BigQueryTornadoesITOptions.class);
+ options.setReadMethod(Method.EXPORT);
+ options.setOutput(
+ String.format(
+ "%s.%s", "BigQueryTornadoesIT", "monthly_tornadoes_" + System.currentTimeMillis()));
+ options.setInputQuery("SELECT * FROM `clouddataflow-readonly.samples.weather_stations`");
+
+ runE2EBigQueryTornadoesTest(options);
+ }
+
+ @Test
+ public void testE2eBigQueryTornadoesWithStorageApiUsingQuery() throws Exception {
+ BigQueryTornadoesITOptions options =
+ TestPipeline.testingPipelineOptions().as(BigQueryTornadoesITOptions.class);
+ options.setReadMethod(Method.DIRECT_READ);
+ options.setOutput(
+ String.format(
+ "%s.%s",
+ "BigQueryTornadoesIT", "monthly_tornadoes_storage_" + System.currentTimeMillis()));
+ options.setInputQuery("SELECT * FROM `clouddataflow-readonly.samples.weather_stations`");
+
+ runE2EBigQueryTornadoesTest(options);
+ }
}
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
new file mode 100644
index 0000000..6736844
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.Charset;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
+
+/** */
+public class GceMetadataUtil {
+ private static final String BASE_METADATA_URL = "http://metadata/computeMetadata/v1/";
+
+ static String fetchMetadata(String key) {
+ int timeoutMillis = 5000;
+ final HttpParams httpParams = new BasicHttpParams();
+ HttpConnectionParams.setConnectionTimeout(httpParams, timeoutMillis);
+ HttpClient client = new DefaultHttpClient(httpParams);
+ HttpGet request = new HttpGet(BASE_METADATA_URL + key);
+ request.setHeader("Metadata-Flavor", "Google");
+
+ try {
+ HttpResponse response = client.execute(request);
+ if (response.getStatusLine().getStatusCode() != 200) {
+ // May mean its running on a non DataflowRunner, in which case it's perfectly normal.
+ return "";
+ }
+ InputStream in = response.getEntity().getContent();
+ try (final Reader reader = new InputStreamReader(in, Charset.defaultCharset())) {
+ return CharStreams.toString(reader);
+ }
+ } catch (IOException e) {
+ // May mean its running on a non DataflowRunner, in which case it's perfectly normal.
+ }
+ return "";
+ }
+
+ private static String fetchCustomGceMetadata(String customMetadataKey) {
+ return GceMetadataUtil.fetchMetadata("instance/attributes/" + customMetadataKey);
+ }
+
+ public static String fetchDataflowJobId() {
+ return GceMetadataUtil.fetchCustomGceMetadata("job_id");
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java
new file mode 100644
index 0000000..e2d058d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.extensions.gcp.util.GceMetadataUtil;
+
+/** Metadata class for BigQueryIO. i.e. to use as BQ job labels. */
+final class BigQueryIOMetadata {
+
+ private String beamJobId;
+
+ private BigQueryIOMetadata(String beamJobId) {
+ this.beamJobId = beamJobId;
+ }
+
+ private static final Pattern VALID_CLOUD_LABEL_PATTERN =
+ Pattern.compile("^[a-z0-9\\_\\-]{1,63}$");
+
+ /**
+ * Creates a BigQueryIOMetadata. This will request metadata properly based on which runner is
+ * being used.
+ */
+ public static BigQueryIOMetadata create() {
+ String dataflowJobId = GceMetadataUtil.fetchDataflowJobId();
+ // If a Dataflow job id is returned on GCE metadata. Then it means
+ // this program is running on a Dataflow GCE VM.
+ boolean isDataflowRunner = dataflowJobId != null && !dataflowJobId.isEmpty();
+
+ String beamJobId = null;
+ if (isDataflowRunner) {
+ if (BigQueryIOMetadata.isValidCloudLabel(dataflowJobId)) {
+ beamJobId = dataflowJobId;
+ }
+ }
+ return new BigQueryIOMetadata(beamJobId);
+ }
+
+ public Map<String, String> addAdditionalJobLabels(Map<String, String> jobLabels) {
+ if (this.beamJobId != null && !jobLabels.containsKey("beam_job_id")) {
+ jobLabels.put("beam_job_id", this.beamJobId);
+ }
+ return jobLabels;
+ }
+
+ /**
+ * Returns true if label_value is a valid cloud label string. This function can return false in
+ * cases where the label value is valid. However, it will not return true in a case where the
+ * lavel value is invalid. This is because a stricter set of allowed characters is used in this
+ * validator, because foreign language characters are not accepted. Thus, this should not be used
+ * as a generic validator for all cloud labels.
+ *
+ * <p>See Also: https://cloud.google.com/compute/docs/labeling-resources
+ *
+ * @return true if label_value is a valid cloud label string.
+ */
+ public static boolean isValidCloudLabel(String value) {
+ Matcher m = VALID_CLOUD_LABEL_PATTERN.matcher(value);
+ return m.find();
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index e8c1e29..991ec44 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -74,5 +74,8 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("query", queryDef.getQuery()));
+ builder.add(
+ DisplayData.item("launchesBigQueryJobs", true)
+ .withLabel("This transform launches BigQuery jobs to read/write elements."));
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index df6064c..035425e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -64,8 +64,10 @@ import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -140,16 +142,19 @@ class BigQueryServicesImpl implements BigQueryServices {
static class JobServiceImpl implements BigQueryServices.JobService {
private final ApiErrorExtractor errorExtractor;
private final Bigquery client;
+ private final BigQueryIOMetadata bqIOMetadata;
@VisibleForTesting
JobServiceImpl(Bigquery client) {
this.errorExtractor = new ApiErrorExtractor();
this.client = client;
+ this.bqIOMetadata = BigQueryIOMetadata.create();
}
private JobServiceImpl(BigQueryOptions options) {
this.errorExtractor = new ApiErrorExtractor();
this.client = newBigQueryClient(options).build();
+ this.bqIOMetadata = BigQueryIOMetadata.create();
}
/**
@@ -162,11 +167,14 @@ class BigQueryServicesImpl implements BigQueryServices {
@Override
public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
throws InterruptedException, IOException {
+ Map<String, String> labelMap = new HashMap<>();
Job job =
new Job()
.setJobReference(jobRef)
- .setConfiguration(new JobConfiguration().setLoad(loadConfig));
-
+ .setConfiguration(
+ new JobConfiguration()
+ .setLoad(loadConfig)
+ .setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
startJob(job, errorExtractor, client);
}
@@ -180,11 +188,14 @@ class BigQueryServicesImpl implements BigQueryServices {
@Override
public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
throws InterruptedException, IOException {
+ Map<String, String> labelMap = new HashMap<>();
Job job =
new Job()
.setJobReference(jobRef)
- .setConfiguration(new JobConfiguration().setExtract(extractConfig));
-
+ .setConfiguration(
+ new JobConfiguration()
+ .setExtract(extractConfig)
+ .setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
startJob(job, errorExtractor, client);
}
@@ -198,11 +209,14 @@ class BigQueryServicesImpl implements BigQueryServices {
@Override
public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig)
throws IOException, InterruptedException {
+ Map<String, String> labelMap = new HashMap<>();
Job job =
new Job()
.setJobReference(jobRef)
- .setConfiguration(new JobConfiguration().setQuery(queryConfig));
-
+ .setConfiguration(
+ new JobConfiguration()
+ .setQuery(queryConfig)
+ .setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
startJob(job, errorExtractor, client);
}
@@ -216,11 +230,14 @@ class BigQueryServicesImpl implements BigQueryServices {
@Override
public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig)
throws IOException, InterruptedException {
+ Map<String, String> labelMap = new HashMap<>();
Job job =
new Job()
.setJobReference(jobRef)
- .setConfiguration(new JobConfiguration().setCopy(copyConfig));
-
+ .setConfiguration(
+ new JobConfiguration()
+ .setCopy(copyConfig)
+ .setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
startJob(job, errorExtractor, client);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
index 1564ab6..3e6ea7a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
@@ -109,6 +109,9 @@ public class BigQueryStorageQuerySource<T> extends BigQueryStorageSourceBase<T>
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("query", queryProvider).withLabel("Query"));
+ builder.add(
+ DisplayData.item("launchesBigQueryJobs", true)
+ .withLabel("This transform launches BigQuery jobs to read/write elements."));
}
@Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
index 0e79a29..0adce0f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
@@ -91,6 +91,8 @@ public class BigQueryStorageTableSource<T> extends BigQueryStorageSourceBase<T>
builder.addIfNotNull(
DisplayData.item("table", BigQueryHelpers.displayTable(tableReferenceProvider))
.withLabel("Table"));
+ // Note: This transform does not set launchesBigQueryJobs because it doesn't launch
+ // BigQuery jobs, but instead uses the storage api to directly read the table.
}
@Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index 1cf9f08..6f0df14 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -88,5 +88,8 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("table", tableDef.getJsonTable()));
+ builder.add(
+ DisplayData.item("launchesBigQueryJobs", true)
+ .withLabel("This transform launches BigQuery jobs to read/write elements."));
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index 9761cf4..8aeb982 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -267,6 +267,9 @@ class WriteRename extends DoFn<Iterable<KV<TableDestination, String>>, Void> {
.withLabel("Write Disposition"))
.add(
DisplayData.item("firstPaneCreateDisposition", firstPaneCreateDisposition.toString())
- .withLabel("Create Disposition"));
+ .withLabel("Create Disposition"))
+ .add(
+ DisplayData.item("launchesBigQueryJobs", true)
+ .withLabel("This transform launches BigQuery jobs to read/write elements."));
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 9cce81e..ceb02a1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -230,6 +231,13 @@ class WriteTables<DestinationT>
new PendingJobData(window, retryJob, partitionFiles, tableDestination, tableReference));
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add(
+ DisplayData.item("launchesBigQueryJobs", true)
+ .withLabel("This transform launches BigQuery jobs to read/write elements."));
+ }
+
@FinishBundle
public void finishBundle(FinishBundleContext c) throws Exception {
DatasetService datasetService =
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadataTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadataTest.java
new file mode 100644
index 0000000..0511535
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadataTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static junit.framework.TestCase.assertFalse;
+import static junit.framework.TestCase.assertTrue;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for BigQueryIOMetadataTest. */
+@RunWith(JUnit4.class)
+public class BigQueryIOMetadataTest {
+
+ @Test
+ public void testIsValidCloudLabel() {
+ // A dataflow job ID.
+ // Lowercase letters, numbers, underscores and hyphens are allowed.
+ String testStr = "2020-06-29_15_26_09-12838749047888422749";
+ assertTrue(BigQueryIOMetadata.isValidCloudLabel(testStr));
+
+ // At least one character.
+ testStr = "0";
+ assertTrue(BigQueryIOMetadata.isValidCloudLabel(testStr));
+
+ // Up to 63 characters.
+ testStr = "0123456789abcdefghij0123456789abcdefghij0123456789abcdefghij012";
+ assertTrue(BigQueryIOMetadata.isValidCloudLabel(testStr));
+
+ // Lowercase letters allowed
+ testStr = "abcdefghijklmnopqrstuvwxyz";
+ for (char testChar : testStr.toCharArray()) {
+ assertTrue(BigQueryIOMetadata.isValidCloudLabel(String.valueOf(testChar)));
+ }
+
+ // Empty strings not allowed.
+ testStr = "";
+ assertFalse(BigQueryIOMetadata.isValidCloudLabel(testStr));
+
+ // 64 or more characters not allowed.
+ testStr = "0123456789abcdefghij0123456789abcdefghij0123456789abcdefghij0123";
+ assertFalse(BigQueryIOMetadata.isValidCloudLabel(testStr));
+
+ // Uppercase letters not allowed
+ testStr = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+ for (char testChar : testStr.toCharArray()) {
+ assertFalse(BigQueryIOMetadata.isValidCloudLabel(String.valueOf(testChar)));
+ }
+
+ // Special characters besides hyphens are not allowed
+ testStr = "!@#$%^&*()+=[{]};:\'\"\\|,<.>?/`~";
+ for (char testChar : testStr.toCharArray()) {
+ assertFalse(BigQueryIOMetadata.isValidCloudLabel(String.valueOf(testChar)));
+ }
+ }
+}