You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/12/29 04:33:18 UTC

[2/3] beam git commit: Rename options.py -> pipeline_options.py

Rename options.py -> pipeline_options.py


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd9c9f88
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd9c9f88
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd9c9f88

Branch: refs/heads/python-sdk
Commit: cd9c9f888879d425a990031f28bf3bff63d2087e
Parents: 89a5b3c
Author: Maria Garcia Herrero <ma...@google.com>
Authored: Wed Dec 28 15:28:37 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Dec 28 20:32:24 2016 -0800

----------------------------------------------------------------------
 .../examples/complete/autocomplete.py           |   4 +-
 .../examples/complete/estimate_pi.py            |   4 +-
 .../apache_beam/examples/complete/tfidf.py      |   4 +-
 .../examples/complete/top_wikipedia_sessions.py |   4 +-
 .../examples/cookbook/bigquery_side_input.py    |   4 +-
 .../apache_beam/examples/cookbook/bigshuffle.py |   4 +-
 .../apache_beam/examples/cookbook/coders.py     |   4 +-
 .../examples/cookbook/custom_ptransform.py      |   2 +-
 .../examples/cookbook/datastore_wordcount.py    |   6 +-
 .../examples/cookbook/group_with_coder.py       |   4 +-
 .../examples/cookbook/mergecontacts.py          |   4 +-
 .../examples/cookbook/multiple_output_pardo.py  |   4 +-
 .../apache_beam/examples/snippets/snippets.py   |  50 +-
 .../examples/snippets/snippets_test.py          |   2 +-
 sdks/python/apache_beam/examples/wordcount.py   |   4 +-
 .../apache_beam/examples/wordcount_debugging.py |   4 +-
 .../apache_beam/examples/wordcount_minimal.py   |   4 +-
 sdks/python/apache_beam/internal/apiclient.py   |   8 +-
 .../apache_beam/internal/apiclient_test.py      |   2 +-
 sdks/python/apache_beam/internal/auth.py        |   4 +-
 sdks/python/apache_beam/io/bigquery.py          |   2 +-
 sdks/python/apache_beam/io/bigquery_test.py     |   2 +-
 sdks/python/apache_beam/pipeline.py             |   8 +-
 .../apache_beam/runners/dataflow_runner.py      |   2 +-
 .../runners/direct/transform_evaluator.py       |   2 +-
 sdks/python/apache_beam/runners/runner_test.py  |   2 +-
 .../apache_beam/runners/template_runner_test.py |   2 +-
 .../runners/test/test_dataflow_runner.py        |   2 +-
 sdks/python/apache_beam/test_pipeline.py        |   2 +-
 sdks/python/apache_beam/test_pipeline_test.py   |   2 +-
 sdks/python/apache_beam/transforms/core.py      |   2 +-
 sdks/python/apache_beam/transforms/display.py   |   2 +-
 .../apache_beam/transforms/display_test.py      |   2 +-
 .../python/apache_beam/transforms/ptransform.py |   2 +-
 .../apache_beam/transforms/ptransform_test.py   |   4 +-
 .../transforms/write_ptransform_test.py         |   2 +-
 .../typehints/typed_pipeline_test.py            |   4 +-
 sdks/python/apache_beam/utils/dependency.py     |   4 +-
 .../python/apache_beam/utils/dependency_test.py |   6 +-
 sdks/python/apache_beam/utils/options.py        | 543 -------------------
 .../apache_beam/utils/pipeline_options.py       | 540 ++++++++++++++++++
 .../apache_beam/utils/pipeline_options_test.py  |   2 +-
 .../utils/pipeline_options_validator.py         |  14 +-
 .../utils/pipeline_options_validator_test.py    |   2 +-
 44 files changed, 639 insertions(+), 642 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/complete/autocomplete.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py
index eaa5ca2..87e6c0c 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -24,8 +24,8 @@ import logging
 import re
 
 import apache_beam as beam
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
 
 
 def run(argv=None):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/complete/estimate_pi.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index 682c6d2..d0faefe 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -36,8 +36,8 @@ import apache_beam as beam
 from apache_beam.typehints import Any
 from apache_beam.typehints import Iterable
 from apache_beam.typehints import Tuple
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
 
 
 @beam.typehints.with_output_types(Tuple[int, int, int])

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/complete/tfidf.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py
index 59b2900..b4d5b45 100644
--- a/sdks/python/apache_beam/examples/complete/tfidf.py
+++ b/sdks/python/apache_beam/examples/complete/tfidf.py
@@ -30,8 +30,8 @@ import re
 
 import apache_beam as beam
 from apache_beam.pvalue import AsSingleton
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
 
 
 def read_documents(pipeline, uris):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index fbce641..cbd305a 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -46,8 +46,8 @@ import logging
 import apache_beam as beam
 from apache_beam import combiners
 from apache_beam import window
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
 
 
 ONE_HOUR_IN_SECONDS = 3600

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index 8a53637..25e2c3b 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -35,8 +35,8 @@ import apache_beam as beam
 
 from apache_beam.pvalue import AsList
 from apache_beam.pvalue import AsSingleton
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
 
 
 def create_groups(group_ids, corpus, word, ignore_corpus, ignore_word):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index a076a0c..83d3881 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -25,8 +25,8 @@ import logging
 
 
 import apache_beam as beam
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
 
 
 def crc32line(line):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/coders.py b/sdks/python/apache_beam/examples/cookbook/coders.py
index bbe02b3..690ba66 100644
--- a/sdks/python/apache_beam/examples/cookbook/coders.py
+++ b/sdks/python/apache_beam/examples/cookbook/coders.py
@@ -35,8 +35,8 @@ import json
 import logging
 
 import apache_beam as beam
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
 
 
 class JsonCoder(object):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index b9d64cf..56259ed 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -27,7 +27,7 @@ import logging
 
 import apache_beam as beam
 
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
 
 
 # pylint doesn't understand our pipeline syntax:

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index 9613402..dd34070 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -73,9 +73,9 @@ from googledatastore import helper as datastore_helper, PropertyFilter
 import apache_beam as beam
 from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
 from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore
-from apache_beam.utils.options import GoogleCloudOptions
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
 
 empty_line_aggregator = beam.Aggregator('emptyLines')
 average_word_size_aggregator = beam.Aggregator('averageWordLength',

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
index 651a4f3..c4b8c59 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder.py
@@ -35,8 +35,8 @@ import apache_beam as beam
 from apache_beam import coders
 from apache_beam.typehints import typehints
 from apache_beam.typehints.decorators import with_output_types
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
 
 
 class Player(object):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
index bf6d1b1..6602609 100644
--- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
+++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py
@@ -36,8 +36,8 @@ import logging
 import re
 
 import apache_beam as beam
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
 
 
 def run(argv=None, assert_results=None):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
index d760e5a..dd91e74 100644
--- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
+++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py
@@ -54,8 +54,8 @@ import re
 
 import apache_beam as beam
 from apache_beam import pvalue
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
 
 
 class SplitLinesToWordsFn(beam.DoFn):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 64878f3..0d55125 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -88,7 +88,7 @@ def construct_pipeline(renames):
     return True
 
   # [START pipelines_constructing_creating]
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
 
   p = beam.Pipeline(options=PipelineOptions())
   # [END pipelines_constructing_creating]
@@ -125,7 +125,7 @@ def model_pipelines(argv):
   import re
 
   import apache_beam as beam
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
 
   class MyOptions(PipelineOptions):
 
@@ -161,7 +161,7 @@ def model_pcollection(argv):
 
   URL: https://cloud.google.com/dataflow/model/pcollection
   """
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
 
   class MyOptions(PipelineOptions):
 
@@ -197,7 +197,7 @@ def pipeline_options_remote(argv):
   """
 
   from apache_beam import Pipeline
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
 
   # [START pipeline_options_create]
   options = PipelineOptions(flags=argv)
@@ -212,8 +212,8 @@ def pipeline_options_remote(argv):
       parser.add_argument('--output')
   # [END pipeline_options_define_custom]
 
-  from apache_beam.utils.options import GoogleCloudOptions
-  from apache_beam.utils.options import StandardOptions
+  from apache_beam.utils.pipeline_options import GoogleCloudOptions
+  from apache_beam.utils.pipeline_options import StandardOptions
 
   # [START pipeline_options_dataflow_service]
   # Create and set your PipelineOptions.
@@ -254,7 +254,7 @@ def pipeline_options_local(argv):
   """
 
   from apache_beam import Pipeline
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
 
   options = PipelineOptions(flags=argv)
 
@@ -320,7 +320,7 @@ def pipeline_logging(lines, output):
 
   import re
   import apache_beam as beam
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
 
   # [START pipeline_logging]
   # import Python logging module.
@@ -357,7 +357,7 @@ def pipeline_monitoring(renames):
 
   import re
   import apache_beam as beam
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
 
   class WordCountOptions(PipelineOptions):
 
@@ -425,9 +425,9 @@ def examples_wordcount_minimal(renames):
 
   import apache_beam as beam
 
-  from apache_beam.utils.options import GoogleCloudOptions
-  from apache_beam.utils.options import StandardOptions
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import GoogleCloudOptions
+  from apache_beam.utils.pipeline_options import StandardOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
 
   # [START examples_wordcount_minimal_options]
   options = PipelineOptions()
@@ -485,7 +485,7 @@ def examples_wordcount_wordcount(renames):
   import re
 
   import apache_beam as beam
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
 
   argv = []
 
@@ -544,7 +544,7 @@ def examples_wordcount_debugging(renames):
   import re
 
   import apache_beam as beam
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
 
   # [START example_wordcount_debugging_logging]
   # [START example_wordcount_debugging_aggregators]
@@ -635,7 +635,7 @@ def model_custom_source(count):
   from apache_beam.io import iobase
   from apache_beam.io.range_trackers import OffsetRangeTracker
   from apache_beam.transforms.core import PTransform
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
 
   # Defining a new source.
   # [START model_custom_source_new_source]
@@ -766,7 +766,7 @@ def model_custom_sink(simplekv, KVs, final_table_name_no_ptransform,
   import apache_beam as beam
   from apache_beam.io import iobase
   from apache_beam.transforms.core import PTransform
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
 
   # Defining the new sink.
   # [START model_custom_sink_new_sink]
@@ -867,7 +867,7 @@ def model_textio(renames):
     return re.findall(r'[A-Za-z\']+', x)
 
   import apache_beam as beam
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
 
   # [START model_textio_read]
   p = beam.Pipeline(options=PipelineOptions())
@@ -902,7 +902,7 @@ def model_datastoreio():
   from google.datastore.v1 import query_pb2
   import googledatastore
   import apache_beam as beam
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
   from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
   from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore
 
@@ -938,7 +938,7 @@ def model_bigqueryio():
   URL: https://cloud.google.com/dataflow/model/bigquery-io
   """
   import apache_beam as beam
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
 
   # [START model_bigqueryio_read]
   p = beam.Pipeline(options=PipelineOptions())
@@ -1009,7 +1009,7 @@ def model_composite_transform_example(contents, output_path):
   # [END composite_ptransform_apply_method]
   # [END composite_transform_example]
 
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
   p = beam.Pipeline(options=PipelineOptions())
   (p
    | beam.Create(contents)
@@ -1025,7 +1025,7 @@ def model_multiple_pcollections_flatten(contents, output_path):
   """
   some_hash_fn = lambda s: ord(s[0])
   import apache_beam as beam
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
   p = beam.Pipeline(options=PipelineOptions())
   partition_fn = lambda element, partitions: some_hash_fn(element) % partitions
 
@@ -1066,7 +1066,7 @@ def model_multiple_pcollections_partition(contents, output_path):
     """Assume i in [0,100)."""
     return i
   import apache_beam as beam
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
   p = beam.Pipeline(options=PipelineOptions())
 
   students = p | beam.Create(contents)
@@ -1096,7 +1096,7 @@ def model_group_by_key(contents, output_path):
   import re
 
   import apache_beam as beam
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
   p = beam.Pipeline(options=PipelineOptions())
   words_and_counts = (
       p
@@ -1123,7 +1123,7 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
   URL: https://cloud.google.com/dataflow/model/group-by-key
   """
   import apache_beam as beam
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
   p = beam.Pipeline(options=PipelineOptions())
   # [START model_group_by_key_cogroupbykey_tuple]
   # Each data set is represented by key-value pairs in separate PCollections.
@@ -1161,7 +1161,7 @@ def model_join_using_side_inputs(
 
   import apache_beam as beam
   from apache_beam.pvalue import AsIter
-  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.utils.pipeline_options import PipelineOptions
 
   p = beam.Pipeline(options=PipelineOptions())
   # [START model_join_using_side_inputs]

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index feb06c5..1a84a6e 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -31,7 +31,7 @@ from apache_beam import typehints
 from apache_beam.io import fileio
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
-from apache_beam.utils.options import TypeOptions
+from apache_beam.utils.pipeline_options import TypeOptions
 from apache_beam.examples.snippets import snippets
 
 # pylint: disable=expression-not-assigned

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 7f347d8..51fc2eb 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -24,8 +24,8 @@ import logging
 import re
 
 import apache_beam as beam
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
 
 
 empty_line_aggregator = beam.Aggregator('emptyLines')

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/wordcount_debugging.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py
index ffbfed7..bba09b4 100644
--- a/sdks/python/apache_beam/examples/wordcount_debugging.py
+++ b/sdks/python/apache_beam/examples/wordcount_debugging.py
@@ -46,8 +46,8 @@ import logging
 import re
 
 import apache_beam as beam
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
 
 
 class FilterTextFn(beam.DoFn):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/examples/wordcount_minimal.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_minimal.py b/sdks/python/apache_beam/examples/wordcount_minimal.py
index 98477df..c02ec16 100644
--- a/sdks/python/apache_beam/examples/wordcount_minimal.py
+++ b/sdks/python/apache_beam/examples/wordcount_minimal.py
@@ -51,8 +51,8 @@ import logging
 import re
 
 import apache_beam as beam
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
 
 
 def run(argv=None):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py
index 3a9ba46..001ae64 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -41,10 +41,10 @@ from apache_beam.utils import retry
 from apache_beam.utils.dependency import get_required_container_version
 from apache_beam.utils.dependency import get_sdk_name_and_version
 from apache_beam.utils.names import PropertyNames
-from apache_beam.utils.options import DebugOptions
-from apache_beam.utils.options import GoogleCloudOptions
-from apache_beam.utils.options import StandardOptions
-from apache_beam.utils.options import WorkerOptions
+from apache_beam.utils.pipeline_options import DebugOptions
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import StandardOptions
+from apache_beam.utils.pipeline_options import WorkerOptions
 from apache_beam.internal.clients import storage
 import apache_beam.internal.clients.dataflow as dataflow
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient_test.py b/sdks/python/apache_beam/internal/apiclient_test.py
index 1a83752..188a5a8 100644
--- a/sdks/python/apache_beam/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/internal/apiclient_test.py
@@ -18,7 +18,7 @@
 
 import unittest
 
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.runners.dataflow_runner import DataflowRunner
 from apache_beam.internal import apiclient
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/internal/auth.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/auth.py b/sdks/python/apache_beam/internal/auth.py
index 056f40c..c645f24 100644
--- a/sdks/python/apache_beam/internal/auth.py
+++ b/sdks/python/apache_beam/internal/auth.py
@@ -29,8 +29,8 @@ from oauth2client.client import OAuth2Credentials
 
 from apache_beam.utils import processes
 from apache_beam.utils import retry
-from apache_beam.utils.options import GoogleCloudOptions
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
 
 
 # When we are running in GCE, we can authenticate with VM credentials.

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py
index 6a766d0..ea0a281 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -120,7 +120,7 @@ from apache_beam.internal.json_value import to_json_value
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.utils import retry
-from apache_beam.utils.options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
 
 # Protect against environments where bigquery library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/io/bigquery_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py
index 3a558d7..b8682d1 100644
--- a/sdks/python/apache_beam/io/bigquery_test.py
+++ b/sdks/python/apache_beam/io/bigquery_test.py
@@ -35,7 +35,7 @@ from apache_beam.io.bigquery import TableRowJsonCoder
 from apache_beam.io.bigquery import parse_table_schema_from_json
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
 
 
 class TestRowAsDictJsonCoder(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 8b2345e..81343f3 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -55,10 +55,10 @@ from apache_beam.runners import create_runner
 from apache_beam.runners import PipelineRunner
 from apache_beam.transforms import ptransform
 from apache_beam.typehints import TypeCheckError
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
-from apache_beam.utils.options import StandardOptions
-from apache_beam.utils.options import TypeOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.utils.pipeline_options import StandardOptions
+from apache_beam.utils.pipeline_options import TypeOptions
 from apache_beam.utils.pipeline_options_validator import PipelineOptionsValidator
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 3ee95c5..3505acc 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -41,7 +41,7 @@ from apache_beam.typehints import typehints
 from apache_beam.utils import names
 from apache_beam.utils.names import PropertyNames
 from apache_beam.utils.names import TransformNames
-from apache_beam.utils.options import StandardOptions
+from apache_beam.utils.pipeline_options import StandardOptions
 from apache_beam.internal.clients import dataflow as dataflow_api
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 24ab754..b4c43ba 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -38,7 +38,7 @@ from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
 from apache_beam.typehints.typecheck import TypeCheckError
 from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn
 from apache_beam.utils import counters
-from apache_beam.utils.options import TypeOptions
+from apache_beam.utils.pipeline_options import TypeOptions
 
 
 class TransformEvaluatorRegistry(object):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 74c81c2..ff6a22e 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -36,7 +36,7 @@ from apache_beam.runners import DirectRunner
 from apache_beam.runners import TestDataflowRunner
 import apache_beam.transforms as ptransform
 from apache_beam.transforms.display import DisplayDataItem
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
 
 
 class RunnerTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/runners/template_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/template_runner_test.py b/sdks/python/apache_beam/runners/template_runner_test.py
index 8cd818b..457022d 100644
--- a/sdks/python/apache_beam/runners/template_runner_test.py
+++ b/sdks/python/apache_beam/runners/template_runner_test.py
@@ -26,7 +26,7 @@ import tempfile
 import apache_beam as beam
 from apache_beam.pipeline import Pipeline
 from apache_beam.runners.dataflow_runner import DataflowRunner
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
 from apache_beam.internal import apiclient
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
index a58ab33..77655bd 100644
--- a/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/test/test_dataflow_runner.py
@@ -19,7 +19,7 @@
 
 from apache_beam.internal import pickler
 from apache_beam.runners.dataflow_runner import DataflowRunner
-from apache_beam.utils.options import TestOptions
+from apache_beam.utils.pipeline_options import TestOptions
 
 
 class TestDataflowRunner(DataflowRunner):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline.py b/sdks/python/apache_beam/test_pipeline.py
index e758c3d..48b98b2 100644
--- a/sdks/python/apache_beam/test_pipeline.py
+++ b/sdks/python/apache_beam/test_pipeline.py
@@ -22,7 +22,7 @@ import shlex
 
 from apache_beam.internal import pickler
 from apache_beam.pipeline import Pipeline
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
 from nose.plugins.skip import SkipTest
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/test_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/test_pipeline_test.py b/sdks/python/apache_beam/test_pipeline_test.py
index 747f0ef..42ba2d7 100644
--- a/sdks/python/apache_beam/test_pipeline_test.py
+++ b/sdks/python/apache_beam/test_pipeline_test.py
@@ -20,7 +20,7 @@
 import unittest
 
 from apache_beam.test_pipeline import TestPipeline
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
 
 
 class TestPipelineTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 0ba1c62..6a7bd2e 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -46,7 +46,7 @@ from apache_beam.typehints import TypeCheckError
 from apache_beam.typehints import Union
 from apache_beam.typehints import WithTypeHints
 from apache_beam.typehints.trivial_inference import element_type
-from apache_beam.utils.options import TypeOptions
+from apache_beam.utils.pipeline_options import TypeOptions
 
 # Type variables
 T = typehints.TypeVariable('T')

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/display.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py
index c38fd9b..6e74512 100644
--- a/sdks/python/apache_beam/transforms/display.py
+++ b/sdks/python/apache_beam/transforms/display.py
@@ -122,7 +122,7 @@ class DisplayData(object):
       ValueError: If the has_display_data argument is not an instance of
         HasDisplayData.
     """
-    from apache_beam.utils.options import PipelineOptions
+    from apache_beam.utils.pipeline_options import PipelineOptions
     if not isinstance(pipeline_options, PipelineOptions):
       raise ValueError(
           'Element of class {}.{} does not subclass PipelineOptions'

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/display_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py
index fc50abe..848746c 100644
--- a/sdks/python/apache_beam/transforms/display_test.py
+++ b/sdks/python/apache_beam/transforms/display_test.py
@@ -29,7 +29,7 @@ import apache_beam as beam
 from apache_beam.transforms.display import HasDisplayData
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display import DisplayDataItem
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
 
 
 class DisplayDataItemMatcher(BaseMatcher):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 39864a6..c022c5e 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -410,7 +410,7 @@ class PTransform(WithTypeHints, HasDisplayData):
       deferred = False
       # pylint: disable=wrong-import-order, wrong-import-position
       from apache_beam import pipeline
-      from apache_beam.utils.options import PipelineOptions
+      from apache_beam.utils.pipeline_options import PipelineOptions
       # pylint: enable=wrong-import-order, wrong-import-position
       p = pipeline.Pipeline(
           'DirectRunner', PipelineOptions(sys.argv))

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 5ed7d72..705e85e 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -36,8 +36,8 @@ import apache_beam.typehints as typehints
 from apache_beam.typehints import with_input_types
 from apache_beam.typehints import with_output_types
 from apache_beam.typehints.typehints_test import TypeHintTestCase
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import TypeOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import TypeOptions
 
 
 # Disable frequent lint warning due to pipe operator for chaining transforms.

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/transforms/write_ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py
index 9a1a7de..f96dffb 100644
--- a/sdks/python/apache_beam/transforms/write_ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py
@@ -25,7 +25,7 @@ from apache_beam.io import iobase
 from apache_beam.pipeline import Pipeline
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.util import assert_that, is_empty
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
 
 
 class _TestSink(iobase.Sink):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/typehints/typed_pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 329d657..8b5e3f4 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -25,8 +25,8 @@ from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.transforms.util import assert_that, equal_to
 from apache_beam.typehints import WithTypeHints
-from apache_beam.utils.options import OptionsContext
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import OptionsContext
+from apache_beam.utils.pipeline_options import PipelineOptions
 
 # These test often construct a pipeline as value | PTransform to test side
 # effects (e.g. errors).

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py
index addcdb2..11a2e1c 100644
--- a/sdks/python/apache_beam/utils/dependency.py
+++ b/sdks/python/apache_beam/utils/dependency.py
@@ -66,8 +66,8 @@ from apache_beam import version as beam_version
 from apache_beam.internal import pickler
 from apache_beam.utils import names
 from apache_beam.utils import processes
-from apache_beam.utils.options import GoogleCloudOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import SetupOptions
 
 
 # Standard file names used for staging files.

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/dependency_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency_test.py b/sdks/python/apache_beam/utils/dependency_test.py
index 3549a07..a484d60 100644
--- a/sdks/python/apache_beam/utils/dependency_test.py
+++ b/sdks/python/apache_beam/utils/dependency_test.py
@@ -26,9 +26,9 @@ import unittest
 from apache_beam import utils
 from apache_beam.utils import dependency
 from apache_beam.utils import names
-from apache_beam.utils.options import GoogleCloudOptions
-from apache_beam.utils.options import PipelineOptions
-from apache_beam.utils.options import SetupOptions
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
+from apache_beam.utils.pipeline_options import SetupOptions
 
 
 class SetupTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py
deleted file mode 100644
index 7ca0573..0000000
--- a/sdks/python/apache_beam/utils/options.py
+++ /dev/null
@@ -1,543 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Pipeline options obtained from command line parsing.
-
-TODO(silviuc): Should rename this module to pipeline_options.
-"""
-
-import argparse
-
-from apache_beam.transforms.display import HasDisplayData
-
-
-class PipelineOptions(HasDisplayData):
-  """Pipeline options class used as container for command line options.
-
-  The class is essentially a wrapper over the standard argparse Python module
-  (see https://docs.python.org/3/library/argparse.html).  To define one option
-  or a group of options you subclass from PipelineOptions::
-
-    class XyzOptions(PipelineOptions):
-
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_argument('--abc', default='start')
-        parser.add_argument('--xyz', default='end')
-
-  The arguments for the add_argument() method are exactly the ones
-  described in the argparse public documentation.
-
-  Pipeline objects require an options object during initialization.
-  This is obtained simply by initializing an options class as defined above::
-
-    p = Pipeline(options=XyzOptions())
-    if p.options.xyz == 'end':
-      raise ValueError('Option xyz has an invalid value.')
-
-  By default the options classes will use command line arguments to initialize
-  the options.
-  """
-
-  def __init__(self, flags=None, **kwargs):
-    """Initialize an options class.
-
-    The initializer will traverse all subclasses, add all their argparse
-    arguments and then parse the command line specified by flags or by default
-    the one obtained from sys.argv.
-
-    The subclasses are not expected to require a redefinition of __init__.
-
-    Args:
-      flags: An iterable of command line arguments to be used. If not specified
-        then sys.argv will be used as input for parsing arguments.
-
-      **kwargs: Add overrides for arguments passed in flags.
-    """
-    self._flags = flags
-    self._all_options = kwargs
-    parser = argparse.ArgumentParser()
-    for cls in type(self).mro():
-      if cls == PipelineOptions:
-        break
-      elif '_add_argparse_args' in cls.__dict__:
-        cls._add_argparse_args(parser)
-    # The _visible_options attribute will contain only those options from the
-    # flags (i.e., command line) that can be recognized. The _all_options
-    # field contains additional overrides.
-    self._visible_options, _ = parser.parse_known_args(flags)
-
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    # Override this in subclasses to provide options.
-    pass
-
-  @classmethod
-  def from_dictionary(cls, options):
-    """Returns a PipelineOptions from a dictionary of arguments.
-
-    Args:
-      options: Dictinary of argument value pairs.
-
-    Returns:
-      A PipelineOptions object representing the given arguments.
-    """
-    flags = []
-    for k, v in options.iteritems():
-      if isinstance(v, bool):
-        if v:
-          flags.append('--%s' % k)
-      else:
-        flags.append('--%s=%s' % (k, v))
-
-    return cls(flags)
-
-  def get_all_options(self, drop_default=False):
-    """Returns a dictionary of all defined arguments.
-
-    Returns a dictionary of all defined arguments (arguments that are defined in
-    any subclass of PipelineOptions) into a dictionary.
-
-    Args:
-      drop_default: If set to true, options that are equal to their default
-        values, are not returned as part of the result dictionary.
-
-    Returns:
-      Dictionary of all args and values.
-    """
-    parser = argparse.ArgumentParser()
-    for cls in PipelineOptions.__subclasses__():
-      cls._add_argparse_args(parser)  # pylint: disable=protected-access
-    known_args, _ = parser.parse_known_args(self._flags)
-    result = vars(known_args)
-
-    # Apply the overrides if any
-    for k in result.keys():
-      if k in self._all_options:
-        result[k] = self._all_options[k]
-      if drop_default and parser.get_default(k) == result[k]:
-        del result[k]
-
-    return result
-
-  def display_data(self):
-    return self.get_all_options(True)
-
-  def view_as(self, cls):
-    view = cls(self._flags)
-    view._all_options = self._all_options
-    return view
-
-  def _visible_option_list(self):
-    return sorted(option
-                  for option in dir(self._visible_options) if option[0] != '_')
-
-  def __dir__(self):
-    return sorted(dir(type(self)) + self.__dict__.keys() +
-                  self._visible_option_list())
-
-  def __getattr__(self, name):
-    # Special methods which may be accessed before the object is
-    # fully constructed (e.g. in unpickling).
-    if name[:2] == name[-2:] == '__':
-      return object.__getattr__(self, name)
-    elif name in self._visible_option_list():
-      return self._all_options.get(name, getattr(self._visible_options, name))
-    else:
-      raise AttributeError("'%s' object has no attribute '%s'" %
-                           (type(self).__name__, name))
-
-  def __setattr__(self, name, value):
-    if name in ('_flags', '_all_options', '_visible_options'):
-      super(PipelineOptions, self).__setattr__(name, value)
-    elif name in self._visible_option_list():
-      self._all_options[name] = value
-    else:
-      raise AttributeError("'%s' object has no attribute '%s'" %
-                           (type(self).__name__, name))
-
-  def __str__(self):
-    return '%s(%s)' % (type(self).__name__,
-                       ', '.join('%s=%s' % (option, getattr(self, option))
-                                 for option in self._visible_option_list()))
-
-
-class StandardOptions(PipelineOptions):
-
-  DEFAULT_RUNNER = 'DirectRunner'
-
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    parser.add_argument(
-        '--runner',
-        help=('Pipeline runner used to execute the workflow. Valid values are '
-              'DirectRunner, DataflowRunner, '
-              'and BlockingDataflowRunner.'))
-    # Whether to enable streaming mode.
-    parser.add_argument('--streaming',
-                        default=False,
-                        action='store_true',
-                        help='Whether to enable streaming mode.')
-
-
-class TypeOptions(PipelineOptions):
-
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    # TODO(laolu): Add a type inferencing option here once implemented.
-    parser.add_argument('--type_check_strictness',
-                        default='DEFAULT_TO_ANY',
-                        choices=['ALL_REQUIRED', 'DEFAULT_TO_ANY'],
-                        help='The level of exhaustive manual type-hint '
-                        'annotation required')
-    parser.add_argument('--no_pipeline_type_check',
-                        dest='pipeline_type_check',
-                        action='store_false',
-                        help='Disable type checking at pipeline construction '
-                        'time')
-    parser.add_argument('--pipeline_type_check',
-                        action='store_true',
-                        help='Enable type checking at pipeline construction '
-                        'time')
-    parser.add_argument('--runtime_type_check',
-                        default=False,
-                        action='store_true',
-                        help='Enable type checking at pipeline execution '
-                        'time. NOTE: only supported with the '
-                        'DirectRunner')
-
-
-class GoogleCloudOptions(PipelineOptions):
-  """Google Cloud Dataflow service execution options."""
-
-  BIGQUERY_API_SERVICE = 'bigquery.googleapis.com'
-  COMPUTE_API_SERVICE = 'compute.googleapis.com'
-  STORAGE_API_SERVICE = 'storage.googleapis.com'
-  DATAFLOW_ENDPOINT = 'https://dataflow.googleapis.com'
-
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    parser.add_argument(
-        '--dataflow_endpoint',
-        default=cls.DATAFLOW_ENDPOINT,
-        help=
-        ('The URL for the Dataflow API. If not set, the default public URL '
-         'will be used.'))
-    # Remote execution must check that this option is not None.
-    parser.add_argument('--project',
-                        default=None,
-                        help='Name of the Cloud project owning the Dataflow '
-                        'job.')
-    # Remote execution must check that this option is not None.
-    parser.add_argument('--job_name',
-                        default=None,
-                        help='Name of the Cloud Dataflow job.')
-    # Remote execution must check that this option is not None.
-    parser.add_argument('--staging_location',
-                        default=None,
-                        help='GCS path for staging code packages needed by '
-                        'workers.')
-    # Remote execution must check that this option is not None.
-    # If staging_location is not set, it defaults to temp_location.
-    parser.add_argument('--temp_location',
-                        default=None,
-                        help='GCS path for saving temporary workflow jobs.')
-    parser.add_argument('--service_account_name',
-                        default=None,
-                        help='Name of the service account for Google APIs.')
-    parser.add_argument('--service_account_key_file',
-                        default=None,
-                        help='Path to a file containing the P12 service '
-                        'credentials.')
-    parser.add_argument('--service_account_email',
-                        default=None,
-                        help='Identity to run virtual machines as.')
-    parser.add_argument('--no_auth', dest='no_auth', type=bool, default=False)
-    # Option to run templated pipelines
-    parser.add_argument('--template_location',
-                        default=None,
-                        help='Save job to specified local or GCS location.')
-
-  def validate(self, validator):
-    errors = []
-    if validator.is_service_runner():
-      errors.extend(validator.validate_cloud_options(self))
-      errors.extend(validator.validate_gcs_path(self, 'temp_location'))
-      if getattr(self, 'staging_location',
-                 None) or getattr(self, 'temp_location', None) is None:
-        errors.extend(validator.validate_gcs_path(self, 'staging_location'))
-
-    if self.view_as(DebugOptions).dataflow_job_file:
-      if self.view_as(GoogleCloudOptions).template_location:
-        errors.append('--dataflow_job_file and --template_location '
-                      'are mutually exclusive.')
-
-    return errors
-
-
-# Command line options controlling the worker pool configuration.
-# TODO(silviuc): Update description when autoscaling options are in.
-class WorkerOptions(PipelineOptions):
-  """Worker pool configuration options."""
-
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    parser.add_argument(
-        '--num_workers',
-        type=int,
-        default=None,
-        help=
-        ('Number of workers to use when executing the Dataflow job. If not '
-         'set, the Dataflow service will use a reasonable default.'))
-    parser.add_argument(
-        '--max_num_workers',
-        type=int,
-        default=None,
-        help=
-        ('Maximum number of workers to use when executing the Dataflow job.'))
-    parser.add_argument(
-        '--autoscaling_algorithm',
-        type=str,
-        choices=['NONE', 'THROUGHPUT_BASED'],
-        default=None,  # Meaning unset, distinct from 'NONE' meaning don't scale
-        help=
-        ('If and how to auotscale the workerpool.'))
-    parser.add_argument(
-        '--worker_machine_type',
-        dest='machine_type',
-        default=None,
-        help=('Machine type to create Dataflow worker VMs as. See '
-              'https://cloud.google.com/compute/docs/machine-types '
-              'for a list of valid options. If not set, '
-              'the Dataflow service will choose a reasonable '
-              'default.'))
-    parser.add_argument(
-        '--disk_size_gb',
-        type=int,
-        default=None,
-        help=
-        ('Remote worker disk size, in gigabytes, or 0 to use the default size. '
-         'If not set, the Dataflow service will use a reasonable default.'))
-    parser.add_argument(
-        '--worker_disk_type',
-        dest='disk_type',
-        default=None,
-        help=('Specifies what type of persistent disk should be used.'))
-    parser.add_argument(
-        '--zone',
-        default=None,
-        help=(
-            'GCE availability zone for launching workers. Default is up to the '
-            'Dataflow service.'))
-    parser.add_argument(
-        '--network',
-        default=None,
-        help=(
-            'GCE network for launching workers. Default is up to the Dataflow '
-            'service.'))
-    parser.add_argument(
-        '--worker_harness_container_image',
-        default=None,
-        help=('Docker registry location of container image to use for the '
-              'worker harness. Default is the container for the version of the '
-              'SDK. Note: currently, only approved Google Cloud Dataflow '
-              'container images may be used here.'))
-    parser.add_argument(
-        '--teardown_policy',
-        choices=['TEARDOWN_ALWAYS', 'TEARDOWN_NEVER', 'TEARDOWN_ON_SUCCESS'],
-        default=None,
-        help=
-        ('The teardown policy for the VMs. By default this is left unset and '
-         'the service sets the default policy.'))
-    parser.add_argument(
-        '--use_public_ips',
-        default=None,
-        help='Whether to assign public IP addresses to the worker machines.')
-
-  def validate(self, validator):
-    errors = []
-    if validator.is_service_runner():
-      errors.extend(
-          validator.validate_optional_argument_positive(self, 'num_workers'))
-    return errors
-
-
-class DebugOptions(PipelineOptions):
-
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    parser.add_argument('--dataflow_job_file',
-                        default=None,
-                        help='Debug file to write the workflow specification.')
-    parser.add_argument(
-        '--experiment', '--experiments',
-        dest='experiments',
-        action='append',
-        default=None,
-        help=
-        ('Runners may provide a number of experimental features that can be '
-         'enabled with this flag. Please sync with the owners of the runner '
-         'before enabling any experiments.'))
-
-
-class ProfilingOptions(PipelineOptions):
-
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    parser.add_argument('--profile_cpu',
-                        action='store_true',
-                        help='Enable work item CPU profiling.')
-    parser.add_argument('--profile_memory',
-                        action='store_true',
-                        help='Enable work item heap profiling.')
-    parser.add_argument('--profile_location',
-                        default=None,
-                        help='GCS path for saving profiler data.')
-
-
-class SetupOptions(PipelineOptions):
-
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    # Options for installing dependencies in the worker.
-    parser.add_argument(
-        '--requirements_file',
-        default=None,
-        help=
-        ('Path to a requirements file containing package dependencies. '
-         'Typically it is produced by a pip freeze command. More details: '
-         'https://pip.pypa.io/en/latest/reference/pip_freeze.html. '
-         'If used, all the packages specified will be downloaded, '
-         'cached (use --requirements_cache to change default location), '
-         'and then staged so that they can be automatically installed in '
-         'workers during startup. The cache is refreshed as needed '
-         'avoiding extra downloads for existing packages. Typically the '
-         'file is named requirements.txt.'))
-    parser.add_argument(
-        '--requirements_cache',
-        default=None,
-        help=
-        ('Path to a folder to cache the packages specified in '
-         'the requirements file using the --requirements_file option.'))
-    parser.add_argument(
-        '--setup_file',
-        default=None,
-        help=
-        ('Path to a setup Python file containing package dependencies. If '
-         'specified, the file\'s containing folder is assumed to have the '
-         'structure required for a setuptools setup package. The file must be '
-         'named setup.py. More details: '
-         'https://pythonhosted.org/an_example_pypi_project/setuptools.html '
-         'During job submission a source distribution will be built and the '
-         'worker will install the resulting package before running any custom '
-         'code.'))
-    parser.add_argument(
-        '--save_main_session',
-        default=False,
-        action='store_true',
-        help=
-        ('Save the main session state so that pickled functions and classes '
-         'defined in __main__ (e.g. interactive session) can be unpickled. '
-         'Some workflows do not need the session state if for instance all '
-         'their functions/classes are defined in proper modules (not __main__)'
-         ' and the modules are importable in the worker. '))
-    parser.add_argument(
-        '--sdk_location',
-        default='default',
-        help=
-        ('Override the default GitHub location from where Dataflow SDK is '
-         'downloaded. It can be an URL, a GCS path, or a local path to an '
-         'SDK tarball. Workflow submissions will download or copy an SDK '
-         'tarball from here. If the string "default", '
-         'a standard SDK location is used. If empty, no SDK is copied.'))
-    parser.add_argument(
-        '--extra_package', '--extra_packages',
-        dest='extra_packages',
-        action='append',
-        default=None,
-        help=
-        ('Local path to a Python package file. The file is expected to be (1) '
-         'a package tarball (".tar") or (2) a compressed package tarball '
-         '(".tar.gz") which can be installed using the "pip install" command '
-         'of the standard pip package. Multiple --extra_package options can '
-         'be specified if more than one package is needed. During job '
-         'submission, the files will be staged in the staging area '
-         '(--staging_location option) and the workers will install them in '
-         'same order they were specified on the command line.'))
-
-
-class TestOptions(PipelineOptions):
-
-  @classmethod
-  def _add_argparse_args(cls, parser):
-    # Options for e2e test pipeline.
-    parser.add_argument(
-        '--on_success_matcher',
-        default=None,
-        help=('Verify state/output of e2e test pipeline. This is pickled '
-              'version of the matcher which should extends '
-              'hamcrest.core.base_matcher.BaseMatcher.'))
-
-  def validate(self, validator):
-    errors = []
-    if self.view_as(TestOptions).on_success_matcher:
-      errors.extend(validator.validate_test_matcher(self, 'on_success_matcher'))
-    return errors
-
-# TODO(silviuc): Add --files_to_stage option.
-# This could potentially replace the --requirements_file and --setup_file.
-
-# TODO(silviuc): Non-standard options. Keep them? If yes, add help too!
-# Remote execution must check that this option is not None.
-
-
-class OptionsContext(object):
-  """Set default pipeline options for pipelines created in this block.
-
-  This is particularly useful for pipelines implicitly created with the
-
-      [python list] | PTransform
-
-  construct.
-
-  Can also be used as a decorator.
-  """
-  overrides = []
-
-  def __init__(self, **options):
-    self.options = options
-
-  def __enter__(self):
-    self.overrides.append(self.options)
-
-  def __exit__(self, *exn_info):
-    self.overrides.pop()
-
-  def __call__(self, f, *args, **kwargs):
-
-    def wrapper(*args, **kwargs):
-      with self:
-        f(*args, **kwargs)
-
-    return wrapper
-
-  @classmethod
-  def augment_options(cls, options):
-    for override in cls.overrides:
-      for name, value in override.items():
-        setattr(options, name, value)
-    return options

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options.py b/sdks/python/apache_beam/utils/pipeline_options.py
new file mode 100644
index 0000000..3e09a3b
--- /dev/null
+++ b/sdks/python/apache_beam/utils/pipeline_options.py
@@ -0,0 +1,540 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline options obtained from command line parsing."""
+
+import argparse
+
+from apache_beam.transforms.display import HasDisplayData
+
+
+class PipelineOptions(HasDisplayData):
+  """Pipeline options class used as container for command line options.
+
+  The class is essentially a wrapper over the standard argparse Python module
+  (see https://docs.python.org/3/library/argparse.html).  To define one option
+  or a group of options you subclass from PipelineOptions::
+
+    class XyzOptions(PipelineOptions):
+
+      @classmethod
+      def _add_argparse_args(cls, parser):
+        parser.add_argument('--abc', default='start')
+        parser.add_argument('--xyz', default='end')
+
+  The arguments for the add_argument() method are exactly the ones
+  described in the argparse public documentation.
+
+  Pipeline objects require an options object during initialization.
+  This is obtained simply by initializing an options class as defined above::
+
+    p = Pipeline(options=XyzOptions())
+    if p.options.xyz == 'end':
+      raise ValueError('Option xyz has an invalid value.')
+
+  By default the options classes will use command line arguments to initialize
+  the options.
+  """
+
+  def __init__(self, flags=None, **kwargs):
+    """Initialize an options class.
+
+    The initializer will traverse all subclasses, add all their argparse
+    arguments and then parse the command line specified by flags or by default
+    the one obtained from sys.argv.
+
+    The subclasses are not expected to require a redefinition of __init__.
+
+    Args:
+      flags: An iterable of command line arguments to be used. If not specified
+        then sys.argv will be used as input for parsing arguments.
+
+      **kwargs: Add overrides for arguments passed in flags.
+    """
+    self._flags = flags
+    self._all_options = kwargs
+    parser = argparse.ArgumentParser()
+    for cls in type(self).mro():
+      if cls == PipelineOptions:
+        break
+      elif '_add_argparse_args' in cls.__dict__:
+        cls._add_argparse_args(parser)
+    # The _visible_options attribute will contain only those options from the
+    # flags (i.e., command line) that can be recognized. The _all_options
+    # field contains additional overrides.
+    self._visible_options, _ = parser.parse_known_args(flags)
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    # Override this in subclasses to provide options.
+    pass
+
+  @classmethod
+  def from_dictionary(cls, options):
+    """Returns a PipelineOptions from a dictionary of arguments.
+
+    Args:
+      options: Dictinary of argument value pairs.
+
+    Returns:
+      A PipelineOptions object representing the given arguments.
+    """
+    flags = []
+    for k, v in options.iteritems():
+      if isinstance(v, bool):
+        if v:
+          flags.append('--%s' % k)
+      else:
+        flags.append('--%s=%s' % (k, v))
+
+    return cls(flags)
+
+  def get_all_options(self, drop_default=False):
+    """Returns a dictionary of all defined arguments.
+
+    Returns a dictionary of all defined arguments (arguments that are defined in
+    any subclass of PipelineOptions) into a dictionary.
+
+    Args:
+      drop_default: If set to true, options that are equal to their default
+        values, are not returned as part of the result dictionary.
+
+    Returns:
+      Dictionary of all args and values.
+    """
+    parser = argparse.ArgumentParser()
+    for cls in PipelineOptions.__subclasses__():
+      cls._add_argparse_args(parser)  # pylint: disable=protected-access
+    known_args, _ = parser.parse_known_args(self._flags)
+    result = vars(known_args)
+
+    # Apply the overrides if any
+    for k in result.keys():
+      if k in self._all_options:
+        result[k] = self._all_options[k]
+      if drop_default and parser.get_default(k) == result[k]:
+        del result[k]
+
+    return result
+
+  def display_data(self):
+    return self.get_all_options(True)
+
+  def view_as(self, cls):
+    view = cls(self._flags)
+    view._all_options = self._all_options
+    return view
+
+  def _visible_option_list(self):
+    return sorted(option
+                  for option in dir(self._visible_options) if option[0] != '_')
+
+  def __dir__(self):
+    return sorted(dir(type(self)) + self.__dict__.keys() +
+                  self._visible_option_list())
+
+  def __getattr__(self, name):
+    # Special methods which may be accessed before the object is
+    # fully constructed (e.g. in unpickling).
+    if name[:2] == name[-2:] == '__':
+      return object.__getattr__(self, name)
+    elif name in self._visible_option_list():
+      return self._all_options.get(name, getattr(self._visible_options, name))
+    else:
+      raise AttributeError("'%s' object has no attribute '%s'" %
+                           (type(self).__name__, name))
+
+  def __setattr__(self, name, value):
+    if name in ('_flags', '_all_options', '_visible_options'):
+      super(PipelineOptions, self).__setattr__(name, value)
+    elif name in self._visible_option_list():
+      self._all_options[name] = value
+    else:
+      raise AttributeError("'%s' object has no attribute '%s'" %
+                           (type(self).__name__, name))
+
+  def __str__(self):
+    return '%s(%s)' % (type(self).__name__,
+                       ', '.join('%s=%s' % (option, getattr(self, option))
+                                 for option in self._visible_option_list()))
+
+
+class StandardOptions(PipelineOptions):
+
+  DEFAULT_RUNNER = 'DirectRunner'
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument(
+        '--runner',
+        help=('Pipeline runner used to execute the workflow. Valid values are '
+              'DirectRunner, DataflowRunner, '
+              'and BlockingDataflowRunner.'))
+    # Whether to enable streaming mode.
+    parser.add_argument('--streaming',
+                        default=False,
+                        action='store_true',
+                        help='Whether to enable streaming mode.')
+
+
+class TypeOptions(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    # TODO(laolu): Add a type inferencing option here once implemented.
+    parser.add_argument('--type_check_strictness',
+                        default='DEFAULT_TO_ANY',
+                        choices=['ALL_REQUIRED', 'DEFAULT_TO_ANY'],
+                        help='The level of exhaustive manual type-hint '
+                        'annotation required')
+    parser.add_argument('--no_pipeline_type_check',
+                        dest='pipeline_type_check',
+                        action='store_false',
+                        help='Disable type checking at pipeline construction '
+                        'time')
+    parser.add_argument('--pipeline_type_check',
+                        action='store_true',
+                        help='Enable type checking at pipeline construction '
+                        'time')
+    parser.add_argument('--runtime_type_check',
+                        default=False,
+                        action='store_true',
+                        help='Enable type checking at pipeline execution '
+                        'time. NOTE: only supported with the '
+                        'DirectRunner')
+
+
+class GoogleCloudOptions(PipelineOptions):
+  """Google Cloud Dataflow service execution options."""
+
+  BIGQUERY_API_SERVICE = 'bigquery.googleapis.com'
+  COMPUTE_API_SERVICE = 'compute.googleapis.com'
+  STORAGE_API_SERVICE = 'storage.googleapis.com'
+  DATAFLOW_ENDPOINT = 'https://dataflow.googleapis.com'
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument(
+        '--dataflow_endpoint',
+        default=cls.DATAFLOW_ENDPOINT,
+        help=
+        ('The URL for the Dataflow API. If not set, the default public URL '
+         'will be used.'))
+    # Remote execution must check that this option is not None.
+    parser.add_argument('--project',
+                        default=None,
+                        help='Name of the Cloud project owning the Dataflow '
+                        'job.')
+    # Remote execution must check that this option is not None.
+    parser.add_argument('--job_name',
+                        default=None,
+                        help='Name of the Cloud Dataflow job.')
+    # Remote execution must check that this option is not None.
+    parser.add_argument('--staging_location',
+                        default=None,
+                        help='GCS path for staging code packages needed by '
+                        'workers.')
+    # Remote execution must check that this option is not None.
+    # If staging_location is not set, it defaults to temp_location.
+    parser.add_argument('--temp_location',
+                        default=None,
+                        help='GCS path for saving temporary workflow jobs.')
+    parser.add_argument('--service_account_name',
+                        default=None,
+                        help='Name of the service account for Google APIs.')
+    parser.add_argument('--service_account_key_file',
+                        default=None,
+                        help='Path to a file containing the P12 service '
+                        'credentials.')
+    parser.add_argument('--service_account_email',
+                        default=None,
+                        help='Identity to run virtual machines as.')
+    parser.add_argument('--no_auth', dest='no_auth', type=bool, default=False)
+    # Option to run templated pipelines
+    parser.add_argument('--template_location',
+                        default=None,
+                        help='Save job to specified local or GCS location.')
+
+  def validate(self, validator):
+    errors = []
+    if validator.is_service_runner():
+      errors.extend(validator.validate_cloud_options(self))
+      errors.extend(validator.validate_gcs_path(self, 'temp_location'))
+      if getattr(self, 'staging_location',
+                 None) or getattr(self, 'temp_location', None) is None:
+        errors.extend(validator.validate_gcs_path(self, 'staging_location'))
+
+    if self.view_as(DebugOptions).dataflow_job_file:
+      if self.view_as(GoogleCloudOptions).template_location:
+        errors.append('--dataflow_job_file and --template_location '
+                      'are mutually exclusive.')
+
+    return errors
+
+
+# Command line options controlling the worker pool configuration.
+# TODO(silviuc): Update description when autoscaling options are in.
+class WorkerOptions(PipelineOptions):
+  """Worker pool configuration options."""
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument(
+        '--num_workers',
+        type=int,
+        default=None,
+        help=
+        ('Number of workers to use when executing the Dataflow job. If not '
+         'set, the Dataflow service will use a reasonable default.'))
+    parser.add_argument(
+        '--max_num_workers',
+        type=int,
+        default=None,
+        help=
+        ('Maximum number of workers to use when executing the Dataflow job.'))
+    parser.add_argument(
+        '--autoscaling_algorithm',
+        type=str,
+        choices=['NONE', 'THROUGHPUT_BASED'],
+        default=None,  # Meaning unset, distinct from 'NONE' meaning don't scale
+        help=
+        ('If and how to auotscale the workerpool.'))
+    parser.add_argument(
+        '--worker_machine_type',
+        dest='machine_type',
+        default=None,
+        help=('Machine type to create Dataflow worker VMs as. See '
+              'https://cloud.google.com/compute/docs/machine-types '
+              'for a list of valid options. If not set, '
+              'the Dataflow service will choose a reasonable '
+              'default.'))
+    parser.add_argument(
+        '--disk_size_gb',
+        type=int,
+        default=None,
+        help=
+        ('Remote worker disk size, in gigabytes, or 0 to use the default size. '
+         'If not set, the Dataflow service will use a reasonable default.'))
+    parser.add_argument(
+        '--worker_disk_type',
+        dest='disk_type',
+        default=None,
+        help=('Specifies what type of persistent disk should be used.'))
+    parser.add_argument(
+        '--zone',
+        default=None,
+        help=(
+            'GCE availability zone for launching workers. Default is up to the '
+            'Dataflow service.'))
+    parser.add_argument(
+        '--network',
+        default=None,
+        help=(
+            'GCE network for launching workers. Default is up to the Dataflow '
+            'service.'))
+    parser.add_argument(
+        '--worker_harness_container_image',
+        default=None,
+        help=('Docker registry location of container image to use for the '
+              'worker harness. Default is the container for the version of the '
+              'SDK. Note: currently, only approved Google Cloud Dataflow '
+              'container images may be used here.'))
+    parser.add_argument(
+        '--teardown_policy',
+        choices=['TEARDOWN_ALWAYS', 'TEARDOWN_NEVER', 'TEARDOWN_ON_SUCCESS'],
+        default=None,
+        help=
+        ('The teardown policy for the VMs. By default this is left unset and '
+         'the service sets the default policy.'))
+    parser.add_argument(
+        '--use_public_ips',
+        default=None,
+        help='Whether to assign public IP addresses to the worker machines.')
+
+  def validate(self, validator):
+    errors = []
+    if validator.is_service_runner():
+      errors.extend(
+          validator.validate_optional_argument_positive(self, 'num_workers'))
+    return errors
+
+
+class DebugOptions(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument('--dataflow_job_file',
+                        default=None,
+                        help='Debug file to write the workflow specification.')
+    parser.add_argument(
+        '--experiment', '--experiments',
+        dest='experiments',
+        action='append',
+        default=None,
+        help=
+        ('Runners may provide a number of experimental features that can be '
+         'enabled with this flag. Please sync with the owners of the runner '
+         'before enabling any experiments.'))
+
+
+class ProfilingOptions(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    parser.add_argument('--profile_cpu',
+                        action='store_true',
+                        help='Enable work item CPU profiling.')
+    parser.add_argument('--profile_memory',
+                        action='store_true',
+                        help='Enable work item heap profiling.')
+    parser.add_argument('--profile_location',
+                        default=None,
+                        help='GCS path for saving profiler data.')
+
+
+class SetupOptions(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    # Options for installing dependencies in the worker.
+    parser.add_argument(
+        '--requirements_file',
+        default=None,
+        help=
+        ('Path to a requirements file containing package dependencies. '
+         'Typically it is produced by a pip freeze command. More details: '
+         'https://pip.pypa.io/en/latest/reference/pip_freeze.html. '
+         'If used, all the packages specified will be downloaded, '
+         'cached (use --requirements_cache to change default location), '
+         'and then staged so that they can be automatically installed in '
+         'workers during startup. The cache is refreshed as needed '
+         'avoiding extra downloads for existing packages. Typically the '
+         'file is named requirements.txt.'))
+    parser.add_argument(
+        '--requirements_cache',
+        default=None,
+        help=
+        ('Path to a folder to cache the packages specified in '
+         'the requirements file using the --requirements_file option.'))
+    parser.add_argument(
+        '--setup_file',
+        default=None,
+        help=
+        ('Path to a setup Python file containing package dependencies. If '
+         'specified, the file\'s containing folder is assumed to have the '
+         'structure required for a setuptools setup package. The file must be '
+         'named setup.py. More details: '
+         'https://pythonhosted.org/an_example_pypi_project/setuptools.html '
+         'During job submission a source distribution will be built and the '
+         'worker will install the resulting package before running any custom '
+         'code.'))
+    parser.add_argument(
+        '--save_main_session',
+        default=False,
+        action='store_true',
+        help=
+        ('Save the main session state so that pickled functions and classes '
+         'defined in __main__ (e.g. interactive session) can be unpickled. '
+         'Some workflows do not need the session state if for instance all '
+         'their functions/classes are defined in proper modules (not __main__)'
+         ' and the modules are importable in the worker. '))
+    parser.add_argument(
+        '--sdk_location',
+        default='default',
+        help=
+        ('Override the default GitHub location from where Dataflow SDK is '
+         'downloaded. It can be an URL, a GCS path, or a local path to an '
+         'SDK tarball. Workflow submissions will download or copy an SDK '
+         'tarball from here. If the string "default", '
+         'a standard SDK location is used. If empty, no SDK is copied.'))
+    parser.add_argument(
+        '--extra_package', '--extra_packages',
+        dest='extra_packages',
+        action='append',
+        default=None,
+        help=
+        ('Local path to a Python package file. The file is expected to be (1) '
+         'a package tarball (".tar") or (2) a compressed package tarball '
+         '(".tar.gz") which can be installed using the "pip install" command '
+         'of the standard pip package. Multiple --extra_package options can '
+         'be specified if more than one package is needed. During job '
+         'submission, the files will be staged in the staging area '
+         '(--staging_location option) and the workers will install them in '
+         'same order they were specified on the command line.'))
+
+
+class TestOptions(PipelineOptions):
+
+  @classmethod
+  def _add_argparse_args(cls, parser):
+    # Options for e2e test pipeline.
+    parser.add_argument(
+        '--on_success_matcher',
+        default=None,
+        help=('Verify state/output of e2e test pipeline. This is pickled '
+              'version of the matcher which should extends '
+              'hamcrest.core.base_matcher.BaseMatcher.'))
+
+  def validate(self, validator):
+    errors = []
+    if self.view_as(TestOptions).on_success_matcher:
+      errors.extend(validator.validate_test_matcher(self, 'on_success_matcher'))
+    return errors
+
+# TODO(silviuc): Add --files_to_stage option.
+# This could potentially replace the --requirements_file and --setup_file.
+
+# TODO(silviuc): Non-standard options. Keep them? If yes, add help too!
+# Remote execution must check that this option is not None.
+
+
+class OptionsContext(object):
+  """Set default pipeline options for pipelines created in this block.
+
+  This is particularly useful for pipelines implicitly created with the
+
+      [python list] | PTransform
+
+  construct.
+
+  Can also be used as a decorator.
+  """
+  overrides = []
+
+  def __init__(self, **options):
+    self.options = options
+
+  def __enter__(self):
+    self.overrides.append(self.options)
+
+  def __exit__(self, *exn_info):
+    self.overrides.pop()
+
+  def __call__(self, f, *args, **kwargs):
+
+    def wrapper(*args, **kwargs):
+      with self:
+        f(*args, **kwargs)
+
+    return wrapper
+
+  @classmethod
+  def augment_options(cls, options):
+    for override in cls.overrides:
+      for name, value in override.items():
+        setattr(options, name, value)
+    return options

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/pipeline_options_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_test.py b/sdks/python/apache_beam/utils/pipeline_options_test.py
index 75bd6a2..054b6a5 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_test.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_test.py
@@ -23,7 +23,7 @@ import unittest
 import hamcrest as hc
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
-from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.pipeline_options import PipelineOptions
 
 
 class PipelineOptionsTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/cd9c9f88/sdks/python/apache_beam/utils/pipeline_options_validator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_validator.py b/sdks/python/apache_beam/utils/pipeline_options_validator.py
index c1243ce..85fdc4d 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_validator.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_validator.py
@@ -20,13 +20,13 @@
 import re
 
 from apache_beam.internal import pickler
-from apache_beam.utils.options import DebugOptions
-from apache_beam.utils.options import GoogleCloudOptions
-from apache_beam.utils.options import SetupOptions
-from apache_beam.utils.options import StandardOptions
-from apache_beam.utils.options import TestOptions
-from apache_beam.utils.options import TypeOptions
-from apache_beam.utils.options import WorkerOptions
+from apache_beam.utils.pipeline_options import DebugOptions
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import SetupOptions
+from apache_beam.utils.pipeline_options import StandardOptions
+from apache_beam.utils.pipeline_options import TestOptions
+from apache_beam.utils.pipeline_options import TypeOptions
+from apache_beam.utils.pipeline_options import WorkerOptions
 
 
 class PipelineOptionsValidator(object):