You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:35 UTC

[17/50] incubator-gobblin git commit: [GOBBLIN-387] pick job files in FIFO order

[GOBBLIN-387] pick job files in FIFO order

pick job files in FIFO order

delete temp file on exit

dummy commit

fix findBugsMain

Closes #2264 from arjun4084346/jobOrder


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/cd9447a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/cd9447a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/cd9447a5

Branch: refs/heads/0.12.0
Commit: cd9447a58c0bc66bc24d223803bd8f33dda31887
Parents: 94bcc16
Author: Arjun <ab...@linkedin.com>
Authored: Mon Feb 5 11:44:14 2018 -0800
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Mon Feb 5 11:44:14 2018 -0800

----------------------------------------------------------------------
 .../org/apache/gobblin/util/PullFileLoader.java | 37 +++++++++++++++-----
 .../apache/gobblin/util/PullFileLoaderTest.java | 36 ++++++++++++++++++-
 2 files changed, 64 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd9447a5/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
index a68f9ac..210615c 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PullFileLoader.java
@@ -23,6 +23,8 @@ import java.io.InputStreamReader;
 import java.io.Reader;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
@@ -74,7 +76,7 @@ public class PullFileLoader {
 
   public static final String PROPERTY_DELIMITER_PARSING_ENABLED_KEY = "property.parsing.enablekey";
   public static final boolean DEFAULT_PROPERTY_DELIMITER_PARSING_ENABLED_KEY = false;
-  
+
   private final Path rootDirectory;
   private final FileSystem fs;
   private final ExtensionFilter javaPropsPullFileFilter;
@@ -145,28 +147,36 @@ public class PullFileLoader {
   }
 
   /**
-   * Find and load all pull files under a base {@link Path} recursively.
+   * Find and load all pull files under a base {@link Path} recursively in an order sorted by last modified date.
    * @param path base {@link Path} where pull files should be found recursively.
    * @param sysProps A {@link Config} used as fallback.
    * @param loadGlobalProperties if true, will also load at most one *.properties file per directory from the
    *          {@link #rootDirectory} to the pull file {@link Path} for each pull file.
    * @return The loaded {@link Config}s.
    */
-  public Collection<Config> loadPullFilesRecursively(Path path, Config sysProps, boolean loadGlobalProperties) {
+  public List<Config> loadPullFilesRecursively(Path path, Config sysProps, boolean loadGlobalProperties) {
     try {
       Config fallback = sysProps;
       if (loadGlobalProperties && PathUtils.isAncestor(this.rootDirectory, path.getParent())) {
         fallback = loadAncestorGlobalConfigs(path.getParent(), fallback);
       }
-      return loadPullFilesRecursivelyHelper(path, fallback, loadGlobalProperties);
+      return getSortedConfigs(loadPullFilesRecursivelyHelper(path, fallback, loadGlobalProperties));
     } catch (IOException ioe) {
       return Lists.newArrayList();
     }
   }
 
-  private Collection<Config> loadPullFilesRecursivelyHelper(Path path, Config fallback, boolean loadGlobalProperties) {
-    List<Config> pullFiles = Lists.newArrayList();
+  private List<Config> getSortedConfigs(List<ConfigWithTimeStamp> configsWithTimeStamps) {
+    List<Config> sortedConfigs = Lists.newArrayList();
+    Collections.sort(configsWithTimeStamps, (config1, config2) -> (config1.timeStamp > config2.timeStamp) ? 1 : -1);
+    for (ConfigWithTimeStamp configWithTimeStamp : configsWithTimeStamps) {
+      sortedConfigs.add(configWithTimeStamp.config);
+    }
+    return sortedConfigs;
+  }
 
+  private List<ConfigWithTimeStamp> loadPullFilesRecursivelyHelper(Path path, Config fallback, boolean loadGlobalProperties) {
+    List<ConfigWithTimeStamp> pullFiles = Lists.newArrayList();
     try {
       if (loadGlobalProperties) {
         fallback = findAndLoadGlobalConfigInDirectory(path, fallback);
@@ -183,9 +193,11 @@ public class PullFileLoader {
           if (status.isDirectory()) {
             pullFiles.addAll(loadPullFilesRecursivelyHelper(status.getPath(), fallback, loadGlobalProperties));
           } else if (this.javaPropsPullFileFilter.accept(status.getPath())) {
-            pullFiles.add(loadJavaPropsWithFallback(status.getPath(), fallback).resolve());
+            log.debug("modification time of {} is {}", status.getPath(), status.getModificationTime());
+            pullFiles.add(new ConfigWithTimeStamp(status.getModificationTime(), loadJavaPropsWithFallback(status.getPath(), fallback).resolve()));
           } else if (this.hoconPullFileFilter.accept(status.getPath())) {
-            pullFiles.add(loadHoconConfigAtPath(status.getPath()).withFallback(fallback).resolve());
+            log.debug("modification time of {} is {}", status.getPath(), status.getModificationTime());
+            pullFiles.add(new ConfigWithTimeStamp(status.getModificationTime(), loadHoconConfigAtPath(status.getPath()).withFallback(fallback).resolve()));
           }
         } catch (IOException ioe) {
           // Failed to load specific subpath, try with the other subpaths in this directory
@@ -298,4 +310,13 @@ public class PullFileLoader {
     }
   }
 
+  private static class ConfigWithTimeStamp {
+    long timeStamp;
+    Config config;
+
+    public ConfigWithTimeStamp(long timeStamp, Config config) {
+      this.timeStamp = timeStamp;
+      this.config = config;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/cd9447a5/gobblin-utility/src/test/java/org/apache/gobblin/util/PullFileLoaderTest.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/PullFileLoaderTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/PullFileLoaderTest.java
index 2560206..1c8bff8 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/PullFileLoaderTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/PullFileLoaderTest.java
@@ -17,8 +17,11 @@
 
 package org.apache.gobblin.util;
 
+import java.io.File;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.util.Collection;
+import java.util.List;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
@@ -28,6 +31,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
@@ -139,6 +143,36 @@ public class PullFileLoaderTest {
     Assert.assertEquals(pullFile.entrySet().size(), 4);
   }
 
+  /**
+   * Tests to verify job written first to the job catalog is picked up first.
+   * @throws Exception
+   */
+  @Test void testJobLoadingOrder() throws Exception {
+    Properties sysProps = new Properties();
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+    Path localBasePath = new Path(tmpDir.getAbsolutePath(), "PullFileLoaderTestDir");
+    fs.mkdirs(localBasePath);
+
+    for (int i=5; i>0; i--) {
+      String job = localBasePath.toString() + "/job" + i + ".conf";
+      PrintWriter writer = new PrintWriter(job, "UTF-8");
+      writer.println("key=job" + i + "_val");
+      writer.close();
+      Thread.sleep(1000);
+    }
+
+    List<Config> configs =
+        loader.loadPullFilesRecursively(localBasePath, ConfigUtils.propertiesToConfig(sysProps), false);
+
+    int i = 5;
+    for (Config config : configs) {
+      Assert.assertEquals(config.getString("key"), "job" + i + "_val");
+      i--;
+    }
+  }
+
   @Test
   public void testJobLoadingWithSysPropsAndGlobalProps() throws Exception {
     Path path;
@@ -230,7 +264,7 @@ public class PullFileLoaderTest {
     pullFile = loader.loadPullFile(path, cfg, false);
     Assert.assertEquals(pullFile.getString("json.property.key"), pullFile.getString("json.property.key1"));
   }
-  
+
   private Config pullFileFromPath(Collection<Config> configs, Path path) throws IOException {
     for (Config config : configs) {
       if (config.getString(ConfigurationKeys.JOB_CONFIG_FILE_PATH_KEY).equals(path.toString())) {