You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/20 03:32:32 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1092][Gobblin
1092] added some logs, fix checkstyle, removed some redundant code
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 64cc9ff [GOBBLIN-1092][Gobblin 1092] added some logs, fix checkstyle, removed some redundant code
64cc9ff is described below
commit 64cc9ffaa8a9a232a4181305a9500bbc4229cd7a
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu Mar 19 20:32:25 2020 -0700
[GOBBLIN-1092][Gobblin 1092] added some logs, fix checkstyle, removed some redundant code
Closes #2932 from arjun4084346/debug
---
.../service/FlowConfigResourceLocalHandler.java | 19 ++++++------
.../service/FlowConfigV2ResourceLocalHandler.java | 7 ++++-
.../org/apache/gobblin/runtime/api/FlowSpec.java | 31 +++++++++----------
.../org/apache/gobblin/runtime/api/JobSpec.java | 2 +-
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 24 +++++++--------
.../service/modules/flow/FlowEdgeContext.java | 5 +++
.../flowgraph/pathfinder/AbstractPathFinder.java | 36 +++++++++-------------
.../flowgraph/pathfinder/BFSPathFinder.java | 4 +--
.../scheduler/GobblinServiceJobScheduler.java | 7 -----
9 files changed, 62 insertions(+), 73 deletions(-)
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 24c4ce9..8a7ddd1 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -17,12 +17,6 @@
package org.apache.gobblin.service;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Properties;
-
-import org.apache.commons.lang.StringUtils;
-
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Maps;
import com.linkedin.data.template.StringMap;
@@ -35,10 +29,12 @@ import com.linkedin.restli.server.RestLiServiceException;
import com.linkedin.restli.server.UpdateResponse;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Properties;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-
+import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
@@ -110,6 +106,9 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
// remove keys that were injected as part of flowSpec creation
flowProps.remove(ConfigurationKeys.JOB_SCHEDULE_KEY);
flowProps.remove(ConfigurationKeys.JOB_TEMPLATE_PATH);
+ flowProps.remove(ConfigurationKeys.FLOW_GROUP_KEY);
+ flowProps.remove(ConfigurationKeys.FLOW_NAME_KEY);
+ flowProps.remove(RequesterService.REQUESTER_LIST);
StringMap flowPropsAsStringMap = new StringMap();
flowPropsAsStringMap.putAll(Maps.fromProperties(flowProps));
@@ -141,7 +140,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
// Return conflict and take no action if flowSpec has already been created
if (this.flowCatalog.exists(flowSpec.getUri())) {
- log.warn("Flowspec with URI {} already exists, no action will be taken");
+ log.warn("Flowspec with URI {} already exists, no action will be taken", flowSpec.getUri());
return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_409_CONFLICT);
} else {
this.flowCatalog.put(flowSpec, triggerListener);
@@ -260,7 +259,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
Config config = configBuilder.build();
Config configWithFallback;
- //We first attempt to process the REST.li request as a HOCON string. If the request is not a valid HOCON string
+ // We first attempt to process the REST.li request as a HOCON string. If the request is not a valid HOCON string
// (e.g. when certain special characters such as ":" or "*" are not properly escaped), we catch the Typesafe ConfigException and
// fallback to assuming that values are literal strings.
try {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index 05a31a0..b86da5e 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -62,7 +62,7 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
// Return conflict and take no action if flowSpec has already been created
if (this.flowCatalog.exists(flowSpec.getUri())) {
- log.warn("Flowspec with URI {} already exists, no action will be taken");
+ log.warn("Flowspec with URI {} already exists, no action will be taken", flowSpec.getUri());
return new CreateKVResponse(new ComplexResourceKey<>(flowConfig.getId(), flowStatusId), flowConfig, HttpStatus.S_409_CONFLICT);
}
@@ -84,6 +84,11 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
httpStatus = HttpStatus.S_400_BAD_REQUEST;
}
+ // Remove unnecessary properties
+ flowConfig.getProperties().remove(ConfigurationKeys.FLOW_GROUP_KEY);
+ flowConfig.getProperties().remove(ConfigurationKeys.FLOW_NAME_KEY);
+ flowConfig.getProperties().remove(RequesterService.REQUESTER_LIST);
+
return new CreateKVResponse(new ComplexResourceKey<>(flowConfig.getId(), flowStatusId), flowConfig, httpStatus);
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index faa1a53..8721e41 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -17,6 +17,14 @@
package org.apache.gobblin.runtime.api;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
@@ -24,24 +32,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
-
-import com.typesafe.config.Config;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.typesafe.config.ConfigFactory;
-
+import lombok.Data;
+import lombok.EqualsAndHashCode;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.util.ConfigUtils;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
/**
* Defines a Gobblin Flow (potentially collection of {@link FlowSpec}) that can be run once, or multiple times.
@@ -349,16 +345,17 @@ public class FlowSpec implements Configurable, Spec {
this.childSpecs.get().addAll(childSpecs);
return this;
}
-
-
}
/**
* get the private uri as the primary key for this object.
- * @return
+ * @return URI of the FlowSpec
*/
public URI getUri() {
return this.uri;
}
+ public Boolean isExplain() {
+ return ConfigUtils.getBoolean(getConfig(), ConfigurationKeys.FLOW_EXPLAIN_KEY, false);
+ }
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
index 2aadad4..4043a30 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
@@ -343,7 +343,7 @@ public class JobSpec implements Configurable, Spec {
}
public Map getDefaultMetadata() {
- log.warn("Job Spec Verb is not provided, using type 'UNKNOWN'.");
+ log.debug("Job Spec Verb is not provided, using type 'UNKNOWN'.");
return ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.UNKNOWN.name());
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 6919cfc..d902fdf 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -17,6 +17,10 @@
package org.apache.gobblin.runtime.spec_catalog;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
@@ -27,18 +31,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.typesafe.config.Config;
-
import javax.annotation.Nonnull;
-
+import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
@@ -58,6 +52,8 @@ import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.callbacks.CallbackResult;
import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -293,7 +289,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
*
* @param spec The Spec to be added
* @param triggerListener True if listeners should be notified.
- * @return
+ * @return a map of listeners and their {@link AddSpecResponse}s
*/
public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
Map<String, AddSpecResponse> responseMap = new HashMap<>();
@@ -315,7 +311,9 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
long startTime = System.currentTimeMillis();
metrics.updatePutSpecTime(startTime);
try {
- specStore.addSpec(spec);
+ if (!((FlowSpec) spec).isExplain()) {
+ specStore.addSpec(spec);
+ }
} catch (IOException e) {
throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
index daff8ce..ad0297b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
@@ -43,4 +43,9 @@ public class FlowEdgeContext {
private DatasetDescriptor outputDatasetDescriptor;
private Config mergedConfig;
private SpecExecutor specExecutor;
+
+ @Override
+ public String toString() {
+ return edge == null ? "Null" : edge.toString();
+ }
}
\ No newline at end of file
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 86e8117..741fdfd 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -17,18 +17,6 @@
package org.apache.gobblin.service.modules.flowgraph.pathfinder;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.typesafe.config.Config;
@@ -36,9 +24,15 @@ import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValue;
import com.typesafe.config.ConfigValueFactory;
-
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
-
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FlowSpec;
@@ -250,8 +244,7 @@ public abstract class AbstractPathFinder implements PathFinder {
break;
}
}
- } catch (IOException | ReflectiveOperationException | InterruptedException | ExecutionException | SpecNotFoundException
- | JobTemplate.TemplateException e) {
+ } catch (IOException | ReflectiveOperationException | SpecNotFoundException | JobTemplate.TemplateException e) {
//Skip the edge; and continue
log.warn("Skipping edge {} with config {} due to exception: {}", flowEdge.getId(), flowConfig.toString(), e);
}
@@ -296,9 +289,7 @@ public abstract class AbstractPathFinder implements PathFinder {
throws ReflectiveOperationException {
Config config = outputDescriptor.getRawConfig();
- for (Iterator<Map.Entry<String, ConfigValue>> iterator = currentDescriptor.getRawConfig().entrySet().iterator();
- iterator.hasNext(); ) {
- Map.Entry<String, ConfigValue> entry = iterator.next();
+ for (Map.Entry<String, ConfigValue> entry : currentDescriptor.getRawConfig().entrySet()) {
String entryValue = entry.getValue().unwrapped().toString();
if (!isPlaceHolder(entryValue)) {
String entryValueInOutputDescriptor = ConfigUtils.getString(config, entry.getKey(), StringUtils.EMPTY);
@@ -330,8 +321,7 @@ public abstract class AbstractPathFinder implements PathFinder {
* @param flowEdge An instance of {@link FlowEdge}.
* @return the merged config derived as described above.
*/
- private Config getMergedConfig(FlowEdge flowEdge)
- throws ExecutionException, InterruptedException {
+ private Config getMergedConfig(FlowEdge flowEdge) {
Config srcNodeConfig = this.flowGraph.getNode(flowEdge.getSrc()).getRawConfig().atPath(SOURCE_PREFIX);
Config destNodeConfig = this.flowGraph.getNode(flowEdge.getDest()).getRawConfig().atPath(DESTINATION_PREFIX);
Config mergedConfig = flowConfig.withFallback(flowEdge.getConfig()).withFallback(srcNodeConfig).withFallback(destNodeConfig);
@@ -360,13 +350,15 @@ public abstract class AbstractPathFinder implements PathFinder {
public FlowGraphPath findPath() throws PathFinderException {
FlowGraphPath flowGraphPath = new FlowGraphPath(flowSpec, flowExecutionId);
- //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
+ // Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
// flow graph.
for (DataNode destNode : this.destNodes) {
List<FlowEdgeContext> path = findPathUnicast(destNode);
if (path != null) {
+ log.info("Path to destination node {} found for flow {}. Path - {}", destNode.getId(), flowSpec.getUri(), path);
flowGraphPath.addPath(path);
} else {
+ log.error("Path to destination node {} could not be found for flow {}.", destNode.getId(), flowSpec.getUri());
//No path to at least one of the destination nodes.
return null;
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
index 4f650ad..382d623 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
@@ -85,8 +85,8 @@ public class BFSPathFinder extends AbstractPathFinder {
return new ArrayList<>();
}
- LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>();
- edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor));
+ LinkedList<FlowEdgeContext> edgeQueue =
+ new LinkedList<>(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor));
for (FlowEdgeContext flowEdgeContext : edgeQueue) {
this.pathMap.put(flowEdgeContext, flowEdgeContext);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 01f6a13..afc401b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -307,13 +307,6 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
_log.info("No FlowSpec schedule found, so running FlowSpec: " + addedSpec);
this.jobExecutor.execute(new NonScheduledJobRunner(flowSpec.getUri(), true, jobConfig, null));
}
- } else {
- _log.info("{} Skipping adding flow spec: {}, since it is an EXPLAIN request", this.serviceName, addedSpec);
-
- if (this.flowCatalog.isPresent()) {
- _log.debug("{} Removing flow spec from FlowCatalog: {}", this.serviceName, flowSpec);
- this.flowCatalog.get().remove(flowSpec.getUri(), new Properties(), false);
- }
}
return new AddSpecResponse<>(response);
} catch (JobException je) {