You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/12/29 04:33:17 UTC
[1/3] beam git commit: Rename options.py -> pipeline_options.py
Repository: beam
Updated Branches:
refs/heads/python-sdk 89a5b3c3e -> 8bf4c8059
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
index 8878c3f..ffed048 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator_test.py
@@ -21,7 +21,7 @@ import logging
import unittest
from apache_beam.internal import pickler
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options_validator import PipelineOptionsValidator
from hamcrest.core.base_matcher import BaseMatcher
[3/3] beam git commit: This closes #1714
Posted by dh...@apache.org.
This closes #1714
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8bf4c805
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8bf4c805
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8bf4c805
Branch: refs/heads/python-sdk
Commit: 8bf4c8059a240d187edb1489d266b2ec09b67069
Parents: 89a5b3c cd9c9f8
Author: Dan Halperin <dh...@google.com>
Authored: Wed Dec 28 20:32:27 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Dec 28 20:32:27 2016 -0800
----------------------------------------------------------------------
.../examples/complete/autocomplete.py | 4 +-
.../examples/complete/estimate_pi.py | 4 +-
.../apache_beam/examples/complete/tfidf.py | 4 +-
.../examples/complete/top_wikipedia_sessions.py | 4 +-
.../examples/cookbook/bigquery_side_input.py | 4 +-
.../apache_beam/examples/cookbook/bigshuffle.py | 4 +-
.../apache_beam/examples/cookbook/coders.py | 4 +-
.../examples/cookbook/custom_ptransform.py | 2 +-
.../examples/cookbook/datastore_wordcount.py | 6 +-
.../examples/cookbook/group_with_coder.py | 4 +-
.../examples/cookbook/mergecontacts.py | 4 +-
.../examples/cookbook/multiple_output_pardo.py | 4 +-
.../apache_beam/examples/snippets/snippets.py | 50 +-
.../examples/snippets/snippets_test.py | 2 +-
sdks/python/apache_beam/examples/wordcount.py | 4 +-
.../apache_beam/examples/wordcount_debugging.py | 4 +-
.../apache_beam/examples/wordcount_minimal.py | 4 +-
sdks/python/apache_beam/internal/apiclient.py | 8 +-
.../apache_beam/internal/apiclient_test.py | 2 +-
sdks/python/apache_beam/internal/auth.py | 4 +-
sdks/python/apache_beam/io/bigquery.py | 2 +-
sdks/python/apache_beam/io/bigquery_test.py | 2 +-
sdks/python/apache_beam/pipeline.py | 8 +-
.../apache_beam/runners/dataflow_runner.py | 2 +-
.../runners/direct/transform_evaluator.py | 2 +-
sdks/python/apache_beam/runners/runner_test.py | 2 +-
.../apache_beam/runners/template_runner_test.py | 2 +-
.../runners/test/test_dataflow_runner.py | 2 +-
sdks/python/apache_beam/test_pipeline.py | 2 +-
sdks/python/apache_beam/test_pipeline_test.py | 2 +-
sdks/python/apache_beam/transforms/core.py | 2 +-
sdks/python/apache_beam/transforms/display.py | 2 +-
.../apache_beam/transforms/display_test.py | 2 +-
.../python/apache_beam/transforms/ptransform.py | 2 +-
.../apache_beam/transforms/ptransform_test.py | 4 +-
.../transforms/write_ptransform_test.py | 2 +-
.../typehints/typed_pipeline_test.py | 4 +-
sdks/python/apache_beam/utils/dependency.py | 4 +-
.../python/apache_beam/utils/dependency_test.py | 6 +-
sdks/python/apache_beam/utils/options.py | 543 -------------------
.../apache_beam/utils/pipeline_options.py | 540 ++++++++++++++++++
.../apache_beam/utils/pipeline_options_test.py | 2 +-
.../utils/pipeline_options_validator.py | 14 +-
.../utils/pipeline_options_validator_test.py | 2 +-
44 files changed, 639 insertions(+), 642 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: Rename options.py -> pipeline_options.py
Posted by dh...@apache.org.
Rename options.py -> pipeline_options.py
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd9c9f88
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd9c9f88
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd9c9f88
Branch: refs/heads/python-sdk
Commit: cd9c9f888879d425a990031f28bf3bff63d2087e
Parents: 89a5b3c
Author: Maria Garcia Herrero <ma...@google.com>
Authored: Wed Dec 28 15:28:37 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Dec 28 20:32:24 2016 -0800
----------------------------------------------------------------------
.../examples/complete/autocomplete.py | 4 +-
.../examples/complete/estimate_pi.py | 4 +-
.../apache_beam/examples/complete/tfidf.py | 4 +-
.../examples/complete/top_wikipedia_sessions.py | 4 +-
.../examples/cookbook/bigquery_side_input.py | 4 +-
.../apache_beam/examples/cookbook/bigshuffle.py | 4 +-
.../apache_beam/examples/cookbook/coders.py | 4 +-
.../examples/cookbook/custom_ptransform.py | 2 +-
.../examples/cookbook/datastore_wordcount.py | 6 +-
.../examples/cookbook/group_with_coder.py | 4 +-
.../examples/cookbook/mergecontacts.py | 4 +-
.../examples/cookbook/multiple_output_pardo.py | 4 +-
.../apache_beam/examples/snippets/snippets.py | 50 +-
.../examples/snippets/snippets_test.py | 2 +-
sdks/python/apache_beam/examples/wordcount.py | 4 +-
.../apache_beam/examples/wordcount_debugging.py | 4 +-
.../apache_beam/examples/wordcount_minimal.py | 4 +-
sdks/python/apache_beam/internal/apiclient.py | 8 +-
.../apache_beam/internal/apiclient_test.py | 2 +-
sdks/python/apache_beam/internal/auth.py | 4 +-
sdks/python/apache_beam/io/bigquery.py | 2 +-
sdks/python/apache_beam/io/bigquery_test.py | 2 +-
sdks/python/apache_beam/pipeline.py | 8 +-
.../apache_beam/runners/dataflow_runner.py | 2 +-
.../runners/direct/transform_evaluator.py | 2 +-
sdks/python/apache_beam/runners/runner_test.py | 2 +-
.../apache_beam/runners/template_runner_test.py | 2 +-
.../runners/test/test_dataflow_runner.py | 2 +-
sdks/python/apache_beam/test_pipeline.py | 2 +-
sdks/python/apache_beam/test_pipeline_test.py | 2 +-
sdks/python/apache_beam/transforms/core.py | 2 +-
sdks/python/apache_beam/transforms/display.py | 2 +-
.../apache_beam/transforms/display_test.py | 2 +-
.../python/apache_beam/transforms/ptransform.py | 2 +-
.../apache_beam/transforms/ptransform_test.py | 4 +-
.../transforms/write_ptransform_test.py | 2 +-
.../typehints/typed_pipeline_test.py | 4 +-
sdks/python/apache_beam/utils/dependency.py | 4 +-
.../python/apache_beam/utils/dependency_test.py | 6 +-
sdks/python/apache_beam/utils/options.py | 543 -------------------
.../apache_beam/utils/pipeline_options.py | 540 ++++++++++++++++++
.../apache_beam/utils/pipeline_options_test.py | 2 +-
.../utils/pipeline_options_validator.py | 14 +-
.../utils/pipeline_options_validator_test.py | 2 +-
44 files changed, 639 insertions(+), 642 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index eaa5ca2..87e6c0c 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -24,8 +24,8 @@ import logging
import re
import apache_beam as beam
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
def run(argv=None):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index 682c6d2..d0faefe 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -36,8 +36,8 @@ import apache_beam as beam
from apache_beam.typehints import Any
from apache_beam.typehints import Iterable
from apache_beam.typehints import Tuple
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
@beam.typehints.with_output_types(Tuple[int, int, int])
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index 59b2900..b4d5b45 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -30,8 +30,8 @@ import re
import apache_beam as beam
from apache_beam.pvalue import AsSingleton
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
def read_documents(pipeline, uris):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index fbce641..cbd305a 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -46,8 +46,8 @@ import logging
import apache_beam as beam
from apache_beam import combiners
from apache_beam import window
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
ONE_HOUR_IN_SECONDS = 3600
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index 8a53637..25e2c3b 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -35,8 +35,8 @@ import apache_beam as beam
from apache_beam.pvalue import AsList
from apache_beam.pvalue import AsSingleton
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
def create_groups(group_ids, corpus, word, ignore_corpus, ignore_word):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index a076a0c..83d3881 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -25,8 +25,8 @@ import logging
import apache_beam as beam
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
def crc32line(line):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py
index bbe02b3..690ba66 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders.py
@@ -35,8 +35,8 @@ import json
import logging
import apache_beam as beam
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
class JsonCoder(object):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index b9d64cf..56259ed 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -27,7 +27,7 @@ import logging
import apache_beam as beam
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
# pylint doesn't understand our pipeline syntax:
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 9613402..dd34070 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -73,9 +73,9 @@ from googledatastore import helper as datastore_helper, PropertyFilter
import apache_beam as beam
from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore
-from apache_beam.utils.options import GoogleCloudOptions
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
empty_line_aggregator = beam.Aggregator('emptyLines')
average_word_size_aggregator = beam.Aggregator('averageWordLength',
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 651a4f3..c4b8c59 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -35,8 +35,8 @@ import apache_beam as beam
from apache_beam import coders
from apache_beam.typehints import typehints
from apache_beam.typehints.decorators import with_output_types
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
class Player(object):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index bf6d1b1..6602609 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -36,8 +36,8 @@ import logging
import re
import apache_beam as beam
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
def run(argv=None, assert_results=None):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index d760e5a..dd91e74 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -54,8 +54,8 @@ import re
import apache_beam as beam
from apache_beam import pvalue
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
class SplitLinesToWordsFn(beam.DoFn):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 64878f3..0d55125 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -88,7 +88,7 @@ def construct_pipeline(renames):
return True
# [START pipelines_constructing_creating]
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
p = beam.Pipeline(options=PipelineOptions())
# [END pipelines_constructing_creating]
@@ -125,7 +125,7 @@ def model_pipelines(argv):
import re
import apache_beam as beam
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@@ -161,7 +161,7 @@ def model_pcollection(argv):
URL: https://cloud.google.com/dataflow/model/pcollection
"""
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@@ -197,7 +197,7 @@ def pipeline_options_remote(argv):
"""
from apache_beam import Pipeline
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
# [START pipeline_options_create]
options = PipelineOptions(flags=argv)
@@ -212,8 +212,8 @@ def pipeline_options_remote(argv):
parser.add_argument('--output')
# [END pipeline_options_define_custom]
- from apache_beam.utils.options import GoogleCloudOptions
- from apache_beam.utils.options import StandardOptions
+ from apache_beam.utils.pipeline_options import GoogleCloudOptions
+ from apache_beam.utils.pipeline_options import StandardOptions
# [START pipeline_options_dataflow_service]
# Create and set your PipelineOptions.
@@ -254,7 +254,7 @@ def pipeline_options_local(argv):
"""
from apache_beam import Pipeline
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
options = PipelineOptions(flags=argv)
@@ -320,7 +320,7 @@ def pipeline_logging(lines, output):
import re
import apache_beam as beam
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
# [START pipeline_logging]
# import Python logging module.
@@ -357,7 +357,7 @@ def pipeline_monitoring(renames):
import re
import apache_beam as beam
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
class WordCountOptions(PipelineOptions):
@@ -425,9 +425,9 @@ def examples_wordcount_minimal(renames):
import apache_beam as beam
- from apache_beam.utils.options import GoogleCloudOptions
- from apache_beam.utils.options import StandardOptions
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import GoogleCloudOptions
+ from apache_beam.utils.pipeline_options import StandardOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
# [START examples_wordcount_minimal_options]
options = PipelineOptions()
@@ -485,7 +485,7 @@ def examples_wordcount_wordcount(renames):
import re
import apache_beam as beam
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
argv = []
@@ -544,7 +544,7 @@ def examples_wordcount_debugging(renames):
import re
import apache_beam as beam
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
# [START example_wordcount_debugging_logging]
# [START example_wordcount_debugging_aggregators]
@@ -635,7 +635,7 @@ def model_custom_source(count):
from apache_beam.io import iobase
from apache_beam.io.range_trackers import OffsetRangeTracker
from apache_beam.transforms.core import PTransform
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
# Defining a new source.
# [START model_custom_source_new_source]
@@ -766,7 +766,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.transforms.core import PTransform
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
# Defining the new sink.
# [START model_custom_sink_new_sink]
@@ -867,7 +867,7 @@ def model_textio(renames):
return re.findall(r'[A-Za-z\']+', x)
import apache_beam as beam
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
# [START model_textio_read]
p = beam.Pipeline(options=PipelineOptions())
@@ -902,7 +902,7 @@ def model_datastoreio():
from google.datastore.v1 import query_pb2
import googledatastore
import apache_beam as beam
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore
@@ -938,7 +938,7 @@ def model_bigqueryio():
URL: https://cloud.google.com/dataflow/model/bigquery-io
"""
import apache_beam as beam
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
# [START model_bigqueryio_read]
p = beam.Pipeline(options=PipelineOptions())
@@ -1009,7 +1009,7 @@ def model_composite_transform_example(contents, output_path):
# [END composite_ptransform_apply_method]
# [END composite_transform_example]
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
p = beam.Pipeline(options=PipelineOptions())
(p
| beam.Create(contents)
@@ -1025,7 +1025,7 @@ def model_multiple_pcollections_flatten(contents, output_path):
"""
some_hash_fn = lambda s: ord(s[0])
import apache_beam as beam
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
p = beam.Pipeline(options=PipelineOptions())
partition_fn = lambda element, partitions: some_hash_fn(element) % partitions
@@ -1066,7 +1066,7 @@ def model_multiple_pcollections_partition(contents, output_path):
"""Assume i in [0,100)."""
return i
import apache_beam as beam
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
p = beam.Pipeline(options=PipelineOptions())
students = p | beam.Create(contents)
@@ -1096,7 +1096,7 @@ def model_group_by_key(contents, output_path):
import re
import apache_beam as beam
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
p = beam.Pipeline(options=PipelineOptions())
words_and_counts = (
p
@@ -1123,7 +1123,7 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
URL: https://cloud.google.com/dataflow/model/group-by-key
"""
import apache_beam as beam
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
p = beam.Pipeline(options=PipelineOptions())
# [START model_group_by_key_cogroupbykey_tuple]
# Each data set is represented by key-value pairs in separate PCollections.
@@ -1161,7 +1161,7 @@ def model_join_using_side_inputs(
import apache_beam as beam
from apache_beam.pvalue import AsIter
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
p = beam.Pipeline(options=PipelineOptions())
# [START model_join_using_side_inputs]
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index feb06c5..1a84a6e 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -31,7 +31,7 @@ from apache_beam import typehints
from apache_beam.io import fileio
from apache_beam.transforms.util import assert_that
from apache_beam.transforms.util import equal_to
-from apache_beam.utils.options import TypeOptions
+from apache_beam.utils.pipeline_options import TypeOptions
from apache_beam.examples.snippets import snippets
# pylint: disable=expression-not-assigned
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 7f347d8..51fc2eb 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -24,8 +24,8 @@ import logging
import re
import apache_beam as beam
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
empty_line_aggregator = beam.Aggregator('emptyLines')
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index ffbfed7..bba09b4 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -46,8 +46,8 @@ import logging
import re
import apache_beam as beam
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
class FilterTextFn(beam.DoFn):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 98477df..c02ec16 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -51,8 +51,8 @@ import logging
import re
import apache_beam as beam
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
def run(argv=None):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py
index 3a9ba46..001ae64 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -41,10 +41,10 @@ from apache_beam.utils import retry
from apache_beam.utils.dependency import get_required_container_version
from apache_beam.utils.dependency import get_sdk_name_and_version
from apache_beam.utils.names import PropertyNames
-from apache_beam.utils.options import DebugOptions
-from apache_beam.utils.options import GoogleCloudOptions
-from apache_beam.utils.options import StandardOptions
-from apache_beam.utils.options import WorkerOptions
+from apache_beam.utils.pipeline_options import DebugOptions
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import StandardOptions
+from apache_beam.utils.pipeline_options import WorkerOptions
from apache_beam.internal.clients import storage
import apache_beam.internal.clients.dataflow as dataflow
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient_test.py b/sdks/python/apache_beam/internal/apiclient_test.py
index 1a83752..188a5a8 100644
--- a/sdks/python/apache_beam/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/internal/apiclient_test.py
@@ -18,7 +18,7 @@
import unittest
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.runners.dataflow_runner import DataflowRunner
from apache_beam.internal import apiclient
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/internal/auth.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/auth.py b/sdks/python/apache_beam/internal/auth.py
index 056f40c..c645f24 100644
--- a/sdks/python/apache_beam/internal/auth.py
+++ b/sdks/python/apache_beam/internal/auth.py
@@ -29,8 +29,8 @@ from oauth2client.client import OAuth2Credentials
from apache_beam.utils import processes
from apache_beam.utils import retry
-from apache_beam.utils.options import GoogleCloudOptions
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
# When we are running in GCE, we can authenticate with VM credentials.
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py
index 6a766d0..ea0a281 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -120,7 +120,7 @@ from apache_beam.internal.json_value import to_json_value
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.utils import retry
-from apache_beam.utils.options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/io/bigquery_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py
index 3a558d7..b8682d1 100644
--- a/sdks/python/apache_beam/io/bigquery_test.py
+++ b/sdks/python/apache_beam/io/bigquery_test.py
@@ -35,7 +35,7 @@ from apache_beam.io.bigquery import TableRowJsonCoder
from apache_beam.io.bigquery import parse_table_schema_from_json
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
class TestRowAsDictJsonCoder(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 8b2345e..81343f3 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -55,10 +55,10 @@ from apache_beam.runners import create_runner
from apache_beam.runners import PipelineRunner
from apache_beam.transforms import ptransform
from apache_beam.typehints import TypeCheckError
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
-from apache_beam.utils.options import StandardOptions
-from apache_beam.utils.options import TypeOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.utils.pipeline_options import StandardOptions
+from apache_beam.utils.pipeline_options import TypeOptions
from apache_beam.utils.pipeline_options_validator import PipelineOptionsValidator
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 3ee95c5..3505acc 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -41,7 +41,7 @@ from apache_beam.typehints import typehints
from apache_beam.utils import names
from apache_beam.utils.names import PropertyNames
from apache_beam.utils.names import TransformNames
-from apache_beam.utils.options import StandardOptions
+from apache_beam.utils.pipeline_options import StandardOptions
from apache_beam.internal.clients import dataflow as dataflow_api
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 24ab754..b4c43ba 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -38,7 +38,7 @@ from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
from apache_beam.typehints.typecheck import TypeCheckError
from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn
from apache_beam.utils import counters
-from apache_beam.utils.options import TypeOptions
+from apache_beam.utils.pipeline_options import TypeOptions
class TransformEvaluatorRegistry(object):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 74c81c2..ff6a22e 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -36,7 +36,7 @@ from apache_beam.runners import DirectRunner
from apache_beam.runners import TestDataflowRunner
import apache_beam.transforms as ptransform
from apache_beam.transforms.display import DisplayDataItem
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
class RunnerTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/runners/template_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/template_runner_test.py b/sdks/python/apache_beam/runners/template_runner_test.py
index 8cd818b..457022d 100644
--- a/sdks/python/apache_beam/runners/template_runner_test.py
+++ b/sdks/python/apache_beam/runners/template_runner_test.py
@@ -26,7 +26,7 @@ import tempfile
import apache_beam as beam
from apache_beam.pipeline import Pipeline
from apache_beam.runners.dataflow_runner import DataflowRunner
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.internal import apiclient
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
index a58ab33..77655bd 100644
--- a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
@@ -19,7 +19,7 @@
from apache_beam.internal import pickler
from apache_beam.runners.dataflow_runner import DataflowRunner
-from apache_beam.utils.options import TestOptions
+from apache_beam.utils.pipeline_options import TestOptions
class TestDataflowRunner(DataflowRunner):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
index e758c3d..48b98b2 100644
--- a/sdks/python/apache_beam/test_pipeline.py
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -22,7 +22,7 @@ import shlex
from apache_beam.internal import pickler
from apache_beam.pipeline import Pipeline
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
from nose.plugins.skip import SkipTest
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/test_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline_test.py b/sdks/python/apache_beam/test_pipeline_test.py
index 747f0ef..42ba2d7 100644
--- a/sdks/python/apache_beam/test_pipeline_test.py
+++ b/sdks/python/apache_beam/test_pipeline_test.py
@@ -20,7 +20,7 @@
import unittest
from apache_beam.test_pipeline import TestPipeline
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
class TestPipelineTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 0ba1c62..6a7bd2e 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -46,7 +46,7 @@ from apache_beam.typehints import TypeCheckError
from apache_beam.typehints import Union
from apache_beam.typehints import WithTypeHints
from apache_beam.typehints.trivial_inference import element_type
-from apache_beam.utils.options import TypeOptions
+from apache_beam.utils.pipeline_options import TypeOptions
# Type variables
T = typehints.TypeVariable('T')
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/display.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py
index c38fd9b..6e74512 100644
--- a/sdks/python/apache_beam/transforms/display.py
+++ b/sdks/python/apache_beam/transforms/display.py
@@ -122,7 +122,7 @@ class DisplayData(object):
ValueError: If the has_display_data argument is not an instance of
HasDisplayData.
"""
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
if not isinstance(pipeline_options, PipelineOptions):
raise ValueError(
'Element of class {}.{} does not subclass PipelineOptions'
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/display_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py
index fc50abe..848746c 100644
--- a/sdks/python/apache_beam/transforms/display_test.py
+++ b/sdks/python/apache_beam/transforms/display_test.py
@@ -29,7 +29,7 @@ import apache_beam as beam
from apache_beam.transforms.display import HasDisplayData
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display import DisplayDataItem
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
class DisplayDataItemMatcher(BaseMatcher):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 39864a6..c022c5e 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -410,7 +410,7 @@ class PTransform(WithTypeHints, HasDisplayData):
deferred = False
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import pipeline
- from apache_beam.utils.options import PipelineOptions
+ from apache_beam.utils.pipeline_options import PipelineOptions
# pylint: enable=wrong-import-order, wrong-import-position
p = pipeline.Pipeline(
'DirectRunner', PipelineOptions(sys.argv))
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 5ed7d72..705e85e 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -36,8 +36,8 @@ import apache_beam.typehints as typehints
from apache_beam.typehints import with_input_types
from apache_beam.typehints import with_output_types
from apache_beam.typehints.typehints_test import TypeHintTestCase
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import TypeOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import TypeOptions
# Disable frequent lint warning due to pipe operator for chaining transforms.
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index 9a1a7de..f96dffb 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -25,7 +25,7 @@ from apache_beam.io import iobase
from apache_beam.pipeline import Pipeline
from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.util import assert_that, is_empty
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
class _TestSink(iobase.Sink):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 329d657..8b5e3f4 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -25,8 +25,8 @@ from apache_beam import pvalue
from apache_beam import typehints
from apache_beam.transforms.util import assert_that, equal_to
from apache_beam.typehints import WithTypeHints
-from apache_beam.utils.options import OptionsContext
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import OptionsContext
+from apache_beam.utils.pipeline_options import PipelineOptions
# These test often construct a pipeline as value | PTransform to test side
# effects (e.g. errors).
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py
index addcdb2..11a2e1c 100644
--- a/sdks/python/apache_beam/utils/dependency.py
+++ b/sdks/python/apache_beam/utils/dependency.py
@@ -66,8 +66,8 @@ from apache_beam import version as beam_version
from apache_beam.internal import pickler
from apache_beam.utils import names
from apache_beam.utils import processes
-from apache_beam.utils.options import GoogleCloudOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import SetupOptions
# Standard file names used for staging files.
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/dependency_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency_test.py b/sdks/python/apache_beam/utils/dependency_test.py
index 3549a07..a484d60 100644
--- a/sdks/python/apache_beam/utils/dependency_test.py
+++ b/sdks/python/apache_beam/utils/dependency_test.py
@@ -26,9 +26,9 @@ import unittest
from apache_beam import utils
from apache_beam.utils import dependency
from apache_beam.utils import names
-from apache_beam.utils.options import GoogleCloudOptions
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
class SetupTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py
deleted file mode 100644
index 7ca0573..0000000
--- a/sdks/python/apache_beam/utils/options.py
+++ /dev/null
@@ -1,543 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Pipeline options obtained from command line parsing.
-
-TODO(silviuc): Should rename this module to pipeline_options.
-"""
-
-import argparse
-
-from apache_beam.transforms.display import HasDisplayData
-
-
-class PipelineOptions(HasDisplayData):
- """Pipeline options class used as container for command line options.
-
- The class is essentially a wrapper over the standard argparse Python module
- (see https://docs.python.org/3/library/argparse.html). To define one option
- or a group of options you subclass from PipelineOptions::
-
- class XyzOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument('--abc', default='start')
- parser.add_argument('--xyz', default='end')
-
- The arguments for the add_argument() method are exactly the ones
- described in the argparse public documentation.
-
- Pipeline objects require an options object during initialization.
- This is obtained simply by initializing an options class as defined above::
-
- p = Pipeline(options=XyzOptions())
- if p.options.xyz == 'end':
- raise ValueError('Option xyz has an invalid value.')
-
- By default the options classes will use command line arguments to initialize
- the options.
- """
-
- def __init__(self, flags=None, **kwargs):
- """Initialize an options class.
-
- The initializer will traverse all subclasses, add all their argparse
- arguments and then parse the command line specified by flags or by default
- the one obtained from sys.argv.
-
- The subclasses are not expected to require a redefinition of __init__.
-
- Args:
- flags: An iterable of command line arguments to be used. If not specified
- then sys.argv will be used as input for parsing arguments.
-
- **kwargs: Add overrides for arguments passed in flags.
- """
- self._flags = flags
- self._all_options = kwargs
- parser = argparse.ArgumentParser()
- for cls in type(self).mro():
- if cls == PipelineOptions:
- break
- elif '_add_argparse_args' in cls.__dict__:
- cls._add_argparse_args(parser)
- # The _visible_options attribute will contain only those options from the
- # flags (i.e., command line) that can be recognized. The _all_options
- # field contains additional overrides.
- self._visible_options, _ = parser.parse_known_args(flags)
-
- @classmethod
- def _add_argparse_args(cls, parser):
- # Override this in subclasses to provide options.
- pass
-
- @classmethod
- def from_dictionary(cls, options):
- """Returns a PipelineOptions from a dictionary of arguments.
-
- Args:
- options: Dictinary of argument value pairs.
-
- Returns:
- A PipelineOptions object representing the given arguments.
- """
- flags = []
- for k, v in options.iteritems():
- if isinstance(v, bool):
- if v:
- flags.append('--%s' % k)
- else:
- flags.append('--%s=%s' % (k, v))
-
- return cls(flags)
-
- def get_all_options(self, drop_default=False):
- """Returns a dictionary of all defined arguments.
-
- Returns a dictionary of all defined arguments (arguments that are defined in
- any subclass of PipelineOptions) into a dictionary.
-
- Args:
- drop_default: If set to true, options that are equal to their default
- values, are not returned as part of the result dictionary.
-
- Returns:
- Dictionary of all args and values.
- """
- parser = argparse.ArgumentParser()
- for cls in PipelineOptions.__subclasses__():
- cls._add_argparse_args(parser) # pylint: disable=protected-access
- known_args, _ = parser.parse_known_args(self._flags)
- result = vars(known_args)
-
- # Apply the overrides if any
- for k in result.keys():
- if k in self._all_options:
- result[k] = self._all_options[k]
- if drop_default and parser.get_default(k) == result[k]:
- del result[k]
-
- return result
-
- def display_data(self):
- return self.get_all_options(True)
-
- def view_as(self, cls):
- view = cls(self._flags)
- view._all_options = self._all_options
- return view
-
- def _visible_option_list(self):
- return sorted(option
- for option in dir(self._visible_options) if option[0] != '_')
-
- def __dir__(self):
- return sorted(dir(type(self)) + self.__dict__.keys() +
- self._visible_option_list())
-
- def __getattr__(self, name):
- # Special methods which may be accessed before the object is
- # fully constructed (e.g. in unpickling).
- if name[:2] == name[-2:] == '__':
- return object.__getattr__(self, name)
- elif name in self._visible_option_list():
- return self._all_options.get(name, getattr(self._visible_options, name))
- else:
- raise AttributeError("'%s' object has no attribute '%s'" %
- (type(self).__name__, name))
-
- def __setattr__(self, name, value):
- if name in ('_flags', '_all_options', '_visible_options'):
- super(PipelineOptions, self).__setattr__(name, value)
- elif name in self._visible_option_list():
- self._all_options[name] = value
- else:
- raise AttributeError("'%s' object has no attribute '%s'" %
- (type(self).__name__, name))
-
- def __str__(self):
- return '%s(%s)' % (type(self).__name__,
- ', '.join('%s=%s' % (option, getattr(self, option))
- for option in self._visible_option_list()))
-
-
-class StandardOptions(PipelineOptions):
-
- DEFAULT_RUNNER = 'DirectRunner'
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument(
- '--runner',
- help=('Pipeline runner used to execute the workflow. Valid values are '
- 'DirectRunner, DataflowRunner, '
- 'and BlockingDataflowRunner.'))
- # Whether to enable streaming mode.
- parser.add_argument('--streaming',
- default=False,
- action='store_true',
- help='Whether to enable streaming mode.')
-
-
-class TypeOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- # TODO(laolu): Add a type inferencing option here once implemented.
- parser.add_argument('--type_check_strictness',
- default='DEFAULT_TO_ANY',
- choices=['ALL_REQUIRED', 'DEFAULT_TO_ANY'],
- help='The level of exhaustive manual type-hint '
- 'annotation required')
- parser.add_argument('--no_pipeline_type_check',
- dest='pipeline_type_check',
- action='store_false',
- help='Disable type checking at pipeline construction '
- 'time')
- parser.add_argument('--pipeline_type_check',
- action='store_true',
- help='Enable type checking at pipeline construction '
- 'time')
- parser.add_argument('--runtime_type_check',
- default=False,
- action='store_true',
- help='Enable type checking at pipeline execution '
- 'time. NOTE: only supported with the '
- 'DirectRunner')
-
-
-class GoogleCloudOptions(PipelineOptions):
- """Google Cloud Dataflow service execution options."""
-
- BIGQUERY_API_SERVICE = 'bigquery.googleapis.com'
- COMPUTE_API_SERVICE = 'compute.googleapis.com'
- STORAGE_API_SERVICE = 'storage.googleapis.com'
- DATAFLOW_ENDPOINT = 'https://dataflow.googleapis.com'
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument(
- '--dataflow_endpoint',
- default=cls.DATAFLOW_ENDPOINT,
- help=
- ('The URL for the Dataflow API. If not set, the default public URL '
- 'will be used.'))
- # Remote execution must check that this option is not None.
- parser.add_argument('--project',
- default=None,
- help='Name of the Cloud project owning the Dataflow '
- 'job.')
- # Remote execution must check that this option is not None.
- parser.add_argument('--job_name',
- default=None,
- help='Name of the Cloud Dataflow job.')
- # Remote execution must check that this option is not None.
- parser.add_argument('--staging_location',
- default=None,
- help='GCS path for staging code packages needed by '
- 'workers.')
- # Remote execution must check that this option is not None.
- # If staging_location is not set, it defaults to temp_location.
- parser.add_argument('--temp_location',
- default=None,
- help='GCS path for saving temporary workflow jobs.')
- parser.add_argument('--service_account_name',
- default=None,
- help='Name of the service account for Google APIs.')
- parser.add_argument('--service_account_key_file',
- default=None,
- help='Path to a file containing the P12 service '
- 'credentials.')
- parser.add_argument('--service_account_email',
- default=None,
- help='Identity to run virtual machines as.')
- parser.add_argument('--no_auth', dest='no_auth', type=bool, default=False)
- # Option to run templated pipelines
- parser.add_argument('--template_location',
- default=None,
- help='Save job to specified local or GCS location.')
-
- def validate(self, validator):
- errors = []
- if validator.is_service_runner():
- errors.extend(validator.validate_cloud_options(self))
- errors.extend(validator.validate_gcs_path(self, 'temp_location'))
- if getattr(self, 'staging_location',
- None) or getattr(self, 'temp_location', None) is None:
- errors.extend(validator.validate_gcs_path(self, 'staging_location'))
-
- if self.view_as(DebugOptions).dataflow_job_file:
- if self.view_as(GoogleCloudOptions).template_location:
- errors.append('--dataflow_job_file and --template_location '
- 'are mutually exclusive.')
-
- return errors
-
-
-# Command line options controlling the worker pool configuration.
-# TODO(silviuc): Update description when autoscaling options are in.
-class WorkerOptions(PipelineOptions):
- """Worker pool configuration options."""
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument(
- '--num_workers',
- type=int,
- default=None,
- help=
- ('Number of workers to use when executing the Dataflow job. If not '
- 'set, the Dataflow service will use a reasonable default.'))
- parser.add_argument(
- '--max_num_workers',
- type=int,
- default=None,
- help=
- ('Maximum number of workers to use when executing the Dataflow job.'))
- parser.add_argument(
- '--autoscaling_algorithm',
- type=str,
- choices=['NONE', 'THROUGHPUT_BASED'],
- default=None, # Meaning unset, distinct from 'NONE' meaning don't scale
- help=
- ('If and how to auotscale the workerpool.'))
- parser.add_argument(
- '--worker_machine_type',
- dest='machine_type',
- default=None,
- help=('Machine type to create Dataflow worker VMs as. See '
- 'https://cloud.google.com/compute/docs/machine-types '
- 'for a list of valid options. If not set, '
- 'the Dataflow service will choose a reasonable '
- 'default.'))
- parser.add_argument(
- '--disk_size_gb',
- type=int,
- default=None,
- help=
- ('Remote worker disk size, in gigabytes, or 0 to use the default size. '
- 'If not set, the Dataflow service will use a reasonable default.'))
- parser.add_argument(
- '--worker_disk_type',
- dest='disk_type',
- default=None,
- help=('Specifies what type of persistent disk should be used.'))
- parser.add_argument(
- '--zone',
- default=None,
- help=(
- 'GCE availability zone for launching workers. Default is up to the '
- 'Dataflow service.'))
- parser.add_argument(
- '--network',
- default=None,
- help=(
- 'GCE network for launching workers. Default is up to the Dataflow '
- 'service.'))
- parser.add_argument(
- '--worker_harness_container_image',
- default=None,
- help=('Docker registry location of container image to use for the '
- 'worker harness. Default is the container for the version of the '
- 'SDK. Note: currently, only approved Google Cloud Dataflow '
- 'container images may be used here.'))
- parser.add_argument(
- '--teardown_policy',
- choices=['TEARDOWN_ALWAYS', 'TEARDOWN_NEVER', 'TEARDOWN_ON_SUCCESS'],
- default=None,
- help=
- ('The teardown policy for the VMs. By default this is left unset and '
- 'the service sets the default policy.'))
- parser.add_argument(
- '--use_public_ips',
- default=None,
- help='Whether to assign public IP addresses to the worker machines.')
-
- def validate(self, validator):
- errors = []
- if validator.is_service_runner():
- errors.extend(
- validator.validate_optional_argument_positive(self, 'num_workers'))
- return errors
-
-
-class DebugOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument('--dataflow_job_file',
- default=None,
- help='Debug file to write the workflow specification.')
- parser.add_argument(
- '--experiment', '--experiments',
- dest='experiments',
- action='append',
- default=None,
- help=
- ('Runners may provide a number of experimental features that can be '
- 'enabled with this flag. Please sync with the owners of the runner '
- 'before enabling any experiments.'))
-
-
-class ProfilingOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- parser.add_argument('--profile_cpu',
- action='store_true',
- help='Enable work item CPU profiling.')
- parser.add_argument('--profile_memory',
- action='store_true',
- help='Enable work item heap profiling.')
- parser.add_argument('--profile_location',
- default=None,
- help='GCS path for saving profiler data.')
-
-
-class SetupOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- # Options for installing dependencies in the worker.
- parser.add_argument(
- '--requirements_file',
- default=None,
- help=
- ('Path to a requirements file containing package dependencies. '
- 'Typically it is produced by a pip freeze command. More details: '
- 'https://pip.pypa.io/en/latest/reference/pip_freeze.html. '
- 'If used, all the packages specified will be downloaded, '
- 'cached (use --requirements_cache to change default location), '
- 'and then staged so that they can be automatically installed in '
- 'workers during startup. The cache is refreshed as needed '
- 'avoiding extra downloads for existing packages. Typically the '
- 'file is named requirements.txt.'))
- parser.add_argument(
- '--requirements_cache',
- default=None,
- help=
- ('Path to a folder to cache the packages specified in '
- 'the requirements file using the --requirements_file option.'))
- parser.add_argument(
- '--setup_file',
- default=None,
- help=
- ('Path to a setup Python file containing package dependencies. If '
- 'specified, the file\'s containing folder is assumed to have the '
- 'structure required for a setuptools setup package. The file must be '
- 'named setup.py. More details: '
- 'https://pythonhosted.org/an_example_pypi_project/setuptools.html '
- 'During job submission a source distribution will be built and the '
- 'worker will install the resulting package before running any custom '
- 'code.'))
- parser.add_argument(
- '--save_main_session',
- default=False,
- action='store_true',
- help=
- ('Save the main session state so that pickled functions and classes '
- 'defined in __main__ (e.g. interactive session) can be unpickled. '
- 'Some workflows do not need the session state if for instance all '
- 'their functions/classes are defined in proper modules (not __main__)'
- ' and the modules are importable in the worker. '))
- parser.add_argument(
- '--sdk_location',
- default='default',
- help=
- ('Override the default GitHub location from where Dataflow SDK is '
- 'downloaded. It can be an URL, a GCS path, or a local path to an '
- 'SDK tarball. Workflow submissions will download or copy an SDK '
- 'tarball from here. If the string "default", '
- 'a standard SDK location is used. If empty, no SDK is copied.'))
- parser.add_argument(
- '--extra_package', '--extra_packages',
- dest='extra_packages',
- action='append',
- default=None,
- help=
- ('Local path to a Python package file. The file is expected to be (1) '
- 'a package tarball (".tar") or (2) a compressed package tarball '
- '(".tar.gz") which can be installed using the "pip install" command '
- 'of the standard pip package. Multiple --extra_package options can '
- 'be specified if more than one package is needed. During job '
- 'submission, the files will be staged in the staging area '
- '(--staging_location option) and the workers will install them in '
- 'same order they were specified on the command line.'))
-
-
-class TestOptions(PipelineOptions):
-
- @classmethod
- def _add_argparse_args(cls, parser):
- # Options for e2e test pipeline.
- parser.add_argument(
- '--on_success_matcher',
- default=None,
- help=('Verify state/output of e2e test pipeline. This is pickled '
- 'version of the matcher which should extends '
- 'hamcrest.core.base_matcher.BaseMatcher.'))
-
- def validate(self, validator):
- errors = []
- if self.view_as(TestOptions).on_success_matcher:
- errors.extend(validator.validate_test_matcher(self, 'on_success_matcher'))
- return errors
-
-# TODO(silviuc): Add --files_to_stage option.
-# This could potentially replace the --requirements_file and --setup_file.
-
-# TODO(silviuc): Non-standard options. Keep them? If yes, add help too!
-# Remote execution must check that this option is not None.
-
-
-class OptionsContext(object):
- """Set default pipeline options for pipelines created in this block.
-
- This is particularly useful for pipelines implicitly created with the
-
- [python list] | PTransform
-
- construct.
-
- Can also be used as a decorator.
- """
- overrides = []
-
- def __init__(self, **options):
- self.options = options
-
- def __enter__(self):
- self.overrides.append(self.options)
-
- def __exit__(self, *exn_info):
- self.overrides.pop()
-
- def __call__(self, f, *args, **kwargs):
-
- def wrapper(*args, **kwargs):
- with self:
- f(*args, **kwargs)
-
- return wrapper
-
- @classmethod
- def augment_options(cls, options):
- for override in cls.overrides:
- for name, value in override.items():
- setattr(options, name, value)
- return options
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options.py b/sdks/python/apache_beam/utils/pipeline_options.py
new file mode 100644
index 0000000..3e09a3b
--- /dev/null
+++ b/sdks/python/apache_beam/utils/pipeline_options.py
@@ -0,0 +1,540 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline options obtained from command line parsing."""
+
+import argparse
+
+from apache_beam.transforms.display import HasDisplayData
+
+
+class PipelineOptions(HasDisplayData):
+ """Pipeline options class used as container for command line options.
+
+ The class is essentially a wrapper over the standard argparse Python module
+ (see https://docs.python.org/3/library/argparse.html). To define one option
+ or a group of options you subclass from PipelineOptions::
+
+ class XyzOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument('--abc', default='start')
+ parser.add_argument('--xyz', default='end')
+
+ The arguments for the add_argument() method are exactly the ones
+ described in the argparse public documentation.
+
+ Pipeline objects require an options object during initialization.
+ This is obtained simply by initializing an options class as defined above::
+
+ p = Pipeline(options=XyzOptions())
+ if p.options.xyz == 'end':
+ raise ValueError('Option xyz has an invalid value.')
+
+ By default the options classes will use command line arguments to initialize
+ the options.
+ """
+
+ def __init__(self, flags=None, **kwargs):
+ """Initialize an options class.
+
+ The initializer will traverse all subclasses, add all their argparse
+ arguments and then parse the command line specified by flags or by default
+ the one obtained from sys.argv.
+
+ The subclasses are not expected to require a redefinition of __init__.
+
+ Args:
+ flags: An iterable of command line arguments to be used. If not specified
+ then sys.argv will be used as input for parsing arguments.
+
+ **kwargs: Add overrides for arguments passed in flags.
+ """
+ self._flags = flags
+ self._all_options = kwargs
+ parser = argparse.ArgumentParser()
+ for cls in type(self).mro():
+ if cls == PipelineOptions:
+ break
+ elif '_add_argparse_args' in cls.__dict__:
+ cls._add_argparse_args(parser)
+ # The _visible_options attribute will contain only those options from the
+ # flags (i.e., command line) that can be recognized. The _all_options
+ # field contains additional overrides.
+ self._visible_options, _ = parser.parse_known_args(flags)
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ # Override this in subclasses to provide options.
+ pass
+
+ @classmethod
+ def from_dictionary(cls, options):
+ """Returns a PipelineOptions from a dictionary of arguments.
+
+ Args:
+ options: Dictinary of argument value pairs.
+
+ Returns:
+ A PipelineOptions object representing the given arguments.
+ """
+ flags = []
+ for k, v in options.iteritems():
+ if isinstance(v, bool):
+ if v:
+ flags.append('--%s' % k)
+ else:
+ flags.append('--%s=%s' % (k, v))
+
+ return cls(flags)
+
+ def get_all_options(self, drop_default=False):
+ """Returns a dictionary of all defined arguments.
+
+ Returns a dictionary of all defined arguments (arguments that are defined in
+ any subclass of PipelineOptions) into a dictionary.
+
+ Args:
+ drop_default: If set to true, options that are equal to their default
+ values, are not returned as part of the result dictionary.
+
+ Returns:
+ Dictionary of all args and values.
+ """
+ parser = argparse.ArgumentParser()
+ for cls in PipelineOptions.__subclasses__():
+ cls._add_argparse_args(parser) # pylint: disable=protected-access
+ known_args, _ = parser.parse_known_args(self._flags)
+ result = vars(known_args)
+
+ # Apply the overrides if any
+ for k in result.keys():
+ if k in self._all_options:
+ result[k] = self._all_options[k]
+ if drop_default and parser.get_default(k) == result[k]:
+ del result[k]
+
+ return result
+
+ def display_data(self):
+ return self.get_all_options(True)
+
+ def view_as(self, cls):
+ view = cls(self._flags)
+ view._all_options = self._all_options
+ return view
+
+ def _visible_option_list(self):
+ return sorted(option
+ for option in dir(self._visible_options) if option[0] != '_')
+
+ def __dir__(self):
+ return sorted(dir(type(self)) + self.__dict__.keys() +
+ self._visible_option_list())
+
+ def __getattr__(self, name):
+ # Special methods which may be accessed before the object is
+ # fully constructed (e.g. in unpickling).
+ if name[:2] == name[-2:] == '__':
+ return object.__getattr__(self, name)
+ elif name in self._visible_option_list():
+ return self._all_options.get(name, getattr(self._visible_options, name))
+ else:
+ raise AttributeError("'%s' object has no attribute '%s'" %
+ (type(self).__name__, name))
+
+ def __setattr__(self, name, value):
+ if name in ('_flags', '_all_options', '_visible_options'):
+ super(PipelineOptions, self).__setattr__(name, value)
+ elif name in self._visible_option_list():
+ self._all_options[name] = value
+ else:
+ raise AttributeError("'%s' object has no attribute '%s'" %
+ (type(self).__name__, name))
+
+ def __str__(self):
+ return '%s(%s)' % (type(self).__name__,
+ ', '.join('%s=%s' % (option, getattr(self, option))
+ for option in self._visible_option_list()))
+
+
+class StandardOptions(PipelineOptions):
+
+ DEFAULT_RUNNER = 'DirectRunner'
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument(
+ '--runner',
+ help=('Pipeline runner used to execute the workflow. Valid values are '
+ 'DirectRunner, DataflowRunner, '
+ 'and BlockingDataflowRunner.'))
+ # Whether to enable streaming mode.
+ parser.add_argument('--streaming',
+ default=False,
+ action='store_true',
+ help='Whether to enable streaming mode.')
+
+
+class TypeOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ # TODO(laolu): Add a type inferencing option here once implemented.
+ parser.add_argument('--type_check_strictness',
+ default='DEFAULT_TO_ANY',
+ choices=['ALL_REQUIRED', 'DEFAULT_TO_ANY'],
+ help='The level of exhaustive manual type-hint '
+ 'annotation required')
+ parser.add_argument('--no_pipeline_type_check',
+ dest='pipeline_type_check',
+ action='store_false',
+ help='Disable type checking at pipeline construction '
+ 'time')
+ parser.add_argument('--pipeline_type_check',
+ action='store_true',
+ help='Enable type checking at pipeline construction '
+ 'time')
+ parser.add_argument('--runtime_type_check',
+ default=False,
+ action='store_true',
+ help='Enable type checking at pipeline execution '
+ 'time. NOTE: only supported with the '
+ 'DirectRunner')
+
+
+class GoogleCloudOptions(PipelineOptions):
+ """Google Cloud Dataflow service execution options."""
+
+ BIGQUERY_API_SERVICE = 'bigquery.googleapis.com'
+ COMPUTE_API_SERVICE = 'compute.googleapis.com'
+ STORAGE_API_SERVICE = 'storage.googleapis.com'
+ DATAFLOW_ENDPOINT = 'https://dataflow.googleapis.com'
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument(
+ '--dataflow_endpoint',
+ default=cls.DATAFLOW_ENDPOINT,
+ help=
+ ('The URL for the Dataflow API. If not set, the default public URL '
+ 'will be used.'))
+ # Remote execution must check that this option is not None.
+ parser.add_argument('--project',
+ default=None,
+ help='Name of the Cloud project owning the Dataflow '
+ 'job.')
+ # Remote execution must check that this option is not None.
+ parser.add_argument('--job_name',
+ default=None,
+ help='Name of the Cloud Dataflow job.')
+ # Remote execution must check that this option is not None.
+ parser.add_argument('--staging_location',
+ default=None,
+ help='GCS path for staging code packages needed by '
+ 'workers.')
+ # Remote execution must check that this option is not None.
+ # If staging_location is not set, it defaults to temp_location.
+ parser.add_argument('--temp_location',
+ default=None,
+ help='GCS path for saving temporary workflow jobs.')
+ parser.add_argument('--service_account_name',
+ default=None,
+ help='Name of the service account for Google APIs.')
+ parser.add_argument('--service_account_key_file',
+ default=None,
+ help='Path to a file containing the P12 service '
+ 'credentials.')
+ parser.add_argument('--service_account_email',
+ default=None,
+ help='Identity to run virtual machines as.')
+ parser.add_argument('--no_auth', dest='no_auth', type=bool, default=False)
+ # Option to run templated pipelines
+ parser.add_argument('--template_location',
+ default=None,
+ help='Save job to specified local or GCS location.')
+
+ def validate(self, validator):
+ errors = []
+ if validator.is_service_runner():
+ errors.extend(validator.validate_cloud_options(self))
+ errors.extend(validator.validate_gcs_path(self, 'temp_location'))
+ if getattr(self, 'staging_location',
+ None) or getattr(self, 'temp_location', None) is None:
+ errors.extend(validator.validate_gcs_path(self, 'staging_location'))
+
+ if self.view_as(DebugOptions).dataflow_job_file:
+ if self.view_as(GoogleCloudOptions).template_location:
+ errors.append('--dataflow_job_file and --template_location '
+ 'are mutually exclusive.')
+
+ return errors
+
+
+# Command line options controlling the worker pool configuration.
+# TODO(silviuc): Update description when autoscaling options are in.
+class WorkerOptions(PipelineOptions):
+ """Worker pool configuration options."""
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument(
+ '--num_workers',
+ type=int,
+ default=None,
+ help=
+ ('Number of workers to use when executing the Dataflow job. If not '
+ 'set, the Dataflow service will use a reasonable default.'))
+ parser.add_argument(
+ '--max_num_workers',
+ type=int,
+ default=None,
+ help=
+ ('Maximum number of workers to use when executing the Dataflow job.'))
+ parser.add_argument(
+ '--autoscaling_algorithm',
+ type=str,
+ choices=['NONE', 'THROUGHPUT_BASED'],
+ default=None, # Meaning unset, distinct from 'NONE' meaning don't scale
+ help=
+ ('If and how to auotscale the workerpool.'))
+ parser.add_argument(
+ '--worker_machine_type',
+ dest='machine_type',
+ default=None,
+ help=('Machine type to create Dataflow worker VMs as. See '
+ 'https://cloud.google.com/compute/docs/machine-types '
+ 'for a list of valid options. If not set, '
+ 'the Dataflow service will choose a reasonable '
+ 'default.'))
+ parser.add_argument(
+ '--disk_size_gb',
+ type=int,
+ default=None,
+ help=
+ ('Remote worker disk size, in gigabytes, or 0 to use the default size. '
+ 'If not set, the Dataflow service will use a reasonable default.'))
+ parser.add_argument(
+ '--worker_disk_type',
+ dest='disk_type',
+ default=None,
+ help=('Specifies what type of persistent disk should be used.'))
+ parser.add_argument(
+ '--zone',
+ default=None,
+ help=(
+ 'GCE availability zone for launching workers. Default is up to the '
+ 'Dataflow service.'))
+ parser.add_argument(
+ '--network',
+ default=None,
+ help=(
+ 'GCE network for launching workers. Default is up to the Dataflow '
+ 'service.'))
+ parser.add_argument(
+ '--worker_harness_container_image',
+ default=None,
+ help=('Docker registry location of container image to use for the '
+ 'worker harness. Default is the container for the version of the '
+ 'SDK. Note: currently, only approved Google Cloud Dataflow '
+ 'container images may be used here.'))
+ parser.add_argument(
+ '--teardown_policy',
+ choices=['TEARDOWN_ALWAYS', 'TEARDOWN_NEVER', 'TEARDOWN_ON_SUCCESS'],
+ default=None,
+ help=
+ ('The teardown policy for the VMs. By default this is left unset and '
+ 'the service sets the default policy.'))
+ parser.add_argument(
+ '--use_public_ips',
+ default=None,
+ help='Whether to assign public IP addresses to the worker machines.')
+
+ def validate(self, validator):
+ errors = []
+ if validator.is_service_runner():
+ errors.extend(
+ validator.validate_optional_argument_positive(self, 'num_workers'))
+ return errors
+
+
+class DebugOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument('--dataflow_job_file',
+ default=None,
+ help='Debug file to write the workflow specification.')
+ parser.add_argument(
+ '--experiment', '--experiments',
+ dest='experiments',
+ action='append',
+ default=None,
+ help=
+ ('Runners may provide a number of experimental features that can be '
+ 'enabled with this flag. Please sync with the owners of the runner '
+ 'before enabling any experiments.'))
+
+
+class ProfilingOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ parser.add_argument('--profile_cpu',
+ action='store_true',
+ help='Enable work item CPU profiling.')
+ parser.add_argument('--profile_memory',
+ action='store_true',
+ help='Enable work item heap profiling.')
+ parser.add_argument('--profile_location',
+ default=None,
+ help='GCS path for saving profiler data.')
+
+
+class SetupOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ # Options for installing dependencies in the worker.
+ parser.add_argument(
+ '--requirements_file',
+ default=None,
+ help=
+ ('Path to a requirements file containing package dependencies. '
+ 'Typically it is produced by a pip freeze command. More details: '
+ 'https://pip.pypa.io/en/latest/reference/pip_freeze.html. '
+ 'If used, all the packages specified will be downloaded, '
+ 'cached (use --requirements_cache to change default location), '
+ 'and then staged so that they can be automatically installed in '
+ 'workers during startup. The cache is refreshed as needed '
+ 'avoiding extra downloads for existing packages. Typically the '
+ 'file is named requirements.txt.'))
+ parser.add_argument(
+ '--requirements_cache',
+ default=None,
+ help=
+ ('Path to a folder to cache the packages specified in '
+ 'the requirements file using the --requirements_file option.'))
+ parser.add_argument(
+ '--setup_file',
+ default=None,
+ help=
+ ('Path to a setup Python file containing package dependencies. If '
+ 'specified, the file\'s containing folder is assumed to have the '
+ 'structure required for a setuptools setup package. The file must be '
+ 'named setup.py. More details: '
+ 'https://pythonhosted.org/an_example_pypi_project/setuptools.html '
+ 'During job submission a source distribution will be built and the '
+ 'worker will install the resulting package before running any custom '
+ 'code.'))
+ parser.add_argument(
+ '--save_main_session',
+ default=False,
+ action='store_true',
+ help=
+ ('Save the main session state so that pickled functions and classes '
+ 'defined in __main__ (e.g. interactive session) can be unpickled. '
+ 'Some workflows do not need the session state if for instance all '
+ 'their functions/classes are defined in proper modules (not __main__)'
+ ' and the modules are importable in the worker. '))
+ parser.add_argument(
+ '--sdk_location',
+ default='default',
+ help=
+ ('Override the default GitHub location from where Dataflow SDK is '
+ 'downloaded. It can be an URL, a GCS path, or a local path to an '
+ 'SDK tarball. Workflow submissions will download or copy an SDK '
+ 'tarball from here. If the string "default", '
+ 'a standard SDK location is used. If empty, no SDK is copied.'))
+ parser.add_argument(
+ '--extra_package', '--extra_packages',
+ dest='extra_packages',
+ action='append',
+ default=None,
+ help=
+ ('Local path to a Python package file. The file is expected to be (1) '
+ 'a package tarball (".tar") or (2) a compressed package tarball '
+ '(".tar.gz") which can be installed using the "pip install" command '
+ 'of the standard pip package. Multiple --extra_package options can '
+ 'be specified if more than one package is needed. During job '
+ 'submission, the files will be staged in the staging area '
+ '(--staging_location option) and the workers will install them in '
+ 'same order they were specified on the command line.'))
+
+
+class TestOptions(PipelineOptions):
+
+ @classmethod
+ def _add_argparse_args(cls, parser):
+ # Options for e2e test pipeline.
+ parser.add_argument(
+ '--on_success_matcher',
+ default=None,
+ help=('Verify state/output of e2e test pipeline. This is pickled '
+ 'version of the matcher which should extends '
+ 'hamcrest.core.base_matcher.BaseMatcher.'))
+
+ def validate(self, validator):
+ errors = []
+ if self.view_as(TestOptions).on_success_matcher:
+ errors.extend(validator.validate_test_matcher(self, 'on_success_matcher'))
+ return errors
+
+# TODO(silviuc): Add --files_to_stage option.
+# This could potentially replace the --requirements_file and --setup_file.
+
+# TODO(silviuc): Non-standard options. Keep them? If yes, add help too!
+# Remote execution must check that this option is not None.
+
+
+class OptionsContext(object):
+ """Set default pipeline options for pipelines created in this block.
+
+ This is particularly useful for pipelines implicitly created with the
+
+ [python list] | PTransform
+
+ construct.
+
+ Can also be used as a decorator.
+ """
+ overrides = []
+
+ def __init__(self, **options):
+ self.options = options
+
+ def __enter__(self):
+ self.overrides.append(self.options)
+
+ def __exit__(self, *exn_info):
+ self.overrides.pop()
+
+ def __call__(self, f, *args, **kwargs):
+
+ def wrapper(*args, **kwargs):
+ with self:
+ f(*args, **kwargs)
+
+ return wrapper
+
+ @classmethod
+ def augment_options(cls, options):
+ for override in cls.overrides:
+ for name, value in override.items():
+ setattr(options, name, value)
+ return options
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/pipeline_options_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_test.py b/sdks/python/apache_beam/utils/pipeline_options_test.py
index 75bd6a2..054b6a5 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_test.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_test.py
@@ -23,7 +23,7 @@ import unittest
import hamcrest as hc
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
class PipelineOptionsTest(unittest.TestCase):
http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/pipeline_options_validator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator.py b/sdks/python/apache_beam/utils/pipeline_options_validator.py
index c1243ce..85fdc4d 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_validator.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator.py
@@ -20,13 +20,13 @@
import re
from apache_beam.internal import pickler
-from apache_beam.utils.options import DebugOptions
-from apache_beam.utils.options import GoogleCloudOptions
-from apache_beam.utils.options import SetupOptions
-from apache_beam.utils.options import StandardOptions
-from apache_beam.utils.options import TestOptions
-from apache_beam.utils.options import TypeOptions
-from apache_beam.utils.options import WorkerOptions
+from apache_beam.utils.pipeline_options import DebugOptions
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.utils.pipeline_options import StandardOptions
+from apache_beam.utils.pipeline_options import TestOptions
+from apache_beam.utils.pipeline_options import TypeOptions
+from apache_beam.utils.pipeline_options import WorkerOptions
class PipelineOptionsValidator(object):