You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:35:09 UTC

[20/50] beam git commit: soft-enable the use of streaming flag

soft-enable the use of streaming flag


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51139509
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51139509
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51139509

Branch: refs/heads/DSL_SQL
Commit: 51139509b65b5fa04a39c31f584a02f1a29170dc
Parents: e3139a3
Author: Ahmet Altay <al...@google.com>
Authored: Tue Jun 6 13:34:09 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Jun 6 13:56:57 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/options/pipeline_options.py          | 5 +++--
 .../apache_beam/options/pipeline_options_validator_test.py   | 8 --------
 sdks/python/apache_beam/runners/dataflow/dataflow_runner.py  | 2 +-
 .../apache_beam/runners/dataflow/internal/apiclient.py       | 2 +-
 4 files changed, 5 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/51139509/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 777926a..daef3a7 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -18,6 +18,7 @@
 """Pipeline options obtained from command line parsing."""
 
 import argparse
+import warnings
 
 from apache_beam.transforms.display import HasDisplayData
 from apache_beam.options.value_provider import StaticValueProvider
@@ -278,12 +279,12 @@ class StandardOptions(PipelineOptions):
                         action='store_true',
                         help='Whether to enable streaming mode.')
 
-  # TODO(BEAM-1265): Remove this error, once at least one runner supports
+  # TODO(BEAM-1265): Remove this warning, once at least one runner supports
   # streaming pipelines.
   def validate(self, validator):
     errors = []
     if self.view_as(StandardOptions).streaming:
-      errors.append('Streaming pipelines are not supported.')
+      warnings.warn('Streaming pipelines are not supported.')
     return errors
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/51139509/sdks/python/apache_beam/options/pipeline_options_validator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator_test.py b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
index 28fcbe3..97834cc 100644
--- a/sdks/python/apache_beam/options/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
@@ -300,14 +300,6 @@ class SetupTest(unittest.TestCase):
     errors = validator.validate()
     self.assertFalse(errors)
 
-  def test_streaming(self):
-    pipeline_options = PipelineOptions(['--streaming'])
-    runner = MockRunners.TestDataflowRunner()
-    validator = PipelineOptionsValidator(pipeline_options, runner)
-    errors = validator.validate()
-
-    self.assertIn('Streaming pipelines are not supported.', errors)
-
   def test_test_matcher(self):
     def get_validator(matcher):
       options = ['--project=example:example',

http://git-wip-us.apache.org/repos/asf/beam/blob/51139509/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 3e0e268..62cea33 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -64,7 +64,7 @@ class DataflowRunner(PipelineRunner):
   # a job submission and is used by the service to establish what features
   # are expected by the workers.
   BATCH_ENVIRONMENT_MAJOR_VERSION = '6'
-  STREAMING_ENVIRONMENT_MAJOR_VERSION = '0'
+  STREAMING_ENVIRONMENT_MAJOR_VERSION = '1'
 
   def __init__(self, cache=None):
     # Cache of CloudWorkflowStep protos generated while the runner

http://git-wip-us.apache.org/repos/asf/beam/blob/51139509/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index bfdd5e4..df1a3f2 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -145,7 +145,7 @@ class Environment(object):
     # Version information.
     self.proto.version = dataflow.Environment.VersionValue()
     if self.standard_options.streaming:
-      job_type = 'PYTHON_STREAMING'
+      job_type = 'FNAPI_STREAMING'
     else:
       job_type = 'PYTHON_BATCH'
     self.proto.version.additionalProperties.extend([