You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/03/27 22:50:45 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-713] Lazy load job specification from job catalog to avoid OOM issue.

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1068edb  [GOBBLIN-713] Lazy load job specification from job catalog to avoid OOM issue.
1068edb is described below

commit 1068edbdda140fa1e1f9db33c85f79c060c1c5f8
Author: Kuai Yu <ku...@linkedin.com>
AuthorDate: Wed Mar 27 15:50:35 2019 -0700

    [GOBBLIN-713] Lazy load job specification from job catalog to avoid OOM issue.
    
    Closes #2581 from yukuai518/oom
---
 .../org/apache/gobblin/runtime/api/JobCatalog.java |  5 ++
 .../runtime/job_catalog/ImmutableFSJobCatalog.java | 38 ++++++++++
 .../runtime/job_catalog/JobCatalogBase.java        | 18 ++++-
 .../org/apache/gobblin/util/PullFileLoader.java    | 86 +++++++++++++---------
 4 files changed, 109 insertions(+), 38 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
index 5cfa8b9..7d3b229 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.runtime.api;
 
 import java.net.URI;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -50,6 +51,10 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
   /** Returns an immutable {@link Collection} of {@link JobSpec}s that are known to the catalog. */
   Collection<JobSpec> getJobs();
 
+  default Iterator<JobSpec> getJobSpecIterator() {
+    return getJobs().iterator();
+  }
+
   /** Metrics for the job catalog; null if
    * ({@link #isInstrumentationEnabled()}) is false. */
   JobCatalog.StandardMetrics getMetrics();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
index 626cac2..fb08bc0 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
@@ -22,11 +22,13 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
 import javax.annotation.Nullable;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -37,6 +39,7 @@ import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
@@ -166,6 +169,41 @@ public class ImmutableFSJobCatalog extends JobCatalogBase implements JobCatalog
   }
 
   /**
+   * Return an iterator that can fetch all the job specifications.
+   * Different than {@link #getJobs()}, this method prevents loading
+   * all the job configs into memory in the very beginning. Instead it
+   * only loads file paths initially and creates the JobSpec and the
+   * underlying {@link Config} during the iteration.
+   *
+   * @return an iterator that contains job specs (job specs can be null).
+   */
+  @Override
+  public synchronized Iterator<JobSpec> getJobSpecIterator() {
+    List<Path> jobFiles = loader.fetchJobFilesRecursively(loader.getRootDirectory());
+    Iterator<JobSpec> jobSpecIterator = Iterators.transform(jobFiles.iterator(), new Function<Path, JobSpec>() {
+      @Nullable
+      @Override
+      public JobSpec apply(@Nullable Path jobFile) {
+        if (jobFile == null) {
+          return null;
+        }
+
+        try {
+          Config config = ImmutableFSJobCatalog.this.loader.loadPullFile(jobFile,
+            ImmutableFSJobCatalog.this.sysConfig, ImmutableFSJobCatalog.this.shouldLoadGlobalConf());
+
+          return ImmutableFSJobCatalog.this.converter.apply(config);
+        } catch (IOException e) {
+          log.error("Cannot load job from {} due to {}", jobFile, ExceptionUtils.getFullStackTrace(e));
+          return null;
+        }
+      }
+    });
+
+    return jobSpecIterator;
+  }
+
+  /**
    * Fetch single job file based on its URI,
    * return null requested URI not existed
    * @param uri The relative Path to the target job configuration.
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
index dcb0723..471db8a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.runtime.job_catalog;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -112,6 +113,13 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC
     return jobSpecs;
   }
 
+  private Iterator<JobSpec> getJobSpecsWithTimeUpdate() {
+    long startTime = System.currentTimeMillis();
+    Iterator<JobSpec> jobSpecs = getJobSpecIterator();
+    this.metrics.updateGetJobTime(startTime);
+    return jobSpecs;
+  }
+
   /**{@inheritDoc}*/
   @Override
   public synchronized void addListener(JobCatalogListener jobListener) {
@@ -119,9 +127,13 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC
     this.listeners.addListener(jobListener);
 
     if (state() == State.RUNNING) {
-      for (JobSpec jobSpec : getJobsWithTimeUpdate()) {
-        JobCatalogListener.AddJobCallback addJobCallback = new JobCatalogListener.AddJobCallback(jobSpec);
-        this.listeners.callbackOneListener(addJobCallback, jobListener);
+      Iterator<JobSpec> jobSpecItr = getJobSpecsWithTimeUpdate();
+      while (jobSpecItr.hasNext()) {
+        JobSpec jobSpec = jobSpecItr.next();
+        if (jobSpec != null) {
+          JobCatalogListener.AddJobCallback addJobCallback = new JobCatalogListener.AddJobCallback(jobSpec);
+          this.listeners.callbackOneListener(addJobCallback, jobListener);
+        }
       }
     }
   }
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
index 9270491..5e0ce90 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
@@ -32,12 +32,14 @@ import java.util.Set;
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
@@ -51,6 +53,7 @@ import com.typesafe.config.ConfigSyntax;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 
+import javax.annotation.Nullable;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -155,57 +158,60 @@ public class PullFileLoader {
    * @return The loaded {@link Config}s.
    */
   public List<Config> loadPullFilesRecursively(Path path, Config sysProps, boolean loadGlobalProperties) {
-    try {
-      Config fallback = sysProps;
-      if (loadGlobalProperties && PathUtils.isAncestor(this.rootDirectory, path.getParent())) {
-        fallback = loadAncestorGlobalConfigs(path.getParent(), fallback);
+    return Lists.transform(this.fetchJobFilesRecursively(path), new Function<Path, Config>() {
+      @Nullable
+      @Override
+      public Config apply(@Nullable Path jobFile) {
+        if (jobFile == null) {
+          return null;
+        }
+
+        try {
+          return PullFileLoader.this.loadPullFile(jobFile,
+              sysProps, loadGlobalProperties);
+        } catch (IOException e) {
+          log.error("Cannot load job from {} due to {}", jobFile, ExceptionUtils.getFullStackTrace(e));
+          return null;
+        }
       }
-      return getSortedConfigs(loadPullFilesRecursivelyHelper(path, fallback, loadGlobalProperties));
-    } catch (IOException ioe) {
-      return Lists.newArrayList();
-    }
+    });
+  }
+
+  public List<Path> fetchJobFilesRecursively(Path path) {
+    return getSortedPaths(fetchJobFilePathsRecursivelyHelper(path));
   }
 
-  private List<Config> getSortedConfigs(List<ConfigWithTimeStamp> configsWithTimeStamps) {
-    List<Config> sortedConfigs = Lists.newArrayList();
-    Collections.sort(configsWithTimeStamps, Comparator.comparingLong(o -> o.timeStamp));
-    for (ConfigWithTimeStamp configWithTimeStamp : configsWithTimeStamps) {
-      sortedConfigs.add(configWithTimeStamp.config);
+  private List<Path> getSortedPaths(List<PathWithTimeStamp> pathsWithTimeStamps) {
+    List<Path> sortedPaths = Lists.newArrayList();
+    Collections.sort(pathsWithTimeStamps, Comparator.comparingLong(o -> o.timeStamp));
+    for (PathWithTimeStamp pathWithTimeStamp : pathsWithTimeStamps) {
+      sortedPaths.add(pathWithTimeStamp.path);
     }
-    return sortedConfigs;
+    return sortedPaths;
   }
 
-  private List<ConfigWithTimeStamp> loadPullFilesRecursivelyHelper(Path path, Config fallback, boolean loadGlobalProperties) {
-    List<ConfigWithTimeStamp> pullFiles = Lists.newArrayList();
+  private List<PathWithTimeStamp> fetchJobFilePathsRecursivelyHelper(Path path) {
+    List<PathWithTimeStamp> paths = Lists.newArrayList();
     try {
-      if (loadGlobalProperties) {
-        fallback = findAndLoadGlobalConfigInDirectory(path, fallback);
-      }
-
       FileStatus[] statuses = this.fs.listStatus(path);
       if (statuses == null) {
         log.error("Path does not exist: " + path);
-        return pullFiles;
+        return paths;
       }
 
       for (FileStatus status : statuses) {
-        try {
-          if (status.isDirectory()) {
-            pullFiles.addAll(loadPullFilesRecursivelyHelper(status.getPath(), fallback, loadGlobalProperties));
-          } else if (this.javaPropsPullFileFilter.accept(status.getPath())) {
-            log.debug("modification time of {} is {}", status.getPath(), status.getModificationTime());
-            pullFiles.add(new ConfigWithTimeStamp(status.getModificationTime(), loadJavaPropsWithFallback(status.getPath(), fallback).resolve()));
-          } else if (this.hoconPullFileFilter.accept(status.getPath())) {
-            log.debug("modification time of {} is {}", status.getPath(), status.getModificationTime());
-            pullFiles.add(new ConfigWithTimeStamp(status.getModificationTime(), loadHoconConfigAtPath(status.getPath()).withFallback(fallback).resolve()));
-          }
-        } catch (IOException ioe) {
-          // Failed to load specific subpath, try with the other subpaths in this directory
-          log.error(String.format("Failed to load %s. Skipping.", status.getPath()));
+        if (status.isDirectory()) {
+          paths.addAll(fetchJobFilePathsRecursivelyHelper(status.getPath()));
+        } else if (this.javaPropsPullFileFilter.accept(status.getPath())) {
+          log.debug("modification time of {} is {}", status.getPath(), status.getModificationTime());
+          paths.add(new PathWithTimeStamp(status.getModificationTime(), status.getPath()));
+        } else if (this.hoconPullFileFilter.accept(status.getPath())) {
+          log.debug("modification time of {} is {}", status.getPath(), status.getModificationTime());
+          paths.add(new PathWithTimeStamp(status.getModificationTime(), status.getPath()));
         }
       }
 
-      return pullFiles;
+      return paths;
     } catch (IOException ioe) {
       log.error("Could not load properties at path: " + path, ioe);
       return Lists.newArrayList();
@@ -310,6 +316,16 @@ public class PullFileLoader {
     }
   }
 
+  private static class PathWithTimeStamp {
+    long timeStamp;
+    Path path;
+
+    public PathWithTimeStamp(long timeStamp, Path path) {
+      this.timeStamp = timeStamp;
+      this.path = path;
+    }
+  }
+
   private static class ConfigWithTimeStamp {
     long timeStamp;
     Config config;