You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/03/11 23:55:50 UTC

[GitHub] [incubator-gobblin] arjun4084346 opened a new pull request #2921: compile a flow before storing it in spec catalog

arjun4084346 opened a new pull request #2921: compile a flow before storing it in spec catalog
URL: https://github.com/apache/incubator-gobblin/pull/2921
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below! @jack-moseley  @sv2000  please review
   
   
   ### JIRA
   - [x] My PR addresses the following [GOBBLIN-1082](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if applicable):
   compile a flow , and if compilation is successful then only store it in spec catalog
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   trivial changes
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #2921: compile a flow before storing it in spec catalog

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #2921: compile a flow before storing it in spec catalog
URL: https://github.com/apache/incubator-gobblin/pull/2921#discussion_r391785880
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
 ##########
 @@ -296,27 +297,44 @@ public Spec getSpecWrapper(URI uri) {
    */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
     Map<String, AddSpecResponse> responseMap = new HashMap<>();
-    try {
-      Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
-      Preconditions.checkNotNull(spec);
+    Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
+    Preconditions.checkNotNull(spec);
+
+    log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
+        ((FlowSpec) spec).getConfigAsProperties()));
+    if (triggerListener) {
+      AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+      for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
+        responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
+      }
+    }
 
+    boolean compileSuccess = isCompileSuccessful(responseMap);
+
+    if (compileSuccess) {
       long startTime = System.currentTimeMillis();
-      log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
-          ((FlowSpec) spec).getConfigAsProperties()));
-      specStore.addSpec(spec);
       metrics.updatePutSpecTime(startTime);
-      if (triggerListener) {
-        AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
-        for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
-          responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
-        }
+      try {
+        specStore.addSpec(spec);
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
       }
-    } catch (IOException e) {
-      throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
+    } else {
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false"));
     }
+
     return responseMap;
   }
 
+  private boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
+    AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
 
 Review comment:
   I had that null in the first commit, but then tests in FlowCatalogTest started failing because tests do not set any listeners.
   So I changed the logic to categorized it as "compiledSuccessfully" if GobblinServiceJobScheduler listener (it is a SpecCatalogListener) is not present.
   What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] jack-moseley commented on a change in pull request #2921: compile a flow before storing it in spec catalog

Posted by GitBox <gi...@apache.org>.
jack-moseley commented on a change in pull request #2921: compile a flow before storing it in spec catalog
URL: https://github.com/apache/incubator-gobblin/pull/2921#discussion_r391779606
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
 ##########
 @@ -296,27 +297,44 @@ public Spec getSpecWrapper(URI uri) {
    */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
     Map<String, AddSpecResponse> responseMap = new HashMap<>();
-    try {
-      Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
-      Preconditions.checkNotNull(spec);
+    Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
+    Preconditions.checkNotNull(spec);
+
+    log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
+        ((FlowSpec) spec).getConfigAsProperties()));
+    if (triggerListener) {
+      AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+      for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
+        responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
+      }
+    }
 
+    boolean compileSuccess = isCompileSuccessful(responseMap);
+
+    if (compileSuccess) {
       long startTime = System.currentTimeMillis();
-      log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
-          ((FlowSpec) spec).getConfigAsProperties()));
-      specStore.addSpec(spec);
       metrics.updatePutSpecTime(startTime);
-      if (triggerListener) {
-        AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
-        for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
-          responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
-        }
+      try {
+        specStore.addSpec(spec);
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
       }
-    } catch (IOException e) {
-      throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
+    } else {
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false"));
     }
+
     return responseMap;
   }
 
+  private boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
+    AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
 
 Review comment:
   Should this be `responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, null);` so that `isCompileSuccessful` returns false if it's not set (seems like it would return true now).
   
   Actually where does the key `ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS` get set anyway, I couldn't figure it out by looking at the code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #2921: compile a flow before storing it in spec catalog

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #2921: compile a flow before storing it in spec catalog
URL: https://github.com/apache/incubator-gobblin/pull/2921#discussion_r391793647
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
 ##########
 @@ -296,27 +297,44 @@ public Spec getSpecWrapper(URI uri) {
    */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
     Map<String, AddSpecResponse> responseMap = new HashMap<>();
-    try {
-      Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
-      Preconditions.checkNotNull(spec);
+    Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
+    Preconditions.checkNotNull(spec);
+
+    log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
+        ((FlowSpec) spec).getConfigAsProperties()));
+    if (triggerListener) {
+      AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+      for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
+        responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
+      }
+    }
 
+    boolean compileSuccess = isCompileSuccessful(responseMap);
+
+    if (compileSuccess) {
       long startTime = System.currentTimeMillis();
-      log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
-          ((FlowSpec) spec).getConfigAsProperties()));
-      specStore.addSpec(spec);
       metrics.updatePutSpecTime(startTime);
-      if (triggerListener) {
-        AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
-        for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
-          responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
-        }
+      try {
+        specStore.addSpec(spec);
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
       }
-    } catch (IOException e) {
-      throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
+    } else {
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false"));
     }
+
     return responseMap;
   }
 
+  private boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
+    AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
 
 Review comment:
   I think that is not possible, because GobblinServiceJobScheduler will always be on listeners list, otherwise jobs will not be scheduled (         "this.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);")
   
   And if it is on listeners, it will add its entry in responseMap.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] asfgit closed pull request #2921: compile a flow before storing it in spec catalog

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #2921: compile a flow before storing it in spec catalog
URL: https://github.com/apache/incubator-gobblin/pull/2921
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] codecov-io commented on issue #2921: compile a flow before storing it in spec catalog

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #2921: compile a flow before storing it in spec catalog
URL: https://github.com/apache/incubator-gobblin/pull/2921#issuecomment-598013567
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2921?src=pr&el=h1) Report
   > Merging [#2921](https://codecov.io/gh/apache/incubator-gobblin/pull/2921?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/6418dcfbb928aaade047ef51366595957d7bdf81?src=pr&el=desc) will **decrease** coverage by `0.39%`.
   > The diff coverage is `48.38%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2921/graphs/tree.svg?width=650&token=4MgURJ0bGc&height=150&src=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2921?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##             master    #2921     +/-   ##
   ===========================================
   - Coverage     45.87%   45.47%   -0.4%     
   + Complexity     9188     9109     -79     
   ===========================================
     Files          1934     1936      +2     
     Lines         72930    73057    +127     
     Branches       8044     8051      +7     
   ===========================================
   - Hits          33455    33223    -232     
   - Misses        36404    36772    +368     
   + Partials       3071     3062      -9
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2921?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../org/apache/gobblin/service/ServiceConfigKeys.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2921/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc2VydmljZS9TZXJ2aWNlQ29uZmlnS2V5cy5qYXZh) | `0% <ø> (ø)` | `0 <0> (ø)` | :arrow_down: |
   | [...blin/service/FlowConfigV2ResourceLocalHandler.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2921/diff?src=pr&el=tree#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi1mbG93LWNvbmZpZy1zZXJ2aWNlL2dvYmJsaW4tZmxvdy1jb25maWctc2VydmljZS1zZXJ2ZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc2VydmljZS9GbG93Q29uZmlnVjJSZXNvdXJjZUxvY2FsSGFuZGxlci5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
   | [.../modules/scheduler/GobblinServiceJobScheduler.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2921/diff?src=pr&el=tree#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9zY2hlZHVsZXIvR29iYmxpblNlcnZpY2VKb2JTY2hlZHVsZXIuamF2YQ==) | `50.85% <50%> (-2.86%)` | `15 <0> (-4)` | |
   | [...ache/gobblin/runtime/spec\_catalog/FlowCatalog.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2921/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvc3BlY19jYXRhbG9nL0Zsb3dDYXRhbG9nLmphdmE=) | `47.69% <57.14%> (-0.25%)` | `16 <1> (+1)` | |
   | [...bblin/runtime/job\_monitor/KafkaAvroJobMonitor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2921/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvam9iX21vbml0b3IvS2Fma2FBdnJvSm9iTW9uaXRvci5qYXZh) | `0% <0%> (-93.11%)` | `0% <0%> (-7%)` | |
   | [...pache/gobblin/runtime/kafka/HighLevelConsumer.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2921/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUva2Fma2EvSGlnaExldmVsQ29uc3VtZXIuamF2YQ==) | `0% <0%> (-81.04%)` | `0% <0%> (-12%)` | |
   | [...n/runtime/job\_monitor/SLAEventKafkaJobMonitor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2921/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvam9iX21vbml0b3IvU0xBRXZlbnRLYWZrYUpvYk1vbml0b3IuamF2YQ==) | `0% <0%> (-80.27%)` | `0% <0%> (-17%)` | |
   | [...blin/service/monitoring/KafkaJobStatusMonitor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2921/diff?src=pr&el=tree#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9LYWZrYUpvYlN0YXR1c01vbml0b3IuamF2YQ==) | `0% <0%> (-72.23%)` | `0% <0%> (-16%)` | |
   | [...e/gobblin/runtime/job\_monitor/KafkaJobMonitor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2921/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvam9iX21vbml0b3IvS2Fma2FKb2JNb25pdG9yLmphdmE=) | `0% <0%> (-65.22%)` | `0% <0%> (-9%)` | |
   | [.../service/monitoring/KafkaAvroJobStatusMonitor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2921/diff?src=pr&el=tree#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9LYWZrYUF2cm9Kb2JTdGF0dXNNb25pdG9yLmphdmE=) | `0% <0%> (-56.67%)` | `0% <0%> (-13%)` | |
   | ... and [21 more](https://codecov.io/gh/apache/incubator-gobblin/pull/2921/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2921?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2921?src=pr&el=footer). Last update [6418dcf...82525a5](https://codecov.io/gh/apache/incubator-gobblin/pull/2921?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #2921: compile a flow before storing it in spec catalog

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #2921: compile a flow before storing it in spec catalog
URL: https://github.com/apache/incubator-gobblin/pull/2921#discussion_r391785880
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
 ##########
 @@ -296,27 +297,44 @@ public Spec getSpecWrapper(URI uri) {
    */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
     Map<String, AddSpecResponse> responseMap = new HashMap<>();
-    try {
-      Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
-      Preconditions.checkNotNull(spec);
+    Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
+    Preconditions.checkNotNull(spec);
+
+    log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
+        ((FlowSpec) spec).getConfigAsProperties()));
+    if (triggerListener) {
+      AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+      for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
+        responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
+      }
+    }
 
+    boolean compileSuccess = isCompileSuccessful(responseMap);
+
+    if (compileSuccess) {
       long startTime = System.currentTimeMillis();
-      log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
-          ((FlowSpec) spec).getConfigAsProperties()));
-      specStore.addSpec(spec);
       metrics.updatePutSpecTime(startTime);
-      if (triggerListener) {
-        AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
-        for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
-          responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
-        }
+      try {
+        specStore.addSpec(spec);
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
       }
-    } catch (IOException e) {
-      throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
+    } else {
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false"));
     }
+
     return responseMap;
   }
 
+  private boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
+    AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
 
 Review comment:
   I had that null in the first commit, but then tests in FlowCatalogTest started failing because tests do not set any listeners.
   So I changed the logic to categorized it as "compiledSuccessfully" if GobblinServiceJobScheduler listener (it is a SpecCatalogListener) does not present.
   What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] jack-moseley commented on a change in pull request #2921: compile a flow before storing it in spec catalog

Posted by GitBox <gi...@apache.org>.
jack-moseley commented on a change in pull request #2921: compile a flow before storing it in spec catalog
URL: https://github.com/apache/incubator-gobblin/pull/2921#discussion_r391399050
 
 

 ##########
 File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
 ##########
 @@ -68,19 +67,27 @@ public CreateKVResponse createFlowConfig(FlowConfig flowConfig, boolean triggerL
     }
 
     Map<String, AddSpecResponse> responseMap = this.flowCatalog.put(flowSpec, triggerListener);
-    HttpStatus httpStatus = HttpStatus.S_201_CREATED;
+    HttpStatus httpStatus;
 
     if (flowConfig.hasExplain() && flowConfig.isExplain()) {
       //This is an Explain request. So no resource is actually created.
       //Enrich original FlowConfig entity by adding the compiledFlow to the properties map.
       StringMap props = flowConfig.getProperties();
-      AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, null);
+      AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, null);
       props.put("gobblin.flow.compiled",
           addSpecResponse != null && addSpecResponse.getValue() != null ? StringEscapeUtils.escapeJson(addSpecResponse.getValue()) : "");
       flowConfig.setProperties(props);
+    }
+
+    if (flowConfig.hasExplain() && flowConfig.isExplain()) {
 
 Review comment:
   I think you're ending an if block and then starting an identical one (`if (flowConfig.hasExplain() && flowConfig.isExplain())`) right after instead of just leaving it open here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] jack-moseley commented on a change in pull request #2921: compile a flow before storing it in spec catalog

Posted by GitBox <gi...@apache.org>.
jack-moseley commented on a change in pull request #2921: compile a flow before storing it in spec catalog
URL: https://github.com/apache/incubator-gobblin/pull/2921#discussion_r391789476
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
 ##########
 @@ -296,27 +297,44 @@ public Spec getSpecWrapper(URI uri) {
    */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
     Map<String, AddSpecResponse> responseMap = new HashMap<>();
-    try {
-      Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
-      Preconditions.checkNotNull(spec);
+    Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
+    Preconditions.checkNotNull(spec);
+
+    log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
+        ((FlowSpec) spec).getConfigAsProperties()));
+    if (triggerListener) {
+      AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+      for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
+        responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
+      }
+    }
 
+    boolean compileSuccess = isCompileSuccessful(responseMap);
+
+    if (compileSuccess) {
       long startTime = System.currentTimeMillis();
-      log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
-          ((FlowSpec) spec).getConfigAsProperties()));
-      specStore.addSpec(spec);
       metrics.updatePutSpecTime(startTime);
-      if (triggerListener) {
-        AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
-        for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
-          responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
-        }
+      try {
+        specStore.addSpec(spec);
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
       }
-    } catch (IOException e) {
-      throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
+    } else {
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false"));
     }
+
     return responseMap;
   }
 
+  private boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
+    AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
 
 Review comment:
   Hmm I guess that is fine. But is it possible for that key to be missing in actual GaaS?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #2921: compile a flow before storing it in spec catalog

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #2921: compile a flow before storing it in spec catalog
URL: https://github.com/apache/incubator-gobblin/pull/2921#discussion_r391782393
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
 ##########
 @@ -296,27 +297,44 @@ public Spec getSpecWrapper(URI uri) {
    */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
     Map<String, AddSpecResponse> responseMap = new HashMap<>();
-    try {
-      Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
-      Preconditions.checkNotNull(spec);
+    Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
+    Preconditions.checkNotNull(spec);
+
+    log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
+        ((FlowSpec) spec).getConfigAsProperties()));
+    if (triggerListener) {
+      AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+      for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
+        responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
+      }
+    }
 
+    boolean compileSuccess = isCompileSuccessful(responseMap);
+
+    if (compileSuccess) {
       long startTime = System.currentTimeMillis();
-      log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
-          ((FlowSpec) spec).getConfigAsProperties()));
-      specStore.addSpec(spec);
       metrics.updatePutSpecTime(startTime);
-      if (triggerListener) {
-        AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
-        for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
-          responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
-        }
+      try {
+        specStore.addSpec(spec);
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
       }
-    } catch (IOException e) {
-      throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
+    } else {
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false"));
     }
+
     return responseMap;
   }
 
+  private boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
+    AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
 
 Review comment:
   FlowCatalog::put inserts this
   responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
   entry.getKey().getName() is 
   default String getName() {
       return getClass().getName();
     }

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services