You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/10/30 21:42:44 UTC

[1/3] beam git commit: Unit test for label pipeline option

Repository: beam
Updated Branches:
  refs/heads/release-2.2.0 6db1db7b4 -> c6b6668a5


Unit test for label pipeline option


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6f07e3f9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6f07e3f9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6f07e3f9

Branch: refs/heads/release-2.2.0
Commit: 6f07e3f9e735768e57ba7b902d8a80b3285e9a93
Parents: 48ae7d1
Author: Ahmet Altay <al...@google.com>
Authored: Fri Oct 13 15:53:15 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Oct 30 14:42:12 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/internal/apiclient_test.py | 28 ++++++++++++++++++++
 1 file changed, 28 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6f07e3f9/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index ecd6003..79cbd1c 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -195,6 +195,34 @@ class UtilTest(unittest.TestCase):
         for experiment in env.proto.experiments:
           self.assertNotIn('runner_harness_container_image=', experiment)
 
+  def test_labels(self):
+    pipeline_options = PipelineOptions(
+        ['--project', 'test_project', '--job_name', 'test_job_name',
+         '--temp_location', 'gs://test-location/temp'])
+    job = apiclient.Job(pipeline_options)
+    self.assertIsNone(job.proto.labels)
+
+    pipeline_options = PipelineOptions(
+        ['--project', 'test_project', '--job_name', 'test_job_name',
+         '--temp_location', 'gs://test-location/temp',
+         '--label', 'key1=value1',
+         '--label', 'key2',
+         '--label', 'key3=value3',
+         '--labels', 'key4=value4',
+         '--labels', 'key5'])
+    job = apiclient.Job(pipeline_options)
+    self.assertEqual(5, len(job.proto.labels.additionalProperties))
+    self.assertEqual('key1', job.proto.labels.additionalProperties[0].key)
+    self.assertEqual('value1', job.proto.labels.additionalProperties[0].value)
+    self.assertEqual('key2', job.proto.labels.additionalProperties[1].key)
+    self.assertEqual('', job.proto.labels.additionalProperties[1].value)
+    self.assertEqual('key3', job.proto.labels.additionalProperties[2].key)
+    self.assertEqual('value3', job.proto.labels.additionalProperties[2].value)
+    self.assertEqual('key4', job.proto.labels.additionalProperties[3].key)
+    self.assertEqual('value4', job.proto.labels.additionalProperties[3].value)
+    self.assertEqual('key5', job.proto.labels.additionalProperties[4].key)
+    self.assertEqual('', job.proto.labels.additionalProperties[4].value)
+
 
 if __name__ == '__main__':
   unittest.main()


[3/3] beam git commit: This closes #3993

Posted by al...@apache.org.
This closes #3993


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c6b6668a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c6b6668a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c6b6668a

Branch: refs/heads/release-2.2.0
Commit: c6b6668a5239a791bf96c0828c305e1482a081fd
Parents: 6db1db7 6f07e3f
Author: Ahmet Altay <al...@google.com>
Authored: Mon Oct 30 14:42:29 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Oct 30 14:42:29 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/options/pipeline_options.py     |  7 +++++
 .../runners/dataflow/internal/apiclient.py      | 11 ++++++++
 .../runners/dataflow/internal/apiclient_test.py | 28 ++++++++++++++++++++
 3 files changed, 46 insertions(+)
----------------------------------------------------------------------



[2/3] beam git commit: Add an option for dataflow job labels.

Posted by al...@apache.org.
Add an option for dataflow job labels.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/48ae7d1d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/48ae7d1d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/48ae7d1d

Branch: refs/heads/release-2.2.0
Commit: 48ae7d1d8243c6a3a037ad454162376f98dec3ab
Parents: 6db1db7
Author: Ahmet Altay <al...@google.com>
Authored: Thu Oct 12 19:17:28 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Mon Oct 30 14:42:12 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/options/pipeline_options.py      |  7 +++++++
 .../apache_beam/runners/dataflow/internal/apiclient.py   | 11 +++++++++++
 2 files changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/48ae7d1d/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 3abcbf2..37703fe 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -374,6 +374,13 @@ class GoogleCloudOptions(PipelineOptions):
     parser.add_argument('--template_location',
                         default=None,
                         help='Save job to specified local or GCS location.')
+    parser.add_argument(
+        '--label', '--labels',
+        dest='labels',
+        action='append',
+        default=None,
+        help='Labels that will be applied to this Dataflow job. Labels are key '
+        'value pairs separated by = (e.g. --label key=value).')
 
   def validate(self, validator):
     errors = []

http://git-wip-us.apache.org/repos/asf/beam/blob/48ae7d1d/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index e48b58c..eec598a 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -363,6 +363,17 @@ class Job(object):
       self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING
     else:
       self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_BATCH
+
+    # Labels.
+    if self.google_cloud_options.labels:
+      self.proto.labels = dataflow.Job.LabelsValue()
+      for label in self.google_cloud_options.labels:
+        parts = label.split('=', 1)
+        key = parts[0]
+        value = parts[1] if len(parts) > 1 else ''
+        self.proto.labels.additionalProperties.append(
+            dataflow.Job.LabelsValue.AdditionalProperty(key=key, value=value))
+
     self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$')
     self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$')