You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2016/03/28 15:39:26 UTC

[3/3] falcon git commit: FALCON-1865 Persist Feed sla data to database

FALCON-1865 Persist Feed sla data to database

Author: Praveen Adlakha <ad...@gmail.com>

Reviewers: Ajay Yadava <aj...@apache.org>

Closes #77 from PraveenAdlakha/feed_alert


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/de2f5c0a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/de2f5c0a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/de2f5c0a

Branch: refs/heads/master
Commit: de2f5c0ab1c26b8d198d067e066c579a86bce737
Parents: 10f3843
Author: Praveen Adlakha <ad...@gmail.com>
Authored: Mon Mar 28 19:08:56 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Mar 28 19:08:56 2016 +0530

----------------------------------------------------------------------
 common/pom.xml                                  |  41 ++
 .../apache/falcon/persistence/EntityBean.java   | 117 +++++
 .../apache/falcon/persistence/InstanceBean.java | 229 ++++++++++
 .../falcon/persistence/MonitoredFeedsBean.java  |  73 ++++
 .../falcon/persistence/PendingInstanceBean.java |  98 +++++
 .../persistence/PersistenceConstants.java       |  35 ++
 .../persistence/ResultNotFoundException.java    |  31 ++
 .../apache/falcon/service/FalconJPAService.java | 170 +++++++
 .../falcon/tools/FalconStateStoreDBCLI.java     | 438 +++++++++++++++++++
 .../src/main/resources/META-INF/persistence.xml | 113 +++++
 common/src/main/resources/startup.properties    |   7 +-
 .../src/main/resources/statestore.credentials   |   4 +-
 common/src/main/resources/statestore.properties |  20 +-
 docs/src/site/twiki/FalconNativeScheduler.twiki |   2 +-
 .../falcon/jdbc/MonitoringJdbcStateStore.java   | 175 ++++++++
 .../service/FeedSLAMonitoringService.java       | 191 +++-----
 .../jdbc/MonitoringJdbcStateStoreTest.java      |  97 ++++
 .../falcon/service/FeedSLAMonitoringTest.java   |  34 --
 scheduler/pom.xml                               |  42 +-
 .../falcon/state/store/jdbc/BeanMapperUtil.java |   2 +
 .../falcon/state/store/jdbc/EntityBean.java     | 117 -----
 .../falcon/state/store/jdbc/InstanceBean.java   | 229 ----------
 .../falcon/state/store/jdbc/JDBCStateStore.java |   4 +-
 .../state/store/service/FalconJPAService.java   | 171 --------
 .../falcon/tools/FalconStateStoreDBCLI.java     | 436 ------------------
 .../src/main/resources/META-INF/persistence.xml | 104 -----
 .../execution/FalconExecutionServiceTest.java   |   2 +-
 .../falcon/state/AbstractSchedulerTestBase.java |   2 +-
 .../state/service/TestFalconJPAService.java     |   2 +-
 .../state/service/store/TestJDBCStateStore.java |   2 +-
 scheduler/src/test/resources/startup.properties |   5 +-
 .../src/test/resources/statestore.properties    |   2 +-
 src/build/findbugs-exclude.xml                  |  25 +-
 src/conf/startup.properties                     |   2 +-
 unit/pom.xml                                    |  10 +
 unit/src/main/resources/startup.properties      |   1 +
 .../org/apache/falcon/unit/TestFalconUnit.java  |   1 +
 .../AbstractSchedulerManagerJerseyIT.java       |   2 +-
 webapp/src/test/resources/startup.properties    |   2 +-
 39 files changed, 1744 insertions(+), 1294 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index df28f9b..c54f9d8 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -187,6 +187,26 @@
             <groupId>com.thinkaurelius.titan</groupId>
             <artifactId>titan-berkeleyje</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.openjpa</groupId>
+            <artifactId>openjpa-jdbc</artifactId>
+            <version>${openjpa.version}</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.openjpa</groupId>
+            <artifactId>openjpa-persistence-jdbc</artifactId>
+            <version>${openjpa.version}</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>javax.validation</groupId>
+            <artifactId>validation-api</artifactId>
+            <version>${javax-validation.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
@@ -216,6 +236,27 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <phase>process-classes</phase>
+                        <configuration>
+                            <tasks>
+                                <taskdef name="openjpac" classname="org.apache.openjpa.ant.PCEnhancerTask" classpathref="maven.compile.classpath"/>
+                                <openjpac>
+                                    <classpath refid="maven.compile.classpath"/>
+                                </openjpac>
+                            </tasks>
+                        </configuration>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/EntityBean.java b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
new file mode 100644
index 0000000..5c94fa4
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import org.apache.openjpa.persistence.jdbc.Index;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.OneToMany;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+import java.util.List;
+//SUSPEND CHECKSTYLE CHECK  LineLengthCheck
+/**
+ * Entity object which will be stored in Data Base.
+ */
+@Entity
+@NamedQueries({
+        @NamedQuery(name = "GET_ENTITY", query = "select OBJECT(a) from EntityBean a where a.id = :id"),
+        @NamedQuery(name = "GET_ENTITY_FOR_STATE", query = "select OBJECT(a) from EntityBean a where a.state = :state"),
+        @NamedQuery(name = "UPDATE_ENTITY", query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type where a.id = :id"),
+        @NamedQuery(name = "GET_ENTITIES_FOR_TYPE", query = "select OBJECT(a) from EntityBean a where a.type = :type"),
+        @NamedQuery(name = "GET_ENTITIES", query = "select OBJECT(a) from EntityBean a"),
+        @NamedQuery(name = "DELETE_ENTITY", query = "delete from EntityBean a where a.id = :id"),
+        @NamedQuery(name = "DELETE_ENTITIES", query = "delete from EntityBean")})
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+@Table(name = "ENTITIES")
+public class EntityBean {
+    @NotNull
+    @Id
+    private String id;
+
+    @Basic
+    @NotNull
+    @Column(name = "name")
+    private String name;
+
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "type")
+    private String type;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "current_state")
+    private String state;
+
+    @OneToMany(cascade= CascadeType.REMOVE, mappedBy="entityBean")
+    private List<InstanceBean> instanceBeans;
+
+    public EntityBean() {
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getState() {
+        return state;
+    }
+
+    public void setState(String state) {
+        this.state = state;
+    }
+
+    public List<InstanceBean> getInstanceBeans() {
+        return instanceBeans;
+    }
+
+    public void setInstanceBeans(List<InstanceBean> instanceBeans) {
+        this.instanceBeans = instanceBeans;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java
new file mode 100644
index 0000000..b7e10f1
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import org.apache.openjpa.persistence.jdbc.ForeignKey;
+import org.apache.openjpa.persistence.jdbc.ForeignKeyAction;
+import org.apache.openjpa.persistence.jdbc.Index;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Lob;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+import java.sql.Timestamp;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Instance State which will be stored in DB.
+ */
+@Entity
+@NamedQueries({
+        @NamedQuery(name = "GET_INSTANCE", query = "select OBJECT(a) from InstanceBean a where a.id = :id"),
+        @NamedQuery(name = "GET_INSTANCE_FOR_EXTERNAL_ID", query = "select OBJECT(a) from InstanceBean a where a.externalID = :externalID"),
+        @NamedQuery(name = "DELETE_INSTANCE", query = "delete from InstanceBean a where a.id = :id"),
+        @NamedQuery(name = "DELETE_INSTANCE_FOR_ENTITY", query = "delete from InstanceBean a where a.entityId = :entityId"),
+        @NamedQuery(name = "UPDATE_INSTANCE", query = "update InstanceBean a set a.cluster = :cluster, a.externalID = :externalID, a.instanceTime = :instanceTime, a.creationTime = :creationTime, a.actualEndTime = :actualEndTime, a.currentState = :currentState, a.actualStartTime = :actualStartTime, a.instanceSequence = :instanceSequence, a.awaitedPredicates = :awaitedPredicates, a.properties = :properties where a.id = :id"),
+        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster"),
+        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState)"),
+        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"),
+        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"),
+        @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"),
+        @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a"),
+        @NamedQuery(name = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE", query = "select a.currentState, COUNT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.instanceTime >= :startTime AND a.instanceTime < :endTime GROUP BY a.currentState")
+})
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+@Table(name = "INSTANCES")
+public class InstanceBean {
+
+    @Id
+    @NotNull
+    private String id;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "entity_id")
+    private String entityId;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "cluster")
+    private String cluster;
+
+    @Basic
+    @Index
+    @Column(name = "external_id")
+    private String externalID;
+
+    @Basic
+    @Index
+    @Column(name = "instance_time")
+    private Timestamp instanceTime;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "creation_time")
+    private Timestamp creationTime;
+
+    @Basic
+    @Column(name = "actual_start_time")
+    private Timestamp actualStartTime;
+
+    @Basic
+    @Column(name = "actual_end_time")
+    private Timestamp actualEndTime;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "current_state")
+    private String currentState;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "instance_sequence")
+    private Integer instanceSequence;
+
+    @ForeignKey(deleteAction= ForeignKeyAction.CASCADE)
+    @ManyToOne(cascade= CascadeType.REMOVE)
+    private EntityBean entityBean;
+
+
+    @Column(name = "awaited_predicates")
+    @Lob
+    private byte[] awaitedPredicates;
+
+    @Column(name = "properties")
+    @Lob
+    private byte[] properties;
+
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getCluster() {
+        return cluster;
+    }
+
+    public void setCluster(String cluster) {
+        this.cluster = cluster;
+    }
+
+    public String getExternalID() {
+        return externalID;
+    }
+
+    public void setExternalID(String externalID) {
+        this.externalID = externalID;
+    }
+
+    public Timestamp getInstanceTime() {
+        return instanceTime;
+    }
+
+    public void setInstanceTime(Timestamp instanceTime) {
+        this.instanceTime = instanceTime;
+    }
+
+    public Timestamp getCreationTime() {
+        return creationTime;
+    }
+
+    public void setCreationTime(Timestamp creationTime) {
+        this.creationTime = creationTime;
+    }
+
+    public Timestamp getActualStartTime() {
+        return actualStartTime;
+    }
+
+    public void setActualStartTime(Timestamp actualStartTime) {
+        this.actualStartTime = actualStartTime;
+    }
+
+    public Timestamp getActualEndTime() {
+        return actualEndTime;
+    }
+
+    public void setActualEndTime(Timestamp actualEndTime) {
+        this.actualEndTime = actualEndTime;
+    }
+
+    public String getCurrentState() {
+        return currentState;
+    }
+
+    public void setCurrentState(String currentState) {
+        this.currentState = currentState;
+    }
+
+    public byte[] getAwaitedPredicates() {
+        return awaitedPredicates;
+    }
+
+    public void setAwaitedPredicates(byte[] awaitedPredicates) {
+        this.awaitedPredicates = awaitedPredicates;
+    }
+
+    public Integer getInstanceSequence() {
+        return instanceSequence;
+    }
+
+    public void setInstanceSequence(Integer instanceSequence) {
+        this.instanceSequence = instanceSequence;
+    }
+
+    public String getEntityId() {
+        return entityId;
+    }
+
+    public void setEntityId(String entityId) {
+        this.entityId = entityId;
+    }
+
+    public byte[] getProperties() {
+        return properties;
+    }
+
+    public void setProperties(byte[] properties) {
+        this.properties = properties;
+    }
+
+    public EntityBean getEntityBean() {
+        return entityBean;
+    }
+
+    public void setEntityBean(EntityBean entityBean) {
+        this.entityBean = entityBean;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
new file mode 100644
index 0000000..2b48569
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import javax.persistence.Entity;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Column;
+import javax.persistence.Basic;
+import javax.validation.constraints.NotNull;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+* The Feeds that are to be monitered will be stored in the db.
+* */
+
+@Entity
+@NamedQueries({
+        @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from "
+                + "MonitoredFeedsBean a where a.feedName = :feedName"),
+        @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredFeedsBean "
+                + "a where a.feedName = :feedName"),
+        @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select OBJECT(a) "
+                + "from MonitoredFeedsBean a")
+})
+@Table(name="MONITORED_FEEDS")
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+public class MonitoredFeedsBean {
+    @NotNull
+    @GeneratedValue(strategy = GenerationType.AUTO)
+    @Id
+    private String id;
+
+    @Basic
+    @NotNull
+    @Column(name = "feed_name")
+    private String feedName;
+
+    public String getFeedName() {
+        return feedName;
+    }
+
+    public void setFeedName(String feedName) {
+        this.feedName = feedName;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
new file mode 100644
index 0000000..038244a
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import javax.persistence.Entity;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.GenerationType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.Id;
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.validation.constraints.NotNull;
+import java.util.Date;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+* The instances of feed to be monitored will be stored in db.
+* */
+@Entity
+@NamedQueries({
+        @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"),
+        @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"),
+        @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED, query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
+        @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
+        @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select  OBJECT(a) from PendingInstanceBean a ")
+})
+@Table(name = "PENDING_INSTANCES")
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+public class PendingInstanceBean {
+    @NotNull
+    @GeneratedValue(strategy = GenerationType.AUTO)
+    @Id
+    private String id;
+
+    @Basic
+    @NotNull
+    @Column(name = "feed_name")
+    private String feedName;
+
+    @Basic
+    @NotNull
+    @Column(name = "cluster_name")
+    private String clusterName;
+
+    @Basic
+    @NotNull
+    @Column(name = "nominal_time")
+    private Date nominalTime;
+
+    public Date getNominalTime() {
+        return nominalTime;
+    }
+
+    public void setNominalTime(Date nominalTime) {
+        this.nominalTime = nominalTime;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getFeedName() {
+        return feedName;
+    }
+
+    public void setFeedName(String feedName) {
+        this.feedName = feedName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
new file mode 100644
index 0000000..511270e
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+/**
+ * The name of queries to be used as constants accross the packages.
+ */
+
+public final class PersistenceConstants {
+    private PersistenceConstants(){
+
+    }
+    public static final String  GET_MONITERED_INSTANCE = "GET_MONITERED_INSTANCE";
+    public static final String DELETE_MONITORED_INSTANCES = "DELETE_MONITORED_INSTANCES";
+    public static final String GET_ALL_MONITORING_FEEDS = "GET_ALL_MONITORING_FEEDS";
+    public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES";
+    public static final String DELETE_PENDING_NOMINAL_INSTANCES = "DELETE_PENDING_NOMINAL_INSTANCES";
+    public static final String DELETE_ALL_INSTANCES_FOR_FEED = "DELETE_ALL_INSTANCES_FOR_FEED";
+    public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES";
+    public static final String GET_ALL_PENDING_INSTANCES = "GET_ALL_PENDING_INSTANCES";
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java b/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java
new file mode 100644
index 0000000..c368d2c
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.persistence;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * Exception to be through by the bean classes.
+ */
+public class ResultNotFoundException extends FalconException {
+
+    public ResultNotFoundException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/service/FalconJPAService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/FalconJPAService.java b/common/src/main/java/org/apache/falcon/service/FalconJPAService.java
new file mode 100644
index 0000000..73fde33
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/service/FalconJPAService.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.service;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.persistence.EntityBean;
+import org.apache.falcon.persistence.InstanceBean;
+import org.apache.falcon.util.StateStoreProperties;
+import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import java.text.MessageFormat;
+import java.util.Properties;
+
+/**
+ * Service that manages JPA.
+ */
+public final class FalconJPAService implements FalconService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FalconJPAService.class);
+    public static final String PREFIX = "falcon.statestore.";
+
+    public static final String DB_SCHEMA = PREFIX + "schema.name";
+    public static final String URL = PREFIX + "jdbc.url";
+    public static final String DRIVER = PREFIX + "jdbc.driver";
+    public static final String USERNAME = PREFIX + "jdbc.username";
+    public static final String PASSWORD = PREFIX + "jdbc.password";
+    public static final String CONN_DATA_SOURCE = PREFIX + "connection.data.source";
+    public static final String CONN_PROPERTIES = PREFIX + "connection.properties";
+    public static final String MAX_ACTIVE_CONN = PREFIX + "pool.max.active.conn";
+    public static final String CREATE_DB_SCHEMA = PREFIX + "create.db.schema";
+    public static final String VALIDATE_DB_CONN = PREFIX + "validate.db.connection";
+    public static final String VALIDATE_DB_CONN_EVICTION_INTERVAL = PREFIX + "validate.db.connection.eviction.interval";
+    public static final String VALIDATE_DB_CONN_EVICTION_NUM = PREFIX + "validate.db.connection.eviction.num";
+
+    private EntityManagerFactory entityManagerFactory;
+    // Persistent Unit which is defined in persistence.xml
+    private String persistenceUnit;
+    private static final FalconJPAService FALCON_JPA_SERVICE = new FalconJPAService();
+
+    private FalconJPAService() {
+    }
+
+    public static FalconJPAService get() {
+        return FALCON_JPA_SERVICE;
+    }
+
+    public EntityManagerFactory getEntityManagerFactory() {
+        return entityManagerFactory;
+    }
+
+    public void setPersistenceUnit(String dbType) {
+        if (StringUtils.isEmpty(dbType)) {
+            throw new IllegalArgumentException(" DB type cannot be null or empty");
+        }
+        dbType = dbType.split(":")[0];
+        this.persistenceUnit = "falcon-" + dbType;
+    }
+
+    @Override
+    public String getName() {
+        return this.getClass().getSimpleName();
+    }
+
+    @Override
+    public void init() throws FalconException {
+        Properties props = getPropsforStore();
+        entityManagerFactory = Persistence.
+                createEntityManagerFactory(persistenceUnit, props);
+        EntityManager entityManager = getEntityManager();
+        entityManager.find(EntityBean.class, 1);
+        entityManager.find(InstanceBean.class, 1);
+        LOG.info("All entities initialized");
+
+        // need to use a pseudo no-op transaction so all entities, datasource
+        // and connection pool are initialized one time only
+        entityManager.getTransaction().begin();
+        OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) entityManagerFactory;
+        // Mask the password with '***'
+        String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,");
+        LOG.info("JPA configuration: {0}", logMsg);
+        entityManager.getTransaction().commit();
+        entityManager.close();
+    }
+
+    private Properties getPropsforStore() throws FalconException {
+        String dbSchema = StateStoreProperties.get().getProperty(DB_SCHEMA);
+        String url = StateStoreProperties.get().getProperty(URL);
+        String driver = StateStoreProperties.get().getProperty(DRIVER);
+        String user = StateStoreProperties.get().getProperty(USERNAME);
+        String password = StateStoreProperties.get().getProperty(PASSWORD).trim();
+        String maxConn = StateStoreProperties.get().getProperty(MAX_ACTIVE_CONN).trim();
+        String dataSource = StateStoreProperties.get().getProperty(CONN_DATA_SOURCE);
+        String connPropsConfig = StateStoreProperties.get().getProperty(CONN_PROPERTIES);
+        boolean autoSchemaCreation = Boolean.parseBoolean(StateStoreProperties.get().getProperty(CREATE_DB_SCHEMA,
+                "false"));
+        boolean validateDbConn = Boolean.parseBoolean(StateStoreProperties.get().getProperty(VALIDATE_DB_CONN, "true"));
+        String evictionInterval = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_INTERVAL).trim();
+        String evictionNum = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_NUM).trim();
+
+        if (!url.startsWith("jdbc:")) {
+            throw new FalconException("invalid JDBC URL, must start with 'jdbc:'" + url);
+        }
+        String dbType = url.substring("jdbc:".length());
+        if (dbType.indexOf(":") <= 0) {
+            throw new FalconException("invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'" + url);
+        }
+        setPersistenceUnit(dbType);
+        String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}";
+        connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn);
+        Properties props = new Properties();
+        if (autoSchemaCreation) {
+            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
+            props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");
+        } else if (validateDbConn) {
+            // validation can be done only if the schema already exist, else a
+            // connection cannot be obtained to create the schema.
+            String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval;
+            String num = "numTestsPerEvictionRun=" + evictionNum;
+            connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num;
+            connProps += ",ValidationQuery=select 1";
+            connProps = MessageFormat.format(connProps, dbSchema);
+        } else {
+            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
+        }
+        if (connPropsConfig != null) {
+            connProps += "," + connPropsConfig;
+        }
+        props.setProperty("openjpa.ConnectionProperties", connProps);
+        props.setProperty("openjpa.ConnectionDriverName", dataSource);
+        return props;
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+        if (entityManagerFactory.isOpen()) {
+            entityManagerFactory.close();
+        }
+    }
+
+
+    /**
+     * Return an EntityManager. Used by the StoreService.
+     *
+     * @return an entity manager
+     */
+    public EntityManager getEntityManager() {
+        return getEntityManagerFactory().createEntityManager();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
new file mode 100644
index 0000000..df8194c
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.tools;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.falcon.cli.CLIParser;
+import org.apache.falcon.service.FalconJPAService;
+import org.apache.falcon.util.BuildProperties;
+import org.apache.falcon.util.StateStoreProperties;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Command Line utility for Table Creation, Update.
+ */
+public class FalconStateStoreDBCLI {
+    public static final String HELP_CMD = "help";
+    public static final String VERSION_CMD = "version";
+    public static final String CREATE_CMD = "create";
+    public static final String SQL_FILE_OPT = "sqlfile";
+    public static final String RUN_OPT = "run";
+    public static final String UPGRADE_CMD = "upgrade";
+
+    // Represents whether DB instance exists or not.
+    private boolean instanceExists;
+    private static final String[] FALCON_HELP =
+    {"Falcon DB initialization tool currently supports Derby DB/ Mysql/ PostgreSQL"};
+
+    public static void main(String[] args) {
+        new FalconStateStoreDBCLI().run(args);
+    }
+
+    public FalconStateStoreDBCLI() {
+        instanceExists = false;
+    }
+
+    protected Options getOptions() {
+        Option sqlfile = new Option(SQL_FILE_OPT, true,
+                "Generate SQL script instead of creating/upgrading the DB schema");
+        Option run = new Option(RUN_OPT, false, "Confirmation option regarding DB schema creation/upgrade");
+        Options options = new Options();
+        options.addOption(sqlfile);
+        options.addOption(run);
+        return options;
+    }
+
+    public synchronized int run(String[] args) {
+        if (instanceExists) {
+            throw new IllegalStateException("CLI instance already used");
+        }
+        instanceExists = true;
+
+        CLIParser parser = new CLIParser("falcondb", FALCON_HELP);
+        parser.addCommand(HELP_CMD, "", "Display usage for all commands or specified command", new Options(), false);
+        parser.addCommand(VERSION_CMD, "", "Show Falcon DB version information", new Options(), false);
+        parser.addCommand(CREATE_CMD, "", "Create Falcon DB schema", getOptions(), false);
+        parser.addCommand(UPGRADE_CMD, "", "Upgrade Falcon DB schema", getOptions(), false);
+
+        try {
+            CLIParser.Command command = parser.parse(args);
+            if (command.getName().equals(HELP_CMD)) {
+                parser.showHelp();
+            } else if (command.getName().equals(VERSION_CMD)) {
+                showVersion();
+            } else {
+                if (!command.getCommandLine().hasOption(SQL_FILE_OPT)
+                        && !command.getCommandLine().hasOption(RUN_OPT)) {
+                    throw new Exception("'-sqlfile <FILE>' or '-run' options must be specified");
+                }
+                CommandLine commandLine = command.getCommandLine();
+                String sqlFile = (commandLine.hasOption(SQL_FILE_OPT))
+                        ? commandLine.getOptionValue(SQL_FILE_OPT)
+                        : File.createTempFile("falcondb-", ".sql").getAbsolutePath();
+                boolean run = commandLine.hasOption(RUN_OPT);
+                if (command.getName().equals(CREATE_CMD)) {
+                    createDB(sqlFile, run);
+                } else if (command.getName().equals(UPGRADE_CMD)) {
+                    upgradeDB(sqlFile, run);
+                }
+                System.out.println("The SQL commands have been written to: " + sqlFile);
+                if (!run) {
+                    System.out.println("WARN: The SQL commands have NOT been executed, you must use the '-run' option");
+                }
+            }
+            return 0;
+        } catch (ParseException ex) {
+            System.err.println("Invalid sub-command: " + ex.getMessage());
+            System.err.println();
+            System.err.println(parser.shortHelp());
+            return 1;
+        } catch (Exception ex) {
+            System.err.println();
+            System.err.println("Error: " + ex.getMessage());
+            System.err.println();
+            System.err.println("Stack trace for the error was (for debug purposes):");
+            System.err.println("--------------------------------------");
+            ex.printStackTrace(System.err);
+            System.err.println("--------------------------------------");
+            System.err.println();
+            return 1;
+        }
+    }
+
+    private void upgradeDB(String sqlFile, boolean run) throws Exception {
+        validateConnection();
+        if (!checkDBExists()) {
+            throw new Exception("Falcon DB doesn't exist");
+        }
+        String falconVersion = BuildProperties.get().getProperty("project.version");
+        String dbVersion = getFalconDBVersion();
+        if (dbVersion.compareTo(falconVersion) >= 0) {
+            System.out.println("Falcon DB already upgraded to Falcon version '" + falconVersion + "'");
+            return;
+        }
+
+        createUpgradeDB(sqlFile, run, false);
+        upgradeFalconDBVersion(sqlFile, run, falconVersion);
+
+        // any post upgrade tasks
+        if (run) {
+            System.out.println("Falcon DB has been upgraded to Falcon version '" + falconVersion + "'");
+        }
+    }
+
+
+    private void upgradeFalconDBVersion(String sqlFile, boolean run, String version) throws Exception {
+        String updateDBVersion = "update FALCON_DB_PROPS set data='" + version + "' where name='db.version'";
+        PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
+        writer.println();
+        writer.println(updateDBVersion);
+        writer.close();
+        System.out.println("Upgrade db.version in FALCON_DB_PROPS table to " + version);
+        if (run) {
+            Connection conn = createConnection();
+            Statement st = null;
+            try {
+                conn.setAutoCommit(true);
+                st = conn.createStatement();
+                st.executeUpdate(updateDBVersion);
+                st.close();
+            } catch (Exception ex) {
+                throw new Exception("Could not upgrade db.version in FALCON_DB_PROPS table: " + ex.toString(), ex);
+            } finally {
+                closeStatement(st);
+                conn.close();
+            }
+        }
+        System.out.println("DONE");
+    }
+
+    private static final String GET_FALCON_DB_VERSION = "select data from FALCON_DB_PROPS where name = 'db.version'";
+
+    private String getFalconDBVersion() throws Exception {
+        String version;
+        System.out.println("Get Falcon DB version");
+        Connection conn = createConnection();
+        Statement st = null;
+        ResultSet rs = null;
+        try {
+            st = conn.createStatement();
+            rs = st.executeQuery(GET_FALCON_DB_VERSION);
+            if (rs.next()) {
+                version = rs.getString(1);
+            } else {
+                throw new Exception("ERROR: Could not find Falcon DB 'db.version' in FALCON_DB_PROPS table");
+            }
+        } catch (Exception ex) {
+            throw new Exception("ERROR: Could not query FALCON_DB_PROPS table: " + ex.toString(), ex);
+        } finally {
+            closeResultSet(rs);
+            closeStatement(st);
+            conn.close();
+        }
+        System.out.println("DONE");
+        return version;
+    }
+
+
+    private Map<String, String> getJdbcConf() throws Exception {
+        Map<String, String> jdbcConf = new HashMap<String, String>();
+        jdbcConf.put("driver", StateStoreProperties.get().getProperty(FalconJPAService.DRIVER));
+        String url = StateStoreProperties.get().getProperty(FalconJPAService.URL);
+        jdbcConf.put("url", url);
+        jdbcConf.put("user", StateStoreProperties.get().getProperty(FalconJPAService.USERNAME));
+        jdbcConf.put("password", StateStoreProperties.get().getProperty(FalconJPAService.PASSWORD));
+        String dbType = url.substring("jdbc:".length());
+        if (dbType.indexOf(":") <= 0) {
+            throw new RuntimeException("Invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");
+        }
+        dbType = dbType.substring(0, dbType.indexOf(":"));
+        jdbcConf.put("dbtype", dbType);
+        return jdbcConf;
+    }
+
+    private String[] createMappingToolArguments(String sqlFile) throws Exception {
+        Map<String, String> conf = getJdbcConf();
+        List<String> args = new ArrayList<String>();
+        args.add("-schemaAction");
+        args.add("add");
+        args.add("-p");
+        args.add("persistence.xml#falcon-" + conf.get("dbtype"));
+        args.add("-connectionDriverName");
+        args.add(conf.get("driver"));
+        args.add("-connectionURL");
+        args.add(conf.get("url"));
+        args.add("-connectionUserName");
+        args.add(conf.get("user"));
+        args.add("-connectionPassword");
+        args.add(conf.get("password"));
+        if (sqlFile != null) {
+            args.add("-sqlFile");
+            args.add(sqlFile);
+        }
+        args.add("-indexes");
+        args.add("true");
+        args.add("org.apache.falcon.persistence.EntityBean");
+        args.add("org.apache.falcon.persistence.InstanceBean");
+        args.add("org.apache.falcon.persistence.PendingInstanceBean");
+        args.add("org.apache.falcon.persistence.MonitoredFeedsBean");
+        return args.toArray(new String[args.size()]);
+    }
+
+    private void createDB(String sqlFile, boolean run) throws Exception {
+        validateConnection();
+        if (checkDBExists()) {
+            return;
+        }
+
+        verifyFalconPropsTable(false);
+        createUpgradeDB(sqlFile, run, true);
+        createFalconPropsTable(sqlFile, run, BuildProperties.get().getProperty("project.version"));
+        if (run) {
+            System.out.println("Falcon DB has been created for Falcon version '"
+                    + BuildProperties.get().getProperty("project.version") + "'");
+        }
+    }
+
+    private static final String CREATE_FALCON_DB_PROPS =
+            "create table FALCON_DB_PROPS (name varchar(100), data varchar(100))";
+
+    private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception {
+        String insertDbVerion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";
+
+        PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
+        writer.println();
+        writer.println(CREATE_FALCON_DB_PROPS);
+        writer.println(insertDbVerion);
+        writer.close();
+        System.out.println("Create FALCON_DB_PROPS table");
+        if (run) {
+            Connection conn = createConnection();
+            Statement st = null;
+            try {
+                conn.setAutoCommit(true);
+                st = conn.createStatement();
+                st.executeUpdate(CREATE_FALCON_DB_PROPS);
+                st.executeUpdate(insertDbVerion);
+                st.close();
+            } catch (Exception ex) {
+                closeStatement(st);
+                throw new Exception("Could not create FALCON_DB_PROPS table: " + ex.toString(), ex);
+            } finally {
+                conn.close();
+            }
+        }
+        System.out.println("DONE");
+    }
+
+    private static final String FALCON_DB_PROPS_EXISTS = "select count(*) from FALCON_DB_PROPS";
+
+    private boolean verifyFalconPropsTable(boolean exists) throws Exception {
+        System.out.println((exists) ? "Check FALCON_DB_PROPS table exists"
+                : "Checking FALCON_DB_PROPS table does not exist");
+        boolean tableExists;
+        Connection conn = createConnection();
+        Statement st = null;
+        ResultSet rs = null;
+        try {
+            st = conn.createStatement();
+            rs = st.executeQuery(FALCON_DB_PROPS_EXISTS);
+            rs.next();
+            tableExists = true;
+        } catch (Exception ex) {
+            tableExists = false;
+        } finally {
+            closeResultSet(rs);
+            closeStatement(st);
+            conn.close();
+        }
+        if (tableExists != exists) {
+            throw new Exception("FALCON_DB_PROPS_TABLE table " + ((exists) ? "does not exist" : "exists"));
+        }
+        System.out.println("DONE");
+        return tableExists;
+    }
+
+    private void closeResultSet(ResultSet rs) {
+        try {
+            if (rs != null) {
+                rs.close();
+            }
+        } catch (Exception e) {
+            System.out.println("Unable to close ResultSet " + rs);
+        }
+    }
+
+    private void closeStatement(Statement st) throws Exception {
+        try {
+            if (st != null) {
+                st.close();
+            }
+        } catch (Exception e) {
+            System.out.println("Unable to close SQL Statement " + st);
+            throw new Exception(e);
+        }
+    }
+
+    private Connection createConnection() throws Exception {
+        Map<String, String> conf = getJdbcConf();
+        Class.forName(conf.get("driver")).newInstance();
+        return DriverManager.getConnection(conf.get("url"), conf.get("user"), conf.get("password"));
+    }
+
+    private void validateConnection() throws Exception {
+        System.out.println("Validating DB Connection");
+        try {
+            createConnection().close();
+            System.out.println("DONE");
+        } catch (Exception ex) {
+            throw new Exception("Could not connect to the database: " + ex.toString(), ex);
+        }
+    }
+
+    private static final String ENTITY_STATUS_QUERY =
+            "select count(*) from ENTITIES where current_state IN ('RUNNING', 'SUSPENDED')";
+    private static final String INSTANCE_STATUS_QUERY =
+            "select count(*) from INSTANCES where current_state IN ('RUNNING', 'SUSPENDED')";
+
+    private boolean checkDBExists() throws Exception {
+        boolean schemaExists;
+        Connection conn = createConnection();
+        ResultSet rs =  null;
+        Statement st = null;
+        try {
+            st = conn.createStatement();
+            rs = st.executeQuery(ENTITY_STATUS_QUERY);
+            rs.next();
+            schemaExists = true;
+        } catch (Exception ex) {
+            schemaExists = false;
+        } finally {
+            closeResultSet(rs);
+            closeStatement(st);
+            conn.close();
+        }
+        System.out.println("DB schema " + ((schemaExists) ? "exists" : "does not exist"));
+        return schemaExists;
+    }
+
+    private void createUpgradeDB(String sqlFile, boolean run, boolean create) throws Exception {
+        System.out.println((create) ? "Create SQL schema" : "Upgrade SQL schema");
+        String[] args = createMappingToolArguments(sqlFile);
+        org.apache.openjpa.jdbc.meta.MappingTool.main(args);
+        if (run) {
+            args = createMappingToolArguments(null);
+            org.apache.openjpa.jdbc.meta.MappingTool.main(args);
+        }
+        System.out.println("DONE");
+    }
+
+    private void showVersion() throws Exception {
+        System.out.println("Falcon Server version: "
+                + BuildProperties.get().getProperty("project.version"));
+        validateConnection();
+        if (!checkDBExists()) {
+            throw new Exception("Falcon DB doesn't exist");
+        }
+        try {
+            verifyFalconPropsTable(true);
+        } catch (Exception ex) {
+            throw new Exception("ERROR: It seems this Falcon DB was never upgraded with the 'falcondb' tool");
+        }
+        showFalconPropsInfo();
+    }
+
+    private static final String GET_FALCON_PROPS_INFO = "select name, data from FALCON_DB_PROPS order by name";
+
+    private void showFalconPropsInfo() throws Exception {
+        Connection conn = createConnection();
+        Statement st = null;
+        ResultSet rs = null;
+        try {
+            System.out.println("Falcon DB Version Information");
+            System.out.println("--------------------------------------");
+            st = conn.createStatement();
+            rs = st.executeQuery(GET_FALCON_PROPS_INFO);
+            while (rs.next()) {
+                System.out.println(rs.getString(1) + ": " + rs.getString(2));
+            }
+            System.out.println("--------------------------------------");
+        } catch (Exception ex) {
+            throw new Exception("ERROR querying FALCON_DB_PROPS table: " + ex.toString(), ex);
+        } finally {
+            closeResultSet(rs);
+            closeStatement(st);
+            conn.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml
new file mode 100644
index 0000000..4c9388c
--- /dev/null
+++ b/common/src/main/resources/META-INF/persistence.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<persistence xmlns="http://java.sun.com/xml/ns/persistence"
+             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+             version="1.0">
+
+    <persistence-unit name="falcon-derby" transaction-type="RESOURCE_LOCAL">
+        <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+
+        <class>org.apache.falcon.persistence.EntityBean</class>
+        <class>org.apache.falcon.persistence.InstanceBean</class>
+        <class>org.apache.falcon.persistence.PendingInstanceBean</class>
+        <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
+
+        <properties>
+            <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
+
+            <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
+
+            <property name="openjpa.MetaDataFactory"
+                      value="jpa(Types=org.apache.falcon.persistence.EntityBean;
+                org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
+                org.apache.falcon.persistence.MonitoredFeedsBean)"></property>
+
+            <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
+            <property name="openjpa.LockManager" value="pessimistic"/>
+            <property name="openjpa.ReadLockLevel" value="read"/>
+            <property name="openjpa.WriteLockLevel" value="write"/>
+            <property name="openjpa.jdbc.TransactionIsolation" value="read-committed"/> <!--CUSTOM-->
+            <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
+            <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
+            <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
+            <property name="openjpa.Log" value="log4j"/>
+        </properties>
+    </persistence-unit>
+
+    <persistence-unit name="falcon-mysql" transaction-type="RESOURCE_LOCAL">
+        <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+
+        <class>org.apache.falcon.persistence.EntityBean</class>
+        <class>org.apache.falcon.persistence.InstanceBean</class>
+        <class>org.apache.falcon.persistence.PendingInstanceBean</class>
+        <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
+
+        <properties>
+            <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
+
+            <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
+
+            <property name="openjpa.MetaDataFactory"
+                      value="jpa(Types=org.apache.falcon.persistence.EntityBean;
+                org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
+                org.apache.falcon.persistence.MonitoredFeedsBean)"></property>
+
+            <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
+            <property name="openjpa.LockManager" value="pessimistic"/>
+            <property name="openjpa.ReadLockLevel" value="read"/>
+            <property name="openjpa.WriteLockLevel" value="write"/>
+            <property name="openjpa.jdbc.TransactionIsolation" value="repeatable-read"/> <!--CUSTOM-->
+            <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
+            <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
+            <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
+            <property name="openjpa.Log" value="log4j"/>
+        </properties>
+    </persistence-unit>
+
+    <persistence-unit name="falcon-postgresql" transaction-type="RESOURCE_LOCAL">
+        <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+
+        <class>org.apache.falcon.persistence.EntityBean</class>
+        <class>org.apache.falcon.persistence.InstanceBean</class>
+        <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
+        <class>org.apache.falcon.persistence.PendingInstanceBean</class>
+
+        <properties>
+            <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
+
+            <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
+
+            <property name="openjpa.MetaDataFactory"
+                      value="jpa(Types=org.apache.falcon.persistence.EntityBean;
+                org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
+                org.apache.falcon.persistence.MonitoredFeedsBean)"></property>
+
+            <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
+            <property name="openjpa.LockManager" value="pessimistic"/>
+            <property name="openjpa.ReadLockLevel" value="read"/>
+            <property name="openjpa.WriteLockLevel" value="write"/>
+            <property name="openjpa.jdbc.TransactionIsolation" value="repeatable-read"/> <!--CUSTOM-->
+            <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
+            <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
+            <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
+            <property name="openjpa.Log" value="log4j"/>
+        </properties>
+    </persistence-unit>
+
+</persistence>

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 81d3da1..87a74bf 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -42,7 +42,8 @@
                         org.apache.falcon.metadata.MetadataMappingService,\
                         org.apache.falcon.service.LogCleanupService,\
                         org.apache.falcon.service.GroupsService,\
-                        org.apache.falcon.service.ProxyUserService
+                        org.apache.falcon.service.ProxyUserService,\
+                        org.apache.falcon.service.FalconJPAService
 ## Add if you want to use Falcon Azure integration ##
 #                        org.apache.falcon.adfservice.ADFProviderService
 ## If you wish to use Falcon native scheduler add the commented out services below to application.services ##
@@ -51,7 +52,7 @@
 #                        org.apache.falcon.notification.service.impl.AlarmService,\
 #                        org.apache.falcon.notification.service.impl.DataAvailabilityService,\
 #                        org.apache.falcon.execution.FalconExecutionService,\
-#                        org.apache.falcon.state.store.service.FalconJPAService
+
 
 
 # List of Lifecycle policies configured.
@@ -305,4 +306,4 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
 ## Creates Falcon DB.
 ## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
 ## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
-#*.falcon.statestore.create.db.schema=true
\ No newline at end of file
+#*.falcon.statestore.create.db.schema=true

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/statestore.credentials
----------------------------------------------------------------------
diff --git a/common/src/main/resources/statestore.credentials b/common/src/main/resources/statestore.credentials
index 86c32a1..b0e4196 100644
--- a/common/src/main/resources/statestore.credentials
+++ b/common/src/main/resources/statestore.credentials
@@ -18,5 +18,5 @@
 
 
 ######### StateStore Credentials #####
-#*.falcon.statestore.jdbc.username=sa
-#*.falcon.statestore.jdbc.password=
\ No newline at end of file
+*.falcon.statestore.jdbc.username=sa
+*.falcon.statestore.jdbc.password=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/statestore.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/statestore.properties b/common/src/main/resources/statestore.properties
index 44e79b3..7686426 100644
--- a/common/src/main/resources/statestore.properties
+++ b/common/src/main/resources/statestore.properties
@@ -42,4 +42,22 @@
 ## Creates Falcon DB.
 ## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
 ## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
-#*.falcon.statestore.create.db.schema=true
\ No newline at end of file
+#*.falcon.statestore.create.db.schema=true
+
+
+######## StateStore Properties #####
+*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true
+*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+# Maximum number of active connections that can be allocated from this pool at the same time.
+*.falcon.statestore.pool.max.active.conn=10
+*.falcon.statestore.connection.properties=
+# Indicates the interval (in milliseconds) between eviction runs.
+*.falcon.statestore.validate.db.connection.eviction.interval=300000
+# The number of objects to examine during each run of the idle object evictor thread.
+*.falcon.statestore.validate.db.connection.eviction.num=10
+# Creates Falcon DB.
+# If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+# If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+*.falcon.statestore.create.db.schema=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/docs/src/site/twiki/FalconNativeScheduler.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconNativeScheduler.twiki b/docs/src/site/twiki/FalconNativeScheduler.twiki
index 9ffc5e9..1f51739 100644
--- a/docs/src/site/twiki/FalconNativeScheduler.twiki
+++ b/docs/src/site/twiki/FalconNativeScheduler.twiki
@@ -29,7 +29,7 @@ You can enable native scheduler by making changes to __$FALCON_HOME/conf/startup
                         org.apache.falcon.service.ProcessSubscriberService,\
                         org.apache.falcon.service.FeedSLAMonitoringService,\
                         org.apache.falcon.service.LifecyclePolicyMap,\
-                        org.apache.falcon.state.store.service.FalconJPAService,\
+                        org.apache.falcon.service.FalconJPAService,\
                         org.apache.falcon.entity.store.ConfigurationStore,\
                         org.apache.falcon.rerun.service.RetryService,\
                         org.apache.falcon.rerun.service.LateRunService,\

http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
new file mode 100644
index 0000000..39e2562
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.jdbc;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.falcon.persistence.MonitoredFeedsBean;
+import org.apache.falcon.persistence.PendingInstanceBean;
+import org.apache.falcon.persistence.PersistenceConstants;
+import org.apache.falcon.persistence.ResultNotFoundException;
+import org.apache.falcon.service.FalconJPAService;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.util.Date;
+import java.util.List;
+
+/**
+* StateStore for MonitoringFeeds and PendingFeedInstances.
+*/
+
+public class MonitoringJdbcStateStore {
+
+    private EntityManager getEntityManager() {
+        return FalconJPAService.get().getEntityManager();
+    }
+
+
+    public void putMonitoredFeed(String feedName){
+
+        MonitoredFeedsBean monitoredFeedsBean = new MonitoredFeedsBean();
+        monitoredFeedsBean.setFeedName(feedName);
+        EntityManager entityManager = getEntityManager();
+        try {
+            beginTransaction(entityManager);
+            entityManager.persist(monitoredFeedsBean);
+        } finally {
+            commitAndCloseTransaction(entityManager);
+        }
+    }
+
+    public MonitoredFeedsBean getMonitoredFeed(String feedName){
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITERED_INSTANCE);
+        q.setParameter("feedName", feedName);
+        List result = q.getResultList();
+        try {
+            if (result.isEmpty()) {
+                return null;
+            }
+        } finally {
+            entityManager.close();
+        }
+        return ((MonitoredFeedsBean)result.get(0));
+    }
+
+    public void deleteMonitoringFeed(String feedName) {
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_INSTANCES);
+        q.setParameter("feedName", feedName);
+        try{
+            q.executeUpdate();
+        } finally {
+            commitAndCloseTransaction(entityManager);
+        }
+    }
+
+    public List<MonitoredFeedsBean> getAllMonitoredFeed() throws ResultNotFoundException{
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_FEEDS);
+        List result = q.getResultList();
+        try{
+            if (result.isEmpty()) {
+                throw new ResultNotFoundException("No Feed has been scheduled for monitoring.");
+            }
+        } finally {
+            entityManager.close();
+        }
+        return result;
+    }
+
+    public void deletePendingInstance(String feedName, String clusterName , Date nominalTime){
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES);
+        q.setParameter("feedName", feedName);
+        q.setParameter("clusterName", clusterName);
+        q.setParameter("nominalTime", nominalTime);
+        try{
+            q.executeUpdate();
+        } finally {
+            commitAndCloseTransaction(entityManager);
+        }
+    }
+
+    public void deletePendingInstances(String feedName, String clusterName){
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED);
+        q.setParameter("feedName", feedName);
+        q.setParameter("clusterName", clusterName);
+        try{
+            q.executeUpdate();
+        } finally {
+            commitAndCloseTransaction(entityManager);
+        }
+    }
+
+    public void putPendingInstances(String feed, String clusterName, Date nominalTime){
+        EntityManager entityManager = getEntityManager();
+        PendingInstanceBean pendingInstanceBean = new PendingInstanceBean();
+        pendingInstanceBean.setFeedName(feed);
+        pendingInstanceBean.setClusterName(clusterName);
+        pendingInstanceBean.setNominalTime(nominalTime);
+
+        beginTransaction(entityManager);
+        entityManager.persist(pendingInstanceBean);
+        commitAndCloseTransaction(entityManager);
+    }
+
+    public List<Date> getNominalInstances(String feedName, String clusterName) throws ResultNotFoundException{
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES);
+        q.setParameter("feedName", feedName);
+        q.setParameter("clusterName", clusterName);
+        List result = q.getResultList();
+        try{
+            if (CollectionUtils.isEmpty(result)) {
+                throw new ResultNotFoundException(feedName + " with " + clusterName + "Not Found");
+            }
+        } finally {
+            entityManager.close();
+        }
+        return result;
+    }
+    public List<PendingInstanceBean> getAllInstances(){
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES);
+        List result = q.getResultList();
+
+        try {
+            if (CollectionUtils.isEmpty(result)) {
+                return null;
+            }
+        } finally{
+            entityManager.close();
+        }
+        return result;
+    }
+
+    private void commitAndCloseTransaction(EntityManager entityManager) {
+        entityManager.getTransaction().commit();
+        entityManager.close();
+    }
+
+    private void beginTransaction(EntityManager entityManager) {
+        entityManager.getTransaction().begin();
+    }
+
+}