You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/25 00:53:45 UTC

[GitHub] [beam] ajamato opened a new pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

ajamato opened a new pull request #12084:
URL: https://github.com/apache/beam/pull/12084


   [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ajamato commented on a change in pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

Posted by GitBox <gi...@apache.org>.
ajamato commented on a change in pull request #12084:
URL: https://github.com/apache/beam/pull/12084#discussion_r448551682



##########
File path: sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Metadata for use in BigQueryIO, i.e. a job_id to use in BQ job labels."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from apache_beam.io.gcp import gce_metadata_util
+
+
+def CreateBigQueryIOMetadata():
+  """Creates a BigQueryIOMetadata.
+
+  This will request metadata properly based on which runner is being used.
+  """
+  dataflow_job_id = gce_metadata_util.FetchDataflowJobId()
+  # If a dataflow_job id is returned on GCE metadata. Then it means
+  # This program is running on a Dataflow GCE VM.
+  is_dataflow_runner = bool(dataflow_job_id)
+  kwargs = {}
+  if is_dataflow_runner:
+    kwargs['beam_job_id'] = dataflow_job_id
+  return BigQueryIOMetadata(**kwargs)
+
+
+class BigQueryIOMetadata(object):
+  """Metadata class for BigQueryIO. i.e. to use as BQ job labels.
+
+  Do not construct directly, use the CreateBigQueryIOMetadata factory.
+  Which will request metadata properly based on which runner is being used.
+  """
+  def __init__(self, beam_job_id=None):
+    self.beam_job_id = beam_job_id
+
+  def add_additional_bq_job_labels(self, job_labels=None):
+    job_labels = dict() if job_labels is None else job_labels
+    if self.beam_job_id and 'beam_job_id' not in job_labels:
+      job_labels['beam_job_id'] = self.beam_job_id

Review comment:
       No, it shouldn't populate the label at all in that case. Because of the 
   "if self.beam_job_id"
   
   It won't evaluate the if if it's None or an empty string.
   Or have I made a mistake which I don't see?

##########
File path: sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Metadata for use in BigQueryIO, i.e. a job_id to use in BQ job labels."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from apache_beam.io.gcp import gce_metadata_util
+
+
+def CreateBigQueryIOMetadata():
+  """Creates a BigQueryIOMetadata.
+
+  This will request metadata properly based on which runner is being used.
+  """
+  dataflow_job_id = gce_metadata_util.FetchDataflowJobId()
+  # If a dataflow_job id is returned on GCE metadata. Then it means
+  # This program is running on a Dataflow GCE VM.
+  is_dataflow_runner = bool(dataflow_job_id)
+  kwargs = {}
+  if is_dataflow_runner:
+    kwargs['beam_job_id'] = dataflow_job_id
+  return BigQueryIOMetadata(**kwargs)
+
+
+class BigQueryIOMetadata(object):
+  """Metadata class for BigQueryIO. i.e. to use as BQ job labels.
+
+  Do not construct directly, use the CreateBigQueryIOMetadata factory.
+  Which will request metadata properly based on which runner is being used.
+  """
+  def __init__(self, beam_job_id=None):
+    self.beam_job_id = beam_job_id
+
+  def add_additional_bq_job_labels(self, job_labels=None):
+    job_labels = dict() if job_labels is None else job_labels

Review comment:
       Done

##########
File path: sdks/python/apache_beam/io/gcp/gce_metadata_util.py
##########
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Fetches GCE metadata if the calling process is running on a GCE VM."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import httplib2
+
+from apache_beam.internal.http_client import get_new_http
+
+BASE_METADATA_URL = "http://metadata/computeMetadata/v1/"
+
+
+def FetchMetadata(key):
+  try:
+    h = get_new_http(timeout_secs=5)
+    headers = {"Metadata-Flavor": "Google"}
+    uri = BASE_METADATA_URL + key

Review comment:
       No, because on windows it will add backslashes. Let's just keep it simple like this for now. Since its a private method, we should expect proper methods like fetch_dataflow_job_id to be written for future keys.

##########
File path: sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Metadata for use in BigQueryIO, i.e. a job_id to use in BQ job labels."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from apache_beam.io.gcp import gce_metadata_util
+
+
+def CreateBigQueryIOMetadata():

Review comment:
       Done

##########
File path: sdks/python/apache_beam/io/gcp/gce_metadata_util.py
##########
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Fetches GCE metadata if the calling process is running on a GCE VM."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import httplib2
+
+from apache_beam.internal.http_client import get_new_http
+
+BASE_METADATA_URL = "http://metadata/computeMetadata/v1/"
+
+
+def FetchMetadata(key):

Review comment:
       Done

##########
File path: sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
##########
@@ -327,10 +329,13 @@ def __init__(
     self.write_disposition = write_disposition
     self.test_client = test_client
     self._observed_tables = set()
+    self.bq_io_metadata = None

Review comment:
       Yes. Done

##########
File path: sdks/python/apache_beam/io/gcp/gce_metadata_util.py
##########
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Fetches GCE metadata if the calling process is running on a GCE VM."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import httplib2
+
+from apache_beam.internal.http_client import get_new_http
+
+BASE_METADATA_URL = "http://metadata/computeMetadata/v1/"
+
+
+def FetchMetadata(key):
+  try:
+    h = get_new_http(timeout_secs=5)
+    headers = {"Metadata-Flavor": "Google"}
+    uri = BASE_METADATA_URL + key
+    resp, content = h.request(uri, "GET", headers=headers)
+    if resp.status == 200:
+      return content
+  except httplib2.HttpLib2Error:

Review comment:
       The URL should not resolve, so it should fail quickly after attempting a request, not for the full timeout. Ideally we are making this request as infrequently as possible.
   
   I've also just added a method to validate the label, in the off case that the metadata server is giving bad output. I don't want this to make the BQ jobs fail (they will fail if given invalid labels).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12084:
URL: https://github.com/apache/beam/pull/12084#issuecomment-652225853


   Run Python 3.5 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12084:
URL: https://github.com/apache/beam/pull/12084#issuecomment-653168951


   Run Python 3.5 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #12084:
URL: https://github.com/apache/beam/pull/12084#discussion_r448126874



##########
File path: sdks/python/apache_beam/io/gcp/gce_metadata_util.py
##########
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Fetches GCE metadata if the calling process is running on a GCE VM."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import httplib2
+
+from apache_beam.internal.http_client import get_new_http
+
+BASE_METADATA_URL = "http://metadata/computeMetadata/v1/"
+
+
+def FetchMetadata(key):
+  try:
+    h = get_new_http(timeout_secs=5)
+    headers = {"Metadata-Flavor": "Google"}
+    uri = BASE_METADATA_URL + key
+    resp, content = h.request(uri, "GET", headers=headers)
+    if resp.status == 200:
+      return content
+  except httplib2.HttpLib2Error:

Review comment:
       how will it work for non-dataflow workers? Will it fail immediately? time out?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ajamato commented on pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

Posted by GitBox <gi...@apache.org>.
ajamato commented on pull request #12084:
URL: https://github.com/apache/beam/pull/12084#issuecomment-652714875


   Thanks Pablo. I addressed the lint issues and pushed the changes. Hopefully it passes


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12084:
URL: https://github.com/apache/beam/pull/12084#issuecomment-654395407


   Thanks @ajamato - LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #12084:
URL: https://github.com/apache/beam/pull/12084


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ajamato commented on pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

Posted by GitBox <gi...@apache.org>.
ajamato commented on pull request #12084:
URL: https://github.com/apache/beam/pull/12084#issuecomment-651539363


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12084:
URL: https://github.com/apache/beam/pull/12084#issuecomment-649793787


   can you address formatter / lint / rat / test  errors?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #12084:
URL: https://github.com/apache/beam/pull/12084#discussion_r448121982



##########
File path: sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Metadata for use in BigQueryIO, i.e. a job_id to use in BQ job labels."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from apache_beam.io.gcp import gce_metadata_util
+
+
+def CreateBigQueryIOMetadata():
+  """Creates a BigQueryIOMetadata.
+
+  This will request metadata properly based on which runner is being used.
+  """
+  dataflow_job_id = gce_metadata_util.FetchDataflowJobId()
+  # If a dataflow_job id is returned on GCE metadata. Then it means
+  # This program is running on a Dataflow GCE VM.
+  is_dataflow_runner = bool(dataflow_job_id)
+  kwargs = {}
+  if is_dataflow_runner:
+    kwargs['beam_job_id'] = dataflow_job_id
+  return BigQueryIOMetadata(**kwargs)
+
+
+class BigQueryIOMetadata(object):
+  """Metadata class for BigQueryIO. i.e. to use as BQ job labels.
+
+  Do not construct directly, use the CreateBigQueryIOMetadata factory.
+  Which will request metadata properly based on which runner is being used.
+  """
+  def __init__(self, beam_job_id=None):
+    self.beam_job_id = beam_job_id
+
+  def add_additional_bq_job_labels(self, job_labels=None):
+    job_labels = dict() if job_labels is None else job_labels

Review comment:
       (nit)Up to you, but you could reduce to this:
   
   ```suggestion
       job_labels = job_labels or {}
   ```

##########
File path: sdks/python/apache_beam/io/gcp/gce_metadata_util.py
##########
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Fetches GCE metadata if the calling process is running on a GCE VM."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import httplib2
+
+from apache_beam.internal.http_client import get_new_http
+
+BASE_METADATA_URL = "http://metadata/computeMetadata/v1/"
+
+
+def FetchMetadata(key):

Review comment:
       make functions snake_case please

##########
File path: sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
##########
@@ -327,10 +329,13 @@ def __init__(
     self.write_disposition = write_disposition
     self.test_client = test_client
     self._observed_tables = set()
+    self.bq_io_metadata = None

Review comment:
       Do you want to add the display data item for this DoFn?

##########
File path: sdks/python/apache_beam/io/gcp/gce_metadata_util.py
##########
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Fetches GCE metadata if the calling process is running on a GCE VM."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import httplib2
+
+from apache_beam.internal.http_client import get_new_http
+
+BASE_METADATA_URL = "http://metadata/computeMetadata/v1/"
+
+
+def FetchMetadata(key):
+  try:
+    h = get_new_http(timeout_secs=5)
+    headers = {"Metadata-Flavor": "Google"}
+    uri = BASE_METADATA_URL + key

Review comment:
       `os.path.join` should ensure URLs are joined properly independently of whether the key has/does not have a leading slash. Do you think that'd be good to add?

##########
File path: sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Metadata for use in BigQueryIO, i.e. a job_id to use in BQ job labels."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from apache_beam.io.gcp import gce_metadata_util
+
+
+def CreateBigQueryIOMetadata():

Review comment:
       we usually make functions be snake_case. Could you change this please?

##########
File path: sdks/python/apache_beam/io/gcp/gce_metadata_util.py
##########
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Fetches GCE metadata if the calling process is running on a GCE VM."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import httplib2
+
+from apache_beam.internal.http_client import get_new_http
+
+BASE_METADATA_URL = "http://metadata/computeMetadata/v1/"
+
+
+def FetchMetadata(key):
+  try:
+    h = get_new_http(timeout_secs=5)
+    headers = {"Metadata-Flavor": "Google"}
+    uri = BASE_METADATA_URL + key

Review comment:
       (up to you)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12084:
URL: https://github.com/apache/beam/pull/12084#issuecomment-652066216


   Run Python 3.5 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12084:
URL: https://github.com/apache/beam/pull/12084#issuecomment-652671801






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ajamato commented on pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

Posted by GitBox <gi...@apache.org>.
ajamato commented on pull request #12084:
URL: https://github.com/apache/beam/pull/12084#issuecomment-651394625


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ajamato commented on pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

Posted by GitBox <gi...@apache.org>.
ajamato commented on pull request #12084:
URL: https://github.com/apache/beam/pull/12084#issuecomment-649150134


   @pabloem @chamikaramj 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #12084:
URL: https://github.com/apache/beam/pull/12084#issuecomment-651444557


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ajamato commented on pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

Posted by GitBox <gi...@apache.org>.
ajamato commented on pull request #12084:
URL: https://github.com/apache/beam/pull/12084#issuecomment-649888199


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12084:
URL: https://github.com/apache/beam/pull/12084#issuecomment-653268082


   Run Python 2 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org