You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2016/01/11 10:47:21 UTC

[1/4] falcon git commit: FALCON-1678 SLA Monitoring does not honour entity end date. Contributed by Ajay Yadava.

Repository: falcon
Updated Branches:
  refs/heads/master a087dca5d -> 4656f692a


FALCON-1678 SLA Monitoring does not honour entity end date. Contributed by Ajay Yadava.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f9953426
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f9953426
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f9953426

Branch: refs/heads/master
Commit: f99534264ef2ba7162b9ba038186ae41e7a85462
Parents: a087dca
Author: Ajay Yadava <aj...@gmail.com>
Authored: Mon Jan 11 14:42:45 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Jan 11 14:42:45 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../org/apache/falcon/entity/FeedHelper.java    |  9 +++
 .../falcon/util/DeploymentProperties.java       |  2 +-
 .../apache/falcon/entity/FeedHelperTest.java    | 16 ++++
 .../service/FeedSLAMonitoringService.java       |  7 +-
 .../falcon/service/FeedSLAMonitoringTest.java   | 85 ++++++++++++++++++--
 6 files changed, 110 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/f9953426/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 297ba0c..5c2a772 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -95,6 +95,8 @@ Proposed Release Version: 0.9
   OPTIMIZATIONS
 
   BUG FIXES
+    FALCON-1678 SLA Monitoring does not honour entity end date(Ajay Yadava)
+
     FALCON-1708  params API does not take start as a mandatory option(Praveen Adlakha via Ajay Yadava)
 
     FALCON-1725 Falcon API shows results in ascending order in native scheduler (Pallavi Rao)

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9953426/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 150e0bd..575ceb3 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -42,6 +42,7 @@ import org.apache.falcon.entity.v0.feed.MergeType;
 import org.apache.falcon.entity.v0.feed.Property;
 import org.apache.falcon.entity.v0.feed.RetentionStage;
 import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.entity.v0.feed.Validity;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Process;
@@ -1128,6 +1129,14 @@ public final class FeedHelper {
         return argsMap;
     }
 
+    public static Validity getClusterValidity(Feed feed, String clusterName) throws FalconException {
+        Cluster cluster = getCluster(feed, clusterName);
+        if (cluster == null) {
+            throw new FalconException("Invalid cluster: " + clusterName + " for feed: " + feed.getName());
+        }
+        return cluster.getValidity();
+    }
+
     public static Frequency getOldRetentionFrequency(Feed feed) {
         Frequency feedFrequency = feed.getFrequency();
         Frequency defaultFrequency = new Frequency("hours(24)");

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9953426/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java b/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java
index 715b7ba..5879f30 100644
--- a/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java
+++ b/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java
@@ -31,7 +31,7 @@ public final class DeploymentProperties extends ApplicationProperties {
     private static final String PROPERTY_FILE = "deploy.properties";
 
     private static final AtomicReference<DeploymentProperties> INSTANCE =
-            new AtomicReference<DeploymentProperties>();
+            new AtomicReference<>();
 
     private DeploymentProperties() throws FalconException {
         super();

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9953426/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
index 9841083..d565f94 100644
--- a/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FeedHelperTest.java
@@ -929,6 +929,22 @@ public class FeedHelperTest extends AbstractTestBase {
         Assert.assertEquals(startInstResult, feed.getClusters().getClusters().get(0).getValidity().getStart());
     }
 
+    @Test
+    public void testGetFeedClusterValidity() throws  Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "hours(1)",  "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC");
+        Validity validity = FeedHelper.getClusterValidity(feed, cluster.getName());
+        Assert.assertEquals(validity.getStart(), getDate("2012-02-07 00:00 UTC"));
+        Assert.assertEquals(validity.getEnd(), getDate("2020-02-25 00:00 UTC"));
+    }
+
+    @Test(expectedExceptions = FalconException.class)
+    public void testGetClusterValidityInvalidCluster() throws Exception {
+        Cluster cluster = publishCluster();
+        Feed feed = publishFeed(cluster, "hours(1)",  "2012-02-07 00:00 UTC", "2020-02-25 00:00 UTC");
+        FeedHelper.getClusterValidity(feed, "abracadabra");
+    }
+
     private Validity getFeedValidity(String start, String end) throws ParseException {
         Validity validity = new Validity();
         validity.setStart(getDate(start));

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9953426/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
index 1cd571e..8ffecd8 100644
--- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
@@ -80,7 +80,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
         return SERVICE;
     }
 
-    private int queueSize;
+    protected int queueSize;
 
     /**
      * Permissions for storePath.
@@ -90,7 +90,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
     /**
      * Feeds to be monitored.
      */
-    private Set<String> monitoredFeeds;
+    protected Set<String> monitoredFeeds;
 
 
     /**
@@ -340,7 +340,8 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
                     org.apache.falcon.entity.v0.cluster.Cluster currentCluster =
                             EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName());
                     nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime);
-                    while (nextInstanceTime.before(to)) {
+                    Date endDate = FeedHelper.getClusterValidity(feed, currentCluster.getName()).getEnd();
+                    while (nextInstanceTime.before(to) && nextInstanceTime.before(endDate)) {
                         if (instances.size() >= queueSize) { // if no space, first make some space
                             LOG.debug("Removing instance={} for <feed,cluster>={}", instances.peek(), key);
                             exists.remove(instances.peek());

http://git-wip-us.apache.org/repos/asf/falcon/blob/f9953426/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
index e3dd5cc..90eec4d 100644
--- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
+++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
@@ -18,27 +18,42 @@
 
 package org.apache.falcon.service;
 
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Pair;
+import org.apache.falcon.entity.AbstractTestBase;
 import org.apache.falcon.entity.EntityNotRegisteredException;
 import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.Clusters;
+import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.entity.v0.feed.Validity;
 import org.apache.falcon.resource.AbstractSchedulableEntityManager;
+
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
 /**
  * Tests for FeedSLAMonitoring Service.
  */
-public class FeedSLAMonitoringTest {
+public class FeedSLAMonitoringTest extends AbstractTestBase {
+    private static final String CLUSTER_NAME = "testCluster";
+    private static final String FEED_NAME = "testFeed";
+    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
 
     @Test
     public void testSLAStatus() throws FalconException {
@@ -122,4 +137,60 @@ public class FeedSLAMonitoringTest {
 
         Assert.assertEquals(FeedSLAMonitoringService.get().pendingInstances.get(feedCluster).size(), 1);
     }
+
+    @Test
+    public void testEndDateCheck() throws Exception {
+        Cluster cluster = publishCluster();
+        publishFeed(cluster, "hours(1)", "2015-11-20 00:00 UTC", "2015-11-20 05:00 UTC");
+        Pair<String, String> feedCluster = new Pair<>(FEED_NAME, CLUSTER_NAME);
+
+        FeedSLAMonitoringService service = FeedSLAMonitoringService.get();
+        service.initializeService();
+        service.queueSize = 100;
+        service.monitoredFeeds.add(FEED_NAME);
+        Date from = SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
+        Date to = SchemaHelper.parseDateUTC("2015-11-25T00:00Z");
+        service.addNewPendingFeedInstances(from, to);
+        // check that instances after feed's end date are not added.
+        Assert.assertEquals(service.pendingInstances.get(feedCluster).size(), 5);
+    }
+
+    private Cluster publishCluster() throws FalconException {
+        Cluster cluster = new Cluster();
+        cluster.setName(CLUSTER_NAME);
+        cluster.setColo("default");
+        getStore().publish(EntityType.CLUSTER, cluster);
+        return cluster;
+
+    }
+
+    private Feed publishFeed(Cluster cluster, String frequency, String start, String end)
+        throws FalconException, ParseException {
+        Feed feed = new Feed();
+        feed.setName(FEED_NAME);
+        Frequency f = new Frequency(frequency);
+        feed.setFrequency(f);
+        feed.setTimezone(UTC);
+        Clusters fClusters = new Clusters();
+        org.apache.falcon.entity.v0.feed.Cluster fCluster = new org.apache.falcon.entity.v0.feed.Cluster();
+        fCluster.setType(ClusterType.SOURCE);
+        fCluster.setName(cluster.getName());
+        fCluster.setValidity(getFeedValidity(start, end));
+        fClusters.getClusters().add(fCluster);
+        feed.setClusters(fClusters);
+        getStore().publish(EntityType.FEED, feed);
+        return feed;
+    }
+
+    private Validity getFeedValidity(String start, String end) throws ParseException {
+        Validity validity = new Validity();
+        validity.setStart(getDate(start));
+        validity.setEnd(getDate(end));
+        return validity;
+    }
+
+    private Date getDate(String dateString) throws ParseException {
+        DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm Z");
+        return format.parse(dateString);
+    }
 }


[2/4] falcon git commit: FALCON-1701 HiveDr, ClusterSetupTest, MirrorSummaryTest fixes. Contributed by Murali Ramasami.

Posted by aj...@apache.org.
FALCON-1701 HiveDr, ClusterSetupTest, MirrorSummaryTest fixes. Contributed by Murali Ramasami.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/f8e98f4f
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/f8e98f4f
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/f8e98f4f

Branch: refs/heads/master
Commit: f8e98f4f690d3302797edf5fbf7f3ad166236bcb
Parents: f995342
Author: Ajay Yadava <aj...@gmail.com>
Authored: Mon Jan 11 14:45:08 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Jan 11 14:45:08 2016 +0530

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |  2 +
 .../regression/ui/search/ClusterWizardPage.java | 80 ++++++++++++++++++--
 .../regression/hcat/HCatFeedOperationsTest.java | 29 -------
 .../regression/searchUI/ClusterSetupTest.java   | 59 +++++++++++++++
 .../regression/searchUI/MirrorSummaryTest.java  |  2 +-
 .../hive-disaster-recovery-template.xml         |  1 +
 .../hive-disaster-recovery-workflow.xml         | 42 ----------
 .../hive-disaster-recovery.properties           |  6 +-
 .../hive-disaster-recovery-template.xml         |  1 +
 .../hive-disaster-recovery-workflow.xml         | 50 +-----------
 .../hive-disaster-recovery.properties           |  5 +-
 .../hive-disaster-recovery-secure-template.xml  |  1 +
 .../hive-disaster-recovery-secure-workflow.xml  | 50 +-----------
 .../hive-disaster-recovery-secure.properties    |  5 +-
 14 files changed, 151 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/f8e98f4f/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 1888401..a4ae687 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -385,6 +385,8 @@ Trunk (Unreleased)
    FALCON-681 delete duplicate feed retention test from falcon regression (SamarthG)
 
   BUG FIXES
+   FALCON-1701 HiveDr, ClusterSetupTest, MirrorSummaryTest fixes(Murali Ramasami via Ajay Yadava)
+
    FALCON-1489 Partial status http response code returns 200(Pragya Mittal via Ajay Yadava)
 
    FALCON-1388 Fix merge conflicts produced by FALCON-1002 (Paul Isaychuk)

http://git-wip-us.apache.org/repos/asf/falcon/blob/f8e98f4f/falcon-regression/merlin/src/main/java/org/apache/falcon/regression/ui/search/ClusterWizardPage.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/main/java/org/apache/falcon/regression/ui/search/ClusterWizardPage.java b/falcon-regression/merlin/src/main/java/org/apache/falcon/regression/ui/search/ClusterWizardPage.java
index f19fc23..41fc120 100644
--- a/falcon-regression/merlin/src/main/java/org/apache/falcon/regression/ui/search/ClusterWizardPage.java
+++ b/falcon-regression/merlin/src/main/java/org/apache/falcon/regression/ui/search/ClusterWizardPage.java
@@ -286,7 +286,46 @@ public class ClusterWizardPage extends EntityWizardPage {
     }
 
     /**
-     * Retrieves hte value of the summary box and parses it to cluster properties.
+     * Method to assert the staging and Working location are same.
+     */
+    public void assertLocationsEqualError(){
+
+        // Assertion for Staging Location.
+        LOGGER.info(" Assertion for Staging Directory ");
+        Assert.assertTrue(checkErrorMessageByElement("input[contains(@id,'location.staging')]//following-sibling::"
+                + "span[contains(@ng-show, 'locationsEqualError')]",
+                "Staging and Working location should be different"));
+
+        // Assertion for Working Location.
+        LOGGER.info("Assertion for Working Directory");
+        Assert.assertTrue(checkErrorMessageByElement("input[contains(@id,'location.working')]//following-sibling::"
+                + "span[contains(@ng-show, 'locationsEqualError')]",
+                "Staging and Working location should be different"));
+    }
+
+    /**
+     * Method to get the Error text message displayed based on Xpath and compares.
+     * with the input string paramater : errMessage
+     * @param elementTag elementTag
+     * @param errMessage errMessage
+     */
+    public boolean checkErrorMessageByElement(String elementTag, String errMessage) {
+
+        List<WebElement> elements = clusterBox.findElements(By.xpath("//" + elementTag));
+        if (!elements.isEmpty()){
+            for (WebElement element : elements) {
+                Assert.assertEquals(element.getText(), errMessage);
+                LOGGER.info("Error Message Displayed : " + element.getText());
+            }
+            return true;
+        }else{
+            LOGGER.info(" No Elements found with the xpath " + elementTag);
+            return false;
+        }
+    }
+
+    /**
+     * Retrieves the value of the summary box and parses it to cluster properties.
      * @param draft empty cluster to contain all properties.
      * @return cluster filled with properties from the summary.
      */
@@ -296,12 +335,16 @@ public class ClusterWizardPage extends EntityWizardPage {
         LOGGER.info("Summary block text : " + summaryBoxText);
 
         String[] slices;
+        String value;
+        String path;
+        String label;
+
         //retrieve basic properties
         String basicProps = summaryBoxText.split("ACL")[0];
         for (String line : basicProps.split("\\n")) {
             slices = line.split(" ");
-            String label = slices[0].replace(":", "").trim();
-            String value = slices[1].trim();
+            label = slices[0].replace(":", "").trim();
+            value = getValueFromSlices(slices, line);
             switch (label) {
             case "Name":
                 cluster.setName(value);
@@ -333,7 +376,7 @@ public class ClusterWizardPage extends EntityWizardPage {
         String interfaces = propsLeft.split(nextLabel)[0].trim();
         for (String line : interfaces.split("\\n")) {
             slices = line.split(" ");
-            String label = slices[0].replace(":", "").trim();
+            label = slices[0].replace(":", "").trim();
             String endpoint = slices[1].trim();
             String version = slices[3].trim();
             switch (label) {
@@ -366,16 +409,16 @@ public class ClusterWizardPage extends EntityWizardPage {
             for (String line : properties.split("\\n")) {
                 int indx = line.indexOf(":");
                 String name = line.substring(0, indx).trim();
-                String value = line.substring(indx + 1, line.length()).trim();
+                value = line.substring(indx + 1, line.length()).trim();
                 cluster.withProperty(name, value);
             }
         }
         //retrieve locations
         propsLeft = propsLeft.split("Locations")[1].trim();
         for (String line : propsLeft.split("\\n")) {
-            slices = line.split(":");
-            String label = slices[0].trim();
-            String path = slices[1].trim();
+            slices = line.split(" ");
+            label = slices[0].replace(":", "").trim();
+            path = getValueFromSlices(slices, line);
             switch (label) {
             case "staging":
                 cluster.addLocation(ClusterLocationType.STAGING, path);
@@ -408,6 +451,14 @@ public class ClusterWizardPage extends EntityWizardPage {
     }
 
     /**
+     *  Click on next button in the cluster creation page.
+     */
+    public void clickJustNext() {
+        next.click();
+        waitForAngularToFinish();
+    }
+
+    /**
      * Click on save button.
      */
     public void clickSave() {
@@ -455,6 +506,19 @@ public class ClusterWizardPage extends EntityWizardPage {
     }
 
     /**
+     * Method preventing the NullPointerException.
+     */
+    public String getValueFromSlices(String[] slices, String line) {
+        String trimValue;
+        if (slices[0].length()==(line.length())) {
+            trimValue = "";
+        }else {
+            trimValue = slices[1].trim();
+        }
+        return trimValue;
+    }
+
+    /**
      * Checks whether registry interface is enabled for input or not.
      */
     public boolean isRegistryEnabled() {

http://git-wip-us.apache.org/repos/asf/falcon/blob/f8e98f4f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
index 27417bd..eb20d7c 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
@@ -152,35 +152,6 @@ public class HCatFeedOperationsTest extends BaseTestClass {
     }
 
     /**
-     * Submit Hcat Replication feed when Hcat table mentioned in table uri does not exist on target. The response is
-     * Partial, with successful with submit/schedule on source.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void submitAndScheduleReplicationFeedWhenTableDoesNotExistOnTarget() throws Exception {
-        Bundle.submitCluster(bundles[0], bundles[1]);
-        final String startDate = "2010-01-01T20:00Z";
-        final String endDate = "2099-01-01T00:00Z";
-        String tableUri = "catalog:" + dbName + ":" + randomTblName + "#year=${YEAR}";
-        bundles[0].setInputFeedPeriodicity(1, Frequency.TimeUnit.hours);
-        bundles[0].setInputFeedValidity(startDate, endDate);
-        bundles[0].setInputFeedTableUri(tableUri);
-
-        feed = bundles[0].getDataSets().get(0);
-        // set cluster 2 as the target.
-        feed = FeedMerlin.fromString(feed).addFeedCluster(
-            new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0)))
-                .withRetention("months(9000)", ActionType.DELETE)
-                .withValidity(startDate, endDate)
-                .withClusterType(ClusterType.TARGET)
-                .withTableUri(tableUri)
-                .build()).toString();
-
-        AssertUtil.assertPartial(prism.getFeedHelper().submitAndSchedule(feed));
-    }
-
-    /**
      * Submit Hcat Replication feed when Hcat table mentioned in table uri exists on both source and target.
      * The response is  Psucceeded, and a replication co-rdinator should apear on target oozie.
      * The test however does not ensure that

http://git-wip-us.apache.org/repos/asf/falcon/blob/f8e98f4f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/ClusterSetupTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/ClusterSetupTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/ClusterSetupTest.java
index e0b69e1..f43212a 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/ClusterSetupTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/ClusterSetupTest.java
@@ -286,6 +286,65 @@ public class ClusterSetupTest extends BaseUITestClass{
     }
 
     /**
+     * Specify the same directory locations for staging and working location.
+     * Check that user is not allowed to create a cluster with same directory for both with proper error message.
+     */
+    @Test
+    public void testSameLocations() throws IOException {
+
+        //get the staging directory location
+        String staging = sourceCluster.getLocation(ClusterLocationType.STAGING).getPath();
+
+        //set the working directory to staging directory
+        sourceCluster.getLocation(ClusterLocationType.WORKING).setPath(staging);
+
+        clusterSetup.fillForm(sourceCluster);
+        clusterSetup.clickJustNext();
+        clusterSetup.assertLocationsEqualError();
+    }
+
+    /**
+     * Default cluster creation scenario with Optional fields set with Empty values. Click next. Return back and click.
+     * next again. Check that all values are present on Summary page. Save cluster.
+     * Check the cluster definition trough /definition API.
+     */
+    @Test
+    public void testOptionalfields()
+        throws URISyntaxException, AuthenticationException, InterruptedException, IOException {
+
+        // Set the Description value to empty
+        sourceCluster.setDescription("");
+        // Set the temp location value to empty
+        sourceCluster.getLocation(ClusterLocationType.TEMP).setPath("");
+        // Now fill the form with the above values for optional fields
+        clusterSetup.fillForm(sourceCluster);
+
+        clusterSetup.clickNext();
+        clusterSetup.clickPrevious();
+        clusterSetup.clickNext();
+
+        ClusterMerlin summaryBlock = clusterSetup.getSummary(sourceCluster.getEmptyCluster());
+        //summary block should contain the same info as source
+        sourceCluster.assertEquals(summaryBlock);
+        clusterSetup.clickSave();
+
+        String alertText = clusterSetup.getActiveAlertText();
+        Assert.assertEquals(alertText, "falcon/default/Submit successful (cluster) " + sourceCluster.getName());
+
+        //check the same via notifications bar
+        clusterSetup.getPageHeader().validateNotificationCountAndCheckLast(1,
+                "falcon/default/Submit successful (cluster) " + sourceCluster.getName());
+
+        ClusterMerlin definition = new ClusterMerlin(cluster.getClusterHelper().
+                getEntityDefinition(bundles[0].getClusterElement().toString()).getMessage());
+
+        //definition should be the same that the source
+        sourceCluster.assertEquals(definition);
+    }
+
+
+
+    /**
      * Validate alert lifetime.
      */
     @Test

http://git-wip-us.apache.org/repos/asf/falcon/blob/f8e98f4f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/MirrorSummaryTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/MirrorSummaryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/MirrorSummaryTest.java
index 0a788f0..ce014ef 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/MirrorSummaryTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/MirrorSummaryTest.java
@@ -60,7 +60,7 @@ public class MirrorSummaryTest extends BaseUITestClass{
         baseMap.put(Summary.MAX_BANDWIDTH, "100");
         baseMap.put(Summary.ACL_OWNER, LoginPage.UI_DEFAULT_USER);
         baseMap.put(Summary.ACL_GROUP, "users");
-        baseMap.put(Summary.ACL_PERMISSIONS, "0x755");
+        baseMap.put(Summary.ACL_PERMISSIONS, "0755");
         baseMap.put(Summary.RETRY_POLICY, "periodic");
         baseMap.put(Summary.RETRY_DELAY, "30 minutes");
         baseMap.put(Summary.RETRY_ATTEMPTS, "3");

http://git-wip-us.apache.org/repos/asf/falcon/blob/f8e98f4f/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-template.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-template.xml b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-template.xml
index c644b99..46546ce 100644
--- a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-template.xml
+++ b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-template.xml
@@ -39,5 +39,6 @@
 
     <workflow name="##falcon.recipe.workflow.name##" engine="oozie" path="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml" lib="##workflow.lib.path##"/>
     <retry policy="##falcon.recipe.retry.policy##" delay="##falcon.recipe.retry.delay##" attempts="3"/>
+    <notification type="##falcon.recipe.notification.type##" to="##falcon.recipe.notification.receivers##"/>
     <ACL/>
 </process>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f8e98f4f/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml
index aa4d5b0..7c4c53a 100644
--- a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml
+++ b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml
@@ -66,48 +66,6 @@
             <arg>-counterLogDir</arg>
             <arg>${logDir}/job-${nominalTime}</arg>
         </java>
-        <ok to="success"/>
-        <error to="failure"/>
-    </action>
-    <decision name="success">
-        <switch>
-            <case to="successAlert">
-                ${drNotificationReceivers ne 'NA'}
-            </case>
-            <default to="end"/>
-        </switch>
-    </decision>
-    <decision name="failure">
-        <switch>
-            <case to="failureAlert">
-                ${drNotificationReceivers ne 'NA'}
-            </case>
-            <default to="fail"/>
-        </switch>
-    </decision>
-    <action name="successAlert">
-        <email xmlns="uri:oozie:email-action:0.2">
-            <to>${drNotificationReceivers}</to>
-            <subject>INFO: HDFS DR workflow ${entityName} completed successfully</subject>
-            <body>
-                The HDFS DR workflow ${wf:id()} is successful.
-                Source      =   ${drSourceClusterFS}${drSourceDir}
-                Target      =   ${drTargetClusterFS}${drTargetDir}
-            </body>
-        </email>
-        <ok to="end"/>
-        <error to="end"/>
-    </action>
-    <action name="failureAlert">
-        <email xmlns="uri:oozie:email-action:0.2">
-            <to>${drNotificationReceivers}</to>
-            <subject>ERROR: HDFS DR workflow ${entityName} failed</subject>
-            <body>
-                The workflow ${wf:id()} had issues and was killed.  The error message is: ${wf:errorMessage(wf:lastErrorNode())}
-                Source      =   ${drSourceClusterFS}${drSourceDir}
-                Target      =   ${drTargetClusterFS}${drTargetDir}
-            </body>
-        </email>
         <ok to="end"/>
         <error to="fail"/>
     </action>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f8e98f4f/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties
index 99f748d..fb2a4fc 100644
--- a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties
+++ b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties
@@ -71,5 +71,7 @@ distcpMaxMaps=1
 # Change it to specify the bandwidth in MB for each mapper in DistCP
 distcpMapBandwidth=100
 
-##### Email on failure
-drNotificationReceivers=NA
\ No newline at end of file
+##### Email Notification for Falcon instance completion
+falcon.recipe.notification.type=email
+falcon.recipe.notification.receivers=NA
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/f8e98f4f/falcon-regression/merlin/src/test/resources/HiveDrRecipe/hive-disaster-recovery-template.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/HiveDrRecipe/hive-disaster-recovery-template.xml b/falcon-regression/merlin/src/test/resources/HiveDrRecipe/hive-disaster-recovery-template.xml
index 3afbef0..c11c2bb 100644
--- a/falcon-regression/merlin/src/test/resources/HiveDrRecipe/hive-disaster-recovery-template.xml
+++ b/falcon-regression/merlin/src/test/resources/HiveDrRecipe/hive-disaster-recovery-template.xml
@@ -40,5 +40,6 @@
     <workflow name="##workflow.name##" engine="oozie"
               path="/apps/data-mirroring/workflows/hive-disaster-recovery-workflow.xml" lib="##workflow.lib.path##"/>
     <retry policy="##retry.policy##" delay="##retry.delay##" attempts="3"/>
+    <notification type="##falcon.recipe.notification.type##" to="##falcon.recipe.notification.receivers##"/>
     <ACL/>
 </process>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f8e98f4f/falcon-regression/merlin/src/test/resources/HiveDrRecipe/hive-disaster-recovery-workflow.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/HiveDrRecipe/hive-disaster-recovery-workflow.xml b/falcon-regression/merlin/src/test/resources/HiveDrRecipe/hive-disaster-recovery-workflow.xml
index c441998..72d40a3 100644
--- a/falcon-regression/merlin/src/test/resources/HiveDrRecipe/hive-disaster-recovery-workflow.xml
+++ b/falcon-regression/merlin/src/test/resources/HiveDrRecipe/hive-disaster-recovery-workflow.xml
@@ -84,7 +84,7 @@
             <arg>lastevents</arg>
         </java>
         <ok to="export-dr-replication"/>
-        <error to="failure"/>
+        <error to="fail"/>
     </action>
     <!-- Export Replication action -->
     <action name="export-dr-replication">
@@ -160,7 +160,7 @@
             <arg>export</arg>
         </java>
         <ok to="import-dr-replication"/>
-        <error to="failure"/>
+        <error to="fail"/>
     </action>
     <!-- Import Replication action -->
     <action name="import-dr-replication">
@@ -235,52 +235,6 @@
             <arg>-executionStage</arg>
             <arg>import</arg>
         </java>
-        <ok to="success"/>
-        <error to="failure"/>
-    </action>
-    <decision name="success">
-        <switch>
-            <case to="successAlert">
-                ${drNotificationReceivers ne 'NA'}
-            </case>
-            <default to="end"/>
-        </switch>
-    </decision>
-    <decision name="failure">
-        <switch>
-            <case to="failureAlert">
-                ${drNotificationReceivers ne 'NA'}
-            </case>
-            <default to="fail"/>
-        </switch>
-    </decision>
-    <action name="successAlert">
-        <email xmlns="uri:oozie:email-action:0.2">
-            <to>${drNotificationReceivers}</to>
-            <subject>INFO: Hive DR workflow ${drJobName} completed successfully</subject>
-            <body>
-                The Hive DR workflow ${wf:id()} is successful.
-                Source          = ${sourceCluster}
-                Target          = ${targetCluster}
-                DB Name         = ${sourceDatabase}
-                Table Name      = ${sourceTable}
-            </body>
-        </email>
-        <ok to="end"/>
-        <error to="end"/>
-    </action>
-    <action name="failureAlert">
-        <email xmlns="uri:oozie:email-action:0.2">
-            <to>${drNotificationReceivers}</to>
-            <subject>ERROR: Hive DR workflow ${drJobName} failed</subject>
-            <body>
-                The Hive DR workflow ${wf:id()} had issues and was killed.  The error message is: ${wf:errorMessage(wf:lastErrorNode())}
-                Source          = ${sourceCluster}
-                Target          = ${targetCluster}
-                DB Name         = ${sourceDatabase}
-                Table Name      = ${sourceTable}
-            </body>
-        </email>
         <ok to="end"/>
         <error to="fail"/>
     </action>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f8e98f4f/falcon-regression/merlin/src/test/resources/HiveDrRecipe/hive-disaster-recovery.properties
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/HiveDrRecipe/hive-disaster-recovery.properties b/falcon-regression/merlin/src/test/resources/HiveDrRecipe/hive-disaster-recovery.properties
index de7f7f9..dd781a5 100644
--- a/falcon-regression/merlin/src/test/resources/HiveDrRecipe/hive-disaster-recovery.properties
+++ b/falcon-regression/merlin/src/test/resources/HiveDrRecipe/hive-disaster-recovery.properties
@@ -90,6 +90,7 @@ distcpMaxMaps=1
 # Change it to specify the bandwidth in MB for each mapper in DistCP
 distcpMapBandwidth=100
 
-##### Email on failure
-drNotificationReceivers=NA
+##### Email Notification for Falcon instance completion
+falcon.recipe.notification.type=email
+falcon.recipe.notification.receivers=NA
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/f8e98f4f/falcon-regression/merlin/src/test/resources/HiveDrSecureRecipe/hive-disaster-recovery-secure-template.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/HiveDrSecureRecipe/hive-disaster-recovery-secure-template.xml b/falcon-regression/merlin/src/test/resources/HiveDrSecureRecipe/hive-disaster-recovery-secure-template.xml
index 3afbef0..c11c2bb 100644
--- a/falcon-regression/merlin/src/test/resources/HiveDrSecureRecipe/hive-disaster-recovery-secure-template.xml
+++ b/falcon-regression/merlin/src/test/resources/HiveDrSecureRecipe/hive-disaster-recovery-secure-template.xml
@@ -40,5 +40,6 @@
     <workflow name="##workflow.name##" engine="oozie"
               path="/apps/data-mirroring/workflows/hive-disaster-recovery-workflow.xml" lib="##workflow.lib.path##"/>
     <retry policy="##retry.policy##" delay="##retry.delay##" attempts="3"/>
+    <notification type="##falcon.recipe.notification.type##" to="##falcon.recipe.notification.receivers##"/>
     <ACL/>
 </process>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f8e98f4f/falcon-regression/merlin/src/test/resources/HiveDrSecureRecipe/hive-disaster-recovery-secure-workflow.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/HiveDrSecureRecipe/hive-disaster-recovery-secure-workflow.xml b/falcon-regression/merlin/src/test/resources/HiveDrSecureRecipe/hive-disaster-recovery-secure-workflow.xml
index 7362c2e..74902b4 100644
--- a/falcon-regression/merlin/src/test/resources/HiveDrSecureRecipe/hive-disaster-recovery-secure-workflow.xml
+++ b/falcon-regression/merlin/src/test/resources/HiveDrSecureRecipe/hive-disaster-recovery-secure-workflow.xml
@@ -148,7 +148,7 @@
             <arg>lastevents</arg>
         </java>
         <ok to="export-dr-replication"/>
-        <error to="failure"/>
+        <error to="fail"/>
     </action>
     <!-- Export Replication action -->
     <action name="export-dr-replication" cred="hive_src_credentials,hive2_src_credentials">
@@ -246,7 +246,7 @@
             <arg>export</arg>
         </java>
         <ok to="import-dr-replication"/>
-        <error to="failure"/>
+        <error to="fail"/>
     </action>
     <!-- Import Replication action -->
     <action name="import-dr-replication" cred="hive_tgt_credentials,hive2_tgt_credentials">
@@ -343,52 +343,6 @@
             <arg>-executionStage</arg>
             <arg>import</arg>
         </java>
-        <ok to="success"/>
-        <error to="failure"/>
-    </action>
-    <decision name="success">
-        <switch>
-            <case to="successAlert">
-                ${drNotificationReceivers ne 'NA'}
-            </case>
-            <default to="end"/>
-        </switch>
-    </decision>
-    <decision name="failure">
-        <switch>
-            <case to="failureAlert">
-                ${drNotificationReceivers ne 'NA'}
-            </case>
-            <default to="fail"/>
-        </switch>
-    </decision>
-    <action name="successAlert">
-        <email xmlns="uri:oozie:email-action:0.2">
-            <to>${drNotificationReceivers}</to>
-            <subject>INFO: Hive DR workflow ${drJobName} completed successfully</subject>
-            <body>
-                The Hive DR workflow ${wf:id()} is successful.
-                Source          = ${sourceCluster}
-                Target          = ${targetCluster}
-                DB Name         = ${sourceDatabase}
-                Table Name      = ${sourceTable}
-            </body>
-        </email>
-        <ok to="end"/>
-        <error to="end"/>
-    </action>
-    <action name="failureAlert">
-        <email xmlns="uri:oozie:email-action:0.2">
-            <to>${drNotificationReceivers}</to>
-            <subject>ERROR: Hive DR workflow ${drJobName} failed</subject>
-            <body>
-                The Hive DR workflow ${wf:id()} had issues and was killed.  The error message is: ${wf:errorMessage(wf:lastErrorNode())}
-                Source          = ${sourceCluster}
-                Target          = ${targetCluster}
-                DB Name         = ${sourceDatabase}
-                Table Name      = ${sourceTable}
-            </body>
-        </email>
         <ok to="end"/>
         <error to="fail"/>
     </action>

http://git-wip-us.apache.org/repos/asf/falcon/blob/f8e98f4f/falcon-regression/merlin/src/test/resources/HiveDrSecureRecipe/hive-disaster-recovery-secure.properties
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/HiveDrSecureRecipe/hive-disaster-recovery-secure.properties b/falcon-regression/merlin/src/test/resources/HiveDrSecureRecipe/hive-disaster-recovery-secure.properties
index ff2611f..da0bcd1 100644
--- a/falcon-regression/merlin/src/test/resources/HiveDrSecureRecipe/hive-disaster-recovery-secure.properties
+++ b/falcon-regression/merlin/src/test/resources/HiveDrSecureRecipe/hive-disaster-recovery-secure.properties
@@ -100,5 +100,6 @@ distcpMaxMaps=1
 # Change it to specify the bandwidth in MB for each mapper in DistCP
 distcpMapBandwidth=100
 
-##### Email on failure
-drNotificationReceivers=NA
+##### Email Notification for Falcon instance completion
+falcon.recipe.notification.type=email
+falcon.recipe.notification.receivers=NA


[4/4] falcon git commit: FALCON-1230 Data based notification Service to notify execution instances when data becomes available. Contributed by Pavan Kumar Kolamuri.

Posted by aj...@apache.org.
FALCON-1230 Data based notification Service to notify execution instances when data becomes available. Contributed by Pavan Kumar Kolamuri.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4656f692
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4656f692
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4656f692

Branch: refs/heads/master
Commit: 4656f692a3c96244f0291501c3b68e14af964f27
Parents: 65bd4d1
Author: Ajay Yadava <aj...@gmail.com>
Authored: Mon Jan 11 14:54:34 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Jan 11 14:54:34 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../execution/ProcessExecutionInstance.java     |  38 ++--
 .../notification/service/event/DataEvent.java   |  19 +-
 .../service/impl/DataAvailabilityService.java   | 210 +++++++++++++++++--
 .../request/DataNotificationRequest.java        | 124 +++++++++--
 .../org/apache/falcon/predicate/Predicate.java  |  18 +-
 .../execution/FalconExecutionServiceTest.java   |  10 +-
 .../service/DataAvailabilityServiceTest.java    | 135 ++++++++++++
 8 files changed, 482 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/4656f692/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3244de..8792f94 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,8 @@ Proposed Release Version: 0.9
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1230 Data based notification Service to notify execution instances when data becomes available(Pavan Kumar Kolamuri via Ajay Yadava)
+
     FALCON-1679 API to get type of scheduler(native/oozie) (Pallavi Rao)
 
     FALCON-1645 Ability to export to database(Venkat Ramachandran via Balu Vellanki)

http://git-wip-us.apache.org/repos/asf/falcon/blob/4656f692/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
index 72e5558..8f026b7 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.execution;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.FeedHelper;
@@ -119,29 +120,36 @@ public class ProcessExecutionInstance extends ExecutionInstance {
                 continue;
             }
             Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
+            List<Path> paths = new ArrayList<>();
             for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
                 List<Location> locations = FeedHelper.getLocations(cluster, feed);
                 for (Location loc : locations) {
                     if (loc.getType() != LocationType.DATA) {
                         continue;
                     }
+                    paths.add(new Path(loc.getPath()));
+                }
 
-                    Predicate predicate = Predicate.createDataPredicate(loc);
-                    // To ensure we evaluate only predicates not evaluated before when an instance is resumed.
-                    if (isResume && !awaitedPredicates.contains(predicate)) {
-                        continue;
-                    }
-                    // TODO : Revisit this once the Data Availability Service has been built
-                    DataAvailabilityService.DataRequestBuilder requestBuilder =
-                            (DataAvailabilityService.DataRequestBuilder)
-                            NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA)
-                                    .createRequestBuilder(executionService, getId());
-                    requestBuilder.setDataLocation(new Path(loc.getPath()));
-                    NotificationServicesRegistry.register(requestBuilder.build());
-                    LOG.info("Registered for a data notification for process {} for data location {}",
-                            process.getName(), loc.getPath());
-                    awaitedPredicates.add(predicate);
+                Predicate predicate = Predicate.createDataPredicate(paths);
+                // To ensure we evaluate only predicates not evaluated before when an instance is resumed.
+                if (isResume && !awaitedPredicates.contains(predicate)) {
+                    continue;
                 }
+                // TODO : Revisit this once the Data Notification Service has been built
+                // TODO Very IMP :  Need to change the polling frequency
+                DataAvailabilityService.DataRequestBuilder requestBuilder =
+                        (DataAvailabilityService.DataRequestBuilder)
+                                NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA)
+                                        .createRequestBuilder(executionService, getId());
+                requestBuilder.setLocations(paths)
+                        .setCluster(cluster.getName())
+                        .setPollingFrequencyInMillis(100)
+                        .setTimeoutInMillis(getTimeOutInMillis())
+                        .setLocations(paths);
+                NotificationServicesRegistry.register(requestBuilder.build());
+                LOG.info("Registered for a data notification for process {} for data location {}",
+                        process.getName(), StringUtils.join(paths, ","));
+                awaitedPredicates.add(predicate);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4656f692/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
index 1036339..083f66c 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
@@ -18,18 +18,18 @@
 package org.apache.falcon.notification.service.event;
 
 
-import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.state.ID;
 import org.apache.hadoop.fs.Path;
 
+import java.util.List;
+
 /**
  * An event generated by {@link org.apache.falcon.notification.service.impl.DataAvailabilityService}
  * indicating availability or non-availability of a dataset.
  */
 public class DataEvent extends Event {
     private final ID callbackID;
-    private Path dataLocation;
-    private LocationType dataType;
+    private List<Path> dataLocations;
     private STATUS status;
 
     /**
@@ -40,10 +40,9 @@ public class DataEvent extends Event {
         UNAVAILABLE
     }
 
-    public DataEvent(ID callbackID, Path location, LocationType locType, STATUS availability) {
+    public DataEvent(ID callbackID, List<Path> dataLocations, STATUS availability) {
         this.callbackID = callbackID;
-        this.dataLocation = location;
-        this.dataType = locType;
+        this.dataLocations = dataLocations;
         this.status = availability;
         this.type = EventType.DATA_AVAILABLE;
     }
@@ -56,12 +55,12 @@ public class DataEvent extends Event {
         this.status = availability;
     }
 
-    public Path getDataLocation() {
-        return dataLocation;
+    public List<Path> getDataLocations() {
+        return dataLocations;
     }
 
-    public LocationType getDataType() {
-        return dataType;
+    public void setDataLocations(List<Path> locations) {
+        this.dataLocations = locations;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/4656f692/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
index 7ffb351..732da62 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
@@ -18,29 +18,62 @@
 package org.apache.falcon.notification.service.impl;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.exception.NotificationServiceException;
 import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.notification.service.FalconNotificationService;
+import org.apache.falcon.notification.service.event.DataEvent;
 import org.apache.falcon.notification.service.request.DataNotificationRequest;
 import org.apache.falcon.notification.service.request.NotificationRequest;
 import org.apache.falcon.state.ID;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * This notification service notifies {@link NotificationHandler} when requested data
  * becomes available. This class also supports time out, in which case it notifies about the unavailability.
- * TODO : Complete/Modify this skeletal class
  */
 public class DataAvailabilityService implements FalconNotificationService {
 
+    private static final Logger LOG = LoggerFactory.getLogger(DataAvailabilityService.class);
+    private static final String NUM_THREADS_PROP = "scheduler.data.notification.service.threads";
+    private static final String DEFAULT_NUM_THREADS = "5";
+
+    private DelayQueue<DataNotificationRequest> delayQueue = new DelayQueue<>();
+    private ExecutorService executorService;
+    // It contains all instances which are unregistered and can be ignored.
+    private Map<ID, NotificationHandler> instancesToIgnore;
+
     @Override
     public void register(NotificationRequest request) throws NotificationServiceException {
-        // TODO : Implement this
+        LOG.info("Registering Data notification for " + request.getCallbackId().toString());
+        DataNotificationRequest dataNotificationRequest = (DataNotificationRequest) request;
+        delayQueue.offer(dataNotificationRequest);
     }
 
     @Override
     public void unregister(NotificationHandler handler, ID listenerID) {
-        // TODO : Implement this
+        LOG.info("Removing Data notification Request with callbackID {}", listenerID.getKey());
+        instancesToIgnore.put(listenerID, handler);
     }
 
     @Override
@@ -55,40 +88,185 @@ public class DataAvailabilityService implements FalconNotificationService {
 
     @Override
     public void init() throws FalconException {
-        // TODO : Implement this
+        int executorThreads = Integer.parseInt(StartupProperties.get().
+                getProperty(NUM_THREADS_PROP, DEFAULT_NUM_THREADS));
+        executorService = Executors.newFixedThreadPool(executorThreads);
+        for (int i = 0; i < executorThreads; i++) {
+            executorService.execute(new EventConsumer());
+        }
+        instancesToIgnore = new ConcurrentHashMap<>();
     }
 
     @Override
     public void destroy() throws FalconException {
-
+        instancesToIgnore.clear();
+        delayQueue.clear();
+        executorService.shutdown();
     }
 
     /**
      * Builds {@link DataNotificationRequest}.
      */
     public static class DataRequestBuilder extends RequestBuilder<DataNotificationRequest> {
-        private Path dataLocation;
+        private String cluster;
+        private long pollingFrequencyInMillis;
+        private long timeoutInMillis;
+        private Map<Path, Boolean> locations;
 
         public DataRequestBuilder(NotificationHandler handler, ID callbackID) {
             super(handler, callbackID);
         }
 
-        /**
-         * @param location
-         * @return This instance
-         */
-        public DataRequestBuilder setDataLocation(Path location) {
-            this.dataLocation = location;
+        public DataRequestBuilder setLocations(List<Path> locPaths) {
+            Map<Path, Boolean> paths = new HashMap<>();
+            for (Path loc : locPaths) {
+                paths.put(loc, false);
+            }
+            this.locations = paths;
             return this;
         }
 
         @Override
         public DataNotificationRequest build() {
-            if (callbackId == null  || dataLocation == null) {
-                throw new IllegalArgumentException("Missing one or more of the mandatory arguments:"
-                        + " callbackId, dataLocation");
+            if (callbackId == null || locations == null
+                    || cluster == null || pollingFrequencyInMillis <= 0
+                    || timeoutInMillis < pollingFrequencyInMillis) {
+                throw new IllegalArgumentException("Missing or incorrect, one or more of the mandatory arguments:"
+                        + " callbackId, locations, dataType, cluster, pollingFrequency, waitTime");
+            }
+            return new DataNotificationRequest(handler, callbackId, cluster,
+                    pollingFrequencyInMillis, timeoutInMillis, locations);
+        }
+
+        public DataRequestBuilder setCluster(String clusterName) {
+            this.cluster = clusterName;
+            return this;
+        }
+
+        public DataRequestBuilder setPollingFrequencyInMillis(long pollingFreq) {
+            if (pollingFreq <= 0) {
+                throw new IllegalArgumentException("PollingFrequency should be greater than zero");
+            }
+            this.pollingFrequencyInMillis = pollingFreq;
+            return this;
+        }
+
+        public DataRequestBuilder setTimeoutInMillis(long timeout) {
+            if (timeout <= 0 || timeout < pollingFrequencyInMillis) {
+                throw new IllegalArgumentException("Timeout should be positive and greater than PollingFrequency");
+            }
+            this.timeoutInMillis = timeout;
+            return this;
+        }
+    }
+
+
+    private class EventConsumer implements Runnable {
+
+        public EventConsumer() {
+        }
+
+        @Override
+        public void run() {
+            DataNotificationRequest dataNotificationRequest;
+            while (!Thread.currentThread().isInterrupted()) {
+                try {
+                    dataNotificationRequest = delayQueue.take();
+                    boolean isUnRegistered = isUnRegistered(dataNotificationRequest);
+                    if (isUnRegistered) {
+                        continue;
+                    }
+                    boolean isDataArrived = checkConditions(dataNotificationRequest);
+                    if (isDataArrived) {
+                        notifyHandler(dataNotificationRequest, DataEvent.STATUS.AVAILABLE);
+                    } else {
+                        if (dataNotificationRequest.isTimedout()) {
+                            notifyHandler(dataNotificationRequest, DataEvent.STATUS.UNAVAILABLE);
+                            continue;
+                        }
+                        dataNotificationRequest.accessed();
+                        delayQueue.offer(dataNotificationRequest);
+                    }
+                } catch (Throwable e) {
+                    LOG.error("Error in Data Notification Service EventConsumer", e);
+                }
+            }
+        }
+
+        private void notifyHandler(DataNotificationRequest dataNotificationRequest,
+                                   DataEvent.STATUS status) {
+            DataEvent dataEvent = new DataEvent(dataNotificationRequest.getCallbackId(),
+                    dataNotificationRequest.getLocations(), status);
+            boolean isUnRegistered = isUnRegistered(dataNotificationRequest);
+            if (isUnRegistered) {
+                return;
             }
-            return new DataNotificationRequest(handler, callbackId, dataLocation);
+            try {
+                LOG.debug("Notifying Handler for Data Notification Request of id {} " ,
+                        dataNotificationRequest.getCallbackId().toString());
+                dataNotificationRequest.getHandler().onEvent(dataEvent);
+            } catch (FalconException e) {
+                LOG.error("Unable to notify Data event with id {} ",
+                        dataNotificationRequest.getCallbackId(), e);
+                // ToDo Retries for notifying
+            }
+        }
+
+        private boolean isUnRegistered(DataNotificationRequest dataNotificationRequest) {
+            if (instancesToIgnore.containsKey(dataNotificationRequest.getCallbackId())) {
+                LOG.info("Ignoring Data Notification Request of id {} ",
+                        dataNotificationRequest.getCallbackId().toString());
+                instancesToIgnore.remove(dataNotificationRequest.getCallbackId());
+                return true;
+            }
+            return false;
+        }
+
+        private boolean checkConditions(DataNotificationRequest dataNotificationRequest) {
+            try {
+                Entity entity = EntityUtil.getEntity(EntityType.CLUSTER, dataNotificationRequest.getCluster());
+                Cluster clusterEntity = (Cluster) entity;
+                Configuration conf = ClusterHelper.getConfiguration(clusterEntity);
+                FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
+                Map<Path, Boolean> locations = dataNotificationRequest.getLocationMap();
+                List<Path> nonAvailablePaths = getUnAvailablePaths(locations);
+                updatePathsAvailability(nonAvailablePaths, fs, locations);
+                if (allPathsExist(locations)) {
+                    return true;
+                }
+            } catch (FalconException e) {
+                LOG.error("Retrieving the Cluster Entity " + e);
+            } catch (IOException e) {
+                LOG.error("Unable to connect to FileSystem " + e);
+            }
+            return false;
+        }
+
+        private void updatePathsAvailability(List<Path> unAvailablePaths, FileSystem fs,
+                                             Map<Path, Boolean> locations) throws IOException {
+            for (Path path : unAvailablePaths) {
+                if (fs.exists(path)) {
+                    locations.put(path, true);
+                }
+            }
+        }
+
+        private List<Path> getUnAvailablePaths(Map<Path, Boolean> locations) {
+            List<Path> paths = new ArrayList<>();
+            for (Map.Entry<Path, Boolean> pathInfo : locations.entrySet()) {
+                if (!pathInfo.getValue()) {
+                    paths.add(pathInfo.getKey());
+                }
+            }
+            return paths;
+        }
+
+        private boolean allPathsExist(Map<Path, Boolean> locations) {
+            if (locations.containsValue(Boolean.FALSE)) {
+                return false;
+            }
+            return true;
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4656f692/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
index 8393de0..c7dd5d3 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
@@ -17,27 +17,34 @@
  */
 package org.apache.falcon.notification.service.request;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.execution.NotificationHandler;
 import org.apache.falcon.notification.service.NotificationServicesRegistry;
 import org.apache.falcon.state.ID;
 import org.apache.hadoop.fs.Path;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Request intended for {@link import org.apache.falcon.notification.service.impl.DataAvailabilityService}
  * for data notifications.
  * The setter methods of the class support chaining similar to a builder class.
- * TODO : Complete/modify this skeletal class
  */
-public class DataNotificationRequest extends NotificationRequest {
-    private final Path dataLocation;
+public class DataNotificationRequest extends NotificationRequest implements Delayed {
+    // Boolean represents path availability to avoid checking all paths for every poll.
+    private Map<Path, Boolean> locations;
+    private long pollingFrequencyInMillis;
+    private long timeoutInMillis;
     private String cluster;
+    private long accessTimeInMillis;
+    private long createdTimeInMillis;
+    // Represents request was accessed by DataAvailability service first time or not.
+    private boolean isFirst;
 
-    /**
-     * @return data location to be watched.
-     */
-    public Path getDataLocation() {
-        return dataLocation;
-    }
 
     /**
      * Given a number of instances, should the service wait for exactly those many,
@@ -53,27 +60,106 @@ public class DataNotificationRequest extends NotificationRequest {
      * Constructor.
      * @param notifHandler
      * @param callbackId
+     * @param cluster
+     * @param pollingFrequencyInMillis
+     * @param timeoutInMillis
+     * @param locations
      */
-    public DataNotificationRequest(NotificationHandler notifHandler, ID callbackId, Path location) {
+    public DataNotificationRequest(NotificationHandler notifHandler, ID callbackId,
+                                   String cluster, long pollingFrequencyInMillis,
+                                   long timeoutInMillis, Map<Path, Boolean> locations) {
         this.handler = notifHandler;
         this.callbackId = callbackId;
-        this.dataLocation = location;
         this.service = NotificationServicesRegistry.SERVICE.DATA;
+        this.cluster = cluster;
+        this.pollingFrequencyInMillis = pollingFrequencyInMillis;
+        this.timeoutInMillis = timeoutInMillis;
+        this.locations = locations;
+        this.accessTimeInMillis = System.currentTimeMillis();
+        this.createdTimeInMillis = accessTimeInMillis;
+        this.isFirst = true;
+    }
+
+
+    public void accessed() {
+        this.accessTimeInMillis = System.currentTimeMillis();
     }
 
-    /**
-     * @return cluster name
-     */
     public String getCluster() {
         return cluster;
     }
 
+
+    public boolean isTimedout() {
+        long currentTimeInMillis = System.currentTimeMillis();
+        if (currentTimeInMillis - createdTimeInMillis > timeoutInMillis) {
+            return true;
+        }
+        return false;
+    }
+
+
     /**
-     * @param clusterName
-     * @return This instance
+     * Obtain list of paths from locations map.
+     * @return List of paths to check.
      */
-    public DataNotificationRequest setCluster(String clusterName) {
-        this.cluster = clusterName;
-        return this;
+    public List<Path> getLocations() {
+        if (this.locations == null) {
+            return null;
+        }
+        List<Path> paths = new ArrayList<>();
+        for (Path path : this.locations.keySet()) {
+            paths.add(path);
+        }
+        return paths;
     }
+
+    /**
+     * @return Map of locations and their availabilities.
+     */
+    public Map<Path, Boolean> getLocationMap() {
+        return this.locations;
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+        if (isFirst) {
+            this.isFirst = false;
+            return 0;
+        }
+        long age = System.currentTimeMillis() - accessTimeInMillis;
+        return unit.convert(pollingFrequencyInMillis - age, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public int compareTo(Delayed other) {
+        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DataNotificationRequest that = (DataNotificationRequest) o;
+        if (!StringUtils.equals(cluster, that.cluster)) {
+            return false;
+        }
+        if (!locations.equals(that.locations)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = cluster.hashCode();
+        result = 31 * result + (locations != null ? locations.hashCode() : 0);
+        return result;
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4656f692/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
index c7b4f12..c248db6 100644
--- a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
+++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
@@ -17,8 +17,8 @@
  */
 package org.apache.falcon.predicate;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.execution.NotificationHandler;
 import org.apache.falcon.notification.service.event.DataEvent;
 import org.apache.falcon.notification.service.event.Event;
@@ -26,6 +26,7 @@ import org.apache.falcon.notification.service.event.EventType;
 import org.apache.falcon.notification.service.event.RerunEvent;
 import org.apache.falcon.notification.service.event.TimeElapsedEvent;
 import org.apache.falcon.state.ID;
+import org.apache.hadoop.fs.Path;
 
 import java.io.Serializable;
 import java.util.Collections;
@@ -158,15 +159,15 @@ public class Predicate implements Serializable {
     /**
      * Creates a predicate of type DATA.
      *
-     * @param location
+     * @param paths List of paths to check
      * @return
      */
-    public static Predicate createDataPredicate(Location location) {
+    public static Predicate createDataPredicate(List<Path> paths) {
         return new Predicate(TYPE.DATA)
-                .addClause("path", (location == null) ? ANY : location.getPath())
-                .addClause("type", (location == null) ? ANY : location.getType());
+                .addClause("path", StringUtils.join(paths, ","));
     }
 
+
     /**
      * Creates a predicate of type JOB_COMPLETION.
      *
@@ -202,11 +203,8 @@ public class Predicate implements Serializable {
     public static Predicate getPredicate(Event event) throws FalconException {
         if (event.getType() == EventType.DATA_AVAILABLE) {
             DataEvent dataEvent = (DataEvent) event;
-            if (dataEvent.getDataLocation() != null && dataEvent.getDataType() != null) {
-                Location loc = new Location();
-                loc.setPath(dataEvent.getDataLocation().toString());
-                loc.setType(dataEvent.getDataType());
-                return createDataPredicate(loc);
+            if (dataEvent.getDataLocations() != null) {
+                return createDataPredicate(dataEvent.getDataLocations());
             } else {
                 throw new FalconException("Event does not have enough data to create a predicate");
             }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4656f692/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
index d66972c..d08f7d4 100644
--- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
@@ -22,7 +22,6 @@ import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.notification.service.NotificationServicesRegistry;
@@ -66,6 +65,7 @@ import org.testng.annotations.Test;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
 import java.util.Iterator;
@@ -331,7 +331,7 @@ public class FalconExecutionServiceTest extends AbstractSchedulerTestBase {
         Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), executorID);
     }
 
-    @Test
+    @Test(enabled = false)
     public void testTimeOut() throws Exception {
         storeEntity(EntityType.PROCESS, "summarize3");
         Process process = getStore().get(EntityType.PROCESS, "summarize3");
@@ -602,7 +602,8 @@ public class FalconExecutionServiceTest extends AbstractSchedulerTestBase {
                     new DateTime(process.getClusters().getClusters().get(0).getValidity().getEnd()),
                     new DateTime(start.getTime() + instanceOffset));
         case DATA:
-            DataEvent dataEvent = new DataEvent(id, new Path("/projects/falcon/clicks"), LocationType.DATA,
+            DataEvent dataEvent = new DataEvent(id,
+                    new ArrayList<Path>(Arrays.asList(new Path("/projects/falcon/clicks"))),
                     DataEvent.STATUS.AVAILABLE);
             return dataEvent;
         default:
@@ -614,7 +615,8 @@ public class FalconExecutionServiceTest extends AbstractSchedulerTestBase {
         ID id = new InstanceID(instance);
         switch (type) {
         case DATA:
-            DataEvent dataEvent = new DataEvent(id, new Path("/projects/falcon/clicks"), LocationType.DATA,
+            DataEvent dataEvent = new DataEvent(id,
+                    new ArrayList<Path>(Arrays.asList(new Path("/projects/falcon/clicks"))),
                     DataEvent.STATUS.AVAILABLE);
             return dataEvent;
         case JOB_SCHEDULE:

http://git-wip-us.apache.org/repos/asf/falcon/blob/4656f692/scheduler/src/test/java/org/apache/falcon/notification/service/DataAvailabilityServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/DataAvailabilityServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/DataAvailabilityServiceTest.java
new file mode 100644
index 0000000..20c99b5
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/notification/service/DataAvailabilityServiceTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.notification.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.event.DataEvent;
+import org.apache.falcon.notification.service.impl.DataAvailabilityService;
+import org.apache.falcon.notification.service.request.DataNotificationRequest;
+import org.apache.falcon.state.EntityClusterID;
+import org.apache.falcon.state.ID;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test cases for DataNotificationService.
+ */
+public class DataAvailabilityServiceTest extends AbstractTestBase {
+
+    private static NotificationHandler handler = Mockito.mock(NotificationHandler.class);
+    private static DataAvailabilityService dataAvailabilityService = Mockito.spy(new DataAvailabilityService());
+    private static final String BASE_PATH = "jail://testCluster:00/data/user";
+
+    @BeforeClass
+    public void setup() throws Exception {
+        this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+        this.conf = dfsCluster.getConf();
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        dataAvailabilityService.init();
+    }
+
+    @Test
+    public void testDataNotificationServiceWithVaryingRequests() throws IOException,
+            FalconException, InterruptedException {
+        FileSystem fs = FileSystem.get(conf);
+        // invalid request
+        org.apache.falcon.entity.v0.process.Process mockProcess = new Process();
+        mockProcess.setName("test");
+        EntityClusterID id = new EntityClusterID(mockProcess, "testCluster");
+
+        DataNotificationRequest dataNotificationRequest = getDataNotificationRequest(new ArrayList<Path>(), id);
+
+        dataAvailabilityService.register(dataNotificationRequest);
+        Thread.sleep(1000);
+        Mockito.verify(handler, Mockito.times(1)).onEvent(Mockito.any(DataEvent.class));
+        ArgumentCaptor<DataEvent> captor = ArgumentCaptor.forClass(DataEvent.class);
+        Mockito.verify(handler).onEvent(captor.capture());
+        Assert.assertEquals(captor.getValue().getStatus(), DataEvent.STATUS.AVAILABLE);
+        Assert.assertEquals(captor.getValue().getTarget(), dataNotificationRequest.getCallbackId());
+
+        cleanupDir(fs, BASE_PATH);
+
+        String path1 = BASE_PATH + "/" + "2015";
+        String path2 = BASE_PATH + "/" + "2016";
+
+        fs.create(new Path(path1));
+        List<Path> paths = new ArrayList<>();
+        paths.add(new Path(path1));
+        paths.add(new Path(path2));
+
+        // Adding paths and verifying its in queue
+        dataNotificationRequest = getDataNotificationRequest(paths, id);
+        dataAvailabilityService.register(dataNotificationRequest);
+        Mockito.verify(handler, Mockito.times(1)).onEvent(Mockito.any(DataEvent.class));
+
+
+        // create path and check availability status
+        fs.create(new Path(path2));
+        Thread.sleep(1000);
+        Mockito.verify(handler, Mockito.times(2)).onEvent(captor.capture());
+        Assert.assertEquals(captor.getValue().getStatus(), DataEvent.STATUS.AVAILABLE);
+        Assert.assertEquals(captor.getValue().getTarget(), dataNotificationRequest.getCallbackId());
+
+
+        // Adding one more path and verify Unavailable case
+        String path3 = BASE_PATH + "/" + "2017";
+        paths.add(new Path(path3));
+        dataNotificationRequest = getDataNotificationRequest(paths, id);
+        dataAvailabilityService.register(dataNotificationRequest);
+        Thread.sleep(2000);
+        Mockito.verify(handler, Mockito.times(3)).onEvent(captor.capture());
+        Assert.assertEquals(captor.getValue().getStatus(), DataEvent.STATUS.UNAVAILABLE);
+        Assert.assertEquals(captor.getValue().getTarget(), dataNotificationRequest.getCallbackId());
+
+        dataNotificationRequest = getDataNotificationRequest(paths, id);
+        dataAvailabilityService.register(dataNotificationRequest);
+        dataAvailabilityService.unregister(dataNotificationRequest.getHandler(),
+                dataNotificationRequest.getCallbackId());
+        fs.create(new Path(path3));
+        Thread.sleep(1000);
+        // It wont notify as event was unregistered
+        Mockito.verify(handler, Mockito.times(3)).onEvent(captor.capture());
+    }
+
+    private void cleanupDir(FileSystem fs, String basePath) throws IOException {
+        fs.delete(new Path(basePath), true);
+    }
+
+    private DataNotificationRequest getDataNotificationRequest(List<Path> locations, ID id) {
+        DataAvailabilityService.DataRequestBuilder dataRequestBuilder =
+                new DataAvailabilityService.DataRequestBuilder(handler, id);
+        dataRequestBuilder.setPollingFrequencyInMillis(20).setCluster("testCluster")
+                .setTimeoutInMillis(100).setLocations(locations);
+        return dataRequestBuilder.build();
+    }
+
+}


[3/4] falcon git commit: FALCON-1601 Make Falcon StateStore more secure by not disclosing imp params in startup.props. Contributed by Pavan Kumar Kolamuri.

Posted by aj...@apache.org.
FALCON-1601 Make Falcon StateStore more secure by not disclosing imp params in startup.props. Contributed by Pavan Kumar Kolamuri.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/65bd4d1d
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/65bd4d1d
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/65bd4d1d

Branch: refs/heads/master
Commit: 65bd4d1d7b83b6a44f3ec810a8a697ec461966a8
Parents: f8e98f4
Author: Ajay Yadava <aj...@gmail.com>
Authored: Mon Jan 11 14:51:49 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Jan 11 14:52:07 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../falcon/util/ApplicationProperties.java      |  20 ++--
 .../falcon/util/StateStoreProperties.java       | 114 +++++++++++++++++++
 .../src/main/resources/statestore.credentials   |  22 ++++
 common/src/main/resources/statestore.properties |  45 ++++++++
 docs/src/site/twiki/FalconNativeScheduler.twiki |  14 ++-
 .../falcon/state/store/AbstractStateStore.java  |   4 +-
 .../falcon/state/store/jdbc/JDBCStateStore.java |   4 +-
 .../state/store/service/FalconJPAService.java   |  26 ++---
 .../falcon/tools/FalconStateStoreDBCLI.java     |  10 +-
 .../service/SchedulerServiceTest.java           |   3 +-
 .../falcon/state/AbstractSchedulerTestBase.java |   5 +-
 .../falcon/state/EntityStateServiceTest.java    |   4 +-
 .../falcon/state/InstanceStateServiceTest.java  |   4 +-
 .../engine/WorkflowEngineFactoryTest.java       |   4 +-
 scheduler/src/test/resources/startup.properties |  20 ----
 .../src/test/resources/statestore.credentials   |  20 ++++
 .../src/test/resources/statestore.properties    |  36 ++++++
 src/conf/startup.properties                     |  21 ----
 src/conf/statestore.credentials                 |  22 ++++
 src/conf/statestore.properties                  |  45 ++++++++
 .../AbstractSchedulerManagerJerseyIT.java       |   5 +-
 .../src/test/resources/statestore.credentials   |  20 ++++
 webapp/src/test/resources/statestore.properties |  35 ++++++
 24 files changed, 419 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5c2a772..e3244de 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -46,6 +46,8 @@ Proposed Release Version: 0.9
     FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
 
   IMPROVEMENTS
+    FALCON-1601 Make Falcon StateStore more secure by not disclosing imp params in startup.props(Pavan Kumar Kolamuri via Ajay Yadava)
+
     FALCON-1705 Standardization of error handling in falcon Server(Praveen Adlakha via Ajay Yadava)
 
     FALCON-1640 Cascading Delete for instances in Native Scheduler(Pavan Kumar Kolamuri via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java b/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java
index 1d8cf49..adf09c4 100644
--- a/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java
+++ b/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java
@@ -44,7 +44,7 @@ public abstract class ApplicationProperties extends Properties {
 
     protected abstract String getPropertyFile();
 
-    private String domain;
+    protected String domain;
 
     protected ApplicationProperties() throws FalconException {
         init();
@@ -104,15 +104,21 @@ public abstract class ApplicationProperties extends Properties {
         InputStream resourceAsStream = null;
         if (confDir != null) {
             File fileToLoad = new File(confDir, propertyFileName);
-            if (fileToLoad.exists() && fileToLoad.isFile() && fileToLoad.canRead()) {
-                LOG.info("config.location is set, using: {}/{}", confDir, propertyFileName);
-                resourceAsStream = new FileInputStream(fileToLoad);
-            }
+            resourceAsStream = getResourceAsStream(fileToLoad);
+        }
+        return resourceAsStream;
+    }
+
+    protected InputStream getResourceAsStream(File fileToLoad) throws FileNotFoundException {
+        InputStream resourceAsStream = null;
+        if (fileToLoad.exists() && fileToLoad.isFile() && fileToLoad.canRead()) {
+            LOG.info("config.location is set, using: {}", fileToLoad.getAbsolutePath());
+            resourceAsStream = new FileInputStream(fileToLoad);
         }
         return resourceAsStream;
     }
 
-    private InputStream checkClassPath(String propertyFileName) {
+    protected InputStream checkClassPath(String propertyFileName) {
 
         InputStream resourceAsStream = null;
         Class clazz = ApplicationProperties.class;
@@ -154,7 +160,7 @@ public abstract class ApplicationProperties extends Properties {
         }
     }
 
-    private Set<String> getKeys(Set<Object> keySet) {
+    protected Set<String> getKeys(Set<Object> keySet) {
         Set<String> keys = new HashSet<String>();
         for (Object keyObj : keySet) {
             String key = (String) keyObj;

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/common/src/main/java/org/apache/falcon/util/StateStoreProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/StateStoreProperties.java b/common/src/main/java/org/apache/falcon/util/StateStoreProperties.java
new file mode 100644
index 0000000..a3e6a56
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/util/StateStoreProperties.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Properties for State Store during application startup.
+ */
+public final class StateStoreProperties extends ApplicationProperties {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StateStoreProperties.class);
+
+    private static final String PROPERTY_FILE = "statestore.properties";
+    private static final String CREDENTIALS_FILE= "falcon.statestore.credentials.file";
+    private static final String DEFAULT_CREDENTIALS_FILE = "statestore.credentials";
+
+    private static final AtomicReference<StateStoreProperties> INSTANCE =
+            new AtomicReference<>();
+
+
+    protected StateStoreProperties() throws FalconException {
+        super();
+    }
+
+    @Override
+    protected String getPropertyFile() {
+        return PROPERTY_FILE;
+    }
+
+    @Override
+    protected void loadProperties() throws FalconException {
+        super.loadProperties();
+
+        String credentialsFile = (String)get(CREDENTIALS_FILE);
+        try {
+            InputStream resourceAsStream = null;
+            if (StringUtils.isNotBlank(credentialsFile)) {
+                resourceAsStream = getResourceAsStream(new File(credentialsFile));
+            }
+            // fall back to class path.
+            if (resourceAsStream == null) {
+                resourceAsStream = checkClassPath(DEFAULT_CREDENTIALS_FILE);
+            }
+            if (resourceAsStream != null) {
+                try {
+                    loadCredentials(resourceAsStream);
+                    return;
+                } finally {
+                    IOUtils.closeQuietly(resourceAsStream);
+                }
+            } else {
+                throw new FalconException("Unable to find state store credentials file");
+            }
+        } catch (IOException e) {
+            throw new FalconException("Error loading properties file: " + getPropertyFile(), e);
+        }
+    }
+
+    private void loadCredentials(InputStream resourceAsStream) throws IOException {
+        Properties origProps = new Properties();
+        origProps.load(resourceAsStream);
+        LOG.info("Initializing {} properties with domain {}", this.getClass().getName(), domain);
+        Set<String> keys = getKeys(origProps.keySet());
+        for (String key : keys) {
+            String value = origProps.getProperty(domain + "." + key, origProps.getProperty("*." + key));
+            if (value != null) {
+                value = ExpressionHelper.substitute(value);
+                LOG.debug("{}={}", key, value);
+                put(key, value);
+            }
+        }
+    }
+
+
+    public static Properties get() {
+        try {
+            if (INSTANCE.get() == null) {
+                INSTANCE.compareAndSet(null, new StateStoreProperties());
+            }
+            return INSTANCE.get();
+        } catch (FalconException e) {
+            throw new RuntimeException("Unable to read application state store properties", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/common/src/main/resources/statestore.credentials
----------------------------------------------------------------------
diff --git a/common/src/main/resources/statestore.credentials b/common/src/main/resources/statestore.credentials
new file mode 100644
index 0000000..86c32a1
--- /dev/null
+++ b/common/src/main/resources/statestore.credentials
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+######### StateStore Credentials #####
+#*.falcon.statestore.jdbc.username=sa
+#*.falcon.statestore.jdbc.password=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/common/src/main/resources/statestore.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/statestore.properties b/common/src/main/resources/statestore.properties
new file mode 100644
index 0000000..a67a871
--- /dev/null
+++ b/common/src/main/resources/statestore.properties
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+*.domain=debug
+
+######### StateStore Properties #####
+#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+## Falcon currently supports derby and mysql, change url based on DB.
+#*.falcon.statestore.jdbc.url=jdbc:derby:data/falcon.db;create=true
+
+## StateStore credentials file where username,password and other properties can be stored securely.
+## Set this credentials file permission 400 and make sure user who starts falcon should only have read permission.
+## Give Absolute path to credentials file along with file name or put in classpath with filename statestore.credentials.
+## Credentials file should be present either in given location or class path, otherwise falcon won't start.
+#*.falcon.statestore.credentials.file=
+
+#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+## Maximum number of active connections that can be allocated from this pool at the same time.
+#*.falcon.statestore.pool.max.active.conn=10
+## Any additional connection properties that need to be used, specified as comma separated key=value pairs.
+#*.falcon.statestore.connection.properties=
+## Indicates the interval (in milliseconds) between eviction runs.
+#*.falcon.statestore.validate.db.connection.eviction.interval=300000
+## The number of objects to examine during each run of the idle object evictor thread.
+#*.falcon.statestore.validate.db.connection.eviction.num=10
+## Creates Falcon DB.
+## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+#*.falcon.statestore.create.db.schema=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/docs/src/site/twiki/FalconNativeScheduler.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconNativeScheduler.twiki b/docs/src/site/twiki/FalconNativeScheduler.twiki
index 9403ae7..d2b3208 100644
--- a/docs/src/site/twiki/FalconNativeScheduler.twiki
+++ b/docs/src/site/twiki/FalconNativeScheduler.twiki
@@ -56,13 +56,19 @@ If you wish to make the Falcon Native Scheduler your default scheduler and remov
 </verbatim>
 
 ---+++ Configuring the state store for Native Scheduler
+You can configure statestore by making changes to __$FALCON_HOME/conf/statestore.properties__ as follows. You will need to restart Falcon Server for the changes to take effect.
 
-Falcon Server needs to maintain state of the entities and instances in a persistent store for the system to be recoverable. Since Prism only federates, it does not need to maintain any state information. Following properties need to be set in startup.properties of Falcon Servers:
+Falcon Server needs to maintain state of the entities and instances in a persistent store for the system to be recoverable. Since Prism only federates, it does not need to maintain any state information. Following properties need to be set in statestore.properties of Falcon Servers:
 <verbatim>
 ######### StateStore Properties #####
 *.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
 *.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
 *.falcon.statestore.jdbc.url=jdbc:derby:data/falcon.db
+# StateStore credentials file where username,password and other properties can be stored securely.
+# Set this credentials file permission 400 and make sure user who starts falcon should only have read permission.
+# Give Absolute path to credentials file along with file name or put in classpath with file name statestore.credentials.
+# Credentials file should be present either in given location or class path, otherwise falcon won't start.
+*.falcon.statestore.credentials.file=
 *.falcon.statestore.jdbc.username=sa
 *.falcon.statestore.jdbc.password=
 *.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
@@ -79,11 +85,11 @@ Falcon Server needs to maintain state of the entities and instances in a persist
 *.falcon.statestore.create.db.schema=true
 </verbatim> 
 
-The _*.falcon.statestore.jdbc.url_ property in startup.properties determines the DB and data location. All other properties are common across RDBMS.
+The _*.falcon.statestore.jdbc.url_ property in statestore.properties determines the DB and data location. All other properties are common across RDBMS.
 
 *NOTE : Although multiple Falcon Servers can share a DB (not applicable for Derby DB), it is recommended that you have different DBs for different Falcon Servers for better performance.*
 
-You will need to create the state DB and tables before starting the Falcon Server. To create tables, a tool comes bundled with the Falcon installation. You can use the _falcon-db.sh_ script to create tables in the DB. The script needs to be run only for Falcon Servers and can be run by any user that has execute permission on the script. The script picks up the DB connection details from __$FALCON_HOME/conf/startup.properties__. Ensure that you have granted the right privileges to the user mentioned in _startup.properties_, so the tables can be created.  
+You will need to create the state DB and tables before starting the Falcon Server. To create tables, a tool comes bundled with the Falcon installation. You can use the _falcon-db.sh_ script to create tables in the DB. The script needs to be run only for Falcon Servers and can be run by any user that has execute permission on the script. The script picks up the DB connection details from __$FALCON_HOME/conf/statestore.properties__. Ensure that you have granted the right privileges to the user mentioned in statestore.properties_, so the tables can be created.
 
 You can use the help command to get details on the sub-commands supported:
 <verbatim>
@@ -117,7 +123,7 @@ For example,
  tells Falcon to use the Derby JDBC connector, with data directory, $FALCON_HOME/data/ and DB name 'falcon'. If _create=true_ is specified, you will not need to create a DB up front; a database will be created if it does not exist.
 
 ---++++ Using MySQL as the State Store
-The jdbc.url property in startup.properties determines the DB and data location.
+The jdbc.url property in statestore.properties determines the DB and data location.
 For example,
  <verbatim> *.falcon.statestore.jdbc.url=jdbc:mysql://localhost:3306/falcon </verbatim>
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
index 2d576e5..84d12f8 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
@@ -24,7 +24,7 @@ import org.apache.falcon.service.ConfigurationChangeListener;
 import org.apache.falcon.state.EntityID;
 import org.apache.falcon.state.EntityState;
 import org.apache.falcon.util.ReflectionUtils;
-import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.util.StateStoreProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,7 +79,7 @@ public abstract class AbstractStateStore implements StateStore, ConfigurationCha
      */
     public static synchronized StateStore get() {
         if (stateStore == null) {
-            String storeImpl = StartupProperties.get().getProperty("falcon.state.store.impl",
+            String storeImpl = StateStoreProperties.get().getProperty("falcon.state.store.impl",
                     "org.apache.falcon.state.store.InMemoryStateStore");
             try {
                 stateStore = ReflectionUtils.getInstanceByClassName(storeImpl);

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
index abd4119..e898247 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
@@ -30,7 +30,7 @@ import org.apache.falcon.state.InstanceState;
 import org.apache.falcon.state.store.AbstractStateStore;
 import org.apache.falcon.state.store.StateStore;
 import org.apache.falcon.state.store.service.FalconJPAService;
-import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.util.StateStoreProperties;
 import org.joda.time.DateTime;
 
 import javax.persistence.EntityManager;
@@ -444,7 +444,7 @@ public final class JDBCStateStore extends AbstractStateStore {
 
     // Debug enabled for test cases
     private boolean isModeDebug() {
-        return DEBUG.equals(StartupProperties.get().getProperty("domain")) ? true : false;
+        return DEBUG.equals(StateStoreProperties.get().getProperty("domain")) ? true : false;
     }
 
     private void commitAndCloseTransaction(EntityManager entityManager) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java b/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
index 72d1aba..f678a6f 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
@@ -22,7 +22,7 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.service.FalconService;
 import org.apache.falcon.state.store.jdbc.EntityBean;
 import org.apache.falcon.state.store.jdbc.InstanceBean;
-import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.util.StateStoreProperties;
 import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -105,19 +105,19 @@ public final class FalconJPAService implements FalconService {
     }
 
     private Properties getPropsforStore() throws FalconException {
-        String dbSchema = StartupProperties.get().getProperty(DB_SCHEMA);
-        String url = StartupProperties.get().getProperty(URL);
-        String driver = StartupProperties.get().getProperty(DRIVER);
-        String user = StartupProperties.get().getProperty(USERNAME);
-        String password = StartupProperties.get().getProperty(PASSWORD).trim();
-        String maxConn = StartupProperties.get().getProperty(MAX_ACTIVE_CONN).trim();
-        String dataSource = StartupProperties.get().getProperty(CONN_DATA_SOURCE);
-        String connPropsConfig = StartupProperties.get().getProperty(CONN_PROPERTIES);
-        boolean autoSchemaCreation = Boolean.parseBoolean(StartupProperties.get().getProperty(CREATE_DB_SCHEMA,
+        String dbSchema = StateStoreProperties.get().getProperty(DB_SCHEMA);
+        String url = StateStoreProperties.get().getProperty(URL);
+        String driver = StateStoreProperties.get().getProperty(DRIVER);
+        String user = StateStoreProperties.get().getProperty(USERNAME);
+        String password = StateStoreProperties.get().getProperty(PASSWORD).trim();
+        String maxConn = StateStoreProperties.get().getProperty(MAX_ACTIVE_CONN).trim();
+        String dataSource = StateStoreProperties.get().getProperty(CONN_DATA_SOURCE);
+        String connPropsConfig = StateStoreProperties.get().getProperty(CONN_PROPERTIES);
+        boolean autoSchemaCreation = Boolean.parseBoolean(StateStoreProperties.get().getProperty(CREATE_DB_SCHEMA,
                 "false"));
-        boolean validateDbConn = Boolean.parseBoolean(StartupProperties.get().getProperty(VALIDATE_DB_CONN, "true"));
-        String evictionInterval = StartupProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_INTERVAL).trim();
-        String evictionNum = StartupProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_NUM).trim();
+        boolean validateDbConn = Boolean.parseBoolean(StateStoreProperties.get().getProperty(VALIDATE_DB_CONN, "true"));
+        String evictionInterval = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_INTERVAL).trim();
+        String evictionNum = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_NUM).trim();
 
         if (!url.startsWith("jdbc:")) {
             throw new FalconException("invalid JDBC URL, must start with 'jdbc:'" + url);

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
index f4058c8..7f22b0e 100644
--- a/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
+++ b/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -24,7 +24,7 @@ import org.apache.commons.cli.ParseException;
 import org.apache.falcon.cli.CLIParser;
 import org.apache.falcon.state.store.service.FalconJPAService;
 import org.apache.falcon.util.BuildProperties;
-import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.util.StateStoreProperties;
 
 import java.io.File;
 import java.io.FileWriter;
@@ -205,11 +205,11 @@ public class FalconStateStoreDBCLI {
 
     private Map<String, String> getJdbcConf() throws Exception {
         Map<String, String> jdbcConf = new HashMap<String, String>();
-        jdbcConf.put("driver", StartupProperties.get().getProperty(FalconJPAService.DRIVER));
-        String url = StartupProperties.get().getProperty(FalconJPAService.URL);
+        jdbcConf.put("driver", StateStoreProperties.get().getProperty(FalconJPAService.DRIVER));
+        String url = StateStoreProperties.get().getProperty(FalconJPAService.URL);
         jdbcConf.put("url", url);
-        jdbcConf.put("user", StartupProperties.get().getProperty(FalconJPAService.USERNAME));
-        jdbcConf.put("password", StartupProperties.get().getProperty(FalconJPAService.PASSWORD));
+        jdbcConf.put("user", StateStoreProperties.get().getProperty(FalconJPAService.USERNAME));
+        jdbcConf.put("password", StateStoreProperties.get().getProperty(FalconJPAService.PASSWORD));
         String dbType = url.substring("jdbc:".length());
         if (dbType.indexOf(":") <= 0) {
             throw new RuntimeException("Invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
index 5a66518..a7ce748 100644
--- a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
@@ -42,6 +42,7 @@ import org.apache.falcon.state.InstanceState;
 import org.apache.falcon.state.store.AbstractStateStore;
 import org.apache.falcon.state.store.StateStore;
 import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.util.StateStoreProperties;
 import org.apache.falcon.workflow.engine.DAGEngine;
 import org.apache.falcon.workflow.engine.DAGEngineFactory;
 import org.apache.oozie.client.WorkflowJob;
@@ -79,7 +80,7 @@ public class SchedulerServiceTest extends AbstractTestBase {
 
     @BeforeClass
     public void init() throws Exception {
-        StartupProperties.get().setProperty("falcon.state.store.impl",
+        StateStoreProperties.get().setProperty("falcon.state.store.impl",
                 "org.apache.falcon.state.store.InMemoryStateStore");
         stateStore = AbstractStateStore.get();
         scheduler = Mockito.spy(new SchedulerService());

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
index a8be06d..155be69 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
@@ -20,7 +20,7 @@ package org.apache.falcon.state;
 import org.apache.falcon.entity.AbstractTestBase;
 import org.apache.falcon.state.store.service.FalconJPAService;
 import org.apache.falcon.tools.FalconStateStoreDBCLI;
-import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.util.StateStoreProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -41,8 +41,7 @@ public class AbstractSchedulerTestBase extends AbstractTestBase {
     protected LocalFileSystem fs = new LocalFileSystem();
 
     public void setup() throws Exception {
-        StartupProperties.get();
-        StartupProperties.get().setProperty(FalconJPAService.URL, url);
+        StateStoreProperties.get().setProperty(FalconJPAService.URL, url);
         Configuration localConf = new Configuration();
         fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
         fs.mkdirs(new Path(DB_BASE_DIR));

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
index 6676754..695fcc1 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
@@ -26,7 +26,7 @@ import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.exception.InvalidStateTransitionException;
 import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.state.store.AbstractStateStore;
-import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.util.StateStoreProperties;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -43,7 +43,7 @@ public class EntityStateServiceTest extends AbstractSchedulerTestBase{
 
     @BeforeClass
     public void setup() throws Exception {
-        StartupProperties.get().setProperty("falcon.state.store.impl",
+        StateStoreProperties.get().setProperty("falcon.state.store.impl",
                 "org.apache.falcon.state.store.InMemoryStateStore");
         super.setup();
         this.dfsCluster = EmbeddedCluster.newCluster("testCluster");

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
index f0ae7b2..b30acda 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
@@ -23,7 +23,7 @@ import org.apache.falcon.exception.InvalidStateTransitionException;
 import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.execution.ProcessExecutionInstance;
 import org.apache.falcon.state.store.AbstractStateStore;
-import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.util.StateStoreProperties;
 import org.joda.time.DateTime;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -43,7 +43,7 @@ public class InstanceStateServiceTest {
 
     @BeforeClass
     public void init() {
-        StartupProperties.get().setProperty("falcon.state.store.impl",
+        StateStoreProperties.get().setProperty("falcon.state.store.impl",
                 "org.apache.falcon.state.store.InMemoryStateStore");
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/scheduler/src/test/java/org/apache/falcon/workflow/engine/WorkflowEngineFactoryTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/workflow/engine/WorkflowEngineFactoryTest.java b/scheduler/src/test/java/org/apache/falcon/workflow/engine/WorkflowEngineFactoryTest.java
index 7e502cd..aacaced 100644
--- a/scheduler/src/test/java/org/apache/falcon/workflow/engine/WorkflowEngineFactoryTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/workflow/engine/WorkflowEngineFactoryTest.java
@@ -26,7 +26,7 @@ import org.apache.falcon.state.EntityID;
 import org.apache.falcon.state.EntityState;
 import org.apache.falcon.state.store.AbstractStateStore;
 import org.apache.falcon.state.store.StateStore;
-import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.util.StateStoreProperties;
 import org.apache.falcon.workflow.WorkflowEngineFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -47,7 +47,7 @@ public class WorkflowEngineFactoryTest extends AbstractTestBase {
     public void init() throws Exception {
         this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
         this.conf = dfsCluster.getConf();
-        StartupProperties.get().setProperty("falcon.state.store.impl",
+        StateStoreProperties.get().setProperty("falcon.state.store.impl",
                 "org.apache.falcon.state.store.InMemoryStateStore");
         setupConfigStore();
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/scheduler/src/test/resources/startup.properties
----------------------------------------------------------------------
diff --git a/scheduler/src/test/resources/startup.properties b/scheduler/src/test/resources/startup.properties
index 2e938ee..7160bb2 100644
--- a/scheduler/src/test/resources/startup.properties
+++ b/scheduler/src/test/resources/startup.properties
@@ -132,23 +132,3 @@ debug.libext.process.paths=${falcon.libext}
 
 # Comma separated list of black listed users
 *.falcon.http.authentication.blacklisted.users=
-
-
-######## StateStore Properties #####
-*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
-*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
-*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true
-*.falcon.statestore.jdbc.username=sa
-*.falcon.statestore.jdbc.password=
-*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
-# Maximum number of active connections that can be allocated from this pool at the same time.
-*.falcon.statestore.pool.max.active.conn=10
-*.falcon.statestore.connection.properties=
-# Indicates the interval (in milliseconds) between eviction runs.
-*.falcon.statestore.validate.db.connection.eviction.interval=300000
-# The number of objects to examine during each run of the idle object evictor thread.
-*.falcon.statestore.validate.db.connection.eviction.num=10
-# Creates Falcon DB.
-# If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
-# If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
-*.falcon.statestore.create.db.schema=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/scheduler/src/test/resources/statestore.credentials
----------------------------------------------------------------------
diff --git a/scheduler/src/test/resources/statestore.credentials b/scheduler/src/test/resources/statestore.credentials
new file mode 100644
index 0000000..018f02e
--- /dev/null
+++ b/scheduler/src/test/resources/statestore.credentials
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+*.falcon.statestore.jdbc.username=sa
+*.falcon.statestore.jdbc.password=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/scheduler/src/test/resources/statestore.properties
----------------------------------------------------------------------
diff --git a/scheduler/src/test/resources/statestore.properties b/scheduler/src/test/resources/statestore.properties
new file mode 100644
index 0000000..2ae642f
--- /dev/null
+++ b/scheduler/src/test/resources/statestore.properties
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+*.domain=debug
+######## StateStore Properties #####
+*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true
+*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+# Maximum number of active connections that can be allocated from this pool at the same time.
+*.falcon.statestore.pool.max.active.conn=10
+*.falcon.statestore.connection.properties=
+# Indicates the interval (in milliseconds) between eviction runs.
+*.falcon.statestore.validate.db.connection.eviction.interval=300000
+# The number of objects to examine during each run of the idle object evictor thread.
+*.falcon.statestore.validate.db.connection.eviction.num=10
+# Creates Falcon DB.
+# If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+# If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+*.falcon.statestore.create.db.schema=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index ef0a2d5..b1a340a 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -274,24 +274,3 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 # Setting monitoring plugin, if SMTP parameters is defined
 #*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\
 #                     org.apache.falcon.plugin.EmailNotificationPlugin
-
-######### StateStore Properties #####
-#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
-#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
-## Falcon currently supports derby and mysql, change url based on DB.
-#*.falcon.statestore.jdbc.url=jdbc:derby:data/falcon.db;create=true
-#*.falcon.statestore.jdbc.username=sa
-#*.falcon.statestore.jdbc.password=
-#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
-## Maximum number of active connections that can be allocated from this pool at the same time.
-#*.falcon.statestore.pool.max.active.conn=10
-## Any additional connection properties that need to be used, specified as comma separated key=value pairs.
-#*.falcon.statestore.connection.properties=
-## Indicates the interval (in milliseconds) between eviction runs.
-#*.falcon.statestore.validate.db.connection.eviction.interval=300000
-## The number of objects to examine during each run of the idle object evictor thread.
-#*.falcon.statestore.validate.db.connection.eviction.num=10
-## Creates Falcon DB.
-## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
-## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
-#*.falcon.statestore.create.db.schema=true

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/src/conf/statestore.credentials
----------------------------------------------------------------------
diff --git a/src/conf/statestore.credentials b/src/conf/statestore.credentials
new file mode 100644
index 0000000..86c32a1
--- /dev/null
+++ b/src/conf/statestore.credentials
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+######### StateStore Credentials #####
+#*.falcon.statestore.jdbc.username=sa
+#*.falcon.statestore.jdbc.password=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/src/conf/statestore.properties
----------------------------------------------------------------------
diff --git a/src/conf/statestore.properties b/src/conf/statestore.properties
new file mode 100644
index 0000000..0c912ad
--- /dev/null
+++ b/src/conf/statestore.properties
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+*.domain=${falcon.app.type}
+
+######### StateStore Properties #####
+#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+## Falcon currently supports derby and mysql, change url based on DB.
+#*.falcon.statestore.jdbc.url=jdbc:derby:data/falcon.db;create=true
+
+## StateStore credentials file where username,password and other properties can be stored securely.
+## Set this credentials file permission 400 and make sure user who starts falcon should only have read permission.
+## Give Absolute path to credentials file along with file name or put in classpath with filename statestore.credentials.
+## Credentials file should be present either in given location or class path, otherwise falcon won't start.
+#*.falcon.statestore.credentials.file=
+
+#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+## Maximum number of active connections that can be allocated from this pool at the same time.
+#*.falcon.statestore.pool.max.active.conn=10
+## Any additional connection properties that need to be used, specified as comma separated key=value pairs.
+#*.falcon.statestore.connection.properties=
+## Indicates the interval (in milliseconds) between eviction runs.
+#*.falcon.statestore.validate.db.connection.eviction.interval=300000
+## The number of objects to examine during each run of the idle object evictor thread.
+#*.falcon.statestore.validate.db.connection.eviction.num=10
+## Creates Falcon DB.
+## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+#*.falcon.statestore.create.db.schema=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
index 0a3e984..175833a 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
@@ -27,6 +27,7 @@ import org.apache.falcon.state.AbstractSchedulerTestBase;
 import org.apache.falcon.state.store.service.FalconJPAService;
 import org.apache.falcon.unit.FalconUnitTestBase;
 import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.util.StateStoreProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -88,7 +89,7 @@ public class AbstractSchedulerManagerJerseyIT extends FalconUnitTestBase {
         configListeners.remove("org.apache.falcon.service.SharedLibraryHostingService");
         configListeners.add("org.apache.falcon.state.store.jdbc.JDBCStateStore");
         StartupProperties.get().setProperty("configstore.listeners", StringUtils.join(configListeners, ","));
-        StartupProperties.get().getProperty("falcon.state.store.impl",
+        StateStoreProperties.get().getProperty("falcon.state.store.impl",
                 "org.apache.falcon.state.store.jdbc.JDBCStateStore");
     }
 
@@ -119,7 +120,7 @@ public class AbstractSchedulerManagerJerseyIT extends FalconUnitTestBase {
 
     private void createDB() throws Exception {
         AbstractSchedulerTestBase abstractSchedulerTestBase = new AbstractSchedulerTestBase();
-        StartupProperties.get().setProperty(FalconJPAService.URL, url);
+        StateStoreProperties.get().setProperty(FalconJPAService.URL, url);
         abstractSchedulerTestBase.createDB(DB_SQL_FILE);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/webapp/src/test/resources/statestore.credentials
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/statestore.credentials b/webapp/src/test/resources/statestore.credentials
new file mode 100644
index 0000000..018f02e
--- /dev/null
+++ b/webapp/src/test/resources/statestore.credentials
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+*.falcon.statestore.jdbc.username=sa
+*.falcon.statestore.jdbc.password=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/65bd4d1d/webapp/src/test/resources/statestore.properties
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/statestore.properties b/webapp/src/test/resources/statestore.properties
new file mode 100644
index 0000000..bd9dd26
--- /dev/null
+++ b/webapp/src/test/resources/statestore.properties
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+*.domain=debug
+######## StateStore Properties #####
+*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true
+*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+# Maximum number of active connections that can be allocated from this pool at the same time.
+*.falcon.statestore.pool.max.active.conn=10
+*.falcon.statestore.connection.properties=
+# Indicates the interval (in milliseconds) between eviction runs.
+*.falcon.statestore.validate.db.connection.eviction.interval=300000
+# The number of objects to examine during each run of the idle object evictor thread.
+*.falcon.statestore.validate.db.connection.eviction.num=10
+# Creates Falcon DB.
+# If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+# If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+*.falcon.statestore.create.db.schema=true