You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/20 03:32:32 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1092][Gobblin 1092] added some logs, fix checkstyle, removed some redundant code

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 64cc9ff  [GOBBLIN-1092][Gobblin 1092] added some logs, fix checkstyle, removed some redundant code
64cc9ff is described below

commit 64cc9ffaa8a9a232a4181305a9500bbc4229cd7a
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu Mar 19 20:32:25 2020 -0700

    [GOBBLIN-1092][Gobblin 1092] added some logs, fix checkstyle, removed some redundant code
    
    Closes #2932 from arjun4084346/debug
---
 .../service/FlowConfigResourceLocalHandler.java    | 19 ++++++------
 .../service/FlowConfigV2ResourceLocalHandler.java  |  7 ++++-
 .../org/apache/gobblin/runtime/api/FlowSpec.java   | 31 +++++++++----------
 .../org/apache/gobblin/runtime/api/JobSpec.java    |  2 +-
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  | 24 +++++++--------
 .../service/modules/flow/FlowEdgeContext.java      |  5 +++
 .../flowgraph/pathfinder/AbstractPathFinder.java   | 36 +++++++++-------------
 .../flowgraph/pathfinder/BFSPathFinder.java        |  4 +--
 .../scheduler/GobblinServiceJobScheduler.java      |  7 -----
 9 files changed, 62 insertions(+), 73 deletions(-)

diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 24c4ce9..8a7ddd1 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -17,12 +17,6 @@
 
 package org.apache.gobblin.service;
 
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Properties;
-
-import org.apache.commons.lang.StringUtils;
-
 import com.codahale.metrics.MetricRegistry;
 import com.google.common.collect.Maps;
 import com.linkedin.data.template.StringMap;
@@ -35,10 +29,12 @@ import com.linkedin.restli.server.RestLiServiceException;
 import com.linkedin.restli.server.UpdateResponse;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
-
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Properties;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-
+import org.apache.commons.lang.StringUtils;
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
@@ -110,6 +106,9 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
       // remove keys that were injected as part of flowSpec creation
       flowProps.remove(ConfigurationKeys.JOB_SCHEDULE_KEY);
       flowProps.remove(ConfigurationKeys.JOB_TEMPLATE_PATH);
+      flowProps.remove(ConfigurationKeys.FLOW_GROUP_KEY);
+      flowProps.remove(ConfigurationKeys.FLOW_NAME_KEY);
+      flowProps.remove(RequesterService.REQUESTER_LIST);
 
       StringMap flowPropsAsStringMap = new StringMap();
       flowPropsAsStringMap.putAll(Maps.fromProperties(flowProps));
@@ -141,7 +140,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
     FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
     // Return conflict and take no action if flowSpec has already been created
     if (this.flowCatalog.exists(flowSpec.getUri())) {
-      log.warn("Flowspec with URI {} already exists, no action will be taken");
+      log.warn("Flowspec with URI {} already exists, no action will be taken", flowSpec.getUri());
       return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_409_CONFLICT);
     } else {
       this.flowCatalog.put(flowSpec, triggerListener);
@@ -260,7 +259,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
     Config config = configBuilder.build();
 
     Config configWithFallback;
-    //We first attempt to process the REST.li request as a HOCON string. If the request is not a valid HOCON string
+    // We first attempt to process the REST.li request as a HOCON string. If the request is not a valid HOCON string
     // (e.g. when certain special characters such as ":" or "*" are not properly escaped), we catch the Typesafe ConfigException and
     // fallback to assuming that values are literal strings.
     try {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index 05a31a0..b86da5e 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -62,7 +62,7 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
 
     // Return conflict and take no action if flowSpec has already been created
     if (this.flowCatalog.exists(flowSpec.getUri())) {
-      log.warn("Flowspec with URI {} already exists, no action will be taken");
+      log.warn("Flowspec with URI {} already exists, no action will be taken", flowSpec.getUri());
       return new CreateKVResponse(new ComplexResourceKey<>(flowConfig.getId(), flowStatusId), flowConfig, HttpStatus.S_409_CONFLICT);
     }
 
@@ -84,6 +84,11 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
       httpStatus = HttpStatus.S_400_BAD_REQUEST;
     }
 
+    // Remove unnecessary properties
+    flowConfig.getProperties().remove(ConfigurationKeys.FLOW_GROUP_KEY);
+    flowConfig.getProperties().remove(ConfigurationKeys.FLOW_NAME_KEY);
+    flowConfig.getProperties().remove(RequesterService.REQUESTER_LIST);
+
     return new CreateKVResponse(new ComplexResourceKey<>(flowConfig.getId(), flowStatusId), flowConfig, httpStatus);
   }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index faa1a53..8721e41 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -17,6 +17,14 @@
 
 package org.apache.gobblin.runtime.api;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
@@ -24,24 +32,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
-
-import com.typesafe.config.Config;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.typesafe.config.ConfigFactory;
-
+import lombok.Data;
+import lombok.EqualsAndHashCode;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.util.ConfigUtils;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
 
 /**
  * Defines a Gobblin Flow (potentially collection of {@link FlowSpec}) that can be run once, or multiple times.
@@ -349,16 +345,17 @@ public class FlowSpec implements Configurable, Spec {
       this.childSpecs.get().addAll(childSpecs);
       return this;
     }
-
-
   }
 
   /**
    * get the private uri as the primary key for this object.
-   * @return
+   * @return URI of the FlowSpec
    */
   public URI getUri() {
     return this.uri;
   }
 
+  public Boolean isExplain() {
+    return ConfigUtils.getBoolean(getConfig(), ConfigurationKeys.FLOW_EXPLAIN_KEY, false);
+  }
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
index 2aadad4..4043a30 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
@@ -343,7 +343,7 @@ public class JobSpec implements Configurable, Spec {
     }
 
     public Map getDefaultMetadata() {
-      log.warn("Job Spec Verb is not provided, using type 'UNKNOWN'.");
+      log.debug("Job Spec Verb is not provided, using type 'UNKNOWN'.");
       return ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.UNKNOWN.name());
     }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 6919cfc..d902fdf 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -17,6 +17,10 @@
 
 package org.apache.gobblin.runtime.spec_catalog;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
@@ -27,18 +31,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.typesafe.config.Config;
-
 import javax.annotation.Nonnull;
-
+import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
@@ -58,6 +52,8 @@ import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.callbacks.CallbackResult;
 import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -293,7 +289,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
    *
    * @param spec The Spec to be added
    * @param triggerListener True if listeners should be notified.
-   * @return
+   * @return a map of listeners and their {@link AddSpecResponse}s
    */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
     Map<String, AddSpecResponse> responseMap = new HashMap<>();
@@ -315,7 +311,9 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
       long startTime = System.currentTimeMillis();
       metrics.updatePutSpecTime(startTime);
       try {
-        specStore.addSpec(spec);
+        if (!((FlowSpec) spec).isExplain()) {
+          specStore.addSpec(spec);
+        }
       } catch (IOException e) {
         throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
       }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
index daff8ce..ad0297b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeContext.java
@@ -43,4 +43,9 @@ public class FlowEdgeContext {
   private DatasetDescriptor outputDatasetDescriptor;
   private Config mergedConfig;
   private SpecExecutor specExecutor;
+
+  @Override
+  public String toString() {
+    return edge == null ? "Null" : edge.toString();
+  }
 }
\ No newline at end of file
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 86e8117..741fdfd 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -17,18 +17,6 @@
 
 package org.apache.gobblin.service.modules.flowgraph.pathfinder;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.typesafe.config.Config;
@@ -36,9 +24,15 @@ import com.typesafe.config.ConfigException;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValue;
 import com.typesafe.config.ConfigValueFactory;
-
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
-
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
@@ -250,8 +244,7 @@ public abstract class AbstractPathFinder implements PathFinder {
             break;
           }
         }
-      } catch (IOException | ReflectiveOperationException | InterruptedException | ExecutionException | SpecNotFoundException
-          | JobTemplate.TemplateException e) {
+      } catch (IOException | ReflectiveOperationException | SpecNotFoundException | JobTemplate.TemplateException e) {
         //Skip the edge; and continue
         log.warn("Skipping edge {} with config {} due to exception: {}", flowEdge.getId(), flowConfig.toString(), e);
       }
@@ -296,9 +289,7 @@ public abstract class AbstractPathFinder implements PathFinder {
       throws ReflectiveOperationException {
     Config config = outputDescriptor.getRawConfig();
 
-    for (Iterator<Map.Entry<String, ConfigValue>> iterator = currentDescriptor.getRawConfig().entrySet().iterator();
-        iterator.hasNext(); ) {
-      Map.Entry<String, ConfigValue> entry = iterator.next();
+    for (Map.Entry<String, ConfigValue> entry : currentDescriptor.getRawConfig().entrySet()) {
       String entryValue = entry.getValue().unwrapped().toString();
       if (!isPlaceHolder(entryValue)) {
         String entryValueInOutputDescriptor = ConfigUtils.getString(config, entry.getKey(), StringUtils.EMPTY);
@@ -330,8 +321,7 @@ public abstract class AbstractPathFinder implements PathFinder {
    * @param flowEdge An instance of {@link FlowEdge}.
    * @return the merged config derived as described above.
    */
-  private Config getMergedConfig(FlowEdge flowEdge)
-      throws ExecutionException, InterruptedException {
+  private Config getMergedConfig(FlowEdge flowEdge) {
     Config srcNodeConfig = this.flowGraph.getNode(flowEdge.getSrc()).getRawConfig().atPath(SOURCE_PREFIX);
     Config destNodeConfig = this.flowGraph.getNode(flowEdge.getDest()).getRawConfig().atPath(DESTINATION_PREFIX);
     Config mergedConfig = flowConfig.withFallback(flowEdge.getConfig()).withFallback(srcNodeConfig).withFallback(destNodeConfig);
@@ -360,13 +350,15 @@ public abstract class AbstractPathFinder implements PathFinder {
   public FlowGraphPath findPath() throws PathFinderException {
 
     FlowGraphPath flowGraphPath = new FlowGraphPath(flowSpec, flowExecutionId);
-    //Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
+    // Path computation must be thread-safe to guarantee read consistency. In other words, we prevent concurrent read/write access to the
     // flow graph.
     for (DataNode destNode : this.destNodes) {
       List<FlowEdgeContext> path = findPathUnicast(destNode);
       if (path != null) {
+        log.info("Path to destination node {} found for flow {}. Path - {}", destNode.getId(), flowSpec.getUri(), path);
         flowGraphPath.addPath(path);
       } else {
+        log.error("Path to destination node {} could not be found for flow {}.", destNode.getId(), flowSpec.getUri());
         //No path to at least one of the destination nodes.
         return null;
       }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
index 4f650ad..382d623 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/BFSPathFinder.java
@@ -85,8 +85,8 @@ public class BFSPathFinder extends AbstractPathFinder {
       return new ArrayList<>();
     }
 
-    LinkedList<FlowEdgeContext> edgeQueue = new LinkedList<>();
-    edgeQueue.addAll(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor));
+    LinkedList<FlowEdgeContext> edgeQueue =
+        new LinkedList<>(getNextEdges(srcNode, srcDatasetDescriptor, destDatasetDescriptor));
     for (FlowEdgeContext flowEdgeContext : edgeQueue) {
       this.pathMap.put(flowEdgeContext, flowEdgeContext);
     }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 01f6a13..afc401b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -307,13 +307,6 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
             _log.info("No FlowSpec schedule found, so running FlowSpec: " + addedSpec);
             this.jobExecutor.execute(new NonScheduledJobRunner(flowSpec.getUri(), true, jobConfig, null));
           }
-        } else {
-          _log.info("{} Skipping adding flow spec: {}, since it is an EXPLAIN request", this.serviceName, addedSpec);
-
-          if (this.flowCatalog.isPresent()) {
-            _log.debug("{} Removing flow spec from FlowCatalog: {}", this.serviceName, flowSpec);
-            this.flowCatalog.get().remove(flowSpec.getUri(), new Properties(), false);
-          }
         }
         return new AddSpecResponse<>(response);
       } catch (JobException je) {