You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/01 20:25:55 UTC

[GitHub] [beam] ajamato commented on a change in pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

ajamato commented on a change in pull request #12084:
URL: https://github.com/apache/beam/pull/12084#discussion_r448551682



##########
File path: sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Metadata for use in BigQueryIO, i.e. a job_id to use in BQ job labels."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from apache_beam.io.gcp import gce_metadata_util
+
+
+def CreateBigQueryIOMetadata():
+  """Creates a BigQueryIOMetadata.
+
+  This will request metadata properly based on which runner is being used.
+  """
+  dataflow_job_id = gce_metadata_util.FetchDataflowJobId()
+  # If a dataflow_job id is returned on GCE metadata. Then it means
+  # This program is running on a Dataflow GCE VM.
+  is_dataflow_runner = bool(dataflow_job_id)
+  kwargs = {}
+  if is_dataflow_runner:
+    kwargs['beam_job_id'] = dataflow_job_id
+  return BigQueryIOMetadata(**kwargs)
+
+
+class BigQueryIOMetadata(object):
+  """Metadata class for BigQueryIO. i.e. to use as BQ job labels.
+
+  Do not construct directly, use the CreateBigQueryIOMetadata factory.
+  Which will request metadata properly based on which runner is being used.
+  """
+  def __init__(self, beam_job_id=None):
+    self.beam_job_id = beam_job_id
+
+  def add_additional_bq_job_labels(self, job_labels=None):
+    job_labels = dict() if job_labels is None else job_labels
+    if self.beam_job_id and 'beam_job_id' not in job_labels:
+      job_labels['beam_job_id'] = self.beam_job_id

Review comment:
       No, it shouldn't populate the label at all in that case. Because of the 
   "if self.beam_job_id"
   
   It won't evaluate the if if it's None or an empty string.
   Or have I made a mistake which I don't see?

##########
File path: sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Metadata for use in BigQueryIO, i.e. a job_id to use in BQ job labels."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from apache_beam.io.gcp import gce_metadata_util
+
+
+def CreateBigQueryIOMetadata():
+  """Creates a BigQueryIOMetadata.
+
+  This will request metadata properly based on which runner is being used.
+  """
+  dataflow_job_id = gce_metadata_util.FetchDataflowJobId()
+  # If a dataflow_job id is returned on GCE metadata. Then it means
+  # This program is running on a Dataflow GCE VM.
+  is_dataflow_runner = bool(dataflow_job_id)
+  kwargs = {}
+  if is_dataflow_runner:
+    kwargs['beam_job_id'] = dataflow_job_id
+  return BigQueryIOMetadata(**kwargs)
+
+
+class BigQueryIOMetadata(object):
+  """Metadata class for BigQueryIO. i.e. to use as BQ job labels.
+
+  Do not construct directly, use the CreateBigQueryIOMetadata factory.
+  Which will request metadata properly based on which runner is being used.
+  """
+  def __init__(self, beam_job_id=None):
+    self.beam_job_id = beam_job_id
+
+  def add_additional_bq_job_labels(self, job_labels=None):
+    job_labels = dict() if job_labels is None else job_labels

Review comment:
       Done

##########
File path: sdks/python/apache_beam/io/gcp/gce_metadata_util.py
##########
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Fetches GCE metadata if the calling process is running on a GCE VM."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import httplib2
+
+from apache_beam.internal.http_client import get_new_http
+
+BASE_METADATA_URL = "http://metadata/computeMetadata/v1/"
+
+
+def FetchMetadata(key):
+  try:
+    h = get_new_http(timeout_secs=5)
+    headers = {"Metadata-Flavor": "Google"}
+    uri = BASE_METADATA_URL + key

Review comment:
       No, because on windows it will add backslashes. Let's just keep it simple like this for now. Since its a private method, we should expect proper methods like fetch_dataflow_job_id to be written for future keys.

##########
File path: sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Metadata for use in BigQueryIO, i.e. a job_id to use in BQ job labels."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from apache_beam.io.gcp import gce_metadata_util
+
+
+def CreateBigQueryIOMetadata():

Review comment:
       Done

##########
File path: sdks/python/apache_beam/io/gcp/gce_metadata_util.py
##########
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Fetches GCE metadata if the calling process is running on a GCE VM."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import httplib2
+
+from apache_beam.internal.http_client import get_new_http
+
+BASE_METADATA_URL = "http://metadata/computeMetadata/v1/"
+
+
+def FetchMetadata(key):

Review comment:
       Done

##########
File path: sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
##########
@@ -327,10 +329,13 @@ def __init__(
     self.write_disposition = write_disposition
     self.test_client = test_client
     self._observed_tables = set()
+    self.bq_io_metadata = None

Review comment:
       Yes. Done

##########
File path: sdks/python/apache_beam/io/gcp/gce_metadata_util.py
##########
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Fetches GCE metadata if the calling process is running on a GCE VM."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import httplib2
+
+from apache_beam.internal.http_client import get_new_http
+
+BASE_METADATA_URL = "http://metadata/computeMetadata/v1/"
+
+
+def FetchMetadata(key):
+  try:
+    h = get_new_http(timeout_secs=5)
+    headers = {"Metadata-Flavor": "Google"}
+    uri = BASE_METADATA_URL + key
+    resp, content = h.request(uri, "GET", headers=headers)
+    if resp.status == 200:
+      return content
+  except httplib2.HttpLib2Error:

Review comment:
       The URL should not resolve, so it should fail quickly after attempting a request, not for the full timeout. Ideally we are making this request as infrequently as possible.
   
   I've also just added a method to validate the label, in the off case that the metadata server is giving bad output. I don't want this to make the BQ jobs fail (they will fail if given invalid labels).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org