You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2016/03/28 15:39:24 UTC

[1/3] falcon git commit: FALCON-1865 Persist Feed sla data to database

Repository: falcon
Updated Branches:
  refs/heads/master 10f3843ad -> de2f5c0ab


http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/webapp/src/test/resources/startup.properties
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/startup.properties b/webapp/src/test/resources/startup.properties
index bc88534..3544f0a 100644
--- a/webapp/src/test/resources/startup.properties
+++ b/webapp/src/test/resources/startup.properties
@@ -32,7 +32,7 @@
 *.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
                         org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
                         org.apache.falcon.service.ProcessSubscriberService,\
-                        org.apache.falcon.state.store.service.FalconJPAService,\
+                        org.apache.falcon.service.FalconJPAService,\
                         org.apache.falcon.entity.store.ConfigurationStore,\
                         org.apache.falcon.rerun.service.RetryService,\
                         org.apache.falcon.rerun.service.LateRunService,\


[2/3] falcon git commit: FALCON-1865 Persist Feed sla data to database

Posted by aj...@apache.org.
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
index 29bd7ba..b5a2569 100644
--- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
@@ -17,7 +17,7 @@
  */
 package org.apache.falcon.service;
 
-import org.apache.commons.io.IOUtils;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Pair;
 import org.apache.falcon.entity.EntityUtil;
@@ -31,6 +31,9 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Sla;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
+import org.apache.falcon.persistence.MonitoredFeedsBean;
+import org.apache.falcon.persistence.PendingInstanceBean;
 import org.apache.falcon.resource.SchedulableEntityInstance;
 import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.StartupProperties;
@@ -38,24 +41,15 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
+import java.util.ArrayList;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -66,6 +60,8 @@ import java.util.concurrent.TimeUnit;
 public final class FeedSLAMonitoringService implements ConfigurationChangeListener, FalconService {
     private static final Logger LOG = LoggerFactory.getLogger("FeedSLA");
 
+    private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore();
+
     private static final String ONE_HOUR = String.valueOf(60 * 60 * 1000);
 
     private static final int ONE_MS = 1;
@@ -88,29 +84,10 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
     private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
 
     /**
-     * Feeds to be monitored.
-     */
-    protected Set<String> monitoredFeeds;
-
-
-    /**
-     * Map<Pair<feedName, clusterName>, Set<instanceTime> to store
-     * each missing instance of a feed.
-     */
-    protected Map<Pair<String, String>, BlockingQueue<Date>> pendingInstances;
-
-
-    /**
      * Used to store the last time when pending instances were checked for SLA.
      */
     private Date lastCheckedAt;
 
-
-    /**
-     * Used to store last time when the state was serialized to the store.
-     */
-    private Date lastSerializedAt;
-
     /**
      * Frequency in seconds of "status check" for pending feed instances.
      */
@@ -156,7 +133,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
                     if (currentClusters.contains(cluster.getName())) {
                         if (FeedHelper.getSLA(cluster, feed) != null) {
                             LOG.debug("Adding feed:{} for monitoring", feed.getName());
-                            monitoredFeeds.add(feed.getName());
+                            MONITORING_JDBC_STATE_STORE.putMonitoredFeed(feed.getName());
                         }
                     }
                 }
@@ -173,8 +150,8 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
                 Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
                 for (Cluster cluster : feed.getClusters().getClusters()) {
                     if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
-                        monitoredFeeds.remove(feed.getName());
-                        pendingInstances.remove(new Pair<>(feed.getName(), cluster.getName()));
+                        MONITORING_JDBC_STATE_STORE.deleteMonitoringFeed(feed.getName());
+                        MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), cluster.getName());
                     }
                 }
             }
@@ -212,7 +189,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
                 }
 
                 for (String clusterName : slaRemovedClusters) {
-                    pendingInstances.remove(new Pair<>(newFeed.getName(), clusterName));
+                    MONITORING_JDBC_STATE_STORE.deletePendingInstances(newFeed.getName(), clusterName);
                 }
             }
         }
@@ -247,32 +224,21 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
         String size = StartupProperties.get().getProperty("feed.sla.queue.size", "288");
         queueSize = Integer.parseInt(size);
 
-        try {
-            if (fileSystem.exists(filePath)) {
-                deserialize(filePath);
-            } else {
-                LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString());
-                initializeService();
-            }
-        } catch (IOException e) {
-            throw new FalconException("Couldn't check the existence of " + filePath, e);
-        }
+        LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString());
+        initializeService();
+
         ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
         executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS);
     }
 
-    @Override
-    public void destroy() throws FalconException {
-        serializeState(); // store the state of monitoring service to the disk.
-    }
-
-    public void makeFeedInstanceAvailable(String feedName, String clusterName, Date nominalTime) {
+    public void makeFeedInstanceAvailable(String feedName, String clusterName, Date nominalTime)
+        throws FalconException {
         LOG.info("Removing {} feed's instance {} in cluster {} from pendingSLA", feedName,
                 clusterName, nominalTime);
-        Pair<String, String> feedCluster = new Pair<>(feedName, clusterName);
+        List<Date> instances = (MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName));
         // Slas for feeds not having sla tag are not stored.
-        if (pendingInstances.get(feedCluster) != null) {
-            pendingInstances.get(feedCluster).remove(nominalTime);
+        if (CollectionUtils.isEmpty(instances)){
+            MONITORING_JDBC_STATE_STORE.deletePendingInstance(feedName, clusterName, nominalTime);
         }
     }
 
@@ -290,13 +256,17 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
         }
     }
 
+    @Override
+    public void destroy() throws FalconException {
+    }
+
     //Periodically update status of pending instances, add new instances and take backup.
     private class Monitor implements Runnable {
 
         @Override
         public void run() {
             try {
-                if (!monitoredFeeds.isEmpty()) {
+                if (MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed().size() > 0) {
                     checkPendingInstanceAvailability();
 
                     // add Instances from last checked time to 10 minutes from now(some buffer for status check)
@@ -304,12 +274,6 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
                     Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis);
                     addNewPendingFeedInstances(lastCheckedAt, newCheckPoint);
                     lastCheckedAt = newCheckPoint;
-
-                    //take backup
-                    if (now.getTime() - lastSerializedAt.getTime() > serializationFrequencyMillis) {
-                        serializeState();
-                        lastSerializedAt = new Date();
-                    }
                 }
             } catch (Throwable e) {
                 LOG.error("Feed SLA monitoring failed: ", e);
@@ -320,14 +284,18 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
 
     void addNewPendingFeedInstances(Date from, Date to) throws FalconException {
         Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
-        for (String feedName : monitoredFeeds) {
+        List<MonitoredFeedsBean> feedsBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed();
+        for(MonitoredFeedsBean monitoredFeedsBean : feedsBeanList) {
+            String feedName = monitoredFeedsBean.getFeedName();
             Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
             for (Cluster feedCluster : feed.getClusters().getClusters()) {
                 if (currentClusters.contains(feedCluster.getName())) {
                     Date nextInstanceTime = from;
                     Pair<String, String> key = new Pair<>(feed.getName(), feedCluster.getName());
-                    BlockingQueue<Date> instances = pendingInstances.get(key);
-                    if (instances == null) {
+                    BlockingQueue<Date> instances = new LinkedBlockingQueue<>(
+                            MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, feedCluster.getName()));
+                    if (CollectionUtils.isEmpty(MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName,
+                            feedCluster.getName()))) {
                         instances = new LinkedBlockingQueue<>(queueSize);
                         Date feedStartTime = feedCluster.getValidity().getStart();
                         Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, feedCluster);
@@ -357,7 +325,9 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
                         nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
                         nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime);
                     }
-                    pendingInstances.put(key, instances);
+                    for(Date date:instances){
+                        MONITORING_JDBC_STATE_STORE.putPendingInstances(feed.getName(), feedCluster.getName(), date);
+                    }
                 }
             }
         }
@@ -368,11 +338,14 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
      * Checks the availability of all the pendingInstances and removes the ones which have become available.
      */
     private void checkPendingInstanceAvailability() throws FalconException {
-        for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> entry: pendingInstances.entrySet()) {
-            for (Date date : entry.getValue()) {
-                boolean status = checkFeedInstanceAvailability(entry.getKey().first, entry.getKey().second, date);
+        for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){
+            for (Date date : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getFeedName(),
+                    pendingInstanceBean.getClusterName())) {
+                boolean status = checkFeedInstanceAvailability(pendingInstanceBean.getFeedName(),
+                        pendingInstanceBean.getClusterName(), date);
                 if (status) {
-                    pendingInstances.get(entry.getKey()).remove(date);
+                    MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getFeedName(),
+                            pendingInstanceBean.getClusterName(), date);
                 }
             }
         }
@@ -402,79 +375,9 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
         return false;
     }
 
-    private void serializeState() throws FalconException{
-        LOG.info("Saving context to: [{}]", storePath);
-
-        //create a temporary file and rename it.
-        Path tmp = new Path(storePath , "tmp");
-        ObjectOutputStream oos = null;
-        try {
-            OutputStream out = fileSystem.create(tmp);
-            oos = new ObjectOutputStream(out);
-            Map<String, Object> state = new HashMap<>();
-            state.put("lastSerializedAt", lastSerializedAt.getTime());
-            state.put("lastCheckedAt", lastCheckedAt.getTime());
-            state.put("pendingInstances", pendingInstances);
-            oos.writeObject(state);
-            fileSystem.rename(tmp, filePath);
-        } catch (IOException e) {
-            throw new FalconException("Error serializing context to : " + storePath.toUri(),  e);
-        } finally {
-            IOUtils.closeQuietly(oos);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private void deserialize(Path path) throws FalconException {
-        try {
-            Map<String, Object> state = deserializeInternal(path);
-            pendingInstances = new ConcurrentHashMap<>();
-            Map<Pair<String, String>, BlockingQueue<Date>> pendingInstancesCopy =
-                    (Map<Pair<String, String>, BlockingQueue<Date>>) state.get("pendingInstances");
-            // queue size can change during restarts, hence copy
-            for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> entry : pendingInstancesCopy.entrySet()) {
-                BlockingQueue<Date> value = new LinkedBlockingQueue<>(queueSize);
-                BlockingQueue<Date> oldValue = entry.getValue();
-                LOG.debug("Number of old instances:{}, new queue size:{}", oldValue.size(), queueSize);
-                while (!oldValue.isEmpty()) {
-                    Date instance = oldValue.remove();
-                    if (value.size() == queueSize) { // if full
-                        LOG.debug("Deserialization: Removing value={} for <feed,cluster>={}", value.peek(),
-                            entry.getKey());
-                        value.remove();
-                    }
-                    LOG.debug("Deserialization Adding: key={} to <feed,cluster>={}", entry.getKey(), instance);
-                    value.add(instance);
-                }
-                pendingInstances.put(entry.getKey(), value);
-            }
-            lastCheckedAt = new Date((Long) state.get("lastCheckedAt"));
-            lastSerializedAt = new Date((Long) state.get("lastSerializedAt"));
-            monitoredFeeds = new ConcurrentHashSet<>(); // will be populated on the onLoad of entities.
-            LOG.debug("Restored the service from old state.");
-        } catch (IOException | ClassNotFoundException e) {
-            throw new FalconException("Couldn't deserialize the old state", e);
-        }
-    }
 
     protected void initializeService() {
-        pendingInstances = new ConcurrentHashMap<>();
         lastCheckedAt = new Date();
-        lastSerializedAt = new Date();
-        monitoredFeeds = new ConcurrentHashSet<>();
-    }
-
-    @SuppressWarnings("unchecked")
-    private Map<String, Object> deserializeInternal(Path path) throws IOException, ClassNotFoundException {
-        Map<String, Object> state;
-        InputStream in = fileSystem.open(path);
-        ObjectInputStream ois = new ObjectInputStream(in);
-        try {
-            state = (Map<String, Object>) ois.readObject();
-        } finally {
-            IOUtils.closeQuietly(ois);
-        }
-        return state;
     }
 
     /**
@@ -492,13 +395,16 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
     public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(Date start, Date end)
         throws FalconException {
         Set<SchedulableEntityInstance> result = new HashSet<>();
-        for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> feedInstances : pendingInstances.entrySet()) {
-            Pair<String, String> feedClusterPair = feedInstances.getKey();
+        for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){
+            Pair<String, String> feedClusterPair = new Pair<>(pendingInstanceBean.getFeedName(),
+                    pendingInstanceBean.getClusterName());
             Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first);
             Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
             Sla sla = FeedHelper.getSLA(cluster, feed);
             if (sla != null) {
-                Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end, feedInstances.getValue());
+                Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end,
+                        new LinkedBlockingQueue<Date>(MONITORING_JDBC_STATE_STORE.getNominalInstances(
+                                pendingInstanceBean.getFeedName(), pendingInstanceBean.getClusterName())));
                 for (Pair<Date, String> status : slaStatus){
                     SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first,
                             feedClusterPair.second, status.first, EntityType.FEED);
@@ -525,7 +431,8 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
 
         Set<SchedulableEntityInstance> result = new HashSet<>();
         Pair<String, String> feedClusterPair = new Pair<>(feedName, clusterName);
-        BlockingQueue<Date> missingInstances = pendingInstances.get(feedClusterPair);
+        BlockingQueue<Date> missingInstances = new LinkedBlockingQueue<>(MONITORING_JDBC_STATE_STORE.
+                getNominalInstances(feedName, clusterName));
         Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
         Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
         Sla sla = FeedHelper.getSLA(cluster, feed);

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
new file mode 100644
index 0000000..aa32167
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.jdbc;
+
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.service.FalconJPAService;
+import org.apache.falcon.tools.FalconStateStoreDBCLI;
+import org.apache.falcon.util.StateStoreProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.util.Date;
+import java.util.Random;
+
+/**
+*Unit test for MonitoringJdbcStateStore.
+ * */
+
+public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
+    private static final String DB_BASE_DIR = "target/test-data/persistancedb";
+    protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
+    protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
+    protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
+    protected LocalFileSystem fs = new LocalFileSystem();
+
+    private static Random randomValGenerator = new Random();
+    private static FalconJPAService falconJPAService = FalconJPAService.get();
+
+    protected int execDBCLICommands(String[] args) {
+        return new FalconStateStoreDBCLI().run(args);
+    }
+
+    public void createDB(String file) {
+        File sqlFile = new File(file);
+        String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" };
+        int result = execDBCLICommands(argsCreate);
+        Assert.assertEquals(0, result);
+        Assert.assertTrue(sqlFile.exists());
+
+    }
+
+    @BeforeClass
+    public void setup() throws Exception{
+        StateStoreProperties.get().setProperty(FalconJPAService.URL, url);
+        Configuration localConf = new Configuration();
+        fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
+        fs.mkdirs(new Path(DB_BASE_DIR));
+        createDB(DB_SQL_FILE);
+        falconJPAService.init();
+        this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+        this.conf = dfsCluster.getConf();
+    }
+
+    @Test
+    public void testInsertRetrieveAndUpdate() throws Exception {
+
+        MonitoringJdbcStateStore monitoringJdbcStateStore = new MonitoringJdbcStateStore();
+        monitoringJdbcStateStore.putMonitoredFeed("test_feed1");
+        monitoringJdbcStateStore.putMonitoredFeed("test_feed2");
+        Assert.assertEquals("test_feed1", monitoringJdbcStateStore.getMonitoredFeed("test_feed1").getFeedName());
+        Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredFeed().size(), 2);
+
+        monitoringJdbcStateStore.deleteMonitoringFeed("test_feed1");
+        monitoringJdbcStateStore.deleteMonitoringFeed("test_feed2");
+        Date dateOne =  SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
+        Date dateTwo =  SchemaHelper.parseDateUTC("2015-11-20T01:00Z");
+        monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateOne);
+        monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateTwo);
+
+        Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 2);
+        monitoringJdbcStateStore.deletePendingInstance("test_feed1", "test_cluster", dateOne);
+        Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 1);
+        monitoringJdbcStateStore.deletePendingInstances("test_feed1", "test_cluster");
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
index 90eec4d..b739037 100644
--- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
+++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
@@ -121,40 +121,6 @@ public class FeedSLAMonitoringTest extends AbstractTestBase {
         AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", null, "*");
     }
 
-    @Test
-    public void  testMakeFeedInstanceAvailable() {
-        Date instanceDate = SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
-        Date nextInstanceDate = SchemaHelper.parseDateUTC("2015-11-20T01:00Z");
-        Pair<String, String> feedCluster = new Pair<>("testFeed", "testCluster");
-
-        BlockingQueue<Date> missingInstances = new LinkedBlockingQueue<>();
-        missingInstances.add(instanceDate);
-        missingInstances.add(nextInstanceDate);
-
-        FeedSLAMonitoringService.get().initializeService();
-        FeedSLAMonitoringService.get().pendingInstances.put(feedCluster, missingInstances);
-        FeedSLAMonitoringService.get().makeFeedInstanceAvailable("testFeed", "testCluster", instanceDate);
-
-        Assert.assertEquals(FeedSLAMonitoringService.get().pendingInstances.get(feedCluster).size(), 1);
-    }
-
-    @Test
-    public void testEndDateCheck() throws Exception {
-        Cluster cluster = publishCluster();
-        publishFeed(cluster, "hours(1)", "2015-11-20 00:00 UTC", "2015-11-20 05:00 UTC");
-        Pair<String, String> feedCluster = new Pair<>(FEED_NAME, CLUSTER_NAME);
-
-        FeedSLAMonitoringService service = FeedSLAMonitoringService.get();
-        service.initializeService();
-        service.queueSize = 100;
-        service.monitoredFeeds.add(FEED_NAME);
-        Date from = SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
-        Date to = SchemaHelper.parseDateUTC("2015-11-25T00:00Z");
-        service.addNewPendingFeedInstances(from, to);
-        // check that instances after feed's end date are not added.
-        Assert.assertEquals(service.pendingInstances.get(feedCluster).size(), 5);
-    }
-
     private Cluster publishCluster() throws FalconException {
         Cluster cluster = new Cluster();
         cluster.setName(CLUSTER_NAME);

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/pom.xml
----------------------------------------------------------------------
diff --git a/scheduler/pom.xml b/scheduler/pom.xml
index dc006a1..6cb1c0d 100644
--- a/scheduler/pom.xml
+++ b/scheduler/pom.xml
@@ -95,26 +95,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.apache.openjpa</groupId>
-            <artifactId>openjpa-jdbc</artifactId>
-            <version>${openjpa.version}</version>
-            <scope>compile</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.openjpa</groupId>
-            <artifactId>openjpa-persistence-jdbc</artifactId>
-            <version>${openjpa.version}</version>
-            <scope>compile</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>javax.validation</groupId>
-            <artifactId>validation-api</artifactId>
-            <version>${javax-validation.version}</version>
-        </dependency>
-
-        <dependency>
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
         </dependency>
@@ -169,27 +149,7 @@
                 </configuration>
             </plugin>
 
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-antrun-plugin</artifactId>
-                <version>1.8</version>
-                <executions>
-                    <execution>
-                        <phase>process-classes</phase>
-                        <configuration>
-                            <tasks>
-                                <taskdef name="openjpac" classname="org.apache.openjpa.ant.PCEnhancerTask" classpathref="maven.compile.classpath"/>
-                                <openjpac>
-                                    <classpath refid="maven.compile.classpath"/>
-                                </openjpac>
-                            </tasks>
-                        </configuration>
-                        <goals>
-                            <goal>run</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
+
             <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-dependency-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
index 194819e..3384186 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
@@ -27,6 +27,8 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.execution.ExecutionInstance;
 import org.apache.falcon.execution.ProcessExecutionInstance;
+import org.apache.falcon.persistence.EntityBean;
+import org.apache.falcon.persistence.InstanceBean;
 import org.apache.falcon.predicate.Predicate;
 import org.apache.falcon.state.EntityID;
 import org.apache.falcon.state.EntityState;

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
deleted file mode 100644
index 37fb0cb..0000000
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.state.store.jdbc;
-
-import org.apache.openjpa.persistence.jdbc.Index;
-
-import javax.persistence.Basic;
-import javax.persistence.CascadeType;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.Id;
-import javax.persistence.NamedQueries;
-import javax.persistence.NamedQuery;
-import javax.persistence.OneToMany;
-import javax.persistence.Table;
-import javax.validation.constraints.NotNull;
-import java.util.List;
-//SUSPEND CHECKSTYLE CHECK  LineLengthCheck
-/**
- * Entity object which will be stored in Data Base.
- */
-@Entity
-@NamedQueries({
-        @NamedQuery(name = "GET_ENTITY", query = "select OBJECT(a) from EntityBean a where a.id = :id"),
-        @NamedQuery(name = "GET_ENTITY_FOR_STATE", query = "select OBJECT(a) from EntityBean a where a.state = :state"),
-        @NamedQuery(name = "UPDATE_ENTITY", query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type where a.id = :id"),
-        @NamedQuery(name = "GET_ENTITIES_FOR_TYPE", query = "select OBJECT(a) from EntityBean a where a.type = :type"),
-        @NamedQuery(name = "GET_ENTITIES", query = "select OBJECT(a) from EntityBean a"),
-        @NamedQuery(name = "DELETE_ENTITY", query = "delete from EntityBean a where a.id = :id"),
-        @NamedQuery(name = "DELETE_ENTITIES", query = "delete from EntityBean")})
-//RESUME CHECKSTYLE CHECK  LineLengthCheck
-@Table(name = "ENTITIES")
-public class EntityBean {
-    @NotNull
-    @Id
-    private String id;
-
-    @Basic
-    @NotNull
-    @Column(name = "name")
-    private String name;
-
-
-    @Basic
-    @Index
-    @NotNull
-    @Column(name = "type")
-    private String type;
-
-    @Basic
-    @Index
-    @NotNull
-    @Column(name = "current_state")
-    private String state;
-
-    @OneToMany(cascade= CascadeType.REMOVE, mappedBy="entityBean")
-    private List<InstanceBean> instanceBeans;
-
-    public EntityBean() {
-    }
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getType() {
-        return type;
-    }
-
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    public String getState() {
-        return state;
-    }
-
-    public void setState(String state) {
-        this.state = state;
-    }
-
-    public List<InstanceBean> getInstanceBeans() {
-        return instanceBeans;
-    }
-
-    public void setInstanceBeans(List<InstanceBean> instanceBeans) {
-        this.instanceBeans = instanceBeans;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
deleted file mode 100644
index dffb116..0000000
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.state.store.jdbc;
-
-import org.apache.openjpa.persistence.jdbc.ForeignKey;
-import org.apache.openjpa.persistence.jdbc.ForeignKeyAction;
-import org.apache.openjpa.persistence.jdbc.Index;
-
-import javax.persistence.Basic;
-import javax.persistence.CascadeType;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.Id;
-import javax.persistence.Lob;
-import javax.persistence.ManyToOne;
-import javax.persistence.NamedQueries;
-import javax.persistence.NamedQuery;
-import javax.persistence.Table;
-import javax.validation.constraints.NotNull;
-import java.sql.Timestamp;
-
-//SUSPEND CHECKSTYLE CHECK LineLengthCheck
-/**
- * Instance State which will be stored in DB.
- */
-@Entity
-@NamedQueries({
-        @NamedQuery(name = "GET_INSTANCE", query = "select OBJECT(a) from InstanceBean a where a.id = :id"),
-        @NamedQuery(name = "GET_INSTANCE_FOR_EXTERNAL_ID", query = "select OBJECT(a) from InstanceBean a where a.externalID = :externalID"),
-        @NamedQuery(name = "DELETE_INSTANCE", query = "delete from InstanceBean a where a.id = :id"),
-        @NamedQuery(name = "DELETE_INSTANCE_FOR_ENTITY", query = "delete from InstanceBean a where a.entityId = :entityId"),
-        @NamedQuery(name = "UPDATE_INSTANCE", query = "update InstanceBean a set a.cluster = :cluster, a.externalID = :externalID, a.instanceTime = :instanceTime, a.creationTime = :creationTime, a.actualEndTime = :actualEndTime, a.currentState = :currentState, a.actualStartTime = :actualStartTime, a.instanceSequence = :instanceSequence, a.awaitedPredicates = :awaitedPredicates, a.properties = :properties where a.id = :id"),
-        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster"),
-        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState)"),
-        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"),
-        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"),
-        @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"),
-        @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a"),
-        @NamedQuery(name = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE", query = "select a.currentState, COUNT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.instanceTime >= :startTime AND a.instanceTime < :endTime GROUP BY a.currentState")
-})
-//RESUME CHECKSTYLE CHECK  LineLengthCheck
-@Table(name = "INSTANCES")
-public class InstanceBean {
-
-    @Id
-    @NotNull
-    private String id;
-
-    @Basic
-    @Index
-    @NotNull
-    @Column(name = "entity_id")
-    private String entityId;
-
-    @Basic
-    @Index
-    @NotNull
-    @Column(name = "cluster")
-    private String cluster;
-
-    @Basic
-    @Index
-    @Column(name = "external_id")
-    private String externalID;
-
-    @Basic
-    @Index
-    @Column(name = "instance_time")
-    private Timestamp instanceTime;
-
-    @Basic
-    @Index
-    @NotNull
-    @Column(name = "creation_time")
-    private Timestamp creationTime;
-
-    @Basic
-    @Column(name = "actual_start_time")
-    private Timestamp actualStartTime;
-
-    @Basic
-    @Column(name = "actual_end_time")
-    private Timestamp actualEndTime;
-
-    @Basic
-    @Index
-    @NotNull
-    @Column(name = "current_state")
-    private String currentState;
-
-    @Basic
-    @Index
-    @NotNull
-    @Column(name = "instance_sequence")
-    private Integer instanceSequence;
-
-    @ForeignKey(deleteAction= ForeignKeyAction.CASCADE)
-    @ManyToOne(cascade= CascadeType.REMOVE)
-    private EntityBean entityBean;
-
-
-    @Column(name = "awaited_predicates")
-    @Lob
-    private byte[] awaitedPredicates;
-
-    @Column(name = "properties")
-    @Lob
-    private byte[] properties;
-
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public String getCluster() {
-        return cluster;
-    }
-
-    public void setCluster(String cluster) {
-        this.cluster = cluster;
-    }
-
-    public String getExternalID() {
-        return externalID;
-    }
-
-    public void setExternalID(String externalID) {
-        this.externalID = externalID;
-    }
-
-    public Timestamp getInstanceTime() {
-        return instanceTime;
-    }
-
-    public void setInstanceTime(Timestamp instanceTime) {
-        this.instanceTime = instanceTime;
-    }
-
-    public Timestamp getCreationTime() {
-        return creationTime;
-    }
-
-    public void setCreationTime(Timestamp creationTime) {
-        this.creationTime = creationTime;
-    }
-
-    public Timestamp getActualStartTime() {
-        return actualStartTime;
-    }
-
-    public void setActualStartTime(Timestamp actualStartTime) {
-        this.actualStartTime = actualStartTime;
-    }
-
-    public Timestamp getActualEndTime() {
-        return actualEndTime;
-    }
-
-    public void setActualEndTime(Timestamp actualEndTime) {
-        this.actualEndTime = actualEndTime;
-    }
-
-    public String getCurrentState() {
-        return currentState;
-    }
-
-    public void setCurrentState(String currentState) {
-        this.currentState = currentState;
-    }
-
-    public byte[] getAwaitedPredicates() {
-        return awaitedPredicates;
-    }
-
-    public void setAwaitedPredicates(byte[] awaitedPredicates) {
-        this.awaitedPredicates = awaitedPredicates;
-    }
-
-    public Integer getInstanceSequence() {
-        return instanceSequence;
-    }
-
-    public void setInstanceSequence(Integer instanceSequence) {
-        this.instanceSequence = instanceSequence;
-    }
-
-    public String getEntityId() {
-        return entityId;
-    }
-
-    public void setEntityId(String entityId) {
-        this.entityId = entityId;
-    }
-
-    public byte[] getProperties() {
-        return properties;
-    }
-
-    public void setProperties(byte[] properties) {
-        this.properties = properties;
-    }
-
-    public EntityBean getEntityBean() {
-        return entityBean;
-    }
-
-    public void setEntityBean(EntityBean entityBean) {
-        this.entityBean = entityBean;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
index 1c07286..d2bb8c8 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
@@ -22,6 +22,8 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.persistence.EntityBean;
+import org.apache.falcon.persistence.InstanceBean;
 import org.apache.falcon.state.EntityClusterID;
 import org.apache.falcon.state.EntityID;
 import org.apache.falcon.state.EntityState;
@@ -30,7 +32,7 @@ import org.apache.falcon.state.InstanceID;
 import org.apache.falcon.state.InstanceState;
 import org.apache.falcon.state.store.AbstractStateStore;
 import org.apache.falcon.state.store.StateStore;
-import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.service.FalconJPAService;
 import org.apache.falcon.util.StateStoreProperties;
 import org.joda.time.DateTime;
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java b/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
deleted file mode 100644
index 027a8ef..0000000
--- a/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.state.store.service;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.service.FalconService;
-import org.apache.falcon.state.store.jdbc.EntityBean;
-import org.apache.falcon.state.store.jdbc.InstanceBean;
-import org.apache.falcon.util.StateStoreProperties;
-import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.persistence.EntityManager;
-import javax.persistence.EntityManagerFactory;
-import javax.persistence.Persistence;
-import java.text.MessageFormat;
-import java.util.Properties;
-
-/**
- * Service that manages JPA.
- */
-public final class FalconJPAService implements FalconService {
-
-    private static final Logger LOG = LoggerFactory.getLogger(FalconJPAService.class);
-    public static final String PREFIX = "falcon.statestore.";
-
-    public static final String DB_SCHEMA = PREFIX + "schema.name";
-    public static final String URL = PREFIX + "jdbc.url";
-    public static final String DRIVER = PREFIX + "jdbc.driver";
-    public static final String USERNAME = PREFIX + "jdbc.username";
-    public static final String PASSWORD = PREFIX + "jdbc.password";
-    public static final String CONN_DATA_SOURCE = PREFIX + "connection.data.source";
-    public static final String CONN_PROPERTIES = PREFIX + "connection.properties";
-    public static final String MAX_ACTIVE_CONN = PREFIX + "pool.max.active.conn";
-    public static final String CREATE_DB_SCHEMA = PREFIX + "create.db.schema";
-    public static final String VALIDATE_DB_CONN = PREFIX + "validate.db.connection";
-    public static final String VALIDATE_DB_CONN_EVICTION_INTERVAL = PREFIX + "validate.db.connection.eviction.interval";
-    public static final String VALIDATE_DB_CONN_EVICTION_NUM = PREFIX + "validate.db.connection.eviction.num";
-
-    private EntityManagerFactory entityManagerFactory;
-    // Persistent Unit which is defined in persistence.xml
-    private String persistenceUnit;
-    private static final FalconJPAService FALCON_JPA_SERVICE = new FalconJPAService();
-
-    private FalconJPAService() {
-    }
-
-    public static FalconJPAService get() {
-        return FALCON_JPA_SERVICE;
-    }
-
-    public EntityManagerFactory getEntityManagerFactory() {
-        return entityManagerFactory;
-    }
-
-    public void setPersistenceUnit(String dbType) {
-        if (StringUtils.isEmpty(dbType)) {
-            throw new IllegalArgumentException(" DB type cannot be null or empty");
-        }
-        dbType = dbType.split(":")[0];
-        this.persistenceUnit = "falcon-" + dbType;
-    }
-
-    @Override
-    public String getName() {
-        return this.getClass().getSimpleName();
-    }
-
-    @Override
-    public void init() throws FalconException {
-        Properties props = getPropsforStore();
-        entityManagerFactory = Persistence.
-                createEntityManagerFactory(persistenceUnit, props);
-        EntityManager entityManager = getEntityManager();
-        entityManager.find(EntityBean.class, 1);
-        entityManager.find(InstanceBean.class, 1);
-        LOG.info("All entities initialized");
-
-        // need to use a pseudo no-op transaction so all entities, datasource
-        // and connection pool are initialized one time only
-        entityManager.getTransaction().begin();
-        OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) entityManagerFactory;
-        // Mask the password with '***'
-        String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,");
-        LOG.info("JPA configuration: {0}", logMsg);
-        entityManager.getTransaction().commit();
-        entityManager.close();
-    }
-
-    private Properties getPropsforStore() throws FalconException {
-        String dbSchema = StateStoreProperties.get().getProperty(DB_SCHEMA);
-        String url = StateStoreProperties.get().getProperty(URL);
-        String driver = StateStoreProperties.get().getProperty(DRIVER);
-        String user = StateStoreProperties.get().getProperty(USERNAME);
-        String password = StateStoreProperties.get().getProperty(PASSWORD).trim();
-        String maxConn = StateStoreProperties.get().getProperty(MAX_ACTIVE_CONN).trim();
-        String dataSource = StateStoreProperties.get().getProperty(CONN_DATA_SOURCE);
-        String connPropsConfig = StateStoreProperties.get().getProperty(CONN_PROPERTIES);
-        boolean autoSchemaCreation = Boolean.parseBoolean(StateStoreProperties.get().getProperty(CREATE_DB_SCHEMA,
-                "false"));
-        boolean validateDbConn = Boolean.parseBoolean(StateStoreProperties.get().getProperty(VALIDATE_DB_CONN, "true"));
-        String evictionInterval = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_INTERVAL).trim();
-        String evictionNum = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_NUM).trim();
-
-        if (!url.startsWith("jdbc:")) {
-            throw new FalconException("invalid JDBC URL, must start with 'jdbc:'" + url);
-        }
-        String dbType = url.substring("jdbc:".length());
-        if (dbType.indexOf(":") <= 0) {
-            throw new FalconException("invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'" + url);
-        }
-        setPersistenceUnit(dbType);
-        String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}";
-        connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn);
-        Properties props = new Properties();
-        if (autoSchemaCreation) {
-            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
-            props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");
-        } else if (validateDbConn) {
-            // validation can be done only if the schema already exist, else a
-            // connection cannot be obtained to create the schema.
-            String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval;
-            String num = "numTestsPerEvictionRun=" + evictionNum;
-            connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num;
-            connProps += ",ValidationQuery=select 1";
-            connProps = MessageFormat.format(connProps, dbSchema);
-        } else {
-            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
-        }
-        if (connPropsConfig != null) {
-            connProps += "," + connPropsConfig;
-        }
-        props.setProperty("openjpa.ConnectionProperties", connProps);
-        props.setProperty("openjpa.ConnectionDriverName", dataSource);
-        return props;
-    }
-
-    @Override
-    public void destroy() throws FalconException {
-        if (entityManagerFactory.isOpen()) {
-            entityManagerFactory.close();
-        }
-    }
-
-
-    /**
-     * Return an EntityManager. Used by the StoreService.
-     *
-     * @return an entity manager
-     */
-    public EntityManager getEntityManager() {
-        return getEntityManagerFactory().createEntityManager();
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
deleted file mode 100644
index 6de9f7d..0000000
--- a/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
+++ /dev/null
@@ -1,436 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.tools;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.falcon.cli.CLIParser;
-import org.apache.falcon.state.store.service.FalconJPAService;
-import org.apache.falcon.util.BuildProperties;
-import org.apache.falcon.util.StateStoreProperties;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.PrintWriter;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Command Line utility for Table Creation, Update.
- */
-public class FalconStateStoreDBCLI {
-    public static final String HELP_CMD = "help";
-    public static final String VERSION_CMD = "version";
-    public static final String CREATE_CMD = "create";
-    public static final String SQL_FILE_OPT = "sqlfile";
-    public static final String RUN_OPT = "run";
-    public static final String UPGRADE_CMD = "upgrade";
-
-    // Represents whether DB instance exists or not.
-    private boolean instanceExists;
-    private static final String[] FALCON_HELP =
-    {"Falcon DB initialization tool currently supports Derby DB/ Mysql/ PostgreSQL"};
-
-    public static void main(String[] args) {
-        new FalconStateStoreDBCLI().run(args);
-    }
-
-    public FalconStateStoreDBCLI() {
-        instanceExists = false;
-    }
-
-    protected Options getOptions() {
-        Option sqlfile = new Option(SQL_FILE_OPT, true,
-                "Generate SQL script instead of creating/upgrading the DB schema");
-        Option run = new Option(RUN_OPT, false, "Confirmation option regarding DB schema creation/upgrade");
-        Options options = new Options();
-        options.addOption(sqlfile);
-        options.addOption(run);
-        return options;
-    }
-
-    public synchronized int run(String[] args) {
-        if (instanceExists) {
-            throw new IllegalStateException("CLI instance already used");
-        }
-        instanceExists = true;
-
-        CLIParser parser = new CLIParser("falcondb", FALCON_HELP);
-        parser.addCommand(HELP_CMD, "", "Display usage for all commands or specified command", new Options(), false);
-        parser.addCommand(VERSION_CMD, "", "Show Falcon DB version information", new Options(), false);
-        parser.addCommand(CREATE_CMD, "", "Create Falcon DB schema", getOptions(), false);
-        parser.addCommand(UPGRADE_CMD, "", "Upgrade Falcon DB schema", getOptions(), false);
-
-        try {
-            CLIParser.Command command = parser.parse(args);
-            if (command.getName().equals(HELP_CMD)) {
-                parser.showHelp();
-            } else if (command.getName().equals(VERSION_CMD)) {
-                showVersion();
-            } else {
-                if (!command.getCommandLine().hasOption(SQL_FILE_OPT)
-                        && !command.getCommandLine().hasOption(RUN_OPT)) {
-                    throw new Exception("'-sqlfile <FILE>' or '-run' options must be specified");
-                }
-                CommandLine commandLine = command.getCommandLine();
-                String sqlFile = (commandLine.hasOption(SQL_FILE_OPT))
-                        ? commandLine.getOptionValue(SQL_FILE_OPT)
-                        : File.createTempFile("falcondb-", ".sql").getAbsolutePath();
-                boolean run = commandLine.hasOption(RUN_OPT);
-                if (command.getName().equals(CREATE_CMD)) {
-                    createDB(sqlFile, run);
-                } else if (command.getName().equals(UPGRADE_CMD)) {
-                    upgradeDB(sqlFile, run);
-                }
-                System.out.println("The SQL commands have been written to: " + sqlFile);
-                if (!run) {
-                    System.out.println("WARN: The SQL commands have NOT been executed, you must use the '-run' option");
-                }
-            }
-            return 0;
-        } catch (ParseException ex) {
-            System.err.println("Invalid sub-command: " + ex.getMessage());
-            System.err.println();
-            System.err.println(parser.shortHelp());
-            return 1;
-        } catch (Exception ex) {
-            System.err.println();
-            System.err.println("Error: " + ex.getMessage());
-            System.err.println();
-            System.err.println("Stack trace for the error was (for debug purposes):");
-            System.err.println("--------------------------------------");
-            ex.printStackTrace(System.err);
-            System.err.println("--------------------------------------");
-            System.err.println();
-            return 1;
-        }
-    }
-
-    private void upgradeDB(String sqlFile, boolean run) throws Exception {
-        validateConnection();
-        if (!checkDBExists()) {
-            throw new Exception("Falcon DB doesn't exist");
-        }
-        String falconVersion = BuildProperties.get().getProperty("project.version");
-        String dbVersion = getFalconDBVersion();
-        if (dbVersion.compareTo(falconVersion) >= 0) {
-            System.out.println("Falcon DB already upgraded to Falcon version '" + falconVersion + "'");
-            return;
-        }
-
-        createUpgradeDB(sqlFile, run, false);
-        upgradeFalconDBVersion(sqlFile, run, falconVersion);
-
-        // any post upgrade tasks
-        if (run) {
-            System.out.println("Falcon DB has been upgraded to Falcon version '" + falconVersion + "'");
-        }
-    }
-
-
-    private void upgradeFalconDBVersion(String sqlFile, boolean run, String version) throws Exception {
-        String updateDBVersion = "update FALCON_DB_PROPS set data='" + version + "' where name='db.version'";
-        PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
-        writer.println();
-        writer.println(updateDBVersion);
-        writer.close();
-        System.out.println("Upgrade db.version in FALCON_DB_PROPS table to " + version);
-        if (run) {
-            Connection conn = createConnection();
-            Statement st = null;
-            try {
-                conn.setAutoCommit(true);
-                st = conn.createStatement();
-                st.executeUpdate(updateDBVersion);
-                st.close();
-            } catch (Exception ex) {
-                throw new Exception("Could not upgrade db.version in FALCON_DB_PROPS table: " + ex.toString(), ex);
-            } finally {
-                closeStatement(st);
-                conn.close();
-            }
-        }
-        System.out.println("DONE");
-    }
-
-    private static final String GET_FALCON_DB_VERSION = "select data from FALCON_DB_PROPS where name = 'db.version'";
-
-    private String getFalconDBVersion() throws Exception {
-        String version;
-        System.out.println("Get Falcon DB version");
-        Connection conn = createConnection();
-        Statement st = null;
-        ResultSet rs = null;
-        try {
-            st = conn.createStatement();
-            rs = st.executeQuery(GET_FALCON_DB_VERSION);
-            if (rs.next()) {
-                version = rs.getString(1);
-            } else {
-                throw new Exception("ERROR: Could not find Falcon DB 'db.version' in FALCON_DB_PROPS table");
-            }
-        } catch (Exception ex) {
-            throw new Exception("ERROR: Could not query FALCON_DB_PROPS table: " + ex.toString(), ex);
-        } finally {
-            closeResultSet(rs);
-            closeStatement(st);
-            conn.close();
-        }
-        System.out.println("DONE");
-        return version;
-    }
-
-
-    private Map<String, String> getJdbcConf() throws Exception {
-        Map<String, String> jdbcConf = new HashMap<String, String>();
-        jdbcConf.put("driver", StateStoreProperties.get().getProperty(FalconJPAService.DRIVER));
-        String url = StateStoreProperties.get().getProperty(FalconJPAService.URL);
-        jdbcConf.put("url", url);
-        jdbcConf.put("user", StateStoreProperties.get().getProperty(FalconJPAService.USERNAME));
-        jdbcConf.put("password", StateStoreProperties.get().getProperty(FalconJPAService.PASSWORD));
-        String dbType = url.substring("jdbc:".length());
-        if (dbType.indexOf(":") <= 0) {
-            throw new RuntimeException("Invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");
-        }
-        dbType = dbType.substring(0, dbType.indexOf(":"));
-        jdbcConf.put("dbtype", dbType);
-        return jdbcConf;
-    }
-
-    private String[] createMappingToolArguments(String sqlFile) throws Exception {
-        Map<String, String> conf = getJdbcConf();
-        List<String> args = new ArrayList<String>();
-        args.add("-schemaAction");
-        args.add("add");
-        args.add("-p");
-        args.add("persistence.xml#falcon-" + conf.get("dbtype"));
-        args.add("-connectionDriverName");
-        args.add(conf.get("driver"));
-        args.add("-connectionURL");
-        args.add(conf.get("url"));
-        args.add("-connectionUserName");
-        args.add(conf.get("user"));
-        args.add("-connectionPassword");
-        args.add(conf.get("password"));
-        if (sqlFile != null) {
-            args.add("-sqlFile");
-            args.add(sqlFile);
-        }
-        args.add("-indexes");
-        args.add("true");
-        args.add("org.apache.falcon.state.store.jdbc.EntityBean");
-        args.add("org.apache.falcon.state.store.jdbc.InstanceBean");
-        return args.toArray(new String[args.size()]);
-    }
-
-    private void createDB(String sqlFile, boolean run) throws Exception {
-        validateConnection();
-        if (checkDBExists()) {
-            return;
-        }
-
-        verifyFalconPropsTable(false);
-        createUpgradeDB(sqlFile, run, true);
-        createFalconPropsTable(sqlFile, run, BuildProperties.get().getProperty("project.version"));
-        if (run) {
-            System.out.println("Falcon DB has been created for Falcon version '"
-                    + BuildProperties.get().getProperty("project.version") + "'");
-        }
-    }
-
-    private static final String CREATE_FALCON_DB_PROPS =
-            "create table FALCON_DB_PROPS (name varchar(100), data varchar(100))";
-
-    private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception {
-        String insertDbVerion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";
-
-        PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
-        writer.println();
-        writer.println(CREATE_FALCON_DB_PROPS);
-        writer.println(insertDbVerion);
-        writer.close();
-        System.out.println("Create FALCON_DB_PROPS table");
-        if (run) {
-            Connection conn = createConnection();
-            Statement st = null;
-            try {
-                conn.setAutoCommit(true);
-                st = conn.createStatement();
-                st.executeUpdate(CREATE_FALCON_DB_PROPS);
-                st.executeUpdate(insertDbVerion);
-                st.close();
-            } catch (Exception ex) {
-                closeStatement(st);
-                throw new Exception("Could not create FALCON_DB_PROPS table: " + ex.toString(), ex);
-            } finally {
-                conn.close();
-            }
-        }
-        System.out.println("DONE");
-    }
-
-    private static final String FALCON_DB_PROPS_EXISTS = "select count(*) from FALCON_DB_PROPS";
-
-    private boolean verifyFalconPropsTable(boolean exists) throws Exception {
-        System.out.println((exists) ? "Check FALCON_DB_PROPS table exists"
-                : "Checking FALCON_DB_PROPS table does not exist");
-        boolean tableExists;
-        Connection conn = createConnection();
-        Statement st = null;
-        ResultSet rs = null;
-        try {
-            st = conn.createStatement();
-            rs = st.executeQuery(FALCON_DB_PROPS_EXISTS);
-            rs.next();
-            tableExists = true;
-        } catch (Exception ex) {
-            tableExists = false;
-        } finally {
-            closeResultSet(rs);
-            closeStatement(st);
-            conn.close();
-        }
-        if (tableExists != exists) {
-            throw new Exception("FALCON_DB_PROPS_TABLE table " + ((exists) ? "does not exist" : "exists"));
-        }
-        System.out.println("DONE");
-        return tableExists;
-    }
-
-    private void closeResultSet(ResultSet rs) {
-        try {
-            if (rs != null) {
-                rs.close();
-            }
-        } catch (Exception e) {
-            System.out.println("Unable to close ResultSet " + rs);
-        }
-    }
-
-    private void closeStatement(Statement st) throws Exception {
-        try {
-            if (st != null) {
-                st.close();
-            }
-        } catch (Exception e) {
-            System.out.println("Unable to close SQL Statement " + st);
-            throw new Exception(e);
-        }
-    }
-
-    private Connection createConnection() throws Exception {
-        Map<String, String> conf = getJdbcConf();
-        Class.forName(conf.get("driver")).newInstance();
-        return DriverManager.getConnection(conf.get("url"), conf.get("user"), conf.get("password"));
-    }
-
-    private void validateConnection() throws Exception {
-        System.out.println("Validating DB Connection");
-        try {
-            createConnection().close();
-            System.out.println("DONE");
-        } catch (Exception ex) {
-            throw new Exception("Could not connect to the database: " + ex.toString(), ex);
-        }
-    }
-
-    private static final String ENTITY_STATUS_QUERY =
-            "select count(*) from ENTITIES where current_state IN ('RUNNING', 'SUSPENDED')";
-    private static final String INSTANCE_STATUS_QUERY =
-            "select count(*) from INSTANCES where current_state IN ('RUNNING', 'SUSPENDED')";
-
-    private boolean checkDBExists() throws Exception {
-        boolean schemaExists;
-        Connection conn = createConnection();
-        ResultSet rs =  null;
-        Statement st = null;
-        try {
-            st = conn.createStatement();
-            rs = st.executeQuery(ENTITY_STATUS_QUERY);
-            rs.next();
-            schemaExists = true;
-        } catch (Exception ex) {
-            schemaExists = false;
-        } finally {
-            closeResultSet(rs);
-            closeStatement(st);
-            conn.close();
-        }
-        System.out.println("DB schema " + ((schemaExists) ? "exists" : "does not exist"));
-        return schemaExists;
-    }
-
-    private void createUpgradeDB(String sqlFile, boolean run, boolean create) throws Exception {
-        System.out.println((create) ? "Create SQL schema" : "Upgrade SQL schema");
-        String[] args = createMappingToolArguments(sqlFile);
-        org.apache.openjpa.jdbc.meta.MappingTool.main(args);
-        if (run) {
-            args = createMappingToolArguments(null);
-            org.apache.openjpa.jdbc.meta.MappingTool.main(args);
-        }
-        System.out.println("DONE");
-    }
-
-    private void showVersion() throws Exception {
-        System.out.println("Falcon Server version: "
-                + BuildProperties.get().getProperty("project.version"));
-        validateConnection();
-        if (!checkDBExists()) {
-            throw new Exception("Falcon DB doesn't exist");
-        }
-        try {
-            verifyFalconPropsTable(true);
-        } catch (Exception ex) {
-            throw new Exception("ERROR: It seems this Falcon DB was never upgraded with the 'falcondb' tool");
-        }
-        showFalconPropsInfo();
-    }
-
-    private static final String GET_FALCON_PROPS_INFO = "select name, data from FALCON_DB_PROPS order by name";
-
-    private void showFalconPropsInfo() throws Exception {
-        Connection conn = createConnection();
-        Statement st = null;
-        ResultSet rs = null;
-        try {
-            System.out.println("Falcon DB Version Information");
-            System.out.println("--------------------------------------");
-            st = conn.createStatement();
-            rs = st.executeQuery(GET_FALCON_PROPS_INFO);
-            while (rs.next()) {
-                System.out.println(rs.getString(1) + ": " + rs.getString(2));
-            }
-            System.out.println("--------------------------------------");
-        } catch (Exception ex) {
-            throw new Exception("ERROR querying FALCON_DB_PROPS table: " + ex.toString(), ex);
-        } finally {
-            closeResultSet(rs);
-            closeStatement(st);
-            conn.close();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/scheduler/src/main/resources/META-INF/persistence.xml b/scheduler/src/main/resources/META-INF/persistence.xml
deleted file mode 100644
index c2ef794..0000000
--- a/scheduler/src/main/resources/META-INF/persistence.xml
+++ /dev/null
@@ -1,104 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<persistence xmlns="http://java.sun.com/xml/ns/persistence"
-             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-             version="1.0">
-
-    <persistence-unit name="falcon-derby" transaction-type="RESOURCE_LOCAL">
-        <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
-
-        <class>org.apache.falcon.state.store.jdbc.EntityBean</class>
-        <class>org.apache.falcon.state.store.jdbc.InstanceBean</class>
-
-        <properties>
-            <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
-
-            <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
-
-            <property name="openjpa.MetaDataFactory"
-                      value="jpa(Types=org.apache.falcon.state.store.EntityBean;
-                org.apache.falcon.state.store.InstanceBean)"></property>
-
-            <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
-            <property name="openjpa.LockManager" value="pessimistic"/>
-            <property name="openjpa.ReadLockLevel" value="read"/>
-            <property name="openjpa.WriteLockLevel" value="write"/>
-            <property name="openjpa.jdbc.TransactionIsolation" value="read-committed"/> <!--CUSTOM-->
-            <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
-            <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
-            <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
-            <property name="openjpa.Log" value="log4j"/>
-        </properties>
-    </persistence-unit>
-
-    <persistence-unit name="falcon-mysql" transaction-type="RESOURCE_LOCAL">
-        <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
-
-        <class>org.apache.falcon.state.store.jdbc.EntityBean</class>
-        <class>org.apache.falcon.state.store.jdbc.InstanceBean</class>
-
-        <properties>
-            <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
-
-            <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
-
-            <property name="openjpa.MetaDataFactory"
-                      value="jpa(Types=org.apache.falcon.state.store.EntityBean;
-                org.apache.falcon.state.store.InstanceBean)"></property>
-
-            <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
-            <property name="openjpa.LockManager" value="pessimistic"/>
-            <property name="openjpa.ReadLockLevel" value="read"/>
-            <property name="openjpa.WriteLockLevel" value="write"/>
-            <property name="openjpa.jdbc.TransactionIsolation" value="repeatable-read"/> <!--CUSTOM-->
-            <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
-            <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
-            <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
-            <property name="openjpa.Log" value="log4j"/>
-        </properties>
-    </persistence-unit>
-
-    <persistence-unit name="falcon-postgresql" transaction-type="RESOURCE_LOCAL">
-        <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
-
-        <class>org.apache.falcon.state.store.jdbc.EntityBean</class>
-        <class>org.apache.falcon.state.store.jdbc.InstanceBean</class>
-
-        <properties>
-            <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
-
-            <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
-
-            <property name="openjpa.MetaDataFactory"
-                      value="jpa(Types=org.apache.falcon.state.store.EntityBean;
-                org.apache.falcon.state.store.InstanceBean)"></property>
-
-            <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
-            <property name="openjpa.LockManager" value="pessimistic"/>
-            <property name="openjpa.ReadLockLevel" value="read"/>
-            <property name="openjpa.WriteLockLevel" value="write"/>
-            <property name="openjpa.jdbc.TransactionIsolation" value="repeatable-read"/> <!--CUSTOM-->
-            <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
-            <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
-            <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
-            <property name="openjpa.Log" value="log4j"/>
-        </properties>
-    </persistence-unit>
-
-</persistence>

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
index 417ec3e..437c5f5 100644
--- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
@@ -45,7 +45,7 @@ import org.apache.falcon.state.InstanceID;
 import org.apache.falcon.state.InstanceState;
 import org.apache.falcon.state.store.AbstractStateStore;
 import org.apache.falcon.state.store.StateStore;
-import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.service.FalconJPAService;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.engine.DAGEngine;
 import org.apache.falcon.workflow.engine.DAGEngineFactory;

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
index 155be69..cd99049 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
@@ -18,7 +18,7 @@
 package org.apache.falcon.state;
 
 import org.apache.falcon.entity.AbstractTestBase;
-import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.service.FalconJPAService;
 import org.apache.falcon.tools.FalconStateStoreDBCLI;
 import org.apache.falcon.util.StateStoreProperties;
 import org.apache.hadoop.conf.Configuration;

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java b/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java
index ecd5293..3e186dd 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java
@@ -19,7 +19,7 @@ package org.apache.falcon.state.service;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.state.AbstractSchedulerTestBase;
-import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.service.FalconJPAService;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
index d597e27..bf5c142 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
@@ -44,7 +44,7 @@ import org.apache.falcon.state.InstanceState;
 import org.apache.falcon.state.store.jdbc.BeanMapperUtil;
 import org.apache.falcon.state.store.jdbc.JDBCStateStore;
 import org.apache.falcon.state.store.StateStore;
-import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.service.FalconJPAService;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.engine.DAGEngine;
 import org.apache.falcon.workflow.engine.DAGEngineFactory;

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/resources/startup.properties
----------------------------------------------------------------------
diff --git a/scheduler/src/test/resources/startup.properties b/scheduler/src/test/resources/startup.properties
index 7160bb2..6216b70 100644
--- a/scheduler/src/test/resources/startup.properties
+++ b/scheduler/src/test/resources/startup.properties
@@ -41,7 +41,7 @@
                         org.apache.falcon.notification.service.impl.AlarmService,\
                         org.apache.falcon.notification.service.impl.DataAvailabilityService,\
                         org.apache.falcon.execution.FalconExecutionService,\
-                        org.apache.falcon.state.store.service.FalconJPAService
+                        org.apache.falcon.service.FalconJPAService
 
 ##### Falcon Configuration Store Change listeners #####
 *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
@@ -121,7 +121,8 @@ debug.libext.process.paths=${falcon.libext}
 *.falcon.http.authentication.simple.anonymous.allowed=false
 
 # Indicates the Kerberos principal to be used for HTTP endpoint.
-# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
+# The principal MUST
+#rt with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
 *.falcon.http.authentication.kerberos.principal=
 
 # Location of the keytab file with the credentials for the HTTP principal.

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/resources/statestore.properties
----------------------------------------------------------------------
diff --git a/scheduler/src/test/resources/statestore.properties b/scheduler/src/test/resources/statestore.properties
index 2ae642f..e7a08fc 100644
--- a/scheduler/src/test/resources/statestore.properties
+++ b/scheduler/src/test/resources/statestore.properties
@@ -18,7 +18,7 @@
 
 
 *.domain=debug
-######## StateStore Properties #####
+######### StateStore Properties #####
 *.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
 *.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
 *.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/src/build/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/src/build/findbugs-exclude.xml b/src/build/findbugs-exclude.xml
index 78f2fd0..a6766df 100644
--- a/src/build/findbugs-exclude.xml
+++ b/src/build/findbugs-exclude.xml
@@ -38,16 +38,37 @@
     </Match>
 
     <Match>
-        <Class name="org.apache.falcon.state.store.jdbc.EntityBean" />
+        <Class name="org.apache.falcon.persistence.EntityBean" />
         <Bug pattern="NP_BOOLEAN_RETURN_NULL" />
     </Match>
 
     <Match>
-        <Class name="org.apache.falcon.state.store.jdbc.InstanceBean" />
+        <Class name="org.apache.falcon.persistence.InstanceBean" />
         <Bug pattern="NP_BOOLEAN_RETURN_NULL" />
     </Match>
 
     <Match>
+        <Class name="org.apache.falcon.persistence.PendingInstanceBean" />
+        <Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" />
+    </Match>
+
+    <!--<Match>-->
+        <!--<Class name="org.apache.falcon.persistence.PendingInstanceBean" />-->
+        <!--<Bug pattern="UWF_UNWRITTEN_FIELD" />-->
+    <!--</Match>-->
+
+
+    <Match>
+        <Class name="org.apache.falcon.persistence.MonitoredFeedsBean" />
+        <Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" />
+    </Match>
+
+    <!--<Match>-->
+        <!--<Class name="org.apache.falcon.persistence.MonitoredFeedsBean" />-->
+        <!--<Bug pattern="UWF_UNWRITTEN_FIELD" />-->
+    <!--</Match>-->
+
+    <Match>
         <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT" />
     </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index f23337b..3601e22 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -59,7 +59,7 @@
 #                        org.apache.falcon.service.ProcessSubscriberService,\
 #                        org.apache.falcon.service.FeedSLAMonitoringService,\
 #                        org.apache.falcon.service.LifecyclePolicyMap,\
-#                        org.apache.falcon.state.store.service.FalconJPAService,\
+#                        org.apache.falcon.service.FalconJPAService,\
 #                        org.apache.falcon.entity.store.ConfigurationStore,\
 #                        org.apache.falcon.rerun.service.RetryService,\
 #                        org.apache.falcon.rerun.service.LateRunService,\

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/unit/pom.xml
----------------------------------------------------------------------
diff --git a/unit/pom.xml b/unit/pom.xml
index 7e5b073..f1ef463 100644
--- a/unit/pom.xml
+++ b/unit/pom.xml
@@ -44,6 +44,16 @@
         <dependency>
             <groupId>org.apache.oozie</groupId>
             <artifactId>oozie-core</artifactId>
+            <exclusions>
+                <exclusion>
+            <groupId>org.apache.openjpa</groupId>
+                    <artifactId>openjpa-jdbc</artifactId>
+                    </exclusion>
+                <exclusion>
+                    <groupId>org.apache.openjpa</groupId>
+                    <artifactId>openjpa-persistence</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/unit/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/unit/src/main/resources/startup.properties b/unit/src/main/resources/startup.properties
index 4576e0b..4dfea31 100644
--- a/unit/src/main/resources/startup.properties
+++ b/unit/src/main/resources/startup.properties
@@ -33,6 +33,7 @@
 *.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
                         org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
                         org.apache.falcon.service.ProcessSubscriberService,\
+                        org.apache.falcon.service.FalconJPAService,\
                         org.apache.falcon.entity.store.ConfigurationStore,\
                         org.apache.falcon.rerun.service.RetryService,\
                         org.apache.falcon.rerun.service.LateRunService,\

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index aaf2b37..9b1ff2a 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -133,6 +133,7 @@ public class TestFalconUnit extends FalconUnitTestBase {
             ParseException, InterruptedException {
         // submit cluster and feeds
         submitClusterAndFeeds();
+
         APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
         assertStatus(result);
         createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME);

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
index 175833a..1bd4f45 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
@@ -24,7 +24,7 @@ import org.apache.falcon.client.FalconCLIException;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.state.AbstractSchedulerTestBase;
-import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.service.FalconJPAService;
 import org.apache.falcon.unit.FalconUnitTestBase;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.util.StateStoreProperties;


[3/3] falcon git commit: FALCON-1865 Persist Feed sla data to database

Posted by aj...@apache.org.
FALCON-1865 Persist Feed sla data to database

Author: Praveen Adlakha <ad...@gmail.com>

Reviewers: Ajay Yadava <aj...@apache.org>

Closes #77 from PraveenAdlakha/feed_alert


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/de2f5c0a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/de2f5c0a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/de2f5c0a

Branch: refs/heads/master
Commit: de2f5c0ab1c26b8d198d067e066c579a86bce737
Parents: 10f3843
Author: Praveen Adlakha <ad...@gmail.com>
Authored: Mon Mar 28 19:08:56 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Mar 28 19:08:56 2016 +0530

----------------------------------------------------------------------
 common/pom.xml                                  |  41 ++
 .../apache/falcon/persistence/EntityBean.java   | 117 +++++
 .../apache/falcon/persistence/InstanceBean.java | 229 ++++++++++
 .../falcon/persistence/MonitoredFeedsBean.java  |  73 ++++
 .../falcon/persistence/PendingInstanceBean.java |  98 +++++
 .../persistence/PersistenceConstants.java       |  35 ++
 .../persistence/ResultNotFoundException.java    |  31 ++
 .../apache/falcon/service/FalconJPAService.java | 170 +++++++
 .../falcon/tools/FalconStateStoreDBCLI.java     | 438 +++++++++++++++++++
 .../src/main/resources/META-INF/persistence.xml | 113 +++++
 common/src/main/resources/startup.properties    |   7 +-
 .../src/main/resources/statestore.credentials   |   4 +-
 common/src/main/resources/statestore.properties |  20 +-
 docs/src/site/twiki/FalconNativeScheduler.twiki |   2 +-
 .../falcon/jdbc/MonitoringJdbcStateStore.java   | 175 ++++++++
 .../service/FeedSLAMonitoringService.java       | 191 +++-----
 .../jdbc/MonitoringJdbcStateStoreTest.java      |  97 ++++
 .../falcon/service/FeedSLAMonitoringTest.java   |  34 --
 scheduler/pom.xml                               |  42 +-
 .../falcon/state/store/jdbc/BeanMapperUtil.java |   2 +
 .../falcon/state/store/jdbc/EntityBean.java     | 117 -----
 .../falcon/state/store/jdbc/InstanceBean.java   | 229 ----------
 .../falcon/state/store/jdbc/JDBCStateStore.java |   4 +-
 .../state/store/service/FalconJPAService.java   | 171 --------
 .../falcon/tools/FalconStateStoreDBCLI.java     | 436 ------------------
 .../src/main/resources/META-INF/persistence.xml | 104 -----
 .../execution/FalconExecutionServiceTest.java   |   2 +-
 .../falcon/state/AbstractSchedulerTestBase.java |   2 +-
 .../state/service/TestFalconJPAService.java     |   2 +-
 .../state/service/store/TestJDBCStateStore.java |   2 +-
 scheduler/src/test/resources/startup.properties |   5 +-
 .../src/test/resources/statestore.properties    |   2 +-
 src/build/findbugs-exclude.xml                  |  25 +-
 src/conf/startup.properties                     |   2 +-
 unit/pom.xml                                    |  10 +
 unit/src/main/resources/startup.properties      |   1 +
 .../org/apache/falcon/unit/TestFalconUnit.java  |   1 +
 .../AbstractSchedulerManagerJerseyIT.java       |   2 +-
 webapp/src/test/resources/startup.properties    |   2 +-
 39 files changed, 1744 insertions(+), 1294 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index df28f9b..c54f9d8 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -187,6 +187,26 @@
             <groupId>com.thinkaurelius.titan</groupId>
             <artifactId>titan-berkeleyje</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.openjpa</groupId>
+            <artifactId>openjpa-jdbc</artifactId>
+            <version>${openjpa.version}</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.openjpa</groupId>
+            <artifactId>openjpa-persistence-jdbc</artifactId>
+            <version>${openjpa.version}</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>javax.validation</groupId>
+            <artifactId>validation-api</artifactId>
+            <version>${javax-validation.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
@@ -216,6 +236,27 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <phase>process-classes</phase>
+                        <configuration>
+                            <tasks>
+                                <taskdef name="openjpac" classname="org.apache.openjpa.ant.PCEnhancerTask" classpathref="maven.compile.classpath"/>
+                                <openjpac>
+                                    <classpath refid="maven.compile.classpath"/>
+                                </openjpac>
+                            </tasks>
+                        </configuration>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/EntityBean.java b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
new file mode 100644
index 0000000..5c94fa4
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import org.apache.openjpa.persistence.jdbc.Index;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.OneToMany;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+import java.util.List;
+//SUSPEND CHECKSTYLE CHECK  LineLengthCheck
+/**
+ * Entity object which will be stored in Data Base.
+ */
+@Entity
+@NamedQueries({
+        @NamedQuery(name = "GET_ENTITY", query = "select OBJECT(a) from EntityBean a where a.id = :id"),
+        @NamedQuery(name = "GET_ENTITY_FOR_STATE", query = "select OBJECT(a) from EntityBean a where a.state = :state"),
+        @NamedQuery(name = "UPDATE_ENTITY", query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type where a.id = :id"),
+        @NamedQuery(name = "GET_ENTITIES_FOR_TYPE", query = "select OBJECT(a) from EntityBean a where a.type = :type"),
+        @NamedQuery(name = "GET_ENTITIES", query = "select OBJECT(a) from EntityBean a"),
+        @NamedQuery(name = "DELETE_ENTITY", query = "delete from EntityBean a where a.id = :id"),
+        @NamedQuery(name = "DELETE_ENTITIES", query = "delete from EntityBean")})
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+@Table(name = "ENTITIES")
+public class EntityBean {
+    @NotNull
+    @Id
+    private String id;
+
+    @Basic
+    @NotNull
+    @Column(name = "name")
+    private String name;
+
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "type")
+    private String type;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "current_state")
+    private String state;
+
+    @OneToMany(cascade= CascadeType.REMOVE, mappedBy="entityBean")
+    private List<InstanceBean> instanceBeans;
+
+    public EntityBean() {
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getState() {
+        return state;
+    }
+
+    public void setState(String state) {
+        this.state = state;
+    }
+
+    public List<InstanceBean> getInstanceBeans() {
+        return instanceBeans;
+    }
+
+    public void setInstanceBeans(List<InstanceBean> instanceBeans) {
+        this.instanceBeans = instanceBeans;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java
new file mode 100644
index 0000000..b7e10f1
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import org.apache.openjpa.persistence.jdbc.ForeignKey;
+import org.apache.openjpa.persistence.jdbc.ForeignKeyAction;
+import org.apache.openjpa.persistence.jdbc.Index;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Lob;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+import java.sql.Timestamp;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Instance State which will be stored in DB.
+ */
+@Entity
+@NamedQueries({
+        @NamedQuery(name = "GET_INSTANCE", query = "select OBJECT(a) from InstanceBean a where a.id = :id"),
+        @NamedQuery(name = "GET_INSTANCE_FOR_EXTERNAL_ID", query = "select OBJECT(a) from InstanceBean a where a.externalID = :externalID"),
+        @NamedQuery(name = "DELETE_INSTANCE", query = "delete from InstanceBean a where a.id = :id"),
+        @NamedQuery(name = "DELETE_INSTANCE_FOR_ENTITY", query = "delete from InstanceBean a where a.entityId = :entityId"),
+        @NamedQuery(name = "UPDATE_INSTANCE", query = "update InstanceBean a set a.cluster = :cluster, a.externalID = :externalID, a.instanceTime = :instanceTime, a.creationTime = :creationTime, a.actualEndTime = :actualEndTime, a.currentState = :currentState, a.actualStartTime = :actualStartTime, a.instanceSequence = :instanceSequence, a.awaitedPredicates = :awaitedPredicates, a.properties = :properties where a.id = :id"),
+        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster"),
+        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState)"),
+        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"),
+        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"),
+        @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"),
+        @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a"),
+        @NamedQuery(name = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE", query = "select a.currentState, COUNT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.instanceTime >= :startTime AND a.instanceTime < :endTime GROUP BY a.currentState")
+})
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+@Table(name = "INSTANCES")
+public class InstanceBean {
+
+    @Id
+    @NotNull
+    private String id;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "entity_id")
+    private String entityId;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "cluster")
+    private String cluster;
+
+    @Basic
+    @Index
+    @Column(name = "external_id")
+    private String externalID;
+
+    @Basic
+    @Index
+    @Column(name = "instance_time")
+    private Timestamp instanceTime;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "creation_time")
+    private Timestamp creationTime;
+
+    @Basic
+    @Column(name = "actual_start_time")
+    private Timestamp actualStartTime;
+
+    @Basic
+    @Column(name = "actual_end_time")
+    private Timestamp actualEndTime;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "current_state")
+    private String currentState;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "instance_sequence")
+    private Integer instanceSequence;
+
+    @ForeignKey(deleteAction= ForeignKeyAction.CASCADE)
+    @ManyToOne(cascade= CascadeType.REMOVE)
+    private EntityBean entityBean;
+
+
+    @Column(name = "awaited_predicates")
+    @Lob
+    private byte[] awaitedPredicates;
+
+    @Column(name = "properties")
+    @Lob
+    private byte[] properties;
+
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getCluster() {
+        return cluster;
+    }
+
+    public void setCluster(String cluster) {
+        this.cluster = cluster;
+    }
+
+    public String getExternalID() {
+        return externalID;
+    }
+
+    public void setExternalID(String externalID) {
+        this.externalID = externalID;
+    }
+
+    public Timestamp getInstanceTime() {
+        return instanceTime;
+    }
+
+    public void setInstanceTime(Timestamp instanceTime) {
+        this.instanceTime = instanceTime;
+    }
+
+    public Timestamp getCreationTime() {
+        return creationTime;
+    }
+
+    public void setCreationTime(Timestamp creationTime) {
+        this.creationTime = creationTime;
+    }
+
+    public Timestamp getActualStartTime() {
+        return actualStartTime;
+    }
+
+    public void setActualStartTime(Timestamp actualStartTime) {
+        this.actualStartTime = actualStartTime;
+    }
+
+    public Timestamp getActualEndTime() {
+        return actualEndTime;
+    }
+
+    public void setActualEndTime(Timestamp actualEndTime) {
+        this.actualEndTime = actualEndTime;
+    }
+
+    public String getCurrentState() {
+        return currentState;
+    }
+
+    public void setCurrentState(String currentState) {
+        this.currentState = currentState;
+    }
+
+    public byte[] getAwaitedPredicates() {
+        return awaitedPredicates;
+    }
+
+    public void setAwaitedPredicates(byte[] awaitedPredicates) {
+        this.awaitedPredicates = awaitedPredicates;
+    }
+
+    public Integer getInstanceSequence() {
+        return instanceSequence;
+    }
+
+    public void setInstanceSequence(Integer instanceSequence) {
+        this.instanceSequence = instanceSequence;
+    }
+
+    public String getEntityId() {
+        return entityId;
+    }
+
+    public void setEntityId(String entityId) {
+        this.entityId = entityId;
+    }
+
+    public byte[] getProperties() {
+        return properties;
+    }
+
+    public void setProperties(byte[] properties) {
+        this.properties = properties;
+    }
+
+    public EntityBean getEntityBean() {
+        return entityBean;
+    }
+
+    public void setEntityBean(EntityBean entityBean) {
+        this.entityBean = entityBean;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
new file mode 100644
index 0000000..2b48569
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import javax.persistence.Entity;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Column;
+import javax.persistence.Basic;
+import javax.validation.constraints.NotNull;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+* The Feeds that are to be monitered will be stored in the db.
+* */
+
+@Entity
+@NamedQueries({
+        @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from "
+                + "MonitoredFeedsBean a where a.feedName = :feedName"),
+        @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredFeedsBean "
+                + "a where a.feedName = :feedName"),
+        @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select OBJECT(a) "
+                + "from MonitoredFeedsBean a")
+})
+@Table(name="MONITORED_FEEDS")
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+public class MonitoredFeedsBean {
+    @NotNull
+    @GeneratedValue(strategy = GenerationType.AUTO)
+    @Id
+    private String id;
+
+    @Basic
+    @NotNull
+    @Column(name = "feed_name")
+    private String feedName;
+
+    public String getFeedName() {
+        return feedName;
+    }
+
+    public void setFeedName(String feedName) {
+        this.feedName = feedName;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
new file mode 100644
index 0000000..038244a
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import javax.persistence.Entity;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.GenerationType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.Id;
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.validation.constraints.NotNull;
+import java.util.Date;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+* The instances of feed to be monitored will be stored in db.
+* */
+@Entity
+@NamedQueries({
+        @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"),
+        @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"),
+        @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED, query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
+        @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
+        @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select  OBJECT(a) from PendingInstanceBean a ")
+})
+@Table(name = "PENDING_INSTANCES")
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+public class PendingInstanceBean {
+    @NotNull
+    @GeneratedValue(strategy = GenerationType.AUTO)
+    @Id
+    private String id;
+
+    @Basic
+    @NotNull
+    @Column(name = "feed_name")
+    private String feedName;
+
+    @Basic
+    @NotNull
+    @Column(name = "cluster_name")
+    private String clusterName;
+
+    @Basic
+    @NotNull
+    @Column(name = "nominal_time")
+    private Date nominalTime;
+
+    public Date getNominalTime() {
+        return nominalTime;
+    }
+
+    public void setNominalTime(Date nominalTime) {
+        this.nominalTime = nominalTime;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getFeedName() {
+        return feedName;
+    }
+
+    public void setFeedName(String feedName) {
+        this.feedName = feedName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
new file mode 100644
index 0000000..511270e
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+/**
+ * The name of queries to be used as constants accross the packages.
+ */
+
+public final class PersistenceConstants {
+    private PersistenceConstants(){
+
+    }
+    public static final String  GET_MONITERED_INSTANCE = "GET_MONITERED_INSTANCE";
+    public static final String DELETE_MONITORED_INSTANCES = "DELETE_MONITORED_INSTANCES";
+    public static final String GET_ALL_MONITORING_FEEDS = "GET_ALL_MONITORING_FEEDS";
+    public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES";
+    public static final String DELETE_PENDING_NOMINAL_INSTANCES = "DELETE_PENDING_NOMINAL_INSTANCES";
+    public static final String DELETE_ALL_INSTANCES_FOR_FEED = "DELETE_ALL_INSTANCES_FOR_FEED";
+    public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES";
+    public static final String GET_ALL_PENDING_INSTANCES = "GET_ALL_PENDING_INSTANCES";
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java b/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java
new file mode 100644
index 0000000..c368d2c
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.persistence;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * Exception to be through by the bean classes.
+ */
+public class ResultNotFoundException extends FalconException {
+
+    public ResultNotFoundException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/service/FalconJPAService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/FalconJPAService.java b/common/src/main/java/org/apache/falcon/service/FalconJPAService.java
new file mode 100644
index 0000000..73fde33
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/service/FalconJPAService.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.service;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.persistence.EntityBean;
+import org.apache.falcon.persistence.InstanceBean;
+import org.apache.falcon.util.StateStoreProperties;
+import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import java.text.MessageFormat;
+import java.util.Properties;
+
+/**
+ * Service that manages JPA.
+ */
+public final class FalconJPAService implements FalconService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FalconJPAService.class);
+    public static final String PREFIX = "falcon.statestore.";
+
+    public static final String DB_SCHEMA = PREFIX + "schema.name";
+    public static final String URL = PREFIX + "jdbc.url";
+    public static final String DRIVER = PREFIX + "jdbc.driver";
+    public static final String USERNAME = PREFIX + "jdbc.username";
+    public static final String PASSWORD = PREFIX + "jdbc.password";
+    public static final String CONN_DATA_SOURCE = PREFIX + "connection.data.source";
+    public static final String CONN_PROPERTIES = PREFIX + "connection.properties";
+    public static final String MAX_ACTIVE_CONN = PREFIX + "pool.max.active.conn";
+    public static final String CREATE_DB_SCHEMA = PREFIX + "create.db.schema";
+    public static final String VALIDATE_DB_CONN = PREFIX + "validate.db.connection";
+    public static final String VALIDATE_DB_CONN_EVICTION_INTERVAL = PREFIX + "validate.db.connection.eviction.interval";
+    public static final String VALIDATE_DB_CONN_EVICTION_NUM = PREFIX + "validate.db.connection.eviction.num";
+
+    private EntityManagerFactory entityManagerFactory;
+    // Persistent Unit which is defined in persistence.xml
+    private String persistenceUnit;
+    private static final FalconJPAService FALCON_JPA_SERVICE = new FalconJPAService();
+
+    private FalconJPAService() {
+    }
+
+    public static FalconJPAService get() {
+        return FALCON_JPA_SERVICE;
+    }
+
+    public EntityManagerFactory getEntityManagerFactory() {
+        return entityManagerFactory;
+    }
+
+    public void setPersistenceUnit(String dbType) {
+        if (StringUtils.isEmpty(dbType)) {
+            throw new IllegalArgumentException(" DB type cannot be null or empty");
+        }
+        dbType = dbType.split(":")[0];
+        this.persistenceUnit = "falcon-" + dbType;
+    }
+
+    @Override
+    public String getName() {
+        return this.getClass().getSimpleName();
+    }
+
+    @Override
+    public void init() throws FalconException {
+        Properties props = getPropsforStore();
+        entityManagerFactory = Persistence.
+                createEntityManagerFactory(persistenceUnit, props);
+        EntityManager entityManager = getEntityManager();
+        entityManager.find(EntityBean.class, 1);
+        entityManager.find(InstanceBean.class, 1);
+        LOG.info("All entities initialized");
+
+        // need to use a pseudo no-op transaction so all entities, datasource
+        // and connection pool are initialized one time only
+        entityManager.getTransaction().begin();
+        OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) entityManagerFactory;
+        // Mask the password with '***'
+        String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,");
+        LOG.info("JPA configuration: {0}", logMsg);
+        entityManager.getTransaction().commit();
+        entityManager.close();
+    }
+
+    private Properties getPropsforStore() throws FalconException {
+        String dbSchema = StateStoreProperties.get().getProperty(DB_SCHEMA);
+        String url = StateStoreProperties.get().getProperty(URL);
+        String driver = StateStoreProperties.get().getProperty(DRIVER);
+        String user = StateStoreProperties.get().getProperty(USERNAME);
+        String password = StateStoreProperties.get().getProperty(PASSWORD).trim();
+        String maxConn = StateStoreProperties.get().getProperty(MAX_ACTIVE_CONN).trim();
+        String dataSource = StateStoreProperties.get().getProperty(CONN_DATA_SOURCE);
+        String connPropsConfig = StateStoreProperties.get().getProperty(CONN_PROPERTIES);
+        boolean autoSchemaCreation = Boolean.parseBoolean(StateStoreProperties.get().getProperty(CREATE_DB_SCHEMA,
+                "false"));
+        boolean validateDbConn = Boolean.parseBoolean(StateStoreProperties.get().getProperty(VALIDATE_DB_CONN, "true"));
+        String evictionInterval = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_INTERVAL).trim();
+        String evictionNum = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_NUM).trim();
+
+        if (!url.startsWith("jdbc:")) {
+            throw new FalconException("invalid JDBC URL, must start with 'jdbc:'" + url);
+        }
+        String dbType = url.substring("jdbc:".length());
+        if (dbType.indexOf(":") <= 0) {
+            throw new FalconException("invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'" + url);
+        }
+        setPersistenceUnit(dbType);
+        String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}";
+        connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn);
+        Properties props = new Properties();
+        if (autoSchemaCreation) {
+            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
+            props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");
+        } else if (validateDbConn) {
+            // validation can be done only if the schema already exist, else a
+            // connection cannot be obtained to create the schema.
+            String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval;
+            String num = "numTestsPerEvictionRun=" + evictionNum;
+            connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num;
+            connProps += ",ValidationQuery=select 1";
+            connProps = MessageFormat.format(connProps, dbSchema);
+        } else {
+            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
+        }
+        if (connPropsConfig != null) {
+            connProps += "," + connPropsConfig;
+        }
+        props.setProperty("openjpa.ConnectionProperties", connProps);
+        props.setProperty("openjpa.ConnectionDriverName", dataSource);
+        return props;
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+        if (entityManagerFactory.isOpen()) {
+            entityManagerFactory.close();
+        }
+    }
+
+
+    /**
+     * Return an EntityManager. Used by the StoreService.
+     *
+     * @return an entity manager
+     */
+    public EntityManager getEntityManager() {
+        return getEntityManagerFactory().createEntityManager();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
new file mode 100644
index 0000000..df8194c
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.tools;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.falcon.cli.CLIParser;
+import org.apache.falcon.service.FalconJPAService;
+import org.apache.falcon.util.BuildProperties;
+import org.apache.falcon.util.StateStoreProperties;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Command Line utility for Table Creation, Update.
+ */
+public class FalconStateStoreDBCLI {
+    public static final String HELP_CMD = "help";
+    public static final String VERSION_CMD = "version";
+    public static final String CREATE_CMD = "create";
+    public static final String SQL_FILE_OPT = "sqlfile";
+    public static final String RUN_OPT = "run";
+    public static final String UPGRADE_CMD = "upgrade";
+
+    // Represents whether DB instance exists or not.
+    private boolean instanceExists;
+    private static final String[] FALCON_HELP =
+    {"Falcon DB initialization tool currently supports Derby DB/ Mysql/ PostgreSQL"};
+
+    public static void main(String[] args) {
+        new FalconStateStoreDBCLI().run(args);
+    }
+
+    public FalconStateStoreDBCLI() {
+        instanceExists = false;
+    }
+
+    protected Options getOptions() {
+        Option sqlfile = new Option(SQL_FILE_OPT, true,
+                "Generate SQL script instead of creating/upgrading the DB schema");
+        Option run = new Option(RUN_OPT, false, "Confirmation option regarding DB schema creation/upgrade");
+        Options options = new Options();
+        options.addOption(sqlfile);
+        options.addOption(run);
+        return options;
+    }
+
+    public synchronized int run(String[] args) {
+        if (instanceExists) {
+            throw new IllegalStateException("CLI instance already used");
+        }
+        instanceExists = true;
+
+        CLIParser parser = new CLIParser("falcondb", FALCON_HELP);
+        parser.addCommand(HELP_CMD, "", "Display usage for all commands or specified command", new Options(), false);
+        parser.addCommand(VERSION_CMD, "", "Show Falcon DB version information", new Options(), false);
+        parser.addCommand(CREATE_CMD, "", "Create Falcon DB schema", getOptions(), false);
+        parser.addCommand(UPGRADE_CMD, "", "Upgrade Falcon DB schema", getOptions(), false);
+
+        try {
+            CLIParser.Command command = parser.parse(args);
+            if (command.getName().equals(HELP_CMD)) {
+                parser.showHelp();
+            } else if (command.getName().equals(VERSION_CMD)) {
+                showVersion();
+            } else {
+                if (!command.getCommandLine().hasOption(SQL_FILE_OPT)
+                        && !command.getCommandLine().hasOption(RUN_OPT)) {
+                    throw new Exception("'-sqlfile <FILE>' or '-run' options must be specified");
+                }
+                CommandLine commandLine = command.getCommandLine();
+                String sqlFile = (commandLine.hasOption(SQL_FILE_OPT))
+                        ? commandLine.getOptionValue(SQL_FILE_OPT)
+                        : File.createTempFile("falcondb-", ".sql").getAbsolutePath();
+                boolean run = commandLine.hasOption(RUN_OPT);
+                if (command.getName().equals(CREATE_CMD)) {
+                    createDB(sqlFile, run);
+                } else if (command.getName().equals(UPGRADE_CMD)) {
+                    upgradeDB(sqlFile, run);
+                }
+                System.out.println("The SQL commands have been written to: " + sqlFile);
+                if (!run) {
+                    System.out.println("WARN: The SQL commands have NOT been executed, you must use the '-run' option");
+                }
+            }
+            return 0;
+        } catch (ParseException ex) {
+            System.err.println("Invalid sub-command: " + ex.getMessage());
+            System.err.println();
+            System.err.println(parser.shortHelp());
+            return 1;
+        } catch (Exception ex) {
+            System.err.println();
+            System.err.println("Error: " + ex.getMessage());
+            System.err.println();
+            System.err.println("Stack trace for the error was (for debug purposes):");
+            System.err.println("--------------------------------------");
+            ex.printStackTrace(System.err);
+            System.err.println("--------------------------------------");
+            System.err.println();
+            return 1;
+        }
+    }
+
+    private void upgradeDB(String sqlFile, boolean run) throws Exception {
+        validateConnection();
+        if (!checkDBExists()) {
+            throw new Exception("Falcon DB doesn't exist");
+        }
+        String falconVersion = BuildProperties.get().getProperty("project.version");
+        String dbVersion = getFalconDBVersion();
+        if (dbVersion.compareTo(falconVersion) >= 0) {
+            System.out.println("Falcon DB already upgraded to Falcon version '" + falconVersion + "'");
+            return;
+        }
+
+        createUpgradeDB(sqlFile, run, false);
+        upgradeFalconDBVersion(sqlFile, run, falconVersion);
+
+        // any post upgrade tasks
+        if (run) {
+            System.out.println("Falcon DB has been upgraded to Falcon version '" + falconVersion + "'");
+        }
+    }
+
+
+    private void upgradeFalconDBVersion(String sqlFile, boolean run, String version) throws Exception {
+        String updateDBVersion = "update FALCON_DB_PROPS set data='" + version + "' where name='db.version'";
+        PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
+        writer.println();
+        writer.println(updateDBVersion);
+        writer.close();
+        System.out.println("Upgrade db.version in FALCON_DB_PROPS table to " + version);
+        if (run) {
+            Connection conn = createConnection();
+            Statement st = null;
+            try {
+                conn.setAutoCommit(true);
+                st = conn.createStatement();
+                st.executeUpdate(updateDBVersion);
+                st.close();
+            } catch (Exception ex) {
+                throw new Exception("Could not upgrade db.version in FALCON_DB_PROPS table: " + ex.toString(), ex);
+            } finally {
+                closeStatement(st);
+                conn.close();
+            }
+        }
+        System.out.println("DONE");
+    }
+
+    private static final String GET_FALCON_DB_VERSION = "select data from FALCON_DB_PROPS where name = 'db.version'";
+
+    private String getFalconDBVersion() throws Exception {
+        String version;
+        System.out.println("Get Falcon DB version");
+        Connection conn = createConnection();
+        Statement st = null;
+        ResultSet rs = null;
+        try {
+            st = conn.createStatement();
+            rs = st.executeQuery(GET_FALCON_DB_VERSION);
+            if (rs.next()) {
+                version = rs.getString(1);
+            } else {
+                throw new Exception("ERROR: Could not find Falcon DB 'db.version' in FALCON_DB_PROPS table");
+            }
+        } catch (Exception ex) {
+            throw new Exception("ERROR: Could not query FALCON_DB_PROPS table: " + ex.toString(), ex);
+        } finally {
+            closeResultSet(rs);
+            closeStatement(st);
+            conn.close();
+        }
+        System.out.println("DONE");
+        return version;
+    }
+
+
+    private Map<String, String> getJdbcConf() throws Exception {
+        Map<String, String> jdbcConf = new HashMap<String, String>();
+        jdbcConf.put("driver", StateStoreProperties.get().getProperty(FalconJPAService.DRIVER));
+        String url = StateStoreProperties.get().getProperty(FalconJPAService.URL);
+        jdbcConf.put("url", url);
+        jdbcConf.put("user", StateStoreProperties.get().getProperty(FalconJPAService.USERNAME));
+        jdbcConf.put("password", StateStoreProperties.get().getProperty(FalconJPAService.PASSWORD));
+        String dbType = url.substring("jdbc:".length());
+        if (dbType.indexOf(":") <= 0) {
+            throw new RuntimeException("Invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");
+        }
+        dbType = dbType.substring(0, dbType.indexOf(":"));
+        jdbcConf.put("dbtype", dbType);
+        return jdbcConf;
+    }
+
+    private String[] createMappingToolArguments(String sqlFile) throws Exception {
+        Map<String, String> conf = getJdbcConf();
+        List<String> args = new ArrayList<String>();
+        args.add("-schemaAction");
+        args.add("add");
+        args.add("-p");
+        args.add("persistence.xml#falcon-" + conf.get("dbtype"));
+        args.add("-connectionDriverName");
+        args.add(conf.get("driver"));
+        args.add("-connectionURL");
+        args.add(conf.get("url"));
+        args.add("-connectionUserName");
+        args.add(conf.get("user"));
+        args.add("-connectionPassword");
+        args.add(conf.get("password"));
+        if (sqlFile != null) {
+            args.add("-sqlFile");
+            args.add(sqlFile);
+        }
+        args.add("-indexes");
+        args.add("true");
+        args.add("org.apache.falcon.persistence.EntityBean");
+        args.add("org.apache.falcon.persistence.InstanceBean");
+        args.add("org.apache.falcon.persistence.PendingInstanceBean");
+        args.add("org.apache.falcon.persistence.MonitoredFeedsBean");
+        return args.toArray(new String[args.size()]);
+    }
+
+    private void createDB(String sqlFile, boolean run) throws Exception {
+        validateConnection();
+        if (checkDBExists()) {
+            return;
+        }
+
+        verifyFalconPropsTable(false);
+        createUpgradeDB(sqlFile, run, true);
+        createFalconPropsTable(sqlFile, run, BuildProperties.get().getProperty("project.version"));
+        if (run) {
+            System.out.println("Falcon DB has been created for Falcon version '"
+                    + BuildProperties.get().getProperty("project.version") + "'");
+        }
+    }
+
+    private static final String CREATE_FALCON_DB_PROPS =
+            "create table FALCON_DB_PROPS (name varchar(100), data varchar(100))";
+
+    private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception {
+        String insertDbVerion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";
+
+        PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
+        writer.println();
+        writer.println(CREATE_FALCON_DB_PROPS);
+        writer.println(insertDbVerion);
+        writer.close();
+        System.out.println("Create FALCON_DB_PROPS table");
+        if (run) {
+            Connection conn = createConnection();
+            Statement st = null;
+            try {
+                conn.setAutoCommit(true);
+                st = conn.createStatement();
+                st.executeUpdate(CREATE_FALCON_DB_PROPS);
+                st.executeUpdate(insertDbVerion);
+                st.close();
+            } catch (Exception ex) {
+                closeStatement(st);
+                throw new Exception("Could not create FALCON_DB_PROPS table: " + ex.toString(), ex);
+            } finally {
+                conn.close();
+            }
+        }
+        System.out.println("DONE");
+    }
+
+    private static final String FALCON_DB_PROPS_EXISTS = "select count(*) from FALCON_DB_PROPS";
+
+    private boolean verifyFalconPropsTable(boolean exists) throws Exception {
+        System.out.println((exists) ? "Check FALCON_DB_PROPS table exists"
+                : "Checking FALCON_DB_PROPS table does not exist");
+        boolean tableExists;
+        Connection conn = createConnection();
+        Statement st = null;
+        ResultSet rs = null;
+        try {
+            st = conn.createStatement();
+            rs = st.executeQuery(FALCON_DB_PROPS_EXISTS);
+            rs.next();
+            tableExists = true;
+        } catch (Exception ex) {
+            tableExists = false;
+        } finally {
+            closeResultSet(rs);
+            closeStatement(st);
+            conn.close();
+        }
+        if (tableExists != exists) {
+            throw new Exception("FALCON_DB_PROPS_TABLE table " + ((exists) ? "does not exist" : "exists"));
+        }
+        System.out.println("DONE");
+        return tableExists;
+    }
+
+    private void closeResultSet(ResultSet rs) {
+        try {
+            if (rs != null) {
+                rs.close();
+            }
+        } catch (Exception e) {
+            System.out.println("Unable to close ResultSet " + rs);
+        }
+    }
+
+    private void closeStatement(Statement st) throws Exception {
+        try {
+            if (st != null) {
+                st.close();
+            }
+        } catch (Exception e) {
+            System.out.println("Unable to close SQL Statement " + st);
+            throw new Exception(e);
+        }
+    }
+
+    private Connection createConnection() throws Exception {
+        Map<String, String> conf = getJdbcConf();
+        Class.forName(conf.get("driver")).newInstance();
+        return DriverManager.getConnection(conf.get("url"), conf.get("user"), conf.get("password"));
+    }
+
+    private void validateConnection() throws Exception {
+        System.out.println("Validating DB Connection");
+        try {
+            createConnection().close();
+            System.out.println("DONE");
+        } catch (Exception ex) {
+            throw new Exception("Could not connect to the database: " + ex.toString(), ex);
+        }
+    }
+
+    private static final String ENTITY_STATUS_QUERY =
+            "select count(*) from ENTITIES where current_state IN ('RUNNING', 'SUSPENDED')";
+    private static final String INSTANCE_STATUS_QUERY =
+            "select count(*) from INSTANCES where current_state IN ('RUNNING', 'SUSPENDED')";
+
+    private boolean checkDBExists() throws Exception {
+        boolean schemaExists;
+        Connection conn = createConnection();
+        ResultSet rs =  null;
+        Statement st = null;
+        try {
+            st = conn.createStatement();
+            rs = st.executeQuery(ENTITY_STATUS_QUERY);
+            rs.next();
+            schemaExists = true;
+        } catch (Exception ex) {
+            schemaExists = false;
+        } finally {
+            closeResultSet(rs);
+            closeStatement(st);
+            conn.close();
+        }
+        System.out.println("DB schema " + ((schemaExists) ? "exists" : "does not exist"));
+        return schemaExists;
+    }
+
+    private void createUpgradeDB(String sqlFile, boolean run, boolean create) throws Exception {
+        System.out.println((create) ? "Create SQL schema" : "Upgrade SQL schema");
+        String[] args = createMappingToolArguments(sqlFile);
+        org.apache.openjpa.jdbc.meta.MappingTool.main(args);
+        if (run) {
+            args = createMappingToolArguments(null);
+            org.apache.openjpa.jdbc.meta.MappingTool.main(args);
+        }
+        System.out.println("DONE");
+    }
+
+    private void showVersion() throws Exception {
+        System.out.println("Falcon Server version: "
+                + BuildProperties.get().getProperty("project.version"));
+        validateConnection();
+        if (!checkDBExists()) {
+            throw new Exception("Falcon DB doesn't exist");
+        }
+        try {
+            verifyFalconPropsTable(true);
+        } catch (Exception ex) {
+            throw new Exception("ERROR: It seems this Falcon DB was never upgraded with the 'falcondb' tool");
+        }
+        showFalconPropsInfo();
+    }
+
+    private static final String GET_FALCON_PROPS_INFO = "select name, data from FALCON_DB_PROPS order by name";
+
+    private void showFalconPropsInfo() throws Exception {
+        Connection conn = createConnection();
+        Statement st = null;
+        ResultSet rs = null;
+        try {
+            System.out.println("Falcon DB Version Information");
+            System.out.println("--------------------------------------");
+            st = conn.createStatement();
+            rs = st.executeQuery(GET_FALCON_PROPS_INFO);
+            while (rs.next()) {
+                System.out.println(rs.getString(1) + ": " + rs.getString(2));
+            }
+            System.out.println("--------------------------------------");
+        } catch (Exception ex) {
+            throw new Exception("ERROR querying FALCON_DB_PROPS table: " + ex.toString(), ex);
+        } finally {
+            closeResultSet(rs);
+            closeStatement(st);
+            conn.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml
new file mode 100644
index 0000000..4c9388c
--- /dev/null
+++ b/common/src/main/resources/META-INF/persistence.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<persistence xmlns="http://java.sun.com/xml/ns/persistence"
+             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+             version="1.0">
+
+    <persistence-unit name="falcon-derby" transaction-type="RESOURCE_LOCAL">
+        <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+
+        <class>org.apache.falcon.persistence.EntityBean</class>
+        <class>org.apache.falcon.persistence.InstanceBean</class>
+        <class>org.apache.falcon.persistence.PendingInstanceBean</class>
+        <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
+
+        <properties>
+            <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
+
+            <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
+
+            <property name="openjpa.MetaDataFactory"
+                      value="jpa(Types=org.apache.falcon.persistence.EntityBean;
+                org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
+                org.apache.falcon.persistence.MonitoredFeedsBean)"></property>
+
+            <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
+            <property name="openjpa.LockManager" value="pessimistic"/>
+            <property name="openjpa.ReadLockLevel" value="read"/>
+            <property name="openjpa.WriteLockLevel" value="write"/>
+            <property name="openjpa.jdbc.TransactionIsolation" value="read-committed"/> <!--CUSTOM-->
+            <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
+            <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
+            <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
+            <property name="openjpa.Log" value="log4j"/>
+        </properties>
+    </persistence-unit>
+
+    <persistence-unit name="falcon-mysql" transaction-type="RESOURCE_LOCAL">
+        <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+
+        <class>org.apache.falcon.persistence.EntityBean</class>
+        <class>org.apache.falcon.persistence.InstanceBean</class>
+        <class>org.apache.falcon.persistence.PendingInstanceBean</class>
+        <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
+
+        <properties>
+            <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
+
+            <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
+
+            <property name="openjpa.MetaDataFactory"
+                      value="jpa(Types=org.apache.falcon.persistence.EntityBean;
+                org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
+                org.apache.falcon.persistence.MonitoredFeedsBean)"></property>
+
+            <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
+            <property name="openjpa.LockManager" value="pessimistic"/>
+            <property name="openjpa.ReadLockLevel" value="read"/>
+            <property name="openjpa.WriteLockLevel" value="write"/>
+            <property name="openjpa.jdbc.TransactionIsolation" value="repeatable-read"/> <!--CUSTOM-->
+            <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
+            <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
+            <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
+            <property name="openjpa.Log" value="log4j"/>
+        </properties>
+    </persistence-unit>
+
+    <persistence-unit name="falcon-postgresql" transaction-type="RESOURCE_LOCAL">
+        <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+
+        <class>org.apache.falcon.persistence.EntityBean</class>
+        <class>org.apache.falcon.persistence.InstanceBean</class>
+        <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
+        <class>org.apache.falcon.persistence.PendingInstanceBean</class>
+
+        <properties>
+            <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
+
+            <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
+
+            <property name="openjpa.MetaDataFactory"
+                      value="jpa(Types=org.apache.falcon.persistence.EntityBean;
+                org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
+                org.apache.falcon.persistence.MonitoredFeedsBean)"></property>
+
+            <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
+            <property name="openjpa.LockManager" value="pessimistic"/>
+            <property name="openjpa.ReadLockLevel" value="read"/>
+            <property name="openjpa.WriteLockLevel" value="write"/>
+            <property name="openjpa.jdbc.TransactionIsolation" value="repeatable-read"/> <!--CUSTOM-->
+            <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
+            <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
+            <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
+            <property name="openjpa.Log" value="log4j"/>
+        </properties>
+    </persistence-unit>
+
+</persistence>

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 81d3da1..87a74bf 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -42,7 +42,8 @@
                         org.apache.falcon.metadata.MetadataMappingService,\
                         org.apache.falcon.service.LogCleanupService,\
                         org.apache.falcon.service.GroupsService,\
-                        org.apache.falcon.service.ProxyUserService
+                        org.apache.falcon.service.ProxyUserService,\
+                        org.apache.falcon.service.FalconJPAService
 ## Add if you want to use Falcon Azure integration ##
 #                        org.apache.falcon.adfservice.ADFProviderService
 ## If you wish to use Falcon native scheduler add the commented out services below to application.services ##
@@ -51,7 +52,7 @@
 #                        org.apache.falcon.notification.service.impl.AlarmService,\
 #                        org.apache.falcon.notification.service.impl.DataAvailabilityService,\
 #                        org.apache.falcon.execution.FalconExecutionService,\
-#                        org.apache.falcon.state.store.service.FalconJPAService
+
 
 
 # List of Lifecycle policies configured.
@@ -305,4 +306,4 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
 ## Creates Falcon DB.
 ## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
 ## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
-#*.falcon.statestore.create.db.schema=true
\ No newline at end of file
+#*.falcon.statestore.create.db.schema=true

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/statestore.credentials
----------------------------------------------------------------------
diff --git a/common/src/main/resources/statestore.credentials b/common/src/main/resources/statestore.credentials
index 86c32a1..b0e4196 100644
--- a/common/src/main/resources/statestore.credentials
+++ b/common/src/main/resources/statestore.credentials
@@ -18,5 +18,5 @@
 
 
 ######### StateStore Credentials #####
-#*.falcon.statestore.jdbc.username=sa
-#*.falcon.statestore.jdbc.password=
\ No newline at end of file
+*.falcon.statestore.jdbc.username=sa
+*.falcon.statestore.jdbc.password=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/statestore.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/statestore.properties b/common/src/main/resources/statestore.properties
index 44e79b3..7686426 100644
--- a/common/src/main/resources/statestore.properties
+++ b/common/src/main/resources/statestore.properties
@@ -42,4 +42,22 @@
 ## Creates Falcon DB.
 ## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
 ## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
-#*.falcon.statestore.create.db.schema=true
\ No newline at end of file
+#*.falcon.statestore.create.db.schema=true
+
+
+######## StateStore Properties #####
+*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true
+*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+# Maximum number of active connections that can be allocated from this pool at the same time.
+*.falcon.statestore.pool.max.active.conn=10
+*.falcon.statestore.connection.properties=
+# Indicates the interval (in milliseconds) between eviction runs.
+*.falcon.statestore.validate.db.connection.eviction.interval=300000
+# The number of objects to examine during each run of the idle object evictor thread.
+*.falcon.statestore.validate.db.connection.eviction.num=10
+# Creates Falcon DB.
+# If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+# If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+*.falcon.statestore.create.db.schema=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/docs/src/site/twiki/FalconNativeScheduler.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconNativeScheduler.twiki b/docs/src/site/twiki/FalconNativeScheduler.twiki
index 9ffc5e9..1f51739 100644
--- a/docs/src/site/twiki/FalconNativeScheduler.twiki
+++ b/docs/src/site/twiki/FalconNativeScheduler.twiki
@@ -29,7 +29,7 @@ You can enable native scheduler by making changes to __$FALCON_HOME/conf/startup
                         org.apache.falcon.service.ProcessSubscriberService,\
                         org.apache.falcon.service.FeedSLAMonitoringService,\
                         org.apache.falcon.service.LifecyclePolicyMap,\
-                        org.apache.falcon.state.store.service.FalconJPAService,\
+                        org.apache.falcon.service.FalconJPAService,\
                         org.apache.falcon.entity.store.ConfigurationStore,\
                         org.apache.falcon.rerun.service.RetryService,\
                         org.apache.falcon.rerun.service.LateRunService,\

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
new file mode 100644
index 0000000..39e2562
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.jdbc;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.falcon.persistence.MonitoredFeedsBean;
+import org.apache.falcon.persistence.PendingInstanceBean;
+import org.apache.falcon.persistence.PersistenceConstants;
+import org.apache.falcon.persistence.ResultNotFoundException;
+import org.apache.falcon.service.FalconJPAService;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.util.Date;
+import java.util.List;
+
+/**
+* StateStore for MonitoringFeeds and PendingFeedInstances.
+*/
+
+public class MonitoringJdbcStateStore {
+
+    private EntityManager getEntityManager() {
+        return FalconJPAService.get().getEntityManager();
+    }
+
+
+    public void putMonitoredFeed(String feedName){
+
+        MonitoredFeedsBean monitoredFeedsBean = new MonitoredFeedsBean();
+        monitoredFeedsBean.setFeedName(feedName);
+        EntityManager entityManager = getEntityManager();
+        try {
+            beginTransaction(entityManager);
+            entityManager.persist(monitoredFeedsBean);
+        } finally {
+            commitAndCloseTransaction(entityManager);
+        }
+    }
+
+    public MonitoredFeedsBean getMonitoredFeed(String feedName){
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITERED_INSTANCE);
+        q.setParameter("feedName", feedName);
+        List result = q.getResultList();
+        try {
+            if (result.isEmpty()) {
+                return null;
+            }
+        } finally {
+            entityManager.close();
+        }
+        return ((MonitoredFeedsBean)result.get(0));
+    }
+
+    public void deleteMonitoringFeed(String feedName) {
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_INSTANCES);
+        q.setParameter("feedName", feedName);
+        try{
+            q.executeUpdate();
+        } finally {
+            commitAndCloseTransaction(entityManager);
+        }
+    }
+
+    public List<MonitoredFeedsBean> getAllMonitoredFeed() throws ResultNotFoundException{
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_FEEDS);
+        List result = q.getResultList();
+        try{
+            if (result.isEmpty()) {
+                throw new ResultNotFoundException("No Feed has been scheduled for monitoring.");
+            }
+        } finally {
+            entityManager.close();
+        }
+        return result;
+    }
+
+    public void deletePendingInstance(String feedName, String clusterName , Date nominalTime){
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES);
+        q.setParameter("feedName", feedName);
+        q.setParameter("clusterName", clusterName);
+        q.setParameter("nominalTime", nominalTime);
+        try{
+            q.executeUpdate();
+        } finally {
+            commitAndCloseTransaction(entityManager);
+        }
+    }
+
+    public void deletePendingInstances(String feedName, String clusterName){
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED);
+        q.setParameter("feedName", feedName);
+        q.setParameter("clusterName", clusterName);
+        try{
+            q.executeUpdate();
+        } finally {
+            commitAndCloseTransaction(entityManager);
+        }
+    }
+
+    public void putPendingInstances(String feed, String clusterName, Date nominalTime){
+        EntityManager entityManager = getEntityManager();
+        PendingInstanceBean pendingInstanceBean = new PendingInstanceBean();
+        pendingInstanceBean.setFeedName(feed);
+        pendingInstanceBean.setClusterName(clusterName);
+        pendingInstanceBean.setNominalTime(nominalTime);
+
+        beginTransaction(entityManager);
+        entityManager.persist(pendingInstanceBean);
+        commitAndCloseTransaction(entityManager);
+    }
+
+    public List<Date> getNominalInstances(String feedName, String clusterName) throws ResultNotFoundException{
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES);
+        q.setParameter("feedName", feedName);
+        q.setParameter("clusterName", clusterName);
+        List result = q.getResultList();
+        try{
+            if (CollectionUtils.isEmpty(result)) {
+                throw new ResultNotFoundException(feedName + " with " + clusterName + "Not Found");
+            }
+        } finally {
+            entityManager.close();
+        }
+        return result;
+    }
+    public List<PendingInstanceBean> getAllInstances(){
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES);
+        List result = q.getResultList();
+
+        try {
+            if (CollectionUtils.isEmpty(result)) {
+                return null;
+            }
+        } finally{
+            entityManager.close();
+        }
+        return result;
+    }
+
+    private void commitAndCloseTransaction(EntityManager entityManager) {
+        entityManager.getTransaction().commit();
+        entityManager.close();
+    }
+
+    private void beginTransaction(EntityManager entityManager) {
+        entityManager.getTransaction().begin();
+    }
+
+}