You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:35:09 UTC
[20/50] beam git commit: soft-enable the use of streaming flag
soft-enable the use of streaming flag
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51139509
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51139509
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51139509
Branch: refs/heads/DSL_SQL
Commit: 51139509b65b5fa04a39c31f584a02f1a29170dc
Parents: e3139a3
Author: Ahmet Altay <al...@google.com>
Authored: Tue Jun 6 13:34:09 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue Jun 6 13:56:57 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/options/pipeline_options.py | 5 +++--
.../apache_beam/options/pipeline_options_validator_test.py | 8 --------
sdks/python/apache_beam/runners/dataflow/dataflow_runner.py | 2 +-
.../apache_beam/runners/dataflow/internal/apiclient.py | 2 +-
4 files changed, 5 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/51139509/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 777926a..daef3a7 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -18,6 +18,7 @@
"""Pipeline options obtained from command line parsing."""
import argparse
+import warnings
from apache_beam.transforms.display import HasDisplayData
from apache_beam.options.value_provider import StaticValueProvider
@@ -278,12 +279,12 @@ class StandardOptions(PipelineOptions):
action='store_true',
help='Whether to enable streaming mode.')
- # TODO(BEAM-1265): Remove this error, once at least one runner supports
+ # TODO(BEAM-1265): Remove this warning, once at least one runner supports
# streaming pipelines.
def validate(self, validator):
errors = []
if self.view_as(StandardOptions).streaming:
- errors.append('Streaming pipelines are not supported.')
+ warnings.warn('Streaming pipelines are not supported.')
return errors
http://git-wip-us.apache.org/repos/asf/beam/blob/51139509/sdks/python/apache_beam/options/pipeline_options_validator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options_validator_test.py b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
index 28fcbe3..97834cc 100644
--- a/sdks/python/apache_beam/options/pipeline_options_validator_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_validator_test.py
@@ -300,14 +300,6 @@ class SetupTest(unittest.TestCase):
errors = validator.validate()
self.assertFalse(errors)
- def test_streaming(self):
- pipeline_options = PipelineOptions(['--streaming'])
- runner = MockRunners.TestDataflowRunner()
- validator = PipelineOptionsValidator(pipeline_options, runner)
- errors = validator.validate()
-
- self.assertIn('Streaming pipelines are not supported.', errors)
-
def test_test_matcher(self):
def get_validator(matcher):
options = ['--project=example:example',
http://git-wip-us.apache.org/repos/asf/beam/blob/51139509/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 3e0e268..62cea33 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -64,7 +64,7 @@ class DataflowRunner(PipelineRunner):
# a job submission and is used by the service to establish what features
# are expected by the workers.
BATCH_ENVIRONMENT_MAJOR_VERSION = '6'
- STREAMING_ENVIRONMENT_MAJOR_VERSION = '0'
+ STREAMING_ENVIRONMENT_MAJOR_VERSION = '1'
def __init__(self, cache=None):
# Cache of CloudWorkflowStep protos generated while the runner
http://git-wip-us.apache.org/repos/asf/beam/blob/51139509/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index bfdd5e4..df1a3f2 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -145,7 +145,7 @@ class Environment(object):
# Version information.
self.proto.version = dataflow.Environment.VersionValue()
if self.standard_options.streaming:
- job_type = 'PYTHON_STREAMING'
+ job_type = 'FNAPI_STREAMING'
else:
job_type = 'PYTHON_BATCH'
self.proto.version.additionalProperties.extend([