You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/07/23 22:05:32 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1220] Log improvement

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f30e6f4  [GOBBLIN-1220] Log improvement
f30e6f4 is described below

commit f30e6f4cd51624b7d17894336f2787ec7bf2a9ad
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu Jul 23 15:05:15 2020 -0700

    [GOBBLIN-1220] Log improvement
    
    reduce logging
    improve logging
    
    address review comment
    
    Closes #2991 from arjun4084346/logImprovement
---
 .../apache/gobblin/cluster/HelixRetriggeringJobCallable.java |  4 ++--
 .../src/main/java/org/apache/gobblin/cluster/HelixUtils.java | 12 ++++++------
 .../org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java |  2 +-
 .../runtime/spec_executorInstance/BaseServiceNodeImpl.java   |  5 +++++
 .../service/modules/flow/BaseFlowToJobSpecCompiler.java      |  4 +++-
 .../main/java/org/apache/gobblin/util/PropertiesUtils.java   |  7 +++++++
 6 files changed, 24 insertions(+), 10 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 4a6571f..bfa2dce 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -310,7 +310,7 @@ class HelixRetriggeringJobCallable implements Callable {
         // make sure the planning job is initialized (or visible) to other parallel running threads,
         // so that the same critical section check (querying Helix for job completeness)
         // can be applied.
-        HelixUtils.waitJobInitialization(planningJobManager, newPlanningId, newPlanningId, 300_000);
+        HelixUtils.waitJobInitialization(planningJobManager, newPlanningId, newPlanningId);
 
       } finally {
         // end of the critical section to check if a job with same job name is running
@@ -320,7 +320,7 @@ class HelixRetriggeringJobCallable implements Callable {
       // we can remove the job spec from the catalog because Helix will drive this job to the end.
       this.deleteJobSpec();
 
-      // If we are using non-blocking mode, this get() only guarantees the plannning job is submitted.
+      // If we are using non-blocking mode, this get() only guarantees the planning job is submitted.
       // It doesn't guarantee the job will finish because internally we won't wait for Helix completion.
       this.currentJobMonitor.get();
       this.currentJobMonitor = null;
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index b825417..fe23f20 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -122,8 +122,7 @@ public class HelixUtils {
   static void waitJobInitialization(
       HelixManager helixManager,
       String workFlowName,
-      String jobName,
-      long timeoutMillis) throws Exception {
+      String jobName) throws Exception {
     WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
 
     // If the helix job is deleted from some other thread or a completely external process,
@@ -132,13 +131,14 @@ public class HelixUtils {
     // 2) it did get initialized but deleted soon after, in which case we should stop waiting
     // To overcome this issue, we wait here till workflowContext gets initialized
     long start = System.currentTimeMillis();
+    long timeoutMillis = TimeUnit.MINUTES.toMillis(5L);
     while (workflowContext == null || workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) == null) {
       if (System.currentTimeMillis() - start > timeoutMillis) {
         log.error("Job cannot be initialized within {} milliseconds, considered as an error", timeoutMillis);
         throw new JobException("Job cannot be initialized within {} milliseconds, considered as an error");
       }
       workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
-      Thread.sleep(1000);
+      Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
       log.info("Waiting for work flow initialization.");
     }
 
@@ -159,7 +159,7 @@ public class HelixUtils {
     helixTaskDriver.start(workFlow);
     log.info("Created a work flow {}", workFlowName);
 
-    waitJobInitialization(helixManager, workFlowName, jobName, Long.MAX_VALUE);
+    waitJobInitialization(helixManager, workFlowName, jobName);
   }
 
   static void waitJobCompletion(HelixManager helixManager, String workFlowName, String jobName,
@@ -189,7 +189,7 @@ public class HelixUtils {
           return;
           case STOPPING:
             log.info("Waiting for job {} to complete... State - {}", jobName, jobState);
-            Thread.sleep(1000);
+            Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
             // Workaround for a Helix bug where a job may be stuck in the STOPPING state due to an unresponsive task.
             if (System.currentTimeMillis() > stoppingStateEndTime) {
               log.info("Deleting workflow {}", workFlowName);
@@ -199,7 +199,7 @@ public class HelixUtils {
             return;
           default:
             log.info("Waiting for job {} to complete... State - {}", jobName, jobState);
-            Thread.sleep(1000);
+            Thread.sleep(TimeUnit.SECONDS.toMillis(10L));
         }
       } else {
         // We have waited for WorkflowContext to get initialized,
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 44d54ea..c32f69b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -51,9 +51,9 @@ import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecCatalog;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecSearchObject;
 import org.apache.gobblin.runtime.api.SpecSerDe;
 import org.apache.gobblin.runtime.api.SpecStore;
-import org.apache.gobblin.runtime.api.SpecSearchObject;
 import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
 import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 import org.apache.gobblin.service.ServiceConfigKeys;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/BaseServiceNodeImpl.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/BaseServiceNodeImpl.java
index dcc0c3b..86c78d0 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/BaseServiceNodeImpl.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/BaseServiceNodeImpl.java
@@ -97,4 +97,9 @@ public class BaseServiceNodeImpl implements ServiceNode {
   public int hashCode() {
     return nodeName.hashCode();
   }
+
+  @Override
+  public String toString() {
+    return nodeName;
+  }
 }
\ No newline at end of file
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index b42de01..244a2fa 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -59,6 +59,8 @@ import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PropertiesUtils;
+
 
 // Provide base implementation for constructing multi-hops route.
 @Alpha
@@ -226,7 +228,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
       jobSpecBuilder = jobSpecBuilder.withTemplate(flowSpec.getTemplateURIs().get().iterator().next());
       try {
         jobSpec = new ResolvedJobSpec(jobSpecBuilder.build(), templateCatalog.get());
-        log.info("Resolved JobSpec properties are: " + jobSpec.getConfigAsProperties());
+        log.info("Resolved JobSpec properties are: " + PropertiesUtils.prettyPrintProperties(jobSpec.getConfigAsProperties()));
       } catch (SpecNotFoundException | JobTemplate.TemplateException e) {
         throw new RuntimeException("Could not resolve template in JobSpec from TemplateCatalog", e);
       }
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index 51f2019..893c102 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -22,6 +22,7 @@ import java.io.StringReader;
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -106,4 +107,10 @@ public class PropertiesUtils {
     reader.close();
     return properties;
   }
+
+  public static String prettyPrintProperties(Properties properties) {
+    return properties.entrySet().stream()
+        .map(entry -> "\"" + entry.getKey() + "\"" + " : " + "\"" + entry.getValue() + "\"")
+        .collect(Collectors.joining(",\n"));
+  }
 }