You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2020/03/18 21:12:45 UTC

[GitHub] [beam] rohdesamuel opened a new pull request #11163: Add better error handling to the TestStreamServiceController

rohdesamuel opened a new pull request #11163: Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163
 
 
   Change-Id: I022f25840207eb6c931c97358f159f134393c180
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController

Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#issuecomment-603464550
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#discussion_r396810340
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
 ##########
 @@ -170,8 +170,13 @@ def run_pipeline(self, pipeline, options):
               user_pipeline)):
         streaming_cache_manager = ie.current_env().cache_manager()
         if streaming_cache_manager:
+
+          def exception_handler(e):
+            _LOGGER.error(str(e))
 
 Review comment:
   Same as above. Do we just log and not stop processing?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#discussion_r396766986
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
 ##########
 @@ -166,13 +169,15 @@ def _wait_until_file_exists(self, timeout_secs=30):
 
     # Wait for up to `timeout_secs` for the file to be available.
     start = time.time()
-    path = os.path.join(self._cache_dir, *self._labels)
-    while not os.path.exists(path):
+    while not os.path.exists(self._path):
       time.sleep(1)
       if time.time() - start > timeout_timestamp_secs:
+        from apache_beam.runners.interactive.pipeline_instrument import CacheKey
+        pcollection_var = CacheKey.from_str(self._labels[-1]).var
 
 Review comment:
   I hadn't stopeed to think that labels are a file name too, huh? I guess the final file name is the PCollection variable name? If so, users may name their PCollections something that is not supported by the OS? (or maybe not since they have to be Python variable names?)
   Anyway this is not for this PR. But just to think about.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] rohdesamuel commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#discussion_r397347169
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
 ##########
 @@ -170,8 +170,13 @@ def run_pipeline(self, pipeline, options):
               user_pipeline)):
         streaming_cache_manager = ie.current_env().cache_manager()
         if streaming_cache_manager:
+
+          def exception_handler(e):
+            _LOGGER.error(str(e))
 
 Review comment:
   For here, we this is caught at the TestStreamService level and for the same reason, we just log the error so that it's easier for the user to understand. Because it's caught at the service level, it means that we have already stopped processing.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController

Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#issuecomment-603460559
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#discussion_r396767924
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
 ##########
 @@ -202,14 +207,24 @@ def _emit_from_file(self, fh, tail):
         # The first line at pos = 0 is always the header. Read the line without
         # the new line.
         to_decode = line[:-1]
-        if pos == 0:
-          header = TestStreamFileHeader()
-          header.ParseFromString(self._coder.decode(to_decode))
-          yield header
+        proto_cls = TestStreamFileHeader if pos == 0 else TestStreamFileRecord
+        msg = self._try_parse_as(proto_cls, to_decode)
+        if msg:
+          yield msg
         else:
-          record = TestStreamFileRecord()
-          record.ParseFromString(self._coder.decode(to_decode))
-          yield record
+          break
+
+  def _try_parse_as(self, proto_cls, to_decode):
+    try:
+      msg = proto_cls()
+      msg.ParseFromString(self._coder.decode(to_decode))
+    except DecodeError:
+      _LOGGER.error(
+          'Could not parse as %s. This can indicate that the cache is '
+          'corruputed. Please restart the kernel. '
+          '\nfile: %s \nmessage: %s', proto_cls, self._path, to_decode)
+      msg = None
 
 Review comment:
   Do we just skip? This may mean that the file is corrupted? Should we stop consuming (i.e. rethrow the exception)?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] rohdesamuel commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#discussion_r397342416
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
 ##########
 @@ -166,13 +169,15 @@ def _wait_until_file_exists(self, timeout_secs=30):
 
     # Wait for up to `timeout_secs` for the file to be available.
     start = time.time()
-    path = os.path.join(self._cache_dir, *self._labels)
-    while not os.path.exists(path):
+    while not os.path.exists(self._path):
       time.sleep(1)
       if time.time() - start > timeout_timestamp_secs:
+        from apache_beam.runners.interactive.pipeline_instrument import CacheKey
+        pcollection_var = CacheKey.from_str(self._labels[-1]).var
 
 Review comment:
   Yeah, because these are Python variables, they have very strict naming conventions which is ok to use for file names (alphanumeric with the addition of an underscore).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] rohdesamuel commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on a change in pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#discussion_r397344411
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/caching/streaming_cache.py
 ##########
 @@ -202,14 +207,24 @@ def _emit_from_file(self, fh, tail):
         # The first line at pos = 0 is always the header. Read the line without
         # the new line.
         to_decode = line[:-1]
-        if pos == 0:
-          header = TestStreamFileHeader()
-          header.ParseFromString(self._coder.decode(to_decode))
-          yield header
+        proto_cls = TestStreamFileHeader if pos == 0 else TestStreamFileRecord
+        msg = self._try_parse_as(proto_cls, to_decode)
+        if msg:
+          yield msg
         else:
-          record = TestStreamFileRecord()
-          record.ParseFromString(self._coder.decode(to_decode))
-          yield record
+          break
+
+  def _try_parse_as(self, proto_cls, to_decode):
+    try:
+      msg = proto_cls()
+      msg.ParseFromString(self._coder.decode(to_decode))
+    except DecodeError:
+      _LOGGER.error(
+          'Could not parse as %s. This can indicate that the cache is '
+          'corruputed. Please restart the kernel. '
+          '\nfile: %s \nmessage: %s', proto_cls, self._path, to_decode)
+      msg = None
 
 Review comment:
   No, if we rethrow the exception, it will get handled by the GRPC layer above and get turned into gobbledygook. This does stop processing because it returns msg = None which breaks out of the loop.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController

Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#issuecomment-603460371
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController

Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#issuecomment-603427755
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem merged pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController

Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#issuecomment-603478230
 
 
   Run Python PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController

Posted by GitBox <gi...@apache.org>.
pabloem commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#issuecomment-603462409
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] rohdesamuel commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on issue #11163: [BEAM-9548] Add better error handling to the TestStreamServiceController
URL: https://github.com/apache/beam/pull/11163#issuecomment-601417922
 
 
   R: @pabloem 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services