You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/01 05:40:54 UTC

[GitHub] [beam] pabloem commented on a change in pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID

pabloem commented on a change in pull request #12084:
URL: https://github.com/apache/beam/pull/12084#discussion_r448121982



##########
File path: sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Metadata for use in BigQueryIO, i.e. a job_id to use in BQ job labels."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from apache_beam.io.gcp import gce_metadata_util
+
+
+def CreateBigQueryIOMetadata():
+  """Creates a BigQueryIOMetadata.
+
+  This will request metadata properly based on which runner is being used.
+  """
+  dataflow_job_id = gce_metadata_util.FetchDataflowJobId()
+  # If a dataflow_job id is returned on GCE metadata. Then it means
+  # This program is running on a Dataflow GCE VM.
+  is_dataflow_runner = bool(dataflow_job_id)
+  kwargs = {}
+  if is_dataflow_runner:
+    kwargs['beam_job_id'] = dataflow_job_id
+  return BigQueryIOMetadata(**kwargs)
+
+
+class BigQueryIOMetadata(object):
+  """Metadata class for BigQueryIO. i.e. to use as BQ job labels.
+
+  Do not construct directly, use the CreateBigQueryIOMetadata factory.
+  Which will request metadata properly based on which runner is being used.
+  """
+  def __init__(self, beam_job_id=None):
+    self.beam_job_id = beam_job_id
+
+  def add_additional_bq_job_labels(self, job_labels=None):
+    job_labels = dict() if job_labels is None else job_labels

Review comment:
       (nit)Up to you, but you could reduce to this:
   
   ```suggestion
       job_labels = job_labels or {}
   ```

##########
File path: sdks/python/apache_beam/io/gcp/gce_metadata_util.py
##########
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Fetches GCE metadata if the calling process is running on a GCE VM."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import httplib2
+
+from apache_beam.internal.http_client import get_new_http
+
+BASE_METADATA_URL = "http://metadata/computeMetadata/v1/"
+
+
+def FetchMetadata(key):

Review comment:
       make functions snake_case please

##########
File path: sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
##########
@@ -327,10 +329,13 @@ def __init__(
     self.write_disposition = write_disposition
     self.test_client = test_client
     self._observed_tables = set()
+    self.bq_io_metadata = None

Review comment:
       Do you want to add the display data item for this DoFn?

##########
File path: sdks/python/apache_beam/io/gcp/gce_metadata_util.py
##########
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Fetches GCE metadata if the calling process is running on a GCE VM."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import httplib2
+
+from apache_beam.internal.http_client import get_new_http
+
+BASE_METADATA_URL = "http://metadata/computeMetadata/v1/"
+
+
+def FetchMetadata(key):
+  try:
+    h = get_new_http(timeout_secs=5)
+    headers = {"Metadata-Flavor": "Google"}
+    uri = BASE_METADATA_URL + key

Review comment:
       `os.path.join` should ensure URLs are joined properly independently of whether the key has/does not have a leading slash. Do you think that'd be good to add?

##########
File path: sdks/python/apache_beam/io/gcp/bigquery_io_metadata.py
##########
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Metadata for use in BigQueryIO, i.e. a job_id to use in BQ job labels."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+from apache_beam.io.gcp import gce_metadata_util
+
+
+def CreateBigQueryIOMetadata():

Review comment:
       we usually make functions be snake_case. Could you change this please?

##########
File path: sdks/python/apache_beam/io/gcp/gce_metadata_util.py
##########
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Fetches GCE metadata if the calling process is running on a GCE VM."""
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import httplib2
+
+from apache_beam.internal.http_client import get_new_http
+
+BASE_METADATA_URL = "http://metadata/computeMetadata/v1/"
+
+
+def FetchMetadata(key):
+  try:
+    h = get_new_http(timeout_secs=5)
+    headers = {"Metadata-Flavor": "Google"}
+    uri = BASE_METADATA_URL + key

Review comment:
       (up to you)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org