You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/05/06 23:38:00 UTC

[jira] [Work logged] (GOBBLIN-1144) move spec store delete to gobblinservice job scheduler

     [ https://issues.apache.org/jira/browse/GOBBLIN-1144?focusedWorklogId=431493&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-431493 ]

ASF GitHub Bot logged work on GOBBLIN-1144:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/May/20 23:37
            Start Date: 06/May/20 23:37
    Worklog Time Spent: 10m 
      Work Description: enjoyear commented on a change in pull request #2981:
URL: https://github.com/apache/incubator-gobblin/pull/2981#discussion_r421151364



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
##########
@@ -293,30 +296,27 @@ public Spec getSpecWrapper(URI uri) {
    */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
     Map<String, AddSpecResponse> responseMap = new HashMap<>();
+    FlowSpec flowSpec = (FlowSpec) spec;
     Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
-    Preconditions.checkNotNull(spec);
+    Preconditions.checkNotNull(flowSpec);
+
+    log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", flowSpec.getUri(), flowSpec.getConfigAsProperties()));
+    try {
+      long startTime = System.currentTimeMillis();
+      specStore.addSpec(flowSpec);
+      metrics.updatePutSpecTime(startTime);
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot add Spec to Spec store: " + flowSpec, e);
+    }
 
-    log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
-        ((FlowSpec) spec).getConfigAsProperties()));
     if (triggerListener) {
-      AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+      AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(flowSpec);
       for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
         responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
       }
     }
 
-    boolean compileSuccess = isCompileSuccessful(responseMap);
-
-    if (compileSuccess) {
-      long startTime = System.currentTimeMillis();
-      metrics.updatePutSpecTime(startTime);
-      try {
-        if (!((FlowSpec) spec).isExplain()) {
-          specStore.addSpec(spec);
-        }
-      } catch (IOException e) {
-        throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
-      }
+    if (isCompileSuccessful(responseMap)) {
       responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
     } else {
       responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false"));

Review comment:
       Where is your logic of removing the flow spec if compilation fails?

##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
##########
@@ -293,30 +296,27 @@ public Spec getSpecWrapper(URI uri) {
    */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
     Map<String, AddSpecResponse> responseMap = new HashMap<>();
+    FlowSpec flowSpec = (FlowSpec) spec;
     Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
-    Preconditions.checkNotNull(spec);
+    Preconditions.checkNotNull(flowSpec);
+
+    log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", flowSpec.getUri(), flowSpec.getConfigAsProperties()));
+    try {
+      long startTime = System.currentTimeMillis();
+      specStore.addSpec(flowSpec);
+      metrics.updatePutSpecTime(startTime);
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot add Spec to Spec store: " + flowSpec, e);
+    }
 
-    log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
-        ((FlowSpec) spec).getConfigAsProperties()));
     if (triggerListener) {

Review comment:
       Can you move this if section into the try block like before?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 431493)
    Remaining Estimate: 0h
            Time Spent: 10m

> move spec store delete to gobblinservice job scheduler
> ------------------------------------------------------
>
>                 Key: GOBBLIN-1144
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1144
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Arjun Singh Bora
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)