You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/06/18 20:58:40 UTC

[GitHub] [samza] cameronlee314 opened a new pull request #1506: SAMZA-2303: Exclude side inputs when handling end-of-stream and watermarks for high-level

cameronlee314 opened a new pull request #1506:
URL: https://github.com/apache/samza/pull/1506


   Symptom:
   1. End-of-stream and watermarks are not properly propagated through Samza when side inputs are used.
   2. This prevents many tests from using the `TestRunner` framework, since the `TestRunner` framework relies on having tasks shut themselves down based on end-of-stream messages.
   
   Cause:
   OperatorImplGraph builds EndOfStreamStates and WatermarkStates objects with all of the input SSPs from the job model. That includes side-input SSPs. However, high-level operator tasks aren't given messages from side-input SSPs, so high-level operators should not need to include handling for end-of-stream and watermarks. The result of this issue is that end-of-stream and watermark handling tries to include side-inputs but never updates those states, which can result in not exiting properly (end-of-stream) and not correctly calculating watermarks.
   Note: We currently have tests which use partitionBy and side-inputs, but they only use a single partition, so RunLoop is able to shutdown the task (RunLoop doesn't check side inputs when determining if the task is at the end of all streams).
   
   Changes:
   1. Pass set of SSPs excluding side-inputs to high-level operators so that they don't read directly from the task model which does have side-inputs. High-level operators will then handle end-of-stream and watermark propagation without considering side-input SSPs.
   2. Change `InMemoryManager` to only use `IncomingMessageEnvelope.END_OF_STREAM_OFFSET` when the `taskName` in the `EndOfStreamMessage` is null. This prevents the issue with SAMZA-2300 which causes end-of-stream messages to not get properly get aggregated and then broadcast to all partitions (see SAMZA-2300 for more details). Some existing tests would fail without this change.
   3. Add unique `app.id` in `TestRunner` for each test. This helps prevents clashes between different tests. For example, `ControlMessageSender` has a static cache keyed by stream id of intermediate streams, and multiple tests could end up using the same key in that cache. By using a unique app id, the intermediate streams are unique, so multiple tests won't use the same key in the cache.
   
   Tests: Rewrote some tests to use the `TestRunner` and to use multiple partitions. This tests that end-of-stream messages are being propagated for the multiple-partition case.
   
   API changes (impacts testing framework only):
   1. The default `app.id` used for tests executed by `TestRunner` is set to the "in-memory scope", which is a string that is randomly generated for each test. Before this change, the `app.id` was not set.
   2. `InMemoryManager` only uses `IncomingMessageEnvelope.END_OF_STREAM_OFFSET` when the `EndOfStreamMessage` has a null `taskName`. Before this change, `InMemoryManager` used `IncomingMessageEnvelope.END_OF_STREAM_OFFSET` for all `EndOfStreamMessage`s.
   
   Upgrade/usage instructions:
   1. If tests are written using `TestRunner`, and those tests rely on `app.id` being unset, then those will need to be updated to use/read the new `app.id`. It isn't expected to be a common use case that tests rely on `app.id`.
   2. If the in-memory system is being used (which includes tests written using `TestRunner`), and it is expected that the in-memory system sets `END_OF_STREAM_OFFSET` for messages when the `taskName` is non-null, then that usage will need to be removed. The `taskName` is intended for use by intermediate streams, so it shouldn't be used outside of Samza internals anyways.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on pull request #1506: SAMZA-2303: Exclude side inputs when handling end-of-stream and watermarks for high-level

Posted by GitBox <gi...@apache.org>.
mynameborat commented on pull request #1506:
URL: https://github.com/apache/samza/pull/1506#issuecomment-869976157


   > My initial thoughts are that if we think it is useful for the general TaskModel API to separate out the side inputs (for use by actual applications), then that could help the implementation here
   
   That is one of the benefits of having the divide exposed in the TaskModel. While that requires concrete use cases from the application perspective, the other purpose was to simplify a lot of flow within ContainerStorageManager and the newly introduced classes for state restoration which right now has needs to know which SSPs are side inputs vs changelog SSPs vs input SSPs. @dxichen do you have any inputs here to see if this would benefit/simplify the existing implementation? 
   
   If we decide against it, then the changes looks good to me. It is useful to understand the long term path and potential clean up options if we want to remove this hack. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on pull request #1506: SAMZA-2303: Exclude side inputs when handling end-of-stream and watermarks for high-level

Posted by GitBox <gi...@apache.org>.
mynameborat commented on pull request #1506:
URL: https://github.com/apache/samza/pull/1506#issuecomment-869869912


   A few questions
   
   1. Would it better to have a separation within TaskModel instead of conflating SSPs into both inputs & side inputs? Doing so, will eliminate yet another hack addition to `TaskContextImpl` and potentially solve scenarios that require this divide information upstream. Checking if you have evaluated this option and what the initial thoughts are.
   2. Can we separate out the tests into two categories (one that needs to belong as part of the fix vs one that isn't) and have a separate PR for the latter category of tests?
   3. Looks like fix for SAMZA-2300 is going in as part of this PR. Why is that? Can we separate it into another PR to keep the scope of the PR to single issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 merged pull request #1506: SAMZA-2303: Exclude side inputs when handling end-of-stream and watermarks for high-level

Posted by GitBox <gi...@apache.org>.
cameronlee314 merged pull request #1506:
URL: https://github.com/apache/samza/pull/1506


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on pull request #1506: SAMZA-2303: Exclude side inputs when handling end-of-stream and watermarks for high-level

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on pull request #1506:
URL: https://github.com/apache/samza/pull/1506#issuecomment-870003549


   > > My initial thoughts are that if we think it is useful for the general TaskModel API to separate out the side inputs (for use by actual applications), then that could help the implementation here
   > 
   > That is one of the benefits of having the divide exposed in the TaskModel. While that requires concrete use cases from the application perspective, the other purpose was to simplify a lot of flow within ContainerStorageManager and the newly introduced classes for state restoration which right now has needs to know which SSPs are side inputs vs changelog SSPs vs input SSPs. @dxichen do you have any inputs here to see if this would benefit/simplify the existing implementation?
   > 
   > If we decide against it, then the changes looks good to me. It is useful to understand the long term path and potential clean up options if we want to remove this hack.
   
   Do we need to put all of this information in the main `TaskModel` API layer though? I would imagine that the state restoration layer can have multiple implementations, so it could be helpful to expose this granular SSP information somewhere, but if we put the info in `TaskModel`, would that be overloading the scope of `TaskModel` too much? Maybe not, because this is all I/O that a task is dealing with. What do others think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on pull request #1506: SAMZA-2303: Exclude side inputs when handling end-of-stream and watermarks for high-level

Posted by GitBox <gi...@apache.org>.
dxichen commented on pull request #1506:
URL: https://github.com/apache/samza/pull/1506#issuecomment-871746930


   > > > My initial thoughts are that if we think it is useful for the general TaskModel API to separate out the side inputs (for use by actual applications), then that could help the implementation here
   > > 
   > > 
   > > That is one of the benefits of having the divide exposed in the TaskModel. While that requires concrete use cases from the application perspective, the other purpose was to simplify a lot of flow within ContainerStorageManager and the newly introduced classes for state restoration which right now has needs to know which SSPs are side inputs vs changelog SSPs vs input SSPs. @dxichen do you have any inputs here to see if this would benefit/simplify the existing implementation?
   > > If we decide against it, then the changes looks good to me. It is useful to understand the long term path and potential clean up options if we want to remove this hack.
   > 
   > Do we need to put all of this information in the main `TaskModel` API layer though? I would imagine that the state restoration layer can have multiple implementations, so it could be helpful to expose this granular SSP information somewhere, but if we put the info in `TaskModel`, would that be overloading the scope of `TaskModel` too much? Maybe not, because this is all I/O that a task is dealing with. What do others think?
   
   The state restore changes would indirectly benefit from having the sideinputs SSPs separated for the main SSPs in the task model since ContainerStorageManager would be simplified if that where the case. Currently CSM also does much of the config parsing to determine the store names/SSPs of the sideinputs. This would be a good step to simplify the overloaded CSM implementation. Adding it to the task model maybe help the specific impls of restore manager layer with multiple implementations to decide whether they want to restore for side input topics, but currently that is not used in Blob or changelog restore.
   Note that all of this for parsing and side input handlers initiation is happening in the CSM constructor, meaning that the TaskInstance is not yet created at the time and therefore the TaskContext is not instantiated either. CSM could only use that TaskModel currently. In general I think this side input info could live in task model as well from the perspective of simplifying CSM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on pull request #1506: SAMZA-2303: Exclude side inputs when handling end-of-stream and watermarks for high-level

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on pull request #1506:
URL: https://github.com/apache/samza/pull/1506#issuecomment-871784644


   Summarizing some discussion in a different channel:
   
   - Even if we updated the `TaskModel` API right now, it wouldn't clean up `InternalTaskContext` because we still have to figure out how to deal with passing around `StreamMetadataCache`. Cleaning up the wiring for `StreamMetadataCache` doesn't seem high on the TODO list, so this `InternalTaskContext` would still be around for a while.
   - State restore (SEP-28/SEP-29) doesn't plan to update `TaskModel` to add explicit information about side inputs, so the priority on updating `TaskModel` isn't that high.
   - There are functionality (control message handling) and productivity benefits (testing improvements) that this PR gives, so we will get this in now.
   
   https://issues.apache.org/jira/browse/SAMZA-2661 (Add a way to determine which inputs are side inputs in TaskModel) and https://issues.apache.org/jira/browse/SAMZA-2662 (Remove InternalTaskContext hacks for wiring objects to high-level API) have been created for future cleanup.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] dxichen commented on pull request #1506: SAMZA-2303: Exclude side inputs when handling end-of-stream and watermarks for high-level

Posted by GitBox <gi...@apache.org>.
dxichen commented on pull request #1506:
URL: https://github.com/apache/samza/pull/1506#issuecomment-872407336


   Lgtm, thanks for creating the follow up tickets!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] mynameborat commented on pull request #1506: SAMZA-2303: Exclude side inputs when handling end-of-stream and watermarks for high-level

Posted by GitBox <gi...@apache.org>.
mynameborat commented on pull request #1506:
URL: https://github.com/apache/samza/pull/1506#issuecomment-869869912






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on pull request #1506: SAMZA-2303: Exclude side inputs when handling end-of-stream and watermarks for high-level

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on pull request #1506:
URL: https://github.com/apache/samza/pull/1506#issuecomment-871754032


   IMO, if we decide to add side inputs to `TaskModel`, it should primarily be because we think it is useful as a user API (and I think it does make sense to add to `TaskModel` because side inputs are external I/O as well). It is just a bonus that it makes implementation of internal components easier.
   Would you prefer to wait for updating the `TaskModel` API in order to get the rest of these changes in? We still need the `InternalTaskContext` hack in order to pass `StreamMetadataCache` around anyways, even if `TaskModel` gets updated to have side inputs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on pull request #1506: SAMZA-2303: Exclude side inputs when handling end-of-stream and watermarks for high-level

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on pull request #1506:
URL: https://github.com/apache/samza/pull/1506#issuecomment-869951777


   > 1. Would it better to have a separation within TaskModel instead of conflating SSPs into both inputs & side inputs? Doing so, will eliminate yet another hack addition to `TaskContextImpl` and potentially solve scenarios that require this divide information upstream. Checking if you have evaluated this option and what the initial thoughts are.
   
   I didn't really consider changing the `TaskModel`. My initial thoughts are that if we think it is useful for the general `TaskModel` API to separate out the side inputs (for use by actual applications), then that could help the implementation here. However, if this won't be helpful for the general API, then I don't think we should bloat/modify the API just to fix this issue (for backwards compat, we probably would need to add a new field and not modify the current one). I'm not currently convinced that this is useful enough for the general API, so I don't think that we should change the general API for this bug fix. What do you think?
   I think the root reason for why we need this `TaskContextImpl` hack is because we don't have a clean way to wire in internal components for the high-level implementation (e.g. we also need the `TaskContextImpl` hack to wire through `StreamMetadataCache` into high-level). I don't see a clean way to fix this root reason right now though.
   
   > 3. Looks like fix for SAMZA-2300 is going in as part of this PR. Why is that? Can we separate it into another PR to keep the scope of the PR to single issue?
   
   Answering #3 first: In order to test the side inputs fix (i.e. `TestLocalTableWithSideInputsEndToEnd`), I needed to modify existing tests to use multiple partitions. The current tests (only single partition) don't rely on the end-of-stream propagation. This is why we didn't recognize this bug with the current tests. However, when I switch to using multiple partitions, then the bug in SAMZA-2300 gets triggered. So it was not ideal to split them up. I also don't think it is ideal to fix both tickets in the same commit (since it makes the single commit more complex), but I felt that the benefit towards having more complete tests for the side inputs fix (by merging into a single commit) was better. 
   
   > 2. Can we separate out the tests into two categories (one that needs to belong as part of the fix vs one that isn't) and have a separate PR for the latter category of tests?
   
   I believe the only test changes that aren't as directly related to the side inputs fix is `TestLocalTableEndToEnd`. However, it does help add confidence that the SAMZA-2300 change is working as expected, so that is why I included the changes to `TestLocalTableEndToEnd` here. I do have another set of changes for migrating other tests which I did keep separate (I will post a PR for those when this one is done), but I wanted to at least have this one test in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [samza] cameronlee314 commented on pull request #1506: SAMZA-2303: Exclude side inputs when handling end-of-stream and watermarks for high-level

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on pull request #1506:
URL: https://github.com/apache/samza/pull/1506#issuecomment-869951777






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org