You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by jb...@apache.org on 2018/01/13 07:02:30 UTC

incubator-gobblin git commit: [GOBBLIN-207] Use hadoop filesystem to download job data

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 9f69cd395 -> 4bdd0482e


[GOBBLIN-207] Use hadoop filesystem to download job data

This adds the ability to download the job package
using the Hadoop filesystem.  This opens up new
choices of where to store the job package and
allows for better authN support.  If the config
keys `JOB_CONF_SOURCE_FILE_FS_URI_KEY` and
`JOB_CONF_SOURCE_FILE_PATH_KEY` are set, then the
hadoop method will be used.  If the config key
`JOB_CONF_S3_URI_KEY` is set, then the legacy
method will be used.

Closes #2059 from kadaan/Use_hadoop_filesystem_to_
download_job_package


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/4bdd0482
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/4bdd0482
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/4bdd0482

Branch: refs/heads/master
Commit: 4bdd0482e815013ee016ede4385a9ba339621f1b
Parents: 9f69cd3
Author: Joel Baranick <jb...@apache.org>
Authored: Fri Jan 12 22:58:30 2018 -0800
Committer: Joel Baranick <jo...@ensighten.com>
Committed: Fri Jan 12 22:58:46 2018 -0800

----------------------------------------------------------------------
 .../gobblin/aws/AWSJobConfigurationManager.java |  82 ++++++++--
 .../aws/GobblinAWSConfigurationKeys.java        |   2 +
 .../aws/AWSJobConfigurationManagerTest.java     | 126 +--------------
 .../aws/BaseAWSJobConfigurationManagerTest.java | 156 +++++++++++++++++++
 .../LegacyAWSJobConfigurationManagerTest.java   |  39 +++++
 5 files changed, 271 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4bdd0482/gobblin-aws/src/main/java/org/apache/gobblin/aws/AWSJobConfigurationManager.java
----------------------------------------------------------------------
diff --git a/gobblin-aws/src/main/java/org/apache/gobblin/aws/AWSJobConfigurationManager.java b/gobblin-aws/src/main/java/org/apache/gobblin/aws/AWSJobConfigurationManager.java
index 0e45064..6ad15c2 100644
--- a/gobblin-aws/src/main/java/org/apache/gobblin/aws/AWSJobConfigurationManager.java
+++ b/gobblin-aws/src/main/java/org/apache/gobblin/aws/AWSJobConfigurationManager.java
@@ -21,6 +21,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.URI;
 import java.net.URL;
 import java.util.Enumeration;
 import java.util.List;
@@ -32,10 +33,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
 
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,6 +41,12 @@ import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
 import com.typesafe.config.Config;
 
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.cluster.GobblinHelixJobScheduler;
@@ -52,10 +55,13 @@ import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.SchedulerUtils;
 
 import static org.apache.gobblin.aws.GobblinAWSUtils.appendSlash;
 
+import lombok.Value;
+
 
 /**
  * Class for managing AWS Gobblin job configurations.
@@ -75,7 +81,7 @@ public class AWSJobConfigurationManager extends JobConfigurationManager {
 
   private static final long DEFAULT_JOB_CONF_REFRESH_INTERVAL = 60;
 
-  private Optional<String> jobConfS3Uri;
+  private Optional<JobArchiveRetriever> jobArchiveRetriever;
   private Map<String, Properties> jobConfFiles;
 
   private final long refreshIntervalInSeconds;
@@ -100,9 +106,7 @@ public class AWSJobConfigurationManager extends JobConfigurationManager {
     this.jobConfDirPath =
         config.hasPath(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY) ? Optional
             .of(config.getString(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY)) : Optional.<String>absent();
-    this.jobConfS3Uri =
-        config.hasPath(GobblinAWSConfigurationKeys.JOB_CONF_S3_URI_KEY) ? Optional
-            .of(config.getString(GobblinAWSConfigurationKeys.JOB_CONF_S3_URI_KEY)) : Optional.<String>absent();
+    this.jobArchiveRetriever = this.getJobArchiveRetriever(config);
   }
 
   @Override
@@ -133,14 +137,10 @@ public class AWSJobConfigurationManager extends JobConfigurationManager {
 
     // TODO: Eventually when config store supports job files as well
     // .. we can replace this logic with config store
-    if (this.jobConfS3Uri.isPresent() && this.jobConfDirPath.isPresent()) {
-
+    if (this.jobArchiveRetriever.isPresent() && this.jobConfDirPath.isPresent()) {
       // Download the zip file
-      final String zipFile = appendSlash(this.jobConfDirPath.get()) +
-          StringUtils.substringAfterLast(this.jobConfS3Uri.get(), File.separator);
-      LOGGER.debug("Downloading to zip: " + zipFile + " from uri: " + this.jobConfS3Uri.get());
+      final String zipFile = this.jobArchiveRetriever.get().retrieve(this.config, this.jobConfDirPath.get());
 
-      FileUtils.copyURLToFile(new URL(this.jobConfS3Uri.get()), new File(zipFile));
       final String extractedPullFilesPath = appendSlash(this.jobConfDirPath.get()) + "files";
 
       // Extract the zip file
@@ -233,4 +233,58 @@ public class AWSJobConfigurationManager extends JobConfigurationManager {
   protected void shutDown() throws Exception {
     GobblinAWSUtils.shutdownExecutorService(this.getClass(), this.fetchJobConfExecutor, LOGGER);
   }
+
+  private Optional<JobArchiveRetriever> getJobArchiveRetriever(Config config) {
+    if (config.hasPath(GobblinAWSConfigurationKeys.JOB_CONF_SOURCE_FILE_FS_URI_KEY) &&
+            config.hasPath(GobblinAWSConfigurationKeys.JOB_CONF_SOURCE_FILE_PATH_KEY)) {
+      return Optional.of(new HadoopJobArchiveRetriever(config.getString(GobblinAWSConfigurationKeys.JOB_CONF_SOURCE_FILE_FS_URI_KEY),
+              config.getString(GobblinAWSConfigurationKeys.JOB_CONF_SOURCE_FILE_PATH_KEY)));
+    }
+
+    if (config.hasPath(GobblinAWSConfigurationKeys.JOB_CONF_S3_URI_KEY)) {
+      LOGGER.warn("GobblinAWSConfigurationKeys.JOB_CONF_S3_URI_KEY is deprecated.  " +
+              "Switch to GobblinAWSConfigurationKeys.JOB_CONF_SOURCE_FILE_FS_URI_KEY and " +
+              "GobblinAWSConfigurationKeys.JOB_CONF_SOURCE_FILE_PATH_KEY.");
+      return Optional.of(new LegacyJobArchiveRetriever(config.getString(GobblinAWSConfigurationKeys.JOB_CONF_S3_URI_KEY)));
+    }
+
+    return Optional.absent();
+  }
+
+  private interface JobArchiveRetriever {
+    String retrieve(Config config, String targetDir) throws IOException;
+  }
+
+  @Value
+  private static class LegacyJobArchiveRetriever implements JobArchiveRetriever {
+    String uri;
+
+    @Override
+    public String retrieve(Config config, String targetDir) throws IOException {
+      final String zipFile = appendSlash(targetDir) +
+              StringUtils.substringAfterLast(this.uri, File.separator);
+      LOGGER.debug("Downloading to zip: " + zipFile + " from uri: " + uri);
+      FileUtils.copyURLToFile(new URL(this.uri), new File(zipFile));
+      return zipFile;
+    }
+  }
+
+  @Value
+  private static class HadoopJobArchiveRetriever implements JobArchiveRetriever {
+    String fsUri;
+    String path;
+
+    @Override
+    public String retrieve(Config config, String targetDir) throws IOException {
+      URI uri = URI.create(this.fsUri);
+      FileSystem fs = FileSystem.get(uri, HadoopUtils.getConfFromState(ConfigUtils.configToState(config)));
+
+      final Path sourceFile = new Path(path);
+      final String zipFile = appendSlash(targetDir) +
+              StringUtils.substringAfterLast(this.path, File.separator);
+      LOGGER.debug("Downloading to zip: " + zipFile + " from uri: " + sourceFile);
+      fs.copyToLocalFile(sourceFile, new Path(zipFile));
+      return zipFile;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4bdd0482/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSConfigurationKeys.java b/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSConfigurationKeys.java
index 3b50947..13d1272 100644
--- a/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSConfigurationKeys.java
+++ b/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSConfigurationKeys.java
@@ -85,6 +85,8 @@ public class GobblinAWSConfigurationKeys {
 
   // Job conf properties.
   public static final String JOB_CONF_S3_URI_KEY = GOBBLIN_AWS_PREFIX + "job.conf.s3.uri";
+  public static final String JOB_CONF_SOURCE_FILE_FS_URI_KEY = GOBBLIN_AWS_PREFIX + "job.conf.source.file.fs.uri";
+  public static final String JOB_CONF_SOURCE_FILE_PATH_KEY = GOBBLIN_AWS_PREFIX + "job.conf.source.file.path";
   public static final String JOB_CONF_REFRESH_INTERVAL = GOBBLIN_AWS_PREFIX + "job.conf.refresh.interval";
 
   // Work environment properties.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4bdd0482/gobblin-aws/src/test/java/org/apache/gobblin/aws/AWSJobConfigurationManagerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-aws/src/test/java/org/apache/gobblin/aws/AWSJobConfigurationManagerTest.java b/gobblin-aws/src/test/java/org/apache/gobblin/aws/AWSJobConfigurationManagerTest.java
index 8bcf048..c576bd2 100644
--- a/gobblin-aws/src/test/java/org/apache/gobblin/aws/AWSJobConfigurationManagerTest.java
+++ b/gobblin-aws/src/test/java/org/apache/gobblin/aws/AWSJobConfigurationManagerTest.java
@@ -17,28 +17,11 @@
 
 package org.apache.gobblin.aws;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.URL;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
-import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
-import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
 
 
 /**
@@ -47,108 +30,11 @@ import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
  * @author Abhishek Tiwari
  */
 @Test(groups = { "gobblin.aws" })
-public class AWSJobConfigurationManagerTest {
-  private static final int NUM_JOB_CONFIG_FILES = 1;
-
-  private static final String JOB_NAME_KEY = "job.name";
-  private static final String JOB_FIRST_NAME = "PullFromWikipedia1";
-  private static final String JOB_FIRST_ZIP = "wikipedia1.zip";
-  private static final String JOB_SECOND_NAME = "PullFromWikipedia2";
-  private static final String JOB_SECOND_ZIP = "wikipedia2.zip";
-  private static final String URI_ZIP_NAME = "wikipedia.zip";
-
-  private static final String JOB_CONFIG_DIR_NAME = AWSJobConfigurationManagerTest.class.getSimpleName();
-
-  private final File jobConfigFileDir = new File(JOB_CONFIG_DIR_NAME + "_" + System.currentTimeMillis());
-  private final EventBus eventBus = new EventBus();
-  private AWSJobConfigurationManager jobConfigurationManager;
-
-  private final List<Properties> receivedJobConfigs = Lists.newLinkedList();
-  private final CountDownLatch countDownLatchBootUp = new CountDownLatch(NUM_JOB_CONFIG_FILES);
-  private final CountDownLatch countDownLatchUpdate = new CountDownLatch(NUM_JOB_CONFIG_FILES);
-
-  @BeforeClass
-  public void setUp() throws Exception {
-    this.eventBus.register(this);
-
-    // Prepare the test url to download the job conf from
-    final URL url = GobblinAWSClusterLauncherTest.class.getClassLoader().getResource(JOB_FIRST_ZIP);
-    final String jobConfZipUri = getJobConfigZipUri(new File(url.toURI()));
-
-    // Prepare the test dir to download the job conf to
-    if (this.jobConfigFileDir.exists()) {
-      FileUtils.deleteDirectory(this.jobConfigFileDir);
-    }
-    Assert.assertTrue(this.jobConfigFileDir.mkdirs(), "Failed to create " + this.jobConfigFileDir);
-
-    final Config config = ConfigFactory.empty()
-        .withValue(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY, ConfigValueFactory.fromAnyRef(this.jobConfigFileDir.toString()))
-        .withValue(GobblinAWSConfigurationKeys.JOB_CONF_S3_URI_KEY, ConfigValueFactory.fromAnyRef(jobConfZipUri))
-        .withValue(GobblinAWSConfigurationKeys.JOB_CONF_REFRESH_INTERVAL, ConfigValueFactory.fromAnyRef("10s"));
-    this.jobConfigurationManager = new AWSJobConfigurationManager(this.eventBus, config);
-    this.jobConfigurationManager.startAsync().awaitRunning();
-  }
-
-  @Test(enabled = false)
-  private String getJobConfigZipUri(File source) throws IOException {
-    final File destination = new File(StringUtils.substringBeforeLast(source.toString(), File.separator) + File.separator
-        + URI_ZIP_NAME);
-    if (destination.exists()) {
-      if (!destination.delete()) {
-        throw new IOException("Cannot clean destination job conf zip file: " + destination);
-      }
-    }
-    FileUtils.copyFile(source, destination);
-
-    return destination.toURI().toString();
-  }
-
-  @Test
-  public void testBootUpNewJobConfigs() throws Exception {
-    // Wait for all job configs to be received
-    this.countDownLatchBootUp.await();
-
-    // Wikipedia1.zip has only 1 conf file, so we should only receive that
-    Assert.assertEquals(this.receivedJobConfigs.size(), 1);
-    Assert.assertEquals(this.receivedJobConfigs.get(0).getProperty(JOB_NAME_KEY), JOB_FIRST_NAME);
-
-  }
-
-  @Test(dependsOnMethods = "testBootUpNewJobConfigs")
-  public void testUpdatedNewJobConfigs() throws Exception {
-    // Change zip file in the Uri that JobConfigManager is watching
-    final URL url = GobblinAWSClusterLauncherTest.class.getClassLoader().getResource(JOB_SECOND_ZIP);
-    final String jobConfZipUri = getJobConfigZipUri(new File(url.toURI()));
-
-    // Wait for all job configs to be received (after scheduled execution of 1 minute)
-    this.countDownLatchUpdate.await();
-
-    // Wikipedia2.zip has only 2 conf files:
-    // 1. The original job conf that is not changed
-    // 2. A new job conf that has been added
-    // So, we should only receive one new / updated job conf (ie. total number of configs = 2)
-    Assert.assertEquals(this.receivedJobConfigs.size(), 2);
-    Assert.assertEquals(this.receivedJobConfigs.get(1).getProperty(JOB_NAME_KEY), JOB_SECOND_NAME);
-  }
-
-  @AfterClass
-  public void tearDown() throws IOException {
-    this.jobConfigurationManager.stopAsync().awaitTerminated();
-    if (this.jobConfigFileDir.exists()) {
-      FileUtils.deleteDirectory(this.jobConfigFileDir);
-    }
-  }
-
-  @Test(enabled = false)
-  @Subscribe
-  public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobConfigArrivalEvent) {
-    Properties jobConfig = newJobConfigArrivalEvent.getJobConfig();
-    this.receivedJobConfigs.add(jobConfig);
-
-    if (jobConfig.getProperty(JOB_NAME_KEY).equalsIgnoreCase(JOB_FIRST_NAME)) {
-      this.countDownLatchBootUp.countDown();
-    } else {
-      this.countDownLatchUpdate.countDown();
-    }
+public class AWSJobConfigurationManagerTest extends BaseAWSJobConfigurationManagerTest {
+  @Override
+  protected Config getConfig(String jobConfZipUri) {
+    return ConfigFactory.empty()
+        .withValue(GobblinAWSConfigurationKeys.JOB_CONF_SOURCE_FILE_FS_URI_KEY, ConfigValueFactory.fromAnyRef("file:///"))
+        .withValue(GobblinAWSConfigurationKeys.JOB_CONF_SOURCE_FILE_PATH_KEY, ConfigValueFactory.fromAnyRef(jobConfZipUri));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4bdd0482/gobblin-aws/src/test/java/org/apache/gobblin/aws/BaseAWSJobConfigurationManagerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-aws/src/test/java/org/apache/gobblin/aws/BaseAWSJobConfigurationManagerTest.java b/gobblin-aws/src/test/java/org/apache/gobblin/aws/BaseAWSJobConfigurationManagerTest.java
new file mode 100644
index 0000000..c8f9040
--- /dev/null
+++ b/gobblin-aws/src/test/java/org/apache/gobblin/aws/BaseAWSJobConfigurationManagerTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.aws;
+
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+
+/**
+ * Unit tests for {@link AWSJobConfigurationManager}.
+ *
+ * @author Abhishek Tiwari
+ */
+@Test(groups = { "gobblin.aws" })
+public abstract class BaseAWSJobConfigurationManagerTest {
+  private static final int NUM_JOB_CONFIG_FILES = 1;
+
+  private static final String JOB_NAME_KEY = "job.name";
+  private static final String JOB_FIRST_NAME = "PullFromWikipedia1";
+  private static final String JOB_FIRST_ZIP = "wikipedia1.zip";
+  private static final String JOB_SECOND_NAME = "PullFromWikipedia2";
+  private static final String JOB_SECOND_ZIP = "wikipedia2.zip";
+  private static final String URI_ZIP_NAME = "wikipedia.zip";
+
+  private static final String JOB_CONFIG_DIR_NAME = BaseAWSJobConfigurationManagerTest.class.getSimpleName();
+
+  private final File jobConfigFileDir = new File(JOB_CONFIG_DIR_NAME + "_" + System.currentTimeMillis());
+  private final EventBus eventBus = new EventBus();
+  private AWSJobConfigurationManager jobConfigurationManager;
+
+  private final List<Properties> receivedJobConfigs = Lists.newLinkedList();
+  private final CountDownLatch countDownLatchBootUp = new CountDownLatch(NUM_JOB_CONFIG_FILES);
+  private final CountDownLatch countDownLatchUpdate = new CountDownLatch(NUM_JOB_CONFIG_FILES);
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    this.eventBus.register(this);
+
+    // Prepare the test url to download the job conf from
+    final URL url = GobblinAWSClusterLauncherTest.class.getClassLoader().getResource(JOB_FIRST_ZIP);
+    final String jobConfZipUri = getJobConfigZipUri(new File(url.toURI()));
+
+    // Prepare the test dir to download the job conf to
+    if (this.jobConfigFileDir.exists()) {
+      FileUtils.deleteDirectory(this.jobConfigFileDir);
+    }
+    Assert.assertTrue(this.jobConfigFileDir.mkdirs(), "Failed to create " + this.jobConfigFileDir);
+
+    final Config config = getConfig(jobConfZipUri)
+            .withValue(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY, ConfigValueFactory.fromAnyRef(this.jobConfigFileDir.toString()))
+            .withValue(GobblinAWSConfigurationKeys.JOB_CONF_REFRESH_INTERVAL, ConfigValueFactory.fromAnyRef("10s"));
+
+    this.jobConfigurationManager = new AWSJobConfigurationManager(this.eventBus, config);
+    this.jobConfigurationManager.startAsync().awaitRunning();
+  }
+
+  protected abstract Config getConfig(String jobConfZipUri);
+
+  @Test(enabled = false)
+  private String getJobConfigZipUri(File source) throws IOException {
+    final File destination = new File(StringUtils.substringBeforeLast(source.toString(), File.separator) + File.separator
+        + URI_ZIP_NAME);
+    if (destination.exists()) {
+      if (!destination.delete()) {
+        throw new IOException("Cannot clean destination job conf zip file: " + destination);
+      }
+    }
+    FileUtils.copyFile(source, destination);
+
+    return destination.toURI().toString();
+  }
+
+  @Test
+  public void testBootUpNewJobConfigs() throws Exception {
+    // Wait for all job configs to be received
+    this.countDownLatchBootUp.await();
+
+    // Wikipedia1.zip has only 1 conf file, so we should only receive that
+    Assert.assertEquals(this.receivedJobConfigs.size(), 1);
+    Assert.assertEquals(this.receivedJobConfigs.get(0).getProperty(JOB_NAME_KEY), JOB_FIRST_NAME);
+
+  }
+
+  @Test(dependsOnMethods = "testBootUpNewJobConfigs")
+  public void testUpdatedNewJobConfigs() throws Exception {
+    // Change zip file in the Uri that JobConfigManager is watching
+    final URL url = GobblinAWSClusterLauncherTest.class.getClassLoader().getResource(JOB_SECOND_ZIP);
+    final String jobConfZipUri = getJobConfigZipUri(new File(url.toURI()));
+
+    // Wait for all job configs to be received (after scheduled execution of 1 minute)
+    this.countDownLatchUpdate.await();
+
+    // Wikipedia2.zip has only 2 conf files:
+    // 1. The original job conf that is not changed
+    // 2. A new job conf that has been added
+    // So, we should only receive one new / updated job conf (ie. total number of configs = 2)
+    Assert.assertEquals(this.receivedJobConfigs.size(), 2);
+    Assert.assertEquals(this.receivedJobConfigs.get(1).getProperty(JOB_NAME_KEY), JOB_SECOND_NAME);
+  }
+
+  @AfterClass
+  public void tearDown() throws IOException {
+    this.jobConfigurationManager.stopAsync().awaitTerminated();
+    if (this.jobConfigFileDir.exists()) {
+      FileUtils.deleteDirectory(this.jobConfigFileDir);
+    }
+  }
+
+  @Test(enabled = false)
+  @Subscribe
+  public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobConfigArrivalEvent) {
+    Properties jobConfig = newJobConfigArrivalEvent.getJobConfig();
+    this.receivedJobConfigs.add(jobConfig);
+
+    if (jobConfig.getProperty(JOB_NAME_KEY).equalsIgnoreCase(JOB_FIRST_NAME)) {
+      this.countDownLatchBootUp.countDown();
+    } else {
+      this.countDownLatchUpdate.countDown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4bdd0482/gobblin-aws/src/test/java/org/apache/gobblin/aws/LegacyAWSJobConfigurationManagerTest.java
----------------------------------------------------------------------
diff --git a/gobblin-aws/src/test/java/org/apache/gobblin/aws/LegacyAWSJobConfigurationManagerTest.java b/gobblin-aws/src/test/java/org/apache/gobblin/aws/LegacyAWSJobConfigurationManagerTest.java
new file mode 100644
index 0000000..cb5e793
--- /dev/null
+++ b/gobblin-aws/src/test/java/org/apache/gobblin/aws/LegacyAWSJobConfigurationManagerTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.aws;
+
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+
+/**
+ * Unit tests for {@link AWSJobConfigurationManager}.
+ *
+ * @author Abhishek Tiwari
+ */
+@Test(groups = { "gobblin.aws" })
+public class LegacyAWSJobConfigurationManagerTest extends BaseAWSJobConfigurationManagerTest {
+  @Override
+  protected Config getConfig(String jobConfZipUri) {
+    return ConfigFactory.empty()
+            .withValue(GobblinAWSConfigurationKeys.JOB_CONF_S3_URI_KEY, ConfigValueFactory.fromAnyRef(jobConfZipUri));
+  }
+}