You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/06/04 03:26:05 UTC

[GitHub] [incubator-gobblin] arjun4084346 opened a new pull request #3027: cancel flow execution when dag manager is disabled

arjun4084346 opened a new pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027


   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-XXX
   
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if applicable):
   
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#discussion_r441110026



##########
File path: gobblin-modules/gobblin-kafka-09/build.gradle
##########
@@ -87,6 +87,9 @@ artifacts {
 }
 
 test {
+  // this module's test cases hang often when running in parallel.
+  // making every test class execute in a forked test process till tests are fixed
+  forkEvery = 1

Review comment:
       yes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] codecov-commenter edited a comment on pull request #3027: cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#issuecomment-641729270


   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=h1) Report
   > Merging [#3027](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/31f2ef1478c3b5d4887cfc772f0e85057cde1c56&el=desc) will **increase** coverage by `0.03%`.
   > The diff coverage is `16.54%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3027      +/-   ##
   ============================================
   + Coverage     45.66%   45.70%   +0.03%     
   - Complexity     9296     9303       +7     
   ============================================
     Files          1956     1957       +1     
     Lines         74397    74562     +165     
     Branches       8247     8279      +32     
   ============================================
   + Hits          33977    34079     +102     
   - Misses        37242    37295      +53     
   - Partials       3178     3188      +10     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...a/org/apache/gobblin/runtime/api/SpecExecutor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9hcGkvU3BlY0V4ZWN1dG9yLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...a/org/apache/gobblin/runtime/api/SpecProducer.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9hcGkvU3BlY1Byb2R1Y2VyLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [.../apache/gobblin/cluster/GobblinClusterManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkNsdXN0ZXJNYW5hZ2VyLmphdmE=) | `52.09% <ø> (-0.23%)` | `22.00 <0.00> (-1.00)` | |
   | [...ache/gobblin/cluster/GobblinHelixJobScheduler.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkhlbGl4Sm9iU2NoZWR1bGVyLmphdmE=) | `37.57% <0.00%> (-1.24%)` | `6.00 <0.00> (ø)` | |
   | [...pache/gobblin/cluster/JobConfigurationManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSm9iQ29uZmlndXJhdGlvbk1hbmFnZXIuamF2YQ==) | `81.39% <0.00%> (-6.11%)` | `10.00 <0.00> (ø)` | |
   | [...blin/cluster/StreamingJobConfigurationManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvU3RyZWFtaW5nSm9iQ29uZmlndXJhdGlvbk1hbmFnZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...lin/cluster/event/CancelJobConfigArrivalEvent.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvZXZlbnQvQ2FuY2VsSm9iQ29uZmlnQXJyaXZhbEV2ZW50LmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...apache/gobblin/runtime/api/JobCatalogListener.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL0pvYkNhdGFsb2dMaXN0ZW5lci5qYXZh) | `74.07% <0.00%> (-25.93%)` | `0.00 <0.00> (ø)` | |
   | [.../apache/gobblin/runtime/api/MutableJobCatalog.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL011dGFibGVKb2JDYXRhbG9nLmphdmE=) | `81.25% <0.00%> (-5.42%)` | `0.00 <0.00> (ø)` | |
   | [...n/runtime/job\_catalog/JobCatalogListenersList.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvam9iX2NhdGFsb2cvSm9iQ2F0YWxvZ0xpc3RlbmVyc0xpc3QuamF2YQ==) | `63.63% <0.00%> (-10.05%)` | `10.00 <0.00> (ø)` | |
   | ... and [28 more](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=footer). Last update [31f2ef1...459a424](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#discussion_r522486541



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -298,24 +296,14 @@ synchronized public void stopDag(URI uri) throws IOException {
   /**
    * Add the specified flow to {@link DagManager#cancelQueue}
    */
-  private void killFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException {
+  void killFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException {
     int queueId =  DagManagerUtils.getDagQueueId(flowExecutionId, this.numThreads);
     String dagId = DagManagerUtils.generateDagId(flowGroup, flowName, flowExecutionId);
     if (!this.cancelQueue[queueId].offer(dagId)) {
       throw new IOException("Could not add dag " + dagId + " to cancellation queue.");
     }
   }
 
-  @Subscribe

Review comment:
       It is doing that, right? see GobblinServiceManager.java




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#discussion_r442500219



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
##########
@@ -385,6 +400,42 @@ public void remove(Spec spec, Properties headers) throws IOException {
     }
   }
 
+  @Subscribe
+  public void handleKillFlowEvent(KillFlowEvent killFlowEvent) {

Review comment:
       I am a little concerned with the extent of hacks we are introducing based on whether the dag manager mode is enabled. I think we should evaluate doing this against just making DagManager the standard de-facto code path.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 closed pull request #3027: cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
arjun4084346 closed pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] codecov-commenter edited a comment on pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#issuecomment-641729270


   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=h1) Report
   > Merging [#3027](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/31f2ef1478c3b5d4887cfc772f0e85057cde1c56&el=desc) will **increase** coverage by `0.17%`.
   > The diff coverage is `16.54%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3027      +/-   ##
   ============================================
   + Coverage     45.66%   45.84%   +0.17%     
   - Complexity     9296     9330      +34     
   ============================================
     Files          1956     1957       +1     
     Lines         74397    74575     +178     
     Branches       8247     8279      +32     
   ============================================
   + Hits          33977    34187     +210     
   + Misses        37242    37181      -61     
   - Partials       3178     3207      +29     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...a/org/apache/gobblin/runtime/api/SpecExecutor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9hcGkvU3BlY0V4ZWN1dG9yLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...a/org/apache/gobblin/runtime/api/SpecProducer.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9hcGkvU3BlY1Byb2R1Y2VyLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [.../apache/gobblin/cluster/GobblinClusterManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkNsdXN0ZXJNYW5hZ2VyLmphdmE=) | `52.09% <ø> (-0.23%)` | `22.00 <0.00> (-1.00)` | |
   | [...ache/gobblin/cluster/GobblinHelixJobScheduler.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkhlbGl4Sm9iU2NoZWR1bGVyLmphdmE=) | `37.57% <0.00%> (-1.24%)` | `6.00 <0.00> (ø)` | |
   | [...pache/gobblin/cluster/JobConfigurationManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSm9iQ29uZmlndXJhdGlvbk1hbmFnZXIuamF2YQ==) | `81.39% <0.00%> (-6.11%)` | `10.00 <0.00> (ø)` | |
   | [...blin/cluster/StreamingJobConfigurationManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvU3RyZWFtaW5nSm9iQ29uZmlndXJhdGlvbk1hbmFnZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...lin/cluster/event/CancelJobConfigArrivalEvent.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvZXZlbnQvQ2FuY2VsSm9iQ29uZmlnQXJyaXZhbEV2ZW50LmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...apache/gobblin/runtime/api/JobCatalogListener.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL0pvYkNhdGFsb2dMaXN0ZW5lci5qYXZh) | `74.07% <0.00%> (-25.93%)` | `0.00 <0.00> (ø)` | |
   | [.../apache/gobblin/runtime/api/MutableJobCatalog.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL011dGFibGVKb2JDYXRhbG9nLmphdmE=) | `81.25% <0.00%> (-5.42%)` | `0.00 <0.00> (ø)` | |
   | [...n/runtime/job\_catalog/JobCatalogListenersList.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvam9iX2NhdGFsb2cvSm9iQ2F0YWxvZ0xpc3RlbmVyc0xpc3QuamF2YQ==) | `63.63% <0.00%> (-10.05%)` | `10.00 <0.00> (ø)` | |
   | ... and [39 more](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=footer). Last update [31f2ef1...fe2d657](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#discussion_r441117149



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
##########
@@ -127,17 +121,10 @@ protected void createMetrics() {
       }
     }
 
-    String verbName = record.getMetadata().get(VERB_KEY);

Review comment:
       VERB_KEY should be guaranteed, otherwise this would have thrown exception




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] codecov-commenter edited a comment on pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#issuecomment-641729270


   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=h1) Report
   > Merging [#3027](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/31f2ef1478c3b5d4887cfc772f0e85057cde1c56&el=desc) will **decrease** coverage by `36.39%`.
   > The diff coverage is `4.51%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #3027       +/-   ##
   ============================================
   - Coverage     45.66%   9.27%   -36.40%     
   + Complexity     9296    1695     -7601     
   ============================================
     Files          1956    1957        +1     
     Lines         74397   74564      +167     
     Branches       8247    8279       +32     
   ============================================
   - Hits          33977    6919    -27058     
   - Misses        37242   66971    +29729     
   + Partials       3178     674     -2504     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...a/org/apache/gobblin/runtime/api/SpecExecutor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9hcGkvU3BlY0V4ZWN1dG9yLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...a/org/apache/gobblin/runtime/api/SpecProducer.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9hcGkvU3BlY1Byb2R1Y2VyLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [.../apache/gobblin/cluster/GobblinClusterManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkNsdXN0ZXJNYW5hZ2VyLmphdmE=) | `42.79% <ø> (-9.53%)` | `19.00 <0.00> (-4.00)` | |
   | [...ache/gobblin/cluster/GobblinHelixJobScheduler.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkhlbGl4Sm9iU2NoZWR1bGVyLmphdmE=) | `17.19% <0.00%> (-21.62%)` | `2.00 <0.00> (-4.00)` | |
   | [...pache/gobblin/cluster/JobConfigurationManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSm9iQ29uZmlndXJhdGlvbk1hbmFnZXIuamF2YQ==) | `65.11% <0.00%> (-22.39%)` | `7.00 <0.00> (-3.00)` | |
   | [...blin/cluster/StreamingJobConfigurationManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvU3RyZWFtaW5nSm9iQ29uZmlndXJhdGlvbk1hbmFnZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...lin/cluster/event/CancelJobConfigArrivalEvent.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvZXZlbnQvQ2FuY2VsSm9iQ29uZmlnQXJyaXZhbEV2ZW50LmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...apache/gobblin/kafka/writer/Kafka09DataWriter.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtMDkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4va2Fma2Evd3JpdGVyL0thZmthMDlEYXRhV3JpdGVyLmphdmE=) | `0.00% <0.00%> (-76.79%)` | `0.00 <0.00> (-9.00)` | |
   | [...apache/gobblin/runtime/api/JobCatalogListener.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL0pvYkNhdGFsb2dMaXN0ZW5lci5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00 <0.00> (ø)` | |
   | [.../apache/gobblin/runtime/api/MutableJobCatalog.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL011dGFibGVKb2JDYXRhbG9nLmphdmE=) | `0.00% <0.00%> (-86.67%)` | `0.00 <0.00> (ø)` | |
   | ... and [1049 more](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=footer). Last update [31f2ef1...64537f3](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] codecov-commenter commented on pull request #3027: cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#issuecomment-641729270


   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=h1) Report
   > Merging [#3027](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/31f2ef1478c3b5d4887cfc772f0e85057cde1c56&el=desc) will **increase** coverage by `0.03%`.
   > The diff coverage is `16.54%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3027      +/-   ##
   ============================================
   + Coverage     45.66%   45.70%   +0.03%     
   - Complexity     9296     9303       +7     
   ============================================
     Files          1956     1957       +1     
     Lines         74397    74562     +165     
     Branches       8247     8279      +32     
   ============================================
   + Hits          33977    34079     +102     
   - Misses        37242    37295      +53     
   - Partials       3178     3188      +10     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...a/org/apache/gobblin/runtime/api/SpecExecutor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9hcGkvU3BlY0V4ZWN1dG9yLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...a/org/apache/gobblin/runtime/api/SpecProducer.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9hcGkvU3BlY1Byb2R1Y2VyLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [.../apache/gobblin/cluster/GobblinClusterManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkNsdXN0ZXJNYW5hZ2VyLmphdmE=) | `52.09% <ø> (-0.23%)` | `22.00 <0.00> (-1.00)` | |
   | [...ache/gobblin/cluster/GobblinHelixJobScheduler.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkhlbGl4Sm9iU2NoZWR1bGVyLmphdmE=) | `37.57% <0.00%> (-1.24%)` | `6.00 <0.00> (ø)` | |
   | [...pache/gobblin/cluster/JobConfigurationManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSm9iQ29uZmlndXJhdGlvbk1hbmFnZXIuamF2YQ==) | `81.39% <0.00%> (-6.11%)` | `10.00 <0.00> (ø)` | |
   | [...blin/cluster/StreamingJobConfigurationManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvU3RyZWFtaW5nSm9iQ29uZmlndXJhdGlvbk1hbmFnZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...lin/cluster/event/CancelJobConfigArrivalEvent.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvZXZlbnQvQ2FuY2VsSm9iQ29uZmlnQXJyaXZhbEV2ZW50LmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...apache/gobblin/runtime/api/JobCatalogListener.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL0pvYkNhdGFsb2dMaXN0ZW5lci5qYXZh) | `74.07% <0.00%> (-25.93%)` | `0.00 <0.00> (ø)` | |
   | [.../apache/gobblin/runtime/api/MutableJobCatalog.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL011dGFibGVKb2JDYXRhbG9nLmphdmE=) | `81.25% <0.00%> (-5.42%)` | `0.00 <0.00> (ø)` | |
   | [...n/runtime/job\_catalog/JobCatalogListenersList.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvam9iX2NhdGFsb2cvSm9iQ2F0YWxvZ0xpc3RlbmVyc0xpc3QuamF2YQ==) | `63.63% <0.00%> (-10.05%)` | `10.00 <0.00> (ø)` | |
   | ... and [28 more](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=footer). Last update [31f2ef1...459a424](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] codecov-commenter edited a comment on pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#issuecomment-641729270


   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=h1) Report
   > Merging [#3027](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/31f2ef1478c3b5d4887cfc772f0e85057cde1c56&el=desc) will **increase** coverage by `0.03%`.
   > The diff coverage is `16.54%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3027      +/-   ##
   ============================================
   + Coverage     45.66%   45.70%   +0.03%     
   - Complexity     9296     9304       +8     
   ============================================
     Files          1956     1957       +1     
     Lines         74397    74564     +167     
     Branches       8247     8279      +32     
   ============================================
   + Hits          33977    34080     +103     
   - Misses        37242    37299      +57     
   - Partials       3178     3185       +7     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...a/org/apache/gobblin/runtime/api/SpecExecutor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9hcGkvU3BlY0V4ZWN1dG9yLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...a/org/apache/gobblin/runtime/api/SpecProducer.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9hcGkvU3BlY1Byb2R1Y2VyLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [.../apache/gobblin/cluster/GobblinClusterManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkNsdXN0ZXJNYW5hZ2VyLmphdmE=) | `52.09% <ø> (-0.23%)` | `22.00 <0.00> (-1.00)` | |
   | [...ache/gobblin/cluster/GobblinHelixJobScheduler.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkhlbGl4Sm9iU2NoZWR1bGVyLmphdmE=) | `37.57% <0.00%> (-1.24%)` | `6.00 <0.00> (ø)` | |
   | [...pache/gobblin/cluster/JobConfigurationManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSm9iQ29uZmlndXJhdGlvbk1hbmFnZXIuamF2YQ==) | `81.39% <0.00%> (-6.11%)` | `10.00 <0.00> (ø)` | |
   | [...blin/cluster/StreamingJobConfigurationManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvU3RyZWFtaW5nSm9iQ29uZmlndXJhdGlvbk1hbmFnZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...lin/cluster/event/CancelJobConfigArrivalEvent.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvZXZlbnQvQ2FuY2VsSm9iQ29uZmlnQXJyaXZhbEV2ZW50LmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...apache/gobblin/runtime/api/JobCatalogListener.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL0pvYkNhdGFsb2dMaXN0ZW5lci5qYXZh) | `74.07% <0.00%> (-25.93%)` | `0.00 <0.00> (ø)` | |
   | [.../apache/gobblin/runtime/api/MutableJobCatalog.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL011dGFibGVKb2JDYXRhbG9nLmphdmE=) | `81.25% <0.00%> (-5.42%)` | `0.00 <0.00> (ø)` | |
   | [...n/runtime/job\_catalog/JobCatalogListenersList.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvam9iX2NhdGFsb2cvSm9iQ2F0YWxvZ0xpc3RlbmVyc0xpc3QuamF2YQ==) | `63.63% <0.00%> (-10.05%)` | `10.00 <0.00> (ø)` | |
   | ... and [30 more](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=footer). Last update [31f2ef1...64537f3](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
autumnust commented on pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#issuecomment-646305647


   The PR itself LGTM, but I don’t know indication of this change for the service itself, especially the backward-incompatible part. Will leave it to more knowledgable reviewers for this point


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#discussion_r443074977



##########
File path: gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
##########
@@ -84,6 +83,7 @@ public long bytesWritten() {
       };
 
   private final Producer<K, V> producer;
+  @EqualsAndHashCode.Include

Review comment:
       We instantiate different executors(producers) for different kafka topics to send specs to. So I found only topic name to be relevant in comparing producers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#discussion_r441128943



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
##########
@@ -100,17 +101,29 @@ protected void shutdownMetrics()
   @Override
   protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
     try {
-      Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.getValue());
-      for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
-        if (parsedMessage instanceof Either.Left) {
-          this.newSpecs.inc();
-          this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft());
-        } else if (parsedMessage instanceof Either.Right) {
-          this.removedSpecs.inc();
-          URI jobSpecUri = ((Either.Right<JobSpec, URI>) parsedMessage).getRight();
-          this.jobCatalog.remove(jobSpecUri);
-          // Delete the job state if it is a delete spec request
-          deleteStateStore(jobSpecUri);
+      Collection<JobSpec> parsedCollection = parseJobSpec(message.getValue());
+      for (JobSpec parsedMessage : parsedCollection) {
+        SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(parsedMessage.getMetadata().get(VERB_KEY));
+
+        switch (verb) {
+          case ADD:
+          case UPDATE:
+          case UNKNOWN: // unknown are considered as add request to maintain backward compatibility
+            this.newSpecs.inc();
+            this.jobCatalog.put(parsedMessage);
+            break;
+          case DELETE:
+            this.removedSpecs.inc();
+            URI jobSpecUri = parsedMessage.getUri();
+            this.jobCatalog.remove(jobSpecUri);
+            // Delete the job state if it is a delete spec request
+            deleteStateStore(jobSpecUri);
+            break;
+          case CANCEL:
+            this.jobCatalog.cancel(parsedMessage);
+            break;
+          default:

Review comment:
       If we throw an error, job monitor thread will die and then we will not be able to process other messages.
   Really not sure which approach is better.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] codecov-commenter edited a comment on pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#issuecomment-641729270


   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=h1) Report
   > Merging [#3027](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/31f2ef1478c3b5d4887cfc772f0e85057cde1c56&el=desc) will **increase** coverage by `0.14%`.
   > The diff coverage is `16.54%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3027      +/-   ##
   ============================================
   + Coverage     45.66%   45.81%   +0.14%     
   - Complexity     9296     9329      +33     
   ============================================
     Files          1956     1957       +1     
     Lines         74397    74575     +178     
     Branches       8247     8279      +32     
   ============================================
   + Hits          33977    34164     +187     
   + Misses        37242    37206      -36     
   - Partials       3178     3205      +27     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...a/org/apache/gobblin/runtime/api/SpecExecutor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9hcGkvU3BlY0V4ZWN1dG9yLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...a/org/apache/gobblin/runtime/api/SpecProducer.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vcnVudGltZS9hcGkvU3BlY1Byb2R1Y2VyLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [.../apache/gobblin/cluster/GobblinClusterManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkNsdXN0ZXJNYW5hZ2VyLmphdmE=) | `52.09% <ø> (-0.23%)` | `22.00 <0.00> (-1.00)` | |
   | [...ache/gobblin/cluster/GobblinHelixJobScheduler.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkhlbGl4Sm9iU2NoZWR1bGVyLmphdmE=) | `37.57% <0.00%> (-1.24%)` | `6.00 <0.00> (ø)` | |
   | [...pache/gobblin/cluster/JobConfigurationManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSm9iQ29uZmlndXJhdGlvbk1hbmFnZXIuamF2YQ==) | `81.39% <0.00%> (-6.11%)` | `10.00 <0.00> (ø)` | |
   | [...blin/cluster/StreamingJobConfigurationManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvU3RyZWFtaW5nSm9iQ29uZmlndXJhdGlvbk1hbmFnZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...lin/cluster/event/CancelJobConfigArrivalEvent.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvZXZlbnQvQ2FuY2VsSm9iQ29uZmlnQXJyaXZhbEV2ZW50LmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...apache/gobblin/runtime/api/JobCatalogListener.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL0pvYkNhdGFsb2dMaXN0ZW5lci5qYXZh) | `74.07% <0.00%> (-25.93%)` | `0.00 <0.00> (ø)` | |
   | [.../apache/gobblin/runtime/api/MutableJobCatalog.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL011dGFibGVKb2JDYXRhbG9nLmphdmE=) | `81.25% <0.00%> (-5.42%)` | `0.00 <0.00> (ø)` | |
   | [...n/runtime/job\_catalog/JobCatalogListenersList.java](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvam9iX2NhdGFsb2cvSm9iQ2F0YWxvZ0xpc3RlbmVyc0xpc3QuamF2YQ==) | `63.63% <0.00%> (-10.05%)` | `10.00 <0.00> (ø)` | |
   | ... and [40 more](https://codecov.io/gh/apache/incubator-gobblin/pull/3027/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=footer). Last update [31f2ef1...fe2d657](https://codecov.io/gh/apache/incubator-gobblin/pull/3027?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#discussion_r441117149



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
##########
@@ -127,17 +121,10 @@ protected void createMetrics() {
       }
     }
 
-    String verbName = record.getMetadata().get(VERB_KEY);

Review comment:
       VERB_KEY should be guaranteed, otherwise this will throw exception




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#discussion_r443075074



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalogListener.java
##########
@@ -91,4 +94,18 @@ public Void apply(JobCatalogListener listener) {
     }
   }
 
+  public static class CancelJobCallback extends Callback<JobCatalogListener, Void> {
+    private final JobSpec _cancelJob;
+
+    public CancelJobCallback(JobSpec cancelJob) {
+      super(Objects.toStringHelper("onCancelJob")
+          .add("cancelJob", cancelJob).toString());
+      _cancelJob = cancelJob;
+    }
+
+    @Override public Void apply(JobCatalogListener listener) {

Review comment:
       you mean a line gap b/w L106 and L107?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#discussion_r441115099



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
##########
@@ -100,17 +101,29 @@ protected void shutdownMetrics()
   @Override
   protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
     try {
-      Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.getValue());
-      for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
-        if (parsedMessage instanceof Either.Left) {
-          this.newSpecs.inc();
-          this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft());
-        } else if (parsedMessage instanceof Either.Right) {
-          this.removedSpecs.inc();
-          URI jobSpecUri = ((Either.Right<JobSpec, URI>) parsedMessage).getRight();
-          this.jobCatalog.remove(jobSpecUri);
-          // Delete the job state if it is a delete spec request
-          deleteStateStore(jobSpecUri);
+      Collection<JobSpec> parsedCollection = parseJobSpec(message.getValue());
+      for (JobSpec parsedMessage : parsedCollection) {
+        SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(parsedMessage.getMetadata().get(VERB_KEY));
+
+        switch (verb) {
+          case ADD:
+          case UPDATE:
+          case UNKNOWN: // unknown are considered as add request to maintain backward compatibility

Review comment:
       yes, verb is guaranteed to be set. unknown is its default value
   https://github.com/apache/incubator-gobblin/blob/master/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java#L347




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#discussion_r443075847



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
##########
@@ -293,28 +301,34 @@ public void orchestrate(Spec spec) throws Exception {
         this.dagManager.get().addDag(jobExecutionPlanDag, true);
       } else {
         // Schedule all compiled JobSpecs on their respective Executor
+        // This assumes that the JobSpecs do not have any dependency on each other and all can run together
         for (Dag.DagNode<JobExecutionPlan> dagNode : jobExecutionPlanDag.getNodes()) {
           DagManagerUtils.incrementJobAttempt(dagNode);
           JobExecutionPlan jobExecutionPlan = dagNode.getValue();
 
           // Run this spec on selected executor
-          SpecProducer producer = null;
+          SpecProducer<Spec> producer = null;
           try {
             producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
-            Spec jobSpec = jobExecutionPlan.getJobSpec();
+            JobSpec jobSpec = jobExecutionPlan.getJobSpec();
 
-            if (!((JobSpec) jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+            if (!jobSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
               _log.warn("JobSpec does not contain flowExecutionId.");
             }
 
             Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan);
-            _log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec, producer));
+            _log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec.toString(), producer));
 
             TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
                 getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null;
 
             producer.addSpec(jobSpec);
 
+            if (!specProducerToSpecs.containsKey(producer)) {

Review comment:
       Correct me if I misunderstood; specProducerToSpecs is a mapping from producer -> list of specs processed by that producer. This map should have size of k, where k is the number of kafka topics. Orchestrator.remove() is removing the specs from the the value part of this map.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#discussion_r439162340



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
##########
@@ -108,10 +102,10 @@ protected void createMetrics() {
   /**
    * Creates a {@link JobSpec} or {@link URI} from the {@link AvroJobSpec} record.
    * @param record the record as an {@link AvroJobSpec}
-   * @return a {@link JobSpec} or {@link URI} wrapped in a {@link Collection} of {@link Either}
+   * @return a {@link JobSpec}
    */
   @Override
-  public Collection<Either<JobSpec, URI>> parseJobSpec(AvroJobSpec record) {
+  public Collection<JobSpec> parseJobSpec(AvroJobSpec record) {

Review comment:
       In KafkaJobMonitor, caller of this api is supposed to decide whether it is an insert/update or delete type of job spec based on Either c. I could not found any other way to represent a 3rd type of job spec. It also makes less sense to me, because the job spec itself has sufficient information for caller to decide what kind of request it is; jobspec's metadata has a field VERB to store this information.
   Yes it is a backward incompatible change, I changed all the implementations of KafkaJobMonitor using this new return type, including this one (AvroJobSpecKafkaJobMonitor). If there still is any implementation of KafkaJobMonitor by external developers, probably this change can help them how to remove either?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#discussion_r442480654



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalogListener.java
##########
@@ -91,4 +94,18 @@ public Void apply(JobCatalogListener listener) {
     }
   }
 
+  public static class CancelJobCallback extends Callback<JobCatalogListener, Void> {
+    private final JobSpec _cancelJob;
+
+    public CancelJobCallback(JobSpec cancelJob) {
+      super(Objects.toStringHelper("onCancelJob")
+          .add("cancelJob", cancelJob).toString());
+      _cancelJob = cancelJob;
+    }
+
+    @Override public Void apply(JobCatalogListener listener) {

Review comment:
       start the method definition on a new line.

##########
File path: gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
##########
@@ -84,6 +83,7 @@ public long bytesWritten() {
       };
 
   private final Producer<K, V> producer;
+  @EqualsAndHashCode.Include

Review comment:
       Why does the equals check only include the topic name and not the other fields?

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
##########
@@ -385,6 +400,42 @@ public void remove(Spec spec, Properties headers) throws IOException {
     }
   }
 
+  @Subscribe
+  public void handleKillFlowEvent(KillFlowEvent killFlowEvent) {

Review comment:
       I am a little concerned with the extent of hacks we are introducing because of the dag manager mode being present or not. I think we should evaluate doing this against just making DagManager the standard de-facto code path.  

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
##########
@@ -293,28 +301,34 @@ public void orchestrate(Spec spec) throws Exception {
         this.dagManager.get().addDag(jobExecutionPlanDag, true);
       } else {
         // Schedule all compiled JobSpecs on their respective Executor
+        // This assumes that the JobSpecs do not have any dependency on each other and all can run together
         for (Dag.DagNode<JobExecutionPlan> dagNode : jobExecutionPlanDag.getNodes()) {
           DagManagerUtils.incrementJobAttempt(dagNode);
           JobExecutionPlan jobExecutionPlan = dagNode.getValue();
 
           // Run this spec on selected executor
-          SpecProducer producer = null;
+          SpecProducer<Spec> producer = null;
           try {
             producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
-            Spec jobSpec = jobExecutionPlan.getJobSpec();
+            JobSpec jobSpec = jobExecutionPlan.getJobSpec();
 
-            if (!((JobSpec) jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+            if (!jobSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
               _log.warn("JobSpec does not contain flowExecutionId.");
             }
 
             Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan);
-            _log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec, producer));
+            _log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec.toString(), producer));
 
             TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
                 getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null;
 
             producer.addSpec(jobSpec);
 
+            if (!specProducerToSpecs.containsKey(producer)) {

Review comment:
       It looks like the specProducerToSpecs map will keep growing unless the spec deletion is triggered via DELETE request. The map should be pruned on job completions, either or on success or failure.

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
##########
@@ -298,24 +296,14 @@ synchronized public void stopDag(URI uri) throws IOException {
   /**
    * Add the specified flow to {@link DagManager#cancelQueue}
    */
-  private void killFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException {
+  void killFlow(String flowGroup, String flowName, long flowExecutionId) throws IOException {
     int queueId =  DagManagerUtils.getDagQueueId(flowExecutionId, this.numThreads);
     String dagId = DagManagerUtils.generateDagId(flowGroup, flowName, flowExecutionId);
     if (!this.cancelQueue[queueId].offer(dagId)) {
       throw new IOException("Could not add dag " + dagId + " to cancellation queue.");
     }
   }
 
-  @Subscribe

Review comment:
       I see that the handleKillFlowEvent is now moved to Orchestrator. Shouldn't the Orchestrator be now registering itself with the eventbus instance to receive the messages? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#discussion_r441108741



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
##########
@@ -100,17 +101,29 @@ protected void shutdownMetrics()
   @Override
   protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
     try {
-      Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.getValue());
-      for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
-        if (parsedMessage instanceof Either.Left) {
-          this.newSpecs.inc();
-          this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft());
-        } else if (parsedMessage instanceof Either.Right) {
-          this.removedSpecs.inc();
-          URI jobSpecUri = ((Either.Right<JobSpec, URI>) parsedMessage).getRight();
-          this.jobCatalog.remove(jobSpecUri);
-          // Delete the job state if it is a delete spec request
-          deleteStateStore(jobSpecUri);
+      Collection<JobSpec> parsedCollection = parseJobSpec(message.getValue());
+      for (JobSpec parsedMessage : parsedCollection) {
+        SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(parsedMessage.getMetadata().get(VERB_KEY));
+
+        switch (verb) {
+          case ADD:
+          case UPDATE:
+          case UNKNOWN: // unknown are considered as add request to maintain backward compatibility

Review comment:
       Shouldn't the default block be the place to maintain compatibility? Is the VERB guaranteed to be set previously ? 

##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
##########
@@ -100,17 +101,29 @@ protected void shutdownMetrics()
   @Override
   protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
     try {
-      Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.getValue());
-      for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
-        if (parsedMessage instanceof Either.Left) {
-          this.newSpecs.inc();
-          this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft());
-        } else if (parsedMessage instanceof Either.Right) {
-          this.removedSpecs.inc();
-          URI jobSpecUri = ((Either.Right<JobSpec, URI>) parsedMessage).getRight();
-          this.jobCatalog.remove(jobSpecUri);
-          // Delete the job state if it is a delete spec request
-          deleteStateStore(jobSpecUri);
+      Collection<JobSpec> parsedCollection = parseJobSpec(message.getValue());
+      for (JobSpec parsedMessage : parsedCollection) {
+        SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(parsedMessage.getMetadata().get(VERB_KEY));
+
+        switch (verb) {
+          case ADD:
+          case UPDATE:
+          case UNKNOWN: // unknown are considered as add request to maintain backward compatibility
+            this.newSpecs.inc();
+            this.jobCatalog.put(parsedMessage);
+            break;
+          case DELETE:
+            this.removedSpecs.inc();
+            URI jobSpecUri = parsedMessage.getUri();
+            this.jobCatalog.remove(jobSpecUri);
+            // Delete the job state if it is a delete spec request
+            deleteStateStore(jobSpecUri);
+            break;
+          case CANCEL:
+            this.jobCatalog.cancel(parsedMessage);
+            break;
+          default:

Review comment:
       Shall we throw exceptions here instead of just printing log ? 

##########
File path: gobblin-modules/gobblin-kafka-09/build.gradle
##########
@@ -87,6 +87,9 @@ artifacts {
 }
 
 test {
+  // this module's test cases hang often when running in parallel.
+  // making every test class execute in a forked test process till tests are fixed
+  forkEvery = 1

Review comment:
       Q: Is this the only change targeting for the travis issue ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#discussion_r439150290



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
##########
@@ -108,10 +102,10 @@ protected void createMetrics() {
   /**
    * Creates a {@link JobSpec} or {@link URI} from the {@link AvroJobSpec} record.
    * @param record the record as an {@link AvroJobSpec}
-   * @return a {@link JobSpec} or {@link URI} wrapped in a {@link Collection} of {@link Either}
+   * @return a {@link JobSpec}
    */
   @Override
-  public Collection<Either<JobSpec, URI>> parseJobSpec(AvroJobSpec record) {
+  public Collection<JobSpec> parseJobSpec(AvroJobSpec record) {

Review comment:
       A failure in maintaining backward compatibility ?

##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
##########
@@ -121,5 +125,11 @@ public synchronized void remove(URI jobURI) {
       LOGGER.warn("No file with URI:" + jobURI + " is found. Deletion failed.");
     }
   }
+
+  @Override
+  public synchronized void cancel(JobSpec jobSpec) {
+    Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));

Review comment:
       I am confused by this: Check the state equals to running but log "xxx is not running" ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #3027: [GOBBLIN-1187] cancel flow execution when dag manager is disabled

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3027:
URL: https://github.com/apache/incubator-gobblin/pull/3027#discussion_r439155559



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/NonObservingFSJobCatalog.java
##########
@@ -121,5 +125,11 @@ public synchronized void remove(URI jobURI) {
       LOGGER.warn("No file with URI:" + jobURI + " is found. Deletion failed.");
     }
   }
+
+  @Override
+  public synchronized void cancel(JobSpec jobSpec) {
+    Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));

Review comment:
       Yea, the message is supposed to be "exception message to use if the check fails" (from Preconditions# checkState () javadoc)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org