You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/07/23 22:05:32 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1220] Log
improvement
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f30e6f4 [GOBBLIN-1220] Log improvement
f30e6f4 is described below
commit f30e6f4cd51624b7d17894336f2787ec7bf2a9ad
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu Jul 23 15:05:15 2020 -0700
[GOBBLIN-1220] Log improvement
reduce logging
improve logging
address review comment
Closes #2991 from arjun4084346/logImprovement
---
.../apache/gobblin/cluster/HelixRetriggeringJobCallable.java | 4 ++--
.../src/main/java/org/apache/gobblin/cluster/HelixUtils.java | 12 ++++++------
.../org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java | 2 +-
.../runtime/spec_executorInstance/BaseServiceNodeImpl.java | 5 +++++
.../service/modules/flow/BaseFlowToJobSpecCompiler.java | 4 +++-
.../main/java/org/apache/gobblin/util/PropertiesUtils.java | 7 +++++++
6 files changed, 24 insertions(+), 10 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 4a6571f..bfa2dce 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -310,7 +310,7 @@ class HelixRetriggeringJobCallable implements Callable {
// make sure the planning job is initialized (or visible) to other parallel running threads,
// so that the same critical section check (querying Helix for job completeness)
// can be applied.
- HelixUtils.waitJobInitialization(planningJobManager, newPlanningId, newPlanningId, 300_000);
+ HelixUtils.waitJobInitialization(planningJobManager, newPlanningId, newPlanningId);
} finally {
// end of the critical section to check if a job with same job name is running
@@ -320,7 +320,7 @@ class HelixRetriggeringJobCallable implements Callable {
// we can remove the job spec from the catalog because Helix will drive this job to the end.
this.deleteJobSpec();
- // If we are using non-blocking mode, this get() only guarantees the plannning job is submitted.
+ // If we are using non-blocking mode, this get() only guarantees the planning job is submitted.
// It doesn't guarantee the job will finish because internally we won't wait for Helix completion.
this.currentJobMonitor.get();
this.currentJobMonitor = null;
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index b825417..fe23f20 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -122,8 +122,7 @@ public class HelixUtils {
static void waitJobInitialization(
HelixManager helixManager,
String workFlowName,
- String jobName,
- long timeoutMillis) throws Exception {
+ String jobName) throws Exception {
WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
// If the helix job is deleted from some other thread or a completely external process,
@@ -132,13 +131,14 @@ public class HelixUtils {
// 2) it did get initialized but deleted soon after, in which case we should stop waiting
// To overcome this issue, we wait here till workflowContext gets initialized
long start = System.currentTimeMillis();
+ long timeoutMillis = TimeUnit.MINUTES.toMillis(5L);
while (workflowContext == null || workflowContext.getJobState(TaskUtil.getNamespacedJobName(workFlowName, jobName)) == null) {
if (System.currentTimeMillis() - start > timeoutMillis) {
log.error("Job cannot be initialized within {} milliseconds, considered as an error", timeoutMillis);
throw new JobException("Job cannot be initialized within {} milliseconds, considered as an error");
}
workflowContext = TaskDriver.getWorkflowContext(helixManager, workFlowName);
- Thread.sleep(1000);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
log.info("Waiting for work flow initialization.");
}
@@ -159,7 +159,7 @@ public class HelixUtils {
helixTaskDriver.start(workFlow);
log.info("Created a work flow {}", workFlowName);
- waitJobInitialization(helixManager, workFlowName, jobName, Long.MAX_VALUE);
+ waitJobInitialization(helixManager, workFlowName, jobName);
}
static void waitJobCompletion(HelixManager helixManager, String workFlowName, String jobName,
@@ -189,7 +189,7 @@ public class HelixUtils {
return;
case STOPPING:
log.info("Waiting for job {} to complete... State - {}", jobName, jobState);
- Thread.sleep(1000);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
// Workaround for a Helix bug where a job may be stuck in the STOPPING state due to an unresponsive task.
if (System.currentTimeMillis() > stoppingStateEndTime) {
log.info("Deleting workflow {}", workFlowName);
@@ -199,7 +199,7 @@ public class HelixUtils {
return;
default:
log.info("Waiting for job {} to complete... State - {}", jobName, jobState);
- Thread.sleep(1000);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(10L));
}
} else {
// We have waited for WorkflowContext to get initialized,
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 44d54ea..c32f69b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -51,9 +51,9 @@ import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalog;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecSearchObject;
import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.runtime.api.SpecStore;
-import org.apache.gobblin.runtime.api.SpecSearchObject;
import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
import org.apache.gobblin.service.ServiceConfigKeys;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/BaseServiceNodeImpl.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/BaseServiceNodeImpl.java
index dcc0c3b..86c78d0 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/BaseServiceNodeImpl.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/BaseServiceNodeImpl.java
@@ -97,4 +97,9 @@ public class BaseServiceNodeImpl implements ServiceNode {
public int hashCode() {
return nodeName.hashCode();
}
+
+ @Override
+ public String toString() {
+ return nodeName;
+ }
}
\ No newline at end of file
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index b42de01..244a2fa 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -59,6 +59,8 @@ import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PropertiesUtils;
+
// Provide base implementation for constructing multi-hops route.
@Alpha
@@ -226,7 +228,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
jobSpecBuilder = jobSpecBuilder.withTemplate(flowSpec.getTemplateURIs().get().iterator().next());
try {
jobSpec = new ResolvedJobSpec(jobSpecBuilder.build(), templateCatalog.get());
- log.info("Resolved JobSpec properties are: " + jobSpec.getConfigAsProperties());
+ log.info("Resolved JobSpec properties are: " + PropertiesUtils.prettyPrintProperties(jobSpec.getConfigAsProperties()));
} catch (SpecNotFoundException | JobTemplate.TemplateException e) {
throw new RuntimeException("Could not resolve template in JobSpec from TemplateCatalog", e);
}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index 51f2019..893c102 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -22,6 +22,7 @@ import java.io.StringReader;
import java.io.StringWriter;
import java.util.Map;
import java.util.Properties;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@@ -106,4 +107,10 @@ public class PropertiesUtils {
reader.close();
return properties;
}
+
+ public static String prettyPrintProperties(Properties properties) {
+ return properties.entrySet().stream()
+ .map(entry -> "\"" + entry.getKey() + "\"" + " : " + "\"" + entry.getValue() + "\"")
+ .collect(Collectors.joining(",\n"));
+ }
}