You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/07/06 23:31:32 UTC

[beam] branch master updated: [BEAM-10317] Java - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0dede89  [BEAM-10317] Java - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID
     new 96bf514  Merge pull request #12083 from [BEAM-10317] Java - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID
0dede89 is described below

commit 0dede89a89e6b1b8c5a62b7c5d30759841badbe0
Author: Alex Amato <aj...@google.com>
AuthorDate: Wed Jun 24 17:35:19 2020 -0700

    [BEAM-10317] Java - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID
---
 .../beam/examples/cookbook/BigQueryTornadoes.java  | 46 ++++++++++---
 .../examples/cookbook/BigQueryTornadoesIT.java     | 27 ++++++++
 .../sdk/extensions/gcp/util/GceMetadataUtil.java   | 69 +++++++++++++++++++
 .../sdk/io/gcp/bigquery/BigQueryIOMetadata.java    | 78 ++++++++++++++++++++++
 .../sdk/io/gcp/bigquery/BigQueryQuerySource.java   |  3 +
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  | 33 ++++++---
 .../gcp/bigquery/BigQueryStorageQuerySource.java   |  3 +
 .../gcp/bigquery/BigQueryStorageTableSource.java   |  2 +
 .../sdk/io/gcp/bigquery/BigQueryTableSource.java   |  3 +
 .../beam/sdk/io/gcp/bigquery/WriteRename.java      |  5 +-
 .../beam/sdk/io/gcp/bigquery/WriteTables.java      |  8 +++
 .../io/gcp/bigquery/BigQueryIOMetadataTest.java    | 72 ++++++++++++++++++++
 12 files changed, 329 insertions(+), 20 deletions(-)

diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index b4b775e..3d70ab1 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -139,6 +139,12 @@ public class BigQueryTornadoes {
 
     void setInput(String value);
 
+    @Description("SQL Query to read from, will be used if Input is not set.")
+    @Default.String("")
+    String getInputQuery();
+
+    void setInputQuery(String value);
+
     @Description("Mode to use when reading from BigQuery")
     @Default.Enum("EXPORT")
     TypedRead.Method getReadMethod();
@@ -167,20 +173,38 @@ public class BigQueryTornadoes {
 
     switch (options.getReadMethod()) {
       case DIRECT_READ:
-        rowsFromBigQuery =
-            p.apply(
-                BigQueryIO.readTableRows()
-                    .from(options.getInput())
-                    .withMethod(Method.DIRECT_READ)
-                    .withSelectedFields(Lists.newArrayList("month", "tornado")));
+        if (!options.getInputQuery().isEmpty()) {
+          rowsFromBigQuery =
+              p.apply(
+                  BigQueryIO.readTableRows()
+                      .fromQuery(options.getInputQuery())
+                      .usingStandardSql()
+                      .withMethod(Method.DIRECT_READ));
+        } else {
+          rowsFromBigQuery =
+              p.apply(
+                  BigQueryIO.readTableRows()
+                      .from(options.getInput())
+                      .withMethod(Method.DIRECT_READ)
+                      .withSelectedFields(Lists.newArrayList("month", "tornado")));
+        }
         break;
 
       default:
-        rowsFromBigQuery =
-            p.apply(
-                BigQueryIO.readTableRows()
-                    .from(options.getInput())
-                    .withMethod(options.getReadMethod()));
+        if (!options.getInputQuery().isEmpty()) {
+          rowsFromBigQuery =
+              p.apply(
+                  BigQueryIO.readTableRows()
+                      .fromQuery(options.getInputQuery())
+                      .usingStandardSql()
+                      .withMethod(options.getReadMethod()));
+        } else {
+          rowsFromBigQuery =
+              p.apply(
+                  BigQueryIO.readTableRows()
+                      .from(options.getInput())
+                      .withMethod(options.getReadMethod()));
+        }
         break;
     }
 
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
index 0825478..ea10fb1 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
@@ -96,4 +96,31 @@ public class BigQueryTornadoesIT {
 
     runE2EBigQueryTornadoesTest(options);
   }
+
+  @Test
+  public void testE2EBigQueryTornadoesWithExportUsingQuery() throws Exception {
+    BigQueryTornadoesITOptions options =
+        TestPipeline.testingPipelineOptions().as(BigQueryTornadoesITOptions.class);
+    options.setReadMethod(Method.EXPORT);
+    options.setOutput(
+        String.format(
+            "%s.%s", "BigQueryTornadoesIT", "monthly_tornadoes_" + System.currentTimeMillis()));
+    options.setInputQuery("SELECT * FROM `clouddataflow-readonly.samples.weather_stations`");
+
+    runE2EBigQueryTornadoesTest(options);
+  }
+
+  @Test
+  public void testE2eBigQueryTornadoesWithStorageApiUsingQuery() throws Exception {
+    BigQueryTornadoesITOptions options =
+        TestPipeline.testingPipelineOptions().as(BigQueryTornadoesITOptions.class);
+    options.setReadMethod(Method.DIRECT_READ);
+    options.setOutput(
+        String.format(
+            "%s.%s",
+            "BigQueryTornadoesIT", "monthly_tornadoes_storage_" + System.currentTimeMillis()));
+    options.setInputQuery("SELECT * FROM `clouddataflow-readonly.samples.weather_stations`");
+
+    runE2EBigQueryTornadoesTest(options);
+  }
 }
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
new file mode 100644
index 0000000..6736844
--- /dev/null
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.gcp.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.Charset;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CharStreams;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
+
+/** */
+public class GceMetadataUtil {
+  private static final String BASE_METADATA_URL = "http://metadata/computeMetadata/v1/";
+
+  static String fetchMetadata(String key) {
+    int timeoutMillis = 5000;
+    final HttpParams httpParams = new BasicHttpParams();
+    HttpConnectionParams.setConnectionTimeout(httpParams, timeoutMillis);
+    HttpClient client = new DefaultHttpClient(httpParams);
+    HttpGet request = new HttpGet(BASE_METADATA_URL + key);
+    request.setHeader("Metadata-Flavor", "Google");
+
+    try {
+      HttpResponse response = client.execute(request);
+      if (response.getStatusLine().getStatusCode() != 200) {
+        // May mean its running on a non DataflowRunner, in which case it's perfectly normal.
+        return "";
+      }
+      InputStream in = response.getEntity().getContent();
+      try (final Reader reader = new InputStreamReader(in, Charset.defaultCharset())) {
+        return CharStreams.toString(reader);
+      }
+    } catch (IOException e) {
+      // May mean its running on a non DataflowRunner, in which case it's perfectly normal.
+    }
+    return "";
+  }
+
+  private static String fetchCustomGceMetadata(String customMetadataKey) {
+    return GceMetadataUtil.fetchMetadata("instance/attributes/" + customMetadataKey);
+  }
+
+  public static String fetchDataflowJobId() {
+    return GceMetadataUtil.fetchCustomGceMetadata("job_id");
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java
new file mode 100644
index 0000000..e2d058d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadata.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.extensions.gcp.util.GceMetadataUtil;
+
+/** Metadata class for BigQueryIO. i.e. to use as BQ job labels. */
+final class BigQueryIOMetadata {
+
+  private String beamJobId;
+
+  private BigQueryIOMetadata(String beamJobId) {
+    this.beamJobId = beamJobId;
+  }
+
+  private static final Pattern VALID_CLOUD_LABEL_PATTERN =
+      Pattern.compile("^[a-z0-9\\_\\-]{1,63}$");
+
+  /**
+   * Creates a BigQueryIOMetadata. This will request metadata properly based on which runner is
+   * being used.
+   */
+  public static BigQueryIOMetadata create() {
+    String dataflowJobId = GceMetadataUtil.fetchDataflowJobId();
+    // If a Dataflow job id is returned on GCE metadata. Then it means
+    // this program is running on a Dataflow GCE VM.
+    boolean isDataflowRunner = dataflowJobId != null && !dataflowJobId.isEmpty();
+
+    String beamJobId = null;
+    if (isDataflowRunner) {
+      if (BigQueryIOMetadata.isValidCloudLabel(dataflowJobId)) {
+        beamJobId = dataflowJobId;
+      }
+    }
+    return new BigQueryIOMetadata(beamJobId);
+  }
+
+  public Map<String, String> addAdditionalJobLabels(Map<String, String> jobLabels) {
+    if (this.beamJobId != null && !jobLabels.containsKey("beam_job_id")) {
+      jobLabels.put("beam_job_id", this.beamJobId);
+    }
+    return jobLabels;
+  }
+
+  /**
+   * Returns true if label_value is a valid cloud label string. This function can return false in
+   * cases where the label value is valid. However, it will not return true in a case where the
+   * lavel value is invalid. This is because a stricter set of allowed characters is used in this
+   * validator, because foreign language characters are not accepted. Thus, this should not be used
+   * as a generic validator for all cloud labels.
+   *
+   * <p>See Also: https://cloud.google.com/compute/docs/labeling-resources
+   *
+   * @return true if label_value is a valid cloud label string.
+   */
+  public static boolean isValidCloudLabel(String value) {
+    Matcher m = VALID_CLOUD_LABEL_PATTERN.matcher(value);
+    return m.find();
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index e8c1e29..991ec44 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -74,5 +74,8 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
     builder.add(DisplayData.item("query", queryDef.getQuery()));
+    builder.add(
+        DisplayData.item("launchesBigQueryJobs", true)
+            .withLabel("This transform launches BigQuery jobs to read/write elements."));
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index df6064c..035425e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -64,8 +64,10 @@ import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -140,16 +142,19 @@ class BigQueryServicesImpl implements BigQueryServices {
   static class JobServiceImpl implements BigQueryServices.JobService {
     private final ApiErrorExtractor errorExtractor;
     private final Bigquery client;
+    private final BigQueryIOMetadata bqIOMetadata;
 
     @VisibleForTesting
     JobServiceImpl(Bigquery client) {
       this.errorExtractor = new ApiErrorExtractor();
       this.client = client;
+      this.bqIOMetadata = BigQueryIOMetadata.create();
     }
 
     private JobServiceImpl(BigQueryOptions options) {
       this.errorExtractor = new ApiErrorExtractor();
       this.client = newBigQueryClient(options).build();
+      this.bqIOMetadata = BigQueryIOMetadata.create();
     }
 
     /**
@@ -162,11 +167,14 @@ class BigQueryServicesImpl implements BigQueryServices {
     @Override
     public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
         throws InterruptedException, IOException {
+      Map<String, String> labelMap = new HashMap<>();
       Job job =
           new Job()
               .setJobReference(jobRef)
-              .setConfiguration(new JobConfiguration().setLoad(loadConfig));
-
+              .setConfiguration(
+                  new JobConfiguration()
+                      .setLoad(loadConfig)
+                      .setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
       startJob(job, errorExtractor, client);
     }
 
@@ -180,11 +188,14 @@ class BigQueryServicesImpl implements BigQueryServices {
     @Override
     public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
         throws InterruptedException, IOException {
+      Map<String, String> labelMap = new HashMap<>();
       Job job =
           new Job()
               .setJobReference(jobRef)
-              .setConfiguration(new JobConfiguration().setExtract(extractConfig));
-
+              .setConfiguration(
+                  new JobConfiguration()
+                      .setExtract(extractConfig)
+                      .setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
       startJob(job, errorExtractor, client);
     }
 
@@ -198,11 +209,14 @@ class BigQueryServicesImpl implements BigQueryServices {
     @Override
     public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig)
         throws IOException, InterruptedException {
+      Map<String, String> labelMap = new HashMap<>();
       Job job =
           new Job()
               .setJobReference(jobRef)
-              .setConfiguration(new JobConfiguration().setQuery(queryConfig));
-
+              .setConfiguration(
+                  new JobConfiguration()
+                      .setQuery(queryConfig)
+                      .setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
       startJob(job, errorExtractor, client);
     }
 
@@ -216,11 +230,14 @@ class BigQueryServicesImpl implements BigQueryServices {
     @Override
     public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig)
         throws IOException, InterruptedException {
+      Map<String, String> labelMap = new HashMap<>();
       Job job =
           new Job()
               .setJobReference(jobRef)
-              .setConfiguration(new JobConfiguration().setCopy(copyConfig));
-
+              .setConfiguration(
+                  new JobConfiguration()
+                      .setCopy(copyConfig)
+                      .setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
       startJob(job, errorExtractor, client);
     }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
index 1564ab6..3e6ea7a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
@@ -109,6 +109,9 @@ public class BigQueryStorageQuerySource<T> extends BigQueryStorageSourceBase<T>
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
     builder.add(DisplayData.item("query", queryProvider).withLabel("Query"));
+    builder.add(
+        DisplayData.item("launchesBigQueryJobs", true)
+            .withLabel("This transform launches BigQuery jobs to read/write elements."));
   }
 
   @Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
index 0e79a29..0adce0f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
@@ -91,6 +91,8 @@ public class BigQueryStorageTableSource<T> extends BigQueryStorageSourceBase<T>
     builder.addIfNotNull(
         DisplayData.item("table", BigQueryHelpers.displayTable(tableReferenceProvider))
             .withLabel("Table"));
+    // Note: This transform does not set launchesBigQueryJobs because it doesn't launch
+    // BigQuery jobs, but instead uses the storage api to directly read the table.
   }
 
   @Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index 1cf9f08..6f0df14 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -88,5 +88,8 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
     builder.add(DisplayData.item("table", tableDef.getJsonTable()));
+    builder.add(
+        DisplayData.item("launchesBigQueryJobs", true)
+            .withLabel("This transform launches BigQuery jobs to read/write elements."));
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index 9761cf4..8aeb982 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -267,6 +267,9 @@ class WriteRename extends DoFn<Iterable<KV<TableDestination, String>>, Void> {
                 .withLabel("Write Disposition"))
         .add(
             DisplayData.item("firstPaneCreateDisposition", firstPaneCreateDisposition.toString())
-                .withLabel("Create Disposition"));
+                .withLabel("Create Disposition"))
+        .add(
+            DisplayData.item("launchesBigQueryJobs", true)
+                .withLabel("This transform launches BigQuery jobs to read/write elements."));
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 9cce81e..ceb02a1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -230,6 +231,13 @@ class WriteTables<DestinationT>
           new PendingJobData(window, retryJob, partitionFiles, tableDestination, tableReference));
     }
 
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.add(
+          DisplayData.item("launchesBigQueryJobs", true)
+              .withLabel("This transform launches BigQuery jobs to read/write elements."));
+    }
+
     @FinishBundle
     public void finishBundle(FinishBundleContext c) throws Exception {
       DatasetService datasetService =
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadataTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadataTest.java
new file mode 100644
index 0000000..0511535
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOMetadataTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static junit.framework.TestCase.assertFalse;
+import static junit.framework.TestCase.assertTrue;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for BigQueryIOMetadataTest. */
+@RunWith(JUnit4.class)
+public class BigQueryIOMetadataTest {
+
+  @Test
+  public void testIsValidCloudLabel() {
+    // A dataflow job ID.
+    // Lowercase letters, numbers, underscores and hyphens are allowed.
+    String testStr = "2020-06-29_15_26_09-12838749047888422749";
+    assertTrue(BigQueryIOMetadata.isValidCloudLabel(testStr));
+
+    // At least one character.
+    testStr = "0";
+    assertTrue(BigQueryIOMetadata.isValidCloudLabel(testStr));
+
+    // Up to 63 characters.
+    testStr = "0123456789abcdefghij0123456789abcdefghij0123456789abcdefghij012";
+    assertTrue(BigQueryIOMetadata.isValidCloudLabel(testStr));
+
+    // Lowercase letters allowed
+    testStr = "abcdefghijklmnopqrstuvwxyz";
+    for (char testChar : testStr.toCharArray()) {
+      assertTrue(BigQueryIOMetadata.isValidCloudLabel(String.valueOf(testChar)));
+    }
+
+    // Empty strings not allowed.
+    testStr = "";
+    assertFalse(BigQueryIOMetadata.isValidCloudLabel(testStr));
+
+    // 64 or more characters not allowed.
+    testStr = "0123456789abcdefghij0123456789abcdefghij0123456789abcdefghij0123";
+    assertFalse(BigQueryIOMetadata.isValidCloudLabel(testStr));
+
+    // Uppercase letters not allowed
+    testStr = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+    for (char testChar : testStr.toCharArray()) {
+      assertFalse(BigQueryIOMetadata.isValidCloudLabel(String.valueOf(testChar)));
+    }
+
+    // Special characters besides hyphens are not allowed
+    testStr = "!@#$%^&*()+=[{]};:\'\"\\|,<.>?/`~";
+    for (char testChar : testStr.toCharArray()) {
+      assertFalse(BigQueryIOMetadata.isValidCloudLabel(String.valueOf(testChar)));
+    }
+  }
+}