You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/03/27 22:50:45 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-713] Lazy load
job specification from job catalog to avoid OOM issue.
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1068edb [GOBBLIN-713] Lazy load job specification from job catalog to avoid OOM issue.
1068edb is described below
commit 1068edbdda140fa1e1f9db33c85f79c060c1c5f8
Author: Kuai Yu <ku...@linkedin.com>
AuthorDate: Wed Mar 27 15:50:35 2019 -0700
[GOBBLIN-713] Lazy load job specification from job catalog to avoid OOM issue.
Closes #2581 from yukuai518/oom
---
.../org/apache/gobblin/runtime/api/JobCatalog.java | 5 ++
.../runtime/job_catalog/ImmutableFSJobCatalog.java | 38 ++++++++++
.../runtime/job_catalog/JobCatalogBase.java | 18 ++++-
.../org/apache/gobblin/util/PullFileLoader.java | 86 +++++++++++++---------
4 files changed, 109 insertions(+), 38 deletions(-)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
index 5cfa8b9..7d3b229 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.runtime.api;
import java.net.URI;
import java.util.Collection;
+import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -50,6 +51,10 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable
/** Returns an immutable {@link Collection} of {@link JobSpec}s that are known to the catalog. */
Collection<JobSpec> getJobs();
+ default Iterator<JobSpec> getJobSpecIterator() {
+ return getJobs().iterator();
+ }
+
/** Metrics for the job catalog; null if
* ({@link #isInstrumentationEnabled()}) is false. */
JobCatalog.StandardMetrics getMetrics();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
index 626cac2..fb08bc0 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
@@ -22,11 +22,13 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -37,6 +39,7 @@ import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
@@ -166,6 +169,41 @@ public class ImmutableFSJobCatalog extends JobCatalogBase implements JobCatalog
}
/**
+ * Return an iterator that can fetch all the job specifications.
+ * Different than {@link #getJobs()}, this method prevents loading
+ * all the job configs into memory in the very beginning. Instead it
+ * only loads file paths initially and creates the JobSpec and the
+ * underlying {@link Config} during the iteration.
+ *
+ * @return an iterator that contains job specs (job specs can be null).
+ */
+ @Override
+ public synchronized Iterator<JobSpec> getJobSpecIterator() {
+ List<Path> jobFiles = loader.fetchJobFilesRecursively(loader.getRootDirectory());
+ Iterator<JobSpec> jobSpecIterator = Iterators.transform(jobFiles.iterator(), new Function<Path, JobSpec>() {
+ @Nullable
+ @Override
+ public JobSpec apply(@Nullable Path jobFile) {
+ if (jobFile == null) {
+ return null;
+ }
+
+ try {
+ Config config = ImmutableFSJobCatalog.this.loader.loadPullFile(jobFile,
+ ImmutableFSJobCatalog.this.sysConfig, ImmutableFSJobCatalog.this.shouldLoadGlobalConf());
+
+ return ImmutableFSJobCatalog.this.converter.apply(config);
+ } catch (IOException e) {
+ log.error("Cannot load job from {} due to {}", jobFile, ExceptionUtils.getFullStackTrace(e));
+ return null;
+ }
+ }
+ });
+
+ return jobSpecIterator;
+ }
+
+ /**
* Fetch single job file based on its URI,
* return null requested URI not existed
* @param uri The relative Path to the target job configuration.
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
index dcb0723..471db8a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/JobCatalogBase.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.runtime.job_catalog;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
@@ -112,6 +113,13 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC
return jobSpecs;
}
+ private Iterator<JobSpec> getJobSpecsWithTimeUpdate() {
+ long startTime = System.currentTimeMillis();
+ Iterator<JobSpec> jobSpecs = getJobSpecIterator();
+ this.metrics.updateGetJobTime(startTime);
+ return jobSpecs;
+ }
+
/**{@inheritDoc}*/
@Override
public synchronized void addListener(JobCatalogListener jobListener) {
@@ -119,9 +127,13 @@ public abstract class JobCatalogBase extends AbstractIdleService implements JobC
this.listeners.addListener(jobListener);
if (state() == State.RUNNING) {
- for (JobSpec jobSpec : getJobsWithTimeUpdate()) {
- JobCatalogListener.AddJobCallback addJobCallback = new JobCatalogListener.AddJobCallback(jobSpec);
- this.listeners.callbackOneListener(addJobCallback, jobListener);
+ Iterator<JobSpec> jobSpecItr = getJobSpecsWithTimeUpdate();
+ while (jobSpecItr.hasNext()) {
+ JobSpec jobSpec = jobSpecItr.next();
+ if (jobSpec != null) {
+ JobCatalogListener.AddJobCallback addJobCallback = new JobCatalogListener.AddJobCallback(jobSpec);
+ this.listeners.callbackOneListener(addJobCallback, jobListener);
+ }
}
}
}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
index 9270491..5e0ce90 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
@@ -32,12 +32,14 @@ import java.util.Set;
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import com.google.common.base.Charsets;
+import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
@@ -51,6 +53,7 @@ import com.typesafe.config.ConfigSyntax;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import javax.annotation.Nullable;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -155,57 +158,60 @@ public class PullFileLoader {
* @return The loaded {@link Config}s.
*/
public List<Config> loadPullFilesRecursively(Path path, Config sysProps, boolean loadGlobalProperties) {
- try {
- Config fallback = sysProps;
- if (loadGlobalProperties && PathUtils.isAncestor(this.rootDirectory, path.getParent())) {
- fallback = loadAncestorGlobalConfigs(path.getParent(), fallback);
+ return Lists.transform(this.fetchJobFilesRecursively(path), new Function<Path, Config>() {
+ @Nullable
+ @Override
+ public Config apply(@Nullable Path jobFile) {
+ if (jobFile == null) {
+ return null;
+ }
+
+ try {
+ return PullFileLoader.this.loadPullFile(jobFile,
+ sysProps, loadGlobalProperties);
+ } catch (IOException e) {
+ log.error("Cannot load job from {} due to {}", jobFile, ExceptionUtils.getFullStackTrace(e));
+ return null;
+ }
}
- return getSortedConfigs(loadPullFilesRecursivelyHelper(path, fallback, loadGlobalProperties));
- } catch (IOException ioe) {
- return Lists.newArrayList();
- }
+ });
+ }
+
+ public List<Path> fetchJobFilesRecursively(Path path) {
+ return getSortedPaths(fetchJobFilePathsRecursivelyHelper(path));
}
- private List<Config> getSortedConfigs(List<ConfigWithTimeStamp> configsWithTimeStamps) {
- List<Config> sortedConfigs = Lists.newArrayList();
- Collections.sort(configsWithTimeStamps, Comparator.comparingLong(o -> o.timeStamp));
- for (ConfigWithTimeStamp configWithTimeStamp : configsWithTimeStamps) {
- sortedConfigs.add(configWithTimeStamp.config);
+ private List<Path> getSortedPaths(List<PathWithTimeStamp> pathsWithTimeStamps) {
+ List<Path> sortedPaths = Lists.newArrayList();
+ Collections.sort(pathsWithTimeStamps, Comparator.comparingLong(o -> o.timeStamp));
+ for (PathWithTimeStamp pathWithTimeStamp : pathsWithTimeStamps) {
+ sortedPaths.add(pathWithTimeStamp.path);
}
- return sortedConfigs;
+ return sortedPaths;
}
- private List<ConfigWithTimeStamp> loadPullFilesRecursivelyHelper(Path path, Config fallback, boolean loadGlobalProperties) {
- List<ConfigWithTimeStamp> pullFiles = Lists.newArrayList();
+ private List<PathWithTimeStamp> fetchJobFilePathsRecursivelyHelper(Path path) {
+ List<PathWithTimeStamp> paths = Lists.newArrayList();
try {
- if (loadGlobalProperties) {
- fallback = findAndLoadGlobalConfigInDirectory(path, fallback);
- }
-
FileStatus[] statuses = this.fs.listStatus(path);
if (statuses == null) {
log.error("Path does not exist: " + path);
- return pullFiles;
+ return paths;
}
for (FileStatus status : statuses) {
- try {
- if (status.isDirectory()) {
- pullFiles.addAll(loadPullFilesRecursivelyHelper(status.getPath(), fallback, loadGlobalProperties));
- } else if (this.javaPropsPullFileFilter.accept(status.getPath())) {
- log.debug("modification time of {} is {}", status.getPath(), status.getModificationTime());
- pullFiles.add(new ConfigWithTimeStamp(status.getModificationTime(), loadJavaPropsWithFallback(status.getPath(), fallback).resolve()));
- } else if (this.hoconPullFileFilter.accept(status.getPath())) {
- log.debug("modification time of {} is {}", status.getPath(), status.getModificationTime());
- pullFiles.add(new ConfigWithTimeStamp(status.getModificationTime(), loadHoconConfigAtPath(status.getPath()).withFallback(fallback).resolve()));
- }
- } catch (IOException ioe) {
- // Failed to load specific subpath, try with the other subpaths in this directory
- log.error(String.format("Failed to load %s. Skipping.", status.getPath()));
+ if (status.isDirectory()) {
+ paths.addAll(fetchJobFilePathsRecursivelyHelper(status.getPath()));
+ } else if (this.javaPropsPullFileFilter.accept(status.getPath())) {
+ log.debug("modification time of {} is {}", status.getPath(), status.getModificationTime());
+ paths.add(new PathWithTimeStamp(status.getModificationTime(), status.getPath()));
+ } else if (this.hoconPullFileFilter.accept(status.getPath())) {
+ log.debug("modification time of {} is {}", status.getPath(), status.getModificationTime());
+ paths.add(new PathWithTimeStamp(status.getModificationTime(), status.getPath()));
}
}
- return pullFiles;
+ return paths;
} catch (IOException ioe) {
log.error("Could not load properties at path: " + path, ioe);
return Lists.newArrayList();
@@ -310,6 +316,16 @@ public class PullFileLoader {
}
}
+ private static class PathWithTimeStamp {
+ long timeStamp;
+ Path path;
+
+ public PathWithTimeStamp(long timeStamp, Path path) {
+ this.timeStamp = timeStamp;
+ this.path = path;
+ }
+ }
+
private static class ConfigWithTimeStamp {
long timeStamp;
Config config;