You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:35 UTC
[17/50] incubator-gobblin git commit: [GOBBLIN-387] pick job files in
FIFO order
[GOBBLIN-387] pick job files in FIFO order
pick job files in FIFO order
delete temp file on exit
dummy commit
fix findBugsMain
Closes #2264 from arjun4084346/jobOrder
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/cd9447a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/cd9447a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/cd9447a5
Branch: refs/heads/0.12.0
Commit: cd9447a58c0bc66bc24d223803bd8f33dda31887
Parents: 94bcc16
Author: Arjun <ab...@linkedin.com>
Authored: Mon Feb 5 11:44:14 2018 -0800
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Mon Feb 5 11:44:14 2018 -0800
----------------------------------------------------------------------
.../org/apache/gobblin/util/PullFileLoader.java | 37 +++++++++++++++-----
.../apache/gobblin/util/PullFileLoaderTest.java | 36 ++++++++++++++++++-
2 files changed, 64 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd9447a5/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
index a68f9ac..210615c 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
@@ -23,6 +23,8 @@ import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
@@ -74,7 +76,7 @@ public class PullFileLoader {
public static final String PROPERTY_DELIMITER_PARSING_ENABLED_KEY = "property.parsing.enablekey";
public static final boolean DEFAULT_PROPERTY_DELIMITER_PARSING_ENABLED_KEY = false;
-
+
private final Path rootDirectory;
private final FileSystem fs;
private final ExtensionFilter javaPropsPullFileFilter;
@@ -145,28 +147,36 @@ public class PullFileLoader {
}
/**
- * Find and load all pull files under a base {@link Path} recursively.
+ * Find and load all pull files under a base {@link Path} recursively in an order sorted by last modified date.
* @param path base {@link Path} where pull files should be found recursively.
* @param sysProps A {@link Config} used as fallback.
* @param loadGlobalProperties if true, will also load at most one *.properties file per directory from the
* {@link #rootDirectory} to the pull file {@link Path} for each pull file.
* @return The loaded {@link Config}s.
*/
- public Collection<Config> loadPullFilesRecursively(Path path, Config sysProps, boolean loadGlobalProperties) {
+ public List<Config> loadPullFilesRecursively(Path path, Config sysProps, boolean loadGlobalProperties) {
try {
Config fallback = sysProps;
if (loadGlobalProperties && PathUtils.isAncestor(this.rootDirectory, path.getParent())) {
fallback = loadAncestorGlobalConfigs(path.getParent(), fallback);
}
- return loadPullFilesRecursivelyHelper(path, fallback, loadGlobalProperties);
+ return getSortedConfigs(loadPullFilesRecursivelyHelper(path, fallback, loadGlobalProperties));
} catch (IOException ioe) {
return Lists.newArrayList();
}
}
- private Collection<Config> loadPullFilesRecursivelyHelper(Path path, Config fallback, boolean loadGlobalProperties) {
- List<Config> pullFiles = Lists.newArrayList();
+ private List<Config> getSortedConfigs(List<ConfigWithTimeStamp> configsWithTimeStamps) {
+ List<Config> sortedConfigs = Lists.newArrayList();
+ Collections.sort(configsWithTimeStamps, (config1, config2) -> (config1.timeStamp > config2.timeStamp) ? 1 : -1);
+ for (ConfigWithTimeStamp configWithTimeStamp : configsWithTimeStamps) {
+ sortedConfigs.add(configWithTimeStamp.config);
+ }
+ return sortedConfigs;
+ }
+ private List<ConfigWithTimeStamp> loadPullFilesRecursivelyHelper(Path path, Config fallback, boolean loadGlobalProperties) {
+ List<ConfigWithTimeStamp> pullFiles = Lists.newArrayList();
try {
if (loadGlobalProperties) {
fallback = findAndLoadGlobalConfigInDirectory(path, fallback);
@@ -183,9 +193,11 @@ public class PullFileLoader {
if (status.isDirectory()) {
pullFiles.addAll(loadPullFilesRecursivelyHelper(status.getPath(), fallback, loadGlobalProperties));
} else if (this.javaPropsPullFileFilter.accept(status.getPath())) {
- pullFiles.add(loadJavaPropsWithFallback(status.getPath(), fallback).resolve());
+ log.debug("modification time of {} is {}", status.getPath(), status.getModificationTime());
+ pullFiles.add(new ConfigWithTimeStamp(status.getModificationTime(), loadJavaPropsWithFallback(status.getPath(), fallback).resolve()));
} else if (this.hoconPullFileFilter.accept(status.getPath())) {
- pullFiles.add(loadHoconConfigAtPath(status.getPath()).withFallback(fallback).resolve());
+ log.debug("modification time of {} is {}", status.getPath(), status.getModificationTime());
+ pullFiles.add(new ConfigWithTimeStamp(status.getModificationTime(), loadHoconConfigAtPath(status.getPath()).withFallback(fallback).resolve()));
}
} catch (IOException ioe) {
// Failed to load specific subpath, try with the other subpaths in this directory
@@ -298,4 +310,13 @@ public class PullFileLoader {
}
}
+ private static class ConfigWithTimeStamp {
+ long timeStamp;
+ Config config;
+
+ public ConfigWithTimeStamp(long timeStamp, Config config) {
+ this.timeStamp = timeStamp;
+ this.config = config;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd9447a5/gobblin-utility/src/test/java/org/apache/gobblin/util/PullFileLoaderTest.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/PullFileLoaderTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/PullFileLoaderTest.java
index 2560206..1c8bff8 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/PullFileLoaderTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/PullFileLoaderTest.java
@@ -17,8 +17,11 @@
package org.apache.gobblin.util;
+import java.io.File;
import java.io.IOException;
+import java.io.PrintWriter;
import java.util.Collection;
+import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
@@ -28,6 +31,7 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -139,6 +143,36 @@ public class PullFileLoaderTest {
Assert.assertEquals(pullFile.entrySet().size(), 4);
}
+ /**
+ * Tests to verify job written first to the job catalog is picked up first.
+ * @throws Exception
+ */
+ @Test void testJobLoadingOrder() throws Exception {
+ Properties sysProps = new Properties();
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ File tmpDir = Files.createTempDir();
+ tmpDir.deleteOnExit();
+ Path localBasePath = new Path(tmpDir.getAbsolutePath(), "PullFileLoaderTestDir");
+ fs.mkdirs(localBasePath);
+
+ for (int i=5; i>0; i--) {
+ String job = localBasePath.toString() + "/job" + i + ".conf";
+ PrintWriter writer = new PrintWriter(job, "UTF-8");
+ writer.println("key=job" + i + "_val");
+ writer.close();
+ Thread.sleep(1000);
+ }
+
+ List<Config> configs =
+ loader.loadPullFilesRecursively(localBasePath, ConfigUtils.propertiesToConfig(sysProps), false);
+
+ int i = 5;
+ for (Config config : configs) {
+ Assert.assertEquals(config.getString("key"), "job" + i + "_val");
+ i--;
+ }
+ }
+
@Test
public void testJobLoadingWithSysPropsAndGlobalProps() throws Exception {
Path path;
@@ -230,7 +264,7 @@ public class PullFileLoaderTest {
pullFile = loader.loadPullFile(path, cfg, false);
Assert.assertEquals(pullFile.getString("json.property.key"), pullFile.getString("json.property.key1"));
}
-
+
private Config pullFileFromPath(Collection<Config> configs, Path path) throws IOException {
for (Config config : configs) {
if (config.getString(ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY).equals(path.toString())) {