You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2020/03/18 21:12:45 UTC
[GitHub] [beam] rohdesamuel opened a new pull request #11163: Add better
error handling to the TestStreamServiceController
rohdesamuel opened a new pull request #11163: Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163
Change-Id: I022f25840207eb6c931c97358f159f134393c180
**Please** add a meaningful description for your change here
------------------------
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
--- | --- | --- | --- | --- | --- | --- | ---
Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
--- |Java | Python | Go | Website
--- | --- | --- | --- | ---
Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [beam] pabloem commented on issue #11163: [BEAM-9548] Add better
error handling to the TestStreamServiceController
Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#issuecomment-603464550
retest this please
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [beam] pabloem commented on a change in pull request #11163:
[BEAM-9548] Add better error handling to the TestStreamServiceController
Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#discussion_r396810340
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -170,8 +170,13 @@ def run_pipeline(self, pipeline, options):
user_pipeline)):
streaming_cache_manager = ie.current_env().cache_manager()
if streaming_cache_manager:
+
+ def exception_handler(e):
+ _LOGGER.error(str(e))
Review comment:
Same as above. Do we just log and not stop processing?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [beam] pabloem commented on a change in pull request #11163:
[BEAM-9548] Add better error handling to the TestStreamServiceController
Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#discussion_r396766986
##########
File path: sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
##########
@@ -166,13 +169,15 @@ def _wait_until_file_exists(self, timeout_secs=30):
# Wait for up to `timeout_secs` for the file to be available.
start = time.time()
- path = os.path.join(self._cache_dir, *self._labels)
- while not os.path.exists(path):
+ while not os.path.exists(self._path):
time.sleep(1)
if time.time() - start > timeout_timestamp_secs:
+ from apache_beam.runners.interactive.pipeline_instrument import CacheKey
+ pcollection_var = CacheKey.from_str(self._labels[-1]).var
Review comment:
I hadn't stopeed to think that labels are a file name too, huh? I guess the final file name is the PCollection variable name? If so, users may name their PCollections something that is not supported by the OS? (or maybe not since they have to be Python variable names?)
Anyway this is not for this PR. But just to think about.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [beam] rohdesamuel commented on a change in pull request #11163:
[BEAM-9548] Add better error handling to the TestStreamServiceController
Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#discussion_r397347169
##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -170,8 +170,13 @@ def run_pipeline(self, pipeline, options):
user_pipeline)):
streaming_cache_manager = ie.current_env().cache_manager()
if streaming_cache_manager:
+
+ def exception_handler(e):
+ _LOGGER.error(str(e))
Review comment:
For here, we this is caught at the TestStreamService level and for the same reason, we just log the error so that it's easier for the user to understand. Because it's caught at the service level, it means that we have already stopped processing.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [beam] pabloem commented on issue #11163: [BEAM-9548] Add better
error handling to the TestStreamServiceController
Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#issuecomment-603460559
retest this please
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [beam] pabloem commented on a change in pull request #11163:
[BEAM-9548] Add better error handling to the TestStreamServiceController
Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#discussion_r396767924
##########
File path: sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
##########
@@ -202,14 +207,24 @@ def _emit_from_file(self, fh, tail):
# The first line at pos = 0 is always the header. Read the line without
# the new line.
to_decode = line[:-1]
- if pos == 0:
- header = TestStreamFileHeader()
- header.ParseFromString(self._coder.decode(to_decode))
- yield header
+ proto_cls = TestStreamFileHeader if pos == 0 else TestStreamFileRecord
+ msg = self._try_parse_as(proto_cls, to_decode)
+ if msg:
+ yield msg
else:
- record = TestStreamFileRecord()
- record.ParseFromString(self._coder.decode(to_decode))
- yield record
+ break
+
+ def _try_parse_as(self, proto_cls, to_decode):
+ try:
+ msg = proto_cls()
+ msg.ParseFromString(self._coder.decode(to_decode))
+ except DecodeError:
+ _LOGGER.error(
+ 'Could not parse as %s. This can indicate that the cache is '
+ 'corruputed. Please restart the kernel. '
+ '\nfile: %s \nmessage: %s', proto_cls, self._path, to_decode)
+ msg = None
Review comment:
Do we just skip? This may mean that the file is corrupted? Should we stop consuming (i.e. rethrow the exception)?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [beam] rohdesamuel commented on a change in pull request #11163:
[BEAM-9548] Add better error handling to the TestStreamServiceController
Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#discussion_r397342416
##########
File path: sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
##########
@@ -166,13 +169,15 @@ def _wait_until_file_exists(self, timeout_secs=30):
# Wait for up to `timeout_secs` for the file to be available.
start = time.time()
- path = os.path.join(self._cache_dir, *self._labels)
- while not os.path.exists(path):
+ while not os.path.exists(self._path):
time.sleep(1)
if time.time() - start > timeout_timestamp_secs:
+ from apache_beam.runners.interactive.pipeline_instrument import CacheKey
+ pcollection_var = CacheKey.from_str(self._labels[-1]).var
Review comment:
Yeah, because these are Python variables, they have very strict naming conventions which is ok to use for file names (alphanumeric with the addition of an underscore).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [beam] rohdesamuel commented on a change in pull request #11163:
[BEAM-9548] Add better error handling to the TestStreamServiceController
Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#discussion_r397344411
##########
File path: sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
##########
@@ -202,14 +207,24 @@ def _emit_from_file(self, fh, tail):
# The first line at pos = 0 is always the header. Read the line without
# the new line.
to_decode = line[:-1]
- if pos == 0:
- header = TestStreamFileHeader()
- header.ParseFromString(self._coder.decode(to_decode))
- yield header
+ proto_cls = TestStreamFileHeader if pos == 0 else TestStreamFileRecord
+ msg = self._try_parse_as(proto_cls, to_decode)
+ if msg:
+ yield msg
else:
- record = TestStreamFileRecord()
- record.ParseFromString(self._coder.decode(to_decode))
- yield record
+ break
+
+ def _try_parse_as(self, proto_cls, to_decode):
+ try:
+ msg = proto_cls()
+ msg.ParseFromString(self._coder.decode(to_decode))
+ except DecodeError:
+ _LOGGER.error(
+ 'Could not parse as %s. This can indicate that the cache is '
+ 'corruputed. Please restart the kernel. '
+ '\nfile: %s \nmessage: %s', proto_cls, self._path, to_decode)
+ msg = None
Review comment:
No, if we rethrow the exception, it will get handled by the GRPC layer above and get turned into gobbledygook. This does stop processing because it returns msg = None which breaks out of the loop.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [beam] pabloem commented on issue #11163: [BEAM-9548] Add better
error handling to the TestStreamServiceController
Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#issuecomment-603460371
retest this please
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [beam] pabloem commented on issue #11163: [BEAM-9548] Add better
error handling to the TestStreamServiceController
Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#issuecomment-603427755
retest this please
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [beam] pabloem merged pull request #11163: [BEAM-9548] Add better
error handling to the TestStreamServiceController
Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [beam] pabloem commented on issue #11163: [BEAM-9548] Add better
error handling to the TestStreamServiceController
Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#issuecomment-603478230
Run Python PreCommit
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [beam] pabloem commented on issue #11163: [BEAM-9548] Add better
error handling to the TestStreamServiceController
Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#issuecomment-603462409
retest this please
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [beam] rohdesamuel commented on issue #11163: [BEAM-9548] Add
better error handling to the TestStreamServiceController
Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#issuecomment-601417922
R: @pabloem
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services