You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2015/11/26 11:28:25 UTC

[1/4] falcon git commit: FALCON-1234 State Store for instances scheduled by Falcon (Pavan Kolamuri)

Repository: falcon
Updated Branches:
  refs/heads/master f982f86c7 -> 2bf90130d


http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/resources/startup.properties
----------------------------------------------------------------------
diff --git a/scheduler/src/test/resources/startup.properties b/scheduler/src/test/resources/startup.properties
new file mode 100644
index 0000000..2e938ee
--- /dev/null
+++ b/scheduler/src/test/resources/startup.properties
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+*.domain=debug
+
+######### Implementation classes #########
+## DONT MODIFY UNLESS SURE ABOUT CHANGE ##
+
+*.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine
+*.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder
+*.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder
+*.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager
+*.ConfigSyncService.impl=org.apache.falcon.resource.ConfigSyncService
+*.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager
+*.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService
+
+##### Falcon Services #####
+*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
+                        org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
+                        org.apache.falcon.service.ProcessSubscriberService,\
+                        org.apache.falcon.entity.store.ConfigurationStore,\
+                        org.apache.falcon.rerun.service.RetryService,\
+                        org.apache.falcon.rerun.service.LateRunService,\
+                        org.apache.falcon.notification.service.impl.JobCompletionService,\
+                        org.apache.falcon.notification.service.impl.SchedulerService,\
+                        org.apache.falcon.notification.service.impl.AlarmService,\
+                        org.apache.falcon.notification.service.impl.DataAvailabilityService,\
+                        org.apache.falcon.execution.FalconExecutionService,\
+                        org.apache.falcon.state.store.service.FalconJPAService
+
+##### Falcon Configuration Store Change listeners #####
+*.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
+                        org.apache.falcon.entity.ColoClusterRelation,\
+                        org.apache.falcon.group.FeedGroupMap,\
+                        org.apache.falcon.entity.store.FeedLocationStore
+
+##### JMS MQ Broker Implementation class #####
+*.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
+
+
+######### System startup parameters #########
+
+# Location to store user entity configurations
+debug.config.store.uri=file://${user.dir}/target/store
+debug.config.store.persist=false
+debug.config.oozie.conf.uri=${user.dir}/target/oozie
+debug.system.lib.location=${system.lib.location}
+debug.broker.url=vm://localhost
+debug.retry.recorder.path=${user.dir}/target/retry
+debug.libext.feed.retention.paths=${falcon.libext}
+debug.libext.feed.replication.paths=${falcon.libext}
+debug.libext.process.paths=${falcon.libext}
+
+*.falcon.cleanup.service.frequency=minutes(5)
+
+
+######### Properties for configuring JMS provider - activemq #########
+# Default Active MQ url
+*.broker.url=tcp://localhost:61616
+
+# default time-to-live for a JMS message 3 days (time in minutes)
+*.broker.ttlInMins=4320
+*.entity.topic=FALCON.ENTITY.TOPIC
+*.max.retry.failure.count=1
+*.retry.recorder.path=${user.dir}/logs/retry
+
+######### Properties for configuring iMon client and metric #########
+*.internal.queue.size=1000
+
+
+##### List of shared libraries for Falcon workflows #####
+*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3
+
+######### Authentication Properties #########
+
+# Authentication type must be specified: simple|kerberos
+*.falcon.authentication.type=simple
+
+##### Service Configuration
+
+# Indicates the Kerberos principal to be used in Falcon Service.
+*.falcon.service.authentication.kerberos.principal=
+
+# Location of the keytab file with the credentials for the Service principal.
+*.falcon.service.authentication.kerberos.keytab=
+
+# name node principal to talk to config store
+*.dfs.namenode.kerberos.principal=
+
+##### SPNEGO Configuration
+
+# Authentication type must be specified: simple|kerberos|<class>
+# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility
+*.falcon.http.authentication.type=simple
+
+# Indicates how long (in seconds) an authentication token is valid before it has to be renewed.
+*.falcon.http.authentication.token.validity=36000
+
+# The signature secret for signing the authentication tokens.
+*.falcon.http.authentication.signature.secret=falcon
+
+# The domain to use for the HTTP cookie that stores the authentication token.
+*.falcon.http.authentication.cookie.domain=
+
+# Indicates if anonymous requests are allowed when using 'simple' authentication.
+*.falcon.http.authentication.simple.anonymous.allowed=false
+
+# Indicates the Kerberos principal to be used for HTTP endpoint.
+# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
+*.falcon.http.authentication.kerberos.principal=
+
+# Location of the keytab file with the credentials for the HTTP principal.
+*.falcon.http.authentication.kerberos.keytab=
+
+# The kerberos names rules is to resolve kerberos principal names, refer to Hadoop's KerberosName for more details.
+*.falcon.http.authentication.kerberos.name.rules=DEFAULT
+
+# Comma separated list of black listed users
+*.falcon.http.authentication.blacklisted.users=
+
+
+######## StateStore Properties #####
+*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true
+*.falcon.statestore.jdbc.username=sa
+*.falcon.statestore.jdbc.password=
+*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+# Maximum number of active connections that can be allocated from this pool at the same time.
+*.falcon.statestore.pool.max.active.conn=10
+*.falcon.statestore.connection.properties=
+# Indicates the interval (in milliseconds) between eviction runs.
+*.falcon.statestore.validate.db.connection.eviction.interval=300000
+# The number of objects to examine during each run of the idle object evictor thread.
+*.falcon.statestore.validate.db.connection.eviction.num=10
+# Creates Falcon DB.
+# If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+# If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+*.falcon.statestore.create.db.schema=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/src/bin/falcon-db.sh
----------------------------------------------------------------------
diff --git a/src/bin/falcon-db.sh b/src/bin/falcon-db.sh
new file mode 100644
index 0000000..415fd5d
--- /dev/null
+++ b/src/bin/falcon-db.sh
@@ -0,0 +1,49 @@
+#!/bin/bash
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License. See accompanying LICENSE file.
+#
+
+# resolve links - $0 may be a softlink
+PRG="${0}"
+
+while [ -h "${PRG}" ]; do
+  ls=`ls -ld "${PRG}"`
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '/.*' > /dev/null; then
+    PRG="$link"
+  else
+    PRG=`dirname "${PRG}"`/"$link"
+  fi
+done
+
+BASEDIR=`dirname ${PRG}`
+BASEDIR=`cd ${BASEDIR}/..;pwd`
+
+. ${BASEDIR}/bin/falcon-config.sh 'server' falcon
+
+if test -z ${JAVA_HOME}
+then
+    JAVA_BIN=java
+else
+    JAVA_BIN=${JAVA_HOME}/bin/java
+fi
+
+while [[ ${1} =~ ^\-D ]]; do
+  JAVA_PROPERTIES="${JAVA_PROPERTIES} ${1}"
+  shift
+done
+
+# set the client class path
+FALCONCPPATH="$FALCONCPPATH:${BASEDIR}/client/lib/*"
+
+${JAVA_BIN} ${JAVA_PROPERTIES} -cp ${FALCONCPPATH} org.apache.falcon.tools.FalconStateStoreDBCLI "${@}"

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index ce6e91f..5ddba01 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -57,7 +57,8 @@
 #                        org.apache.falcon.notification.service.impl.SchedulerService,\
 #                        org.apache.falcon.notification.service.impl.AlarmService,\
 #                        org.apache.falcon.notification.service.impl.DataAvailabilityService,\
-#                        org.apache.falcon.execution.FalconExecutionService
+#                        org.apache.falcon.execution.FalconExecutionService,\
+#                        org.apache.falcon.state.store.service.FalconJPAService
 
 ##### Prism Services #####
 prism.application.services=org.apache.falcon.service.LifecyclePolicyMap,\
@@ -262,3 +263,22 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 # Setting monitoring plugin, if SMTP parameters is defined
 #*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\
 #                     org.apache.falcon.plugin.EmailNotificationPlugin
+
+######### StateStore Properties #####
+#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+#*.falcon.statestore.jdbc.url=jdbc:derby:data/statestore.db;create=true
+#*.falcon.statestore.jdbc.username=sa
+#*.falcon.statestore.jdbc.password=
+#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+## Maximum number of active connections that can be allocated from this pool at the same time.
+#*.falcon.statestore.pool.max.active.conn=10
+#*.falcon.statestore.connection.properties=
+## Indicates the interval (in milliseconds) between eviction runs.
+#*.falcon.statestore.validate.db.connection.eviction.interval=300000
+## The number of objects to examine during each run of the idle object evictor thread.
+#*.falcon.statestore.validate.db.connection.eviction.num=10
+## Creates Falcon DB.
+## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+#*.falcon.statestore.create.db.schema=true

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/src/main/assemblies/distributed-package.xml
----------------------------------------------------------------------
diff --git a/src/main/assemblies/distributed-package.xml b/src/main/assemblies/distributed-package.xml
index 794eaef..ebd1745 100644
--- a/src/main/assemblies/distributed-package.xml
+++ b/src/main/assemblies/distributed-package.xml
@@ -41,6 +41,11 @@
         </fileSet>
 
         <fileSet>
+            <directory>scheduler/target/dependency</directory>
+            <outputDirectory>client/lib</outputDirectory>
+        </fileSet>
+
+        <fileSet>
             <directory>oozie-el-extensions/target/dependency</directory>
             <outputDirectory>oozie/libext</outputDirectory>
         </fileSet>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/src/main/assemblies/standalone-package.xml
----------------------------------------------------------------------
diff --git a/src/main/assemblies/standalone-package.xml b/src/main/assemblies/standalone-package.xml
index fcff8d7..b88aec3 100644
--- a/src/main/assemblies/standalone-package.xml
+++ b/src/main/assemblies/standalone-package.xml
@@ -40,6 +40,11 @@
         </fileSet>
 
         <fileSet>
+            <directory>scheduler/target/dependency</directory>
+            <outputDirectory>client/lib</outputDirectory>
+        </fileSet>
+
+        <fileSet>
             <directory>oozie-el-extensions/target/dependency</directory>
             <outputDirectory>oozie/libext</outputDirectory>
         </fileSet>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/unit/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/unit/src/main/resources/startup.properties b/unit/src/main/resources/startup.properties
index fe6f430..4576e0b 100644
--- a/unit/src/main/resources/startup.properties
+++ b/unit/src/main/resources/startup.properties
@@ -127,3 +127,21 @@ debug.libext.process.paths=${falcon.libext}
 # Comma separated list of black listed users
 *.falcon.http.authentication.blacklisted.users=
 
+######### StateStore Properties #####
+#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+#*.falcon.statestore.jdbc.url=jdbc:derby:data/statestore.db;create=true
+#*.falcon.statestore.jdbc.username=sa
+#*.falcon.statestore.jdbc.password=
+#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+## Maximum number of active connections that can be allocated from this pool at the same time.
+#*.falcon.statestore.pool.max.active.conn=10
+#*.falcon.statestore.connection.properties=
+## Indicates the interval (in milliseconds) between eviction runs.
+#*.falcon.statestore.validate.db.connection.eviction.interval=300000
+## The number of objects to examine during each run of the idle object evictor thread.
+#*.falcon.statestore.validate.db.connection.eviction.num=10
+## Creates Falcon DB.
+## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+#*.falcon.statestore.create.db.schema=true


[2/4] falcon git commit: FALCON-1234 State Store for instances scheduled by Falcon (Pavan Kolamuri)

Posted by pa...@apache.org.
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java b/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
new file mode 100644
index 0000000..72d1aba
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store.service;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.service.FalconService;
+import org.apache.falcon.state.store.jdbc.EntityBean;
+import org.apache.falcon.state.store.jdbc.InstanceBean;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import java.text.MessageFormat;
+import java.util.Properties;
+
+/**
+ * Service that manages JPA.
+ */
+public final class FalconJPAService implements FalconService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FalconJPAService.class);
+    public static final String PREFIX = "falcon.statestore.";
+
+    public static final String DB_SCHEMA = PREFIX + "schema.name";
+    public static final String URL = PREFIX + "jdbc.url";
+    public static final String DRIVER = PREFIX + "jdbc.driver";
+    public static final String USERNAME = PREFIX + "jdbc.username";
+    public static final String PASSWORD = PREFIX + "jdbc.password";
+    public static final String CONN_DATA_SOURCE = PREFIX + "connection.data.source";
+    public static final String CONN_PROPERTIES = PREFIX + "connection.properties";
+    public static final String MAX_ACTIVE_CONN = PREFIX + "pool.max.active.conn";
+    public static final String CREATE_DB_SCHEMA = PREFIX + "create.db.schema";
+    public static final String VALIDATE_DB_CONN = PREFIX + "validate.db.connection";
+    public static final String VALIDATE_DB_CONN_EVICTION_INTERVAL = PREFIX + "validate.db.connection.eviction.interval";
+    public static final String VALIDATE_DB_CONN_EVICTION_NUM = PREFIX + "validate.db.connection.eviction.num";
+
+    private EntityManagerFactory entityManagerFactory;
+    // Persistent Unit which is defined in persistence.xml
+    private String persistenceUnit;
+    private static final FalconJPAService FALCON_JPA_SERVICE = new FalconJPAService();
+
+    private FalconJPAService() {
+    }
+
+    public static FalconJPAService get() {
+        return FALCON_JPA_SERVICE;
+    }
+
+    public EntityManagerFactory getEntityManagerFactory() {
+        return entityManagerFactory;
+    }
+
+    public void setPersistenceUnit(String dbType) {
+        if (StringUtils.isEmpty(dbType)) {
+            throw new IllegalArgumentException(" DB type cannot be null or empty");
+        }
+        dbType = dbType.split(":")[0];
+        this.persistenceUnit = "falcon-" + dbType;
+    }
+
+    @Override
+    public String getName() {
+        return this.getClass().getSimpleName();
+    }
+
+    @Override
+    public void init() throws FalconException {
+        Properties props = getPropsforStore();
+        entityManagerFactory = Persistence.
+                createEntityManagerFactory(persistenceUnit, props);
+        EntityManager entityManager = getEntityManager();
+        entityManager.find(EntityBean.class, 1);
+        entityManager.find(InstanceBean.class, 1);
+        LOG.info("All entities initialized");
+
+        // need to use a pseudo no-op transaction so all entities, datasource
+        // and connection pool are initialized one time only
+        entityManager.getTransaction().begin();
+        OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) entityManagerFactory;
+        // Mask the password with '***'
+        String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,");
+        LOG.info("JPA configuration: {0}", logMsg);
+        entityManager.getTransaction().commit();
+        entityManager.close();
+    }
+
+    private Properties getPropsforStore() throws FalconException {
+        String dbSchema = StartupProperties.get().getProperty(DB_SCHEMA);
+        String url = StartupProperties.get().getProperty(URL);
+        String driver = StartupProperties.get().getProperty(DRIVER);
+        String user = StartupProperties.get().getProperty(USERNAME);
+        String password = StartupProperties.get().getProperty(PASSWORD).trim();
+        String maxConn = StartupProperties.get().getProperty(MAX_ACTIVE_CONN).trim();
+        String dataSource = StartupProperties.get().getProperty(CONN_DATA_SOURCE);
+        String connPropsConfig = StartupProperties.get().getProperty(CONN_PROPERTIES);
+        boolean autoSchemaCreation = Boolean.parseBoolean(StartupProperties.get().getProperty(CREATE_DB_SCHEMA,
+                "false"));
+        boolean validateDbConn = Boolean.parseBoolean(StartupProperties.get().getProperty(VALIDATE_DB_CONN, "true"));
+        String evictionInterval = StartupProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_INTERVAL).trim();
+        String evictionNum = StartupProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_NUM).trim();
+
+        if (!url.startsWith("jdbc:")) {
+            throw new FalconException("invalid JDBC URL, must start with 'jdbc:'" + url);
+        }
+        String dbType = url.substring("jdbc:".length());
+        if (dbType.indexOf(":") <= 0) {
+            throw new FalconException("invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'" + url);
+        }
+        setPersistenceUnit(dbType);
+        String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}";
+        connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn);
+        Properties props = new Properties();
+        if (autoSchemaCreation) {
+            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
+            props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");
+        } else if (validateDbConn) {
+            // validation can be done only if the schema already exist, else a
+            // connection cannot be obtained to create the schema.
+            String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval;
+            String num = "numTestsPerEvictionRun=" + evictionNum;
+            connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num;
+            connProps += ",ValidationQuery=select count(*) from ENTITIES";
+            connProps = MessageFormat.format(connProps, dbSchema);
+        } else {
+            connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
+        }
+        if (connPropsConfig != null) {
+            connProps += "," + connPropsConfig;
+        }
+        props.setProperty("openjpa.ConnectionProperties", connProps);
+        props.setProperty("openjpa.ConnectionDriverName", dataSource);
+        return props;
+    }
+
+    @Override
+    public void destroy() throws FalconException {
+        if (entityManagerFactory.isOpen()) {
+            entityManagerFactory.close();
+        }
+    }
+
+
+    /**
+     * Return an EntityManager. Used by the StoreService.
+     *
+     * @return an entity manager
+     */
+    public EntityManager getEntityManager() {
+        return getEntityManagerFactory().createEntityManager();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
new file mode 100644
index 0000000..381b0b3
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.tools;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.falcon.cli.CLIParser;
+import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.util.BuildProperties;
+import org.apache.falcon.util.StartupProperties;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Command Line utility for Table Creation, Update.
+ */
+public class FalconStateStoreDBCLI {
+    public static final String HELP_CMD = "help";
+    public static final String VERSION_CMD = "version";
+    public static final String CREATE_CMD = "create";
+    public static final String SQL_FILE_OPT = "sqlfile";
+    public static final String RUN_OPT = "run";
+    public static final String UPGRADE_CMD = "upgrade";
+
+    // Represents whether DB instance exists or not.
+    private boolean instanceExists;
+    private static final String[] FALCON_HELP = {"Falcon DB initialization tool currently supports Derby DB"};
+
+    public static void main(String[] args) {
+        new FalconStateStoreDBCLI().run(args);
+    }
+
+    public FalconStateStoreDBCLI() {
+        instanceExists = false;
+    }
+
+    protected Options getOptions() {
+        Option sqlfile = new Option(SQL_FILE_OPT, true,
+                "Generate SQL script instead of creating/upgrading the DB schema");
+        Option run = new Option(RUN_OPT, false, "Confirmation option regarding DB schema creation/upgrade");
+        Options options = new Options();
+        options.addOption(sqlfile);
+        options.addOption(run);
+        return options;
+    }
+
+    public synchronized int run(String[] args) {
+        if (instanceExists) {
+            throw new IllegalStateException("CLI instance already used");
+        }
+        instanceExists = true;
+
+        CLIParser parser = new CLIParser("falcondb", FALCON_HELP);
+        parser.addCommand(HELP_CMD, "", "Display usage for all commands or specified command", new Options(), false);
+        parser.addCommand(VERSION_CMD, "", "Show Falcon DB version information", new Options(), false);
+        parser.addCommand(CREATE_CMD, "", "Create Falcon DB schema", getOptions(), false);
+        parser.addCommand(UPGRADE_CMD, "", "Upgrade Falcon DB schema", getOptions(), false);
+
+        try {
+            CLIParser.Command command = parser.parse(args);
+            if (command.getName().equals(HELP_CMD)) {
+                parser.showHelp();
+            } else if (command.getName().equals(VERSION_CMD)) {
+                showVersion();
+            } else {
+                if (!command.getCommandLine().hasOption(SQL_FILE_OPT)
+                        && !command.getCommandLine().hasOption(RUN_OPT)) {
+                    throw new Exception("'-sqlfile <FILE>' or '-run' options must be specified");
+                }
+                CommandLine commandLine = command.getCommandLine();
+                String sqlFile = (commandLine.hasOption(SQL_FILE_OPT))
+                        ? commandLine.getOptionValue(SQL_FILE_OPT)
+                        : File.createTempFile("falcondb-", ".sql").getAbsolutePath();
+                boolean run = commandLine.hasOption(RUN_OPT);
+                if (command.getName().equals(CREATE_CMD)) {
+                    createDB(sqlFile, run);
+                } else if (command.getName().equals(UPGRADE_CMD)) {
+                    upgradeDB(sqlFile, run);
+                }
+                System.out.println("The SQL commands have been written to: " + sqlFile);
+                if (!run) {
+                    System.out.println("WARN: The SQL commands have NOT been executed, you must use the '-run' option");
+                }
+            }
+            return 0;
+        } catch (ParseException ex) {
+            System.err.println("Invalid sub-command: " + ex.getMessage());
+            System.err.println();
+            System.err.println(parser.shortHelp());
+            return 1;
+        } catch (Exception ex) {
+            System.err.println();
+            System.err.println("Error: " + ex.getMessage());
+            System.err.println();
+            System.err.println("Stack trace for the error was (for debug purposes):");
+            System.err.println("--------------------------------------");
+            ex.printStackTrace(System.err);
+            System.err.println("--------------------------------------");
+            System.err.println();
+            return 1;
+        }
+    }
+
+    private void upgradeDB(String sqlFile, boolean run) throws Exception {
+        validateConnection();
+        if (!checkDBExists()) {
+            throw new Exception("Falcon DB doesn't exist");
+        }
+        String falconVersion = BuildProperties.get().getProperty("project.version");
+        String dbVersion = getFalconDBVersion();
+        if (dbVersion.compareTo(falconVersion) >= 0) {
+            System.out.println("Falcon DB already upgraded to Falcon version '" + falconVersion + "'");
+            return;
+        }
+
+        createUpgradeDB(sqlFile, run, false);
+        upgradeFalconDBVersion(sqlFile, run, falconVersion);
+
+        // any post upgrade tasks
+        if (run) {
+            System.out.println("Falcon DB has been upgraded to Falcon version '" + falconVersion + "'");
+        }
+    }
+
+
+    private void upgradeFalconDBVersion(String sqlFile, boolean run, String version) throws Exception {
+        String updateDBVersion = "update FALCON_DB_PROPS set data='" + version + "' where name='db.version'";
+        PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
+        writer.println();
+        writer.println(updateDBVersion);
+        writer.close();
+        System.out.println("Upgrade db.version in FALCON_DB_PROPS table to " + version);
+        if (run) {
+            Connection conn = createConnection();
+            Statement st = null;
+            try {
+                conn.setAutoCommit(true);
+                st = conn.createStatement();
+                st.executeUpdate(updateDBVersion);
+                st.close();
+            } catch (Exception ex) {
+                throw new Exception("Could not upgrade db.version in FALCON_DB_PROPS table: " + ex.toString(), ex);
+            } finally {
+                closeStatement(st);
+                conn.close();
+            }
+        }
+        System.out.println("DONE");
+    }
+
+    private static final String GET_FALCON_DB_VERSION = "select data from FALCON_DB_PROPS where name = 'db.version'";
+
+    private String getFalconDBVersion() throws Exception {
+        String version;
+        System.out.println("Get Falcon DB version");
+        Connection conn = createConnection();
+        Statement st = null;
+        ResultSet rs = null;
+        try {
+            st = conn.createStatement();
+            rs = st.executeQuery(GET_FALCON_DB_VERSION);
+            if (rs.next()) {
+                version = rs.getString(1);
+            } else {
+                throw new Exception("ERROR: Could not find Falcon DB 'db.version' in FALCON_DB_PROPS table");
+            }
+        } catch (Exception ex) {
+            throw new Exception("ERROR: Could not query FALCON_DB_PROPS table: " + ex.toString(), ex);
+        } finally {
+            closeResultSet(rs);
+            closeStatement(st);
+            conn.close();
+        }
+        System.out.println("DONE");
+        return version;
+    }
+
+
+    private Map<String, String> getJdbcConf() throws Exception {
+        Map<String, String> jdbcConf = new HashMap<String, String>();
+        jdbcConf.put("driver", StartupProperties.get().getProperty(FalconJPAService.DRIVER));
+        String url = StartupProperties.get().getProperty(FalconJPAService.URL);
+        jdbcConf.put("url", url);
+        jdbcConf.put("user", StartupProperties.get().getProperty(FalconJPAService.USERNAME));
+        jdbcConf.put("password", StartupProperties.get().getProperty(FalconJPAService.PASSWORD));
+        String dbType = url.substring("jdbc:".length());
+        if (dbType.indexOf(":") <= 0) {
+            throw new RuntimeException("Invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");
+        }
+        dbType = dbType.substring(0, dbType.indexOf(":"));
+        jdbcConf.put("dbtype", dbType);
+        return jdbcConf;
+    }
+
+    private String[] createMappingToolArguments(String sqlFile) throws Exception {
+        Map<String, String> conf = getJdbcConf();
+        List<String> args = new ArrayList<String>();
+        args.add("-schemaAction");
+        args.add("add");
+        args.add("-p");
+        args.add("persistence.xml#falcon-" + conf.get("dbtype"));
+        args.add("-connectionDriverName");
+        args.add(conf.get("driver"));
+        args.add("-connectionURL");
+        args.add(conf.get("url"));
+        args.add("-connectionUserName");
+        args.add(conf.get("user"));
+        args.add("-connectionPassword");
+        args.add(conf.get("password"));
+        if (sqlFile != null) {
+            args.add("-sqlFile");
+            args.add(sqlFile);
+        }
+        args.add("-indexes");
+        args.add("true");
+        args.add("org.apache.falcon.state.store.jdbc.EntityBean");
+        args.add("org.apache.falcon.state.store.jdbc.InstanceBean");
+        return args.toArray(new String[args.size()]);
+    }
+
+    private void createDB(String sqlFile, boolean run) throws Exception {
+        validateConnection();
+        if (checkDBExists()) {
+            return;
+        }
+
+        verifyFalconPropsTable(false);
+        createUpgradeDB(sqlFile, run, true);
+        createFalconPropsTable(sqlFile, run, BuildProperties.get().getProperty("project.version"));
+        if (run) {
+            System.out.println("Falcon DB has been created for Falcon version '"
+                    + BuildProperties.get().getProperty("project.version") + "'");
+        }
+    }
+
+    private static final String CREATE_FALCON_DB_PROPS =
+            "create table FALCON_DB_PROPS (name varchar(100), data varchar(100))";
+
+    private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception {
+        String insertDbVerion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";
+
+        PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
+        writer.println();
+        writer.println(CREATE_FALCON_DB_PROPS);
+        writer.println(insertDbVerion);
+        writer.close();
+        System.out.println("Create FALCON_DB_PROPS table");
+        if (run) {
+            Connection conn = createConnection();
+            Statement st = null;
+            try {
+                conn.setAutoCommit(true);
+                st = conn.createStatement();
+                st.executeUpdate(CREATE_FALCON_DB_PROPS);
+                st.executeUpdate(insertDbVerion);
+                st.close();
+            } catch (Exception ex) {
+                closeStatement(st);
+                throw new Exception("Could not create FALCON_DB_PROPS table: " + ex.toString(), ex);
+            } finally {
+                conn.close();
+            }
+        }
+        System.out.println("DONE");
+    }
+
+    private static final String FALCON_DB_PROPS_EXISTS = "select count(*) from FALCON_DB_PROPS";
+
+    private boolean verifyFalconPropsTable(boolean exists) throws Exception {
+        System.out.println((exists) ? "Check FALCON_DB_PROPS table exists"
+                : "Checking FALCON_DB_PROPS table does not exist");
+        boolean tableExists;
+        Connection conn = createConnection();
+        Statement st = null;
+        ResultSet rs = null;
+        try {
+            st = conn.createStatement();
+            rs = st.executeQuery(FALCON_DB_PROPS_EXISTS);
+            rs.next();
+            tableExists = true;
+        } catch (Exception ex) {
+            tableExists = false;
+        } finally {
+            closeResultSet(rs);
+            closeStatement(st);
+            conn.close();
+        }
+        if (tableExists != exists) {
+            throw new Exception("FALCON_DB_PROPS_TABLE table " + ((exists) ? "does not exist" : "exists"));
+        }
+        System.out.println("DONE");
+        return tableExists;
+    }
+
+    private void closeResultSet(ResultSet rs) {
+        try {
+            if (rs != null) {
+                rs.close();
+            }
+        } catch (Exception e) {
+            System.out.println("Unable to close ResultSet " + rs);
+        }
+    }
+
+    private void closeStatement(Statement st) throws Exception {
+        try {
+            if (st != null) {
+                st.close();
+            }
+        } catch (Exception e) {
+            System.out.println("Unable to close SQL Statement " + st);
+            throw new Exception(e);
+        }
+    }
+
+    private Connection createConnection() throws Exception {
+        Map<String, String> conf = getJdbcConf();
+        Class.forName(conf.get("driver")).newInstance();
+        return DriverManager.getConnection(conf.get("url"), conf.get("user"), conf.get("password"));
+    }
+
+    private void validateConnection() throws Exception {
+        System.out.println("Validating DB Connection");
+        try {
+            createConnection().close();
+            System.out.println("DONE");
+        } catch (Exception ex) {
+            throw new Exception("Could not connect to the database: " + ex.toString(), ex);
+        }
+    }
+
+    private static final String ENTITY_STATUS_QUERY =
+            "select count(*) from ENTITIES where current_state IN ('RUNNING', 'SUSPENDED')";
+    private static final String INSTANCE_STATUS_QUERY =
+            "select count(*) from INSTANCES where current_state IN ('RUNNING', 'SUSPENDED')";
+
+    private boolean checkDBExists() throws Exception {
+        boolean schemaExists;
+        Connection conn = createConnection();
+        ResultSet rs =  null;
+        Statement st = null;
+        try {
+            st = conn.createStatement();
+            rs = st.executeQuery(ENTITY_STATUS_QUERY);
+            rs.next();
+            schemaExists = true;
+        } catch (Exception ex) {
+            schemaExists = false;
+        } finally {
+            closeResultSet(rs);
+            closeStatement(st);
+            conn.close();
+        }
+        System.out.println("DB schema " + ((schemaExists) ? "exists" : "does not exist"));
+        return schemaExists;
+    }
+
+    private void createUpgradeDB(String sqlFile, boolean run, boolean create) throws Exception {
+        System.out.println((create) ? "Create SQL schema" : "Upgrade SQL schema");
+        String[] args = createMappingToolArguments(sqlFile);
+        org.apache.openjpa.jdbc.meta.MappingTool.main(args);
+        if (run) {
+            args = createMappingToolArguments(null);
+            org.apache.openjpa.jdbc.meta.MappingTool.main(args);
+        }
+        System.out.println("DONE");
+    }
+
+    private void showVersion() throws Exception {
+        System.out.println("Falcon Server version: "
+                + BuildProperties.get().getProperty("project.version"));
+        validateConnection();
+        if (!checkDBExists()) {
+            throw new Exception("Falcon DB doesn't exist");
+        }
+        try {
+            verifyFalconPropsTable(true);
+        } catch (Exception ex) {
+            throw new Exception("ERROR: It seems this Falcon DB was never upgraded with the 'falcondb' tool");
+        }
+        showFalconPropsInfo();
+    }
+
+    private static final String GET_FALCON_PROPS_INFO = "select name, data from FALCON_DB_PROPS order by name";
+
+    private void showFalconPropsInfo() throws Exception {
+        Connection conn = createConnection();
+        Statement st = null;
+        ResultSet rs = null;
+        try {
+            System.out.println("Falcon DB Version Information");
+            System.out.println("--------------------------------------");
+            st = conn.createStatement();
+            rs = st.executeQuery(GET_FALCON_PROPS_INFO);
+            while (rs.next()) {
+                System.out.println(rs.getString(1) + ": " + rs.getString(2));
+            }
+            System.out.println("--------------------------------------");
+        } catch (Exception ex) {
+            throw new Exception("ERROR querying FALCON_DB_PROPS table: " + ex.toString(), ex);
+        } finally {
+            closeResultSet(rs);
+            closeStatement(st);
+            conn.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/scheduler/src/main/resources/META-INF/persistence.xml b/scheduler/src/main/resources/META-INF/persistence.xml
new file mode 100644
index 0000000..86558de
--- /dev/null
+++ b/scheduler/src/main/resources/META-INF/persistence.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<persistence xmlns="http://java.sun.com/xml/ns/persistence"
+             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+             version="1.0">
+
+    <persistence-unit name="falcon-derby" transaction-type="RESOURCE_LOCAL">
+        <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+
+        <class>org.apache.falcon.state.store.jdbc.EntityBean</class>
+        <class>org.apache.falcon.state.store.jdbc.InstanceBean</class>
+
+        <properties>
+            <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
+
+            <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
+
+            <property name="openjpa.MetaDataFactory"
+                      value="jpa(Types=org.apache.falcon.state.store.EntityBean;
+                org.apache.falcon.state.store.InstanceBean)"></property>
+
+            <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
+            <property name="openjpa.LockManager" value="pessimistic"/>
+            <property name="openjpa.ReadLockLevel" value="read"/>
+            <property name="openjpa.WriteLockLevel" value="write"/>
+            <property name="openjpa.jdbc.TransactionIsolation" value="read-committed"/> <!--CUSTOM-->
+            <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
+            <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
+            <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
+            <property name="openjpa.Log" value="log4j"/>
+        </properties>
+    </persistence-unit>
+
+</persistence>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/resources/falcon-buildinfo.properties
----------------------------------------------------------------------
diff --git a/scheduler/src/main/resources/falcon-buildinfo.properties b/scheduler/src/main/resources/falcon-buildinfo.properties
new file mode 100644
index 0000000..5a7cb82
--- /dev/null
+++ b/scheduler/src/main/resources/falcon-buildinfo.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######################
+*.domain=all
+
+*.build.user=${user.name}
+*.build.epoch=${timestamp}
+*.project.version=${pom.version}
+*.build.version=${pom.version}-r${buildNumber}
+*.vc.revision=${buildNumber}
+*.vc.source.url=${scm.connection}
+######################

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
index bff92c9..2a9fbce 100644
--- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
@@ -19,7 +19,6 @@ package org.apache.falcon.execution;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.AbstractTestBase;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
@@ -36,6 +35,7 @@ import org.apache.falcon.notification.service.impl.DataAvailabilityService;
 import org.apache.falcon.notification.service.impl.JobCompletionService;
 import org.apache.falcon.notification.service.impl.SchedulerService;
 import org.apache.falcon.service.Services;
+import org.apache.falcon.state.AbstractSchedulerTestBase;
 import org.apache.falcon.state.EntityClusterID;
 import org.apache.falcon.state.EntityID;
 import org.apache.falcon.state.EntityState;
@@ -43,7 +43,8 @@ import org.apache.falcon.state.ID;
 import org.apache.falcon.state.InstanceID;
 import org.apache.falcon.state.InstanceState;
 import org.apache.falcon.state.store.AbstractStateStore;
-import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.state.store.StateStore;
+import org.apache.falcon.state.store.service.FalconJPAService;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.engine.DAGEngine;
 import org.apache.falcon.workflow.engine.DAGEngineFactory;
@@ -59,6 +60,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -68,32 +70,38 @@ import java.util.Iterator;
 /**
  * Tests the API of FalconExecution Service and in turn the FalconExecutionService.get()s.
  */
-public class FalconExecutionServiceTest extends AbstractTestBase {
+public class FalconExecutionServiceTest extends AbstractSchedulerTestBase {
 
-    private InMemoryStateStore stateStore = null;
+    private StateStore stateStore = null;
     private AlarmService mockTimeService;
     private DataAvailabilityService mockDataService;
     private SchedulerService mockSchedulerService;
     private JobCompletionService mockCompletionService;
     private DAGEngine dagEngine;
     private int instanceCount = 0;
+    private static FalconJPAService falconJPAService = FalconJPAService.get();
 
     @BeforeClass
     public void init() throws Exception {
         this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
         this.conf = dfsCluster.getConf();
         setupServices();
+        super.setup();
+        createDB(DB_SQL_FILE);
+        falconJPAService.init();
         setupConfigStore();
     }
 
     @AfterClass
-    public void tearDown() {
+    public void tearDown() throws FalconException, IOException {
+        super.cleanup();
         this.dfsCluster.shutdown();
+        falconJPAService.destroy();
     }
 
     // State store is set up to sync with Config Store. That gets tested too.
     public void setupConfigStore() throws Exception {
-        stateStore = (InMemoryStateStore) AbstractStateStore.get();
+        stateStore = AbstractStateStore.get();
         getStore().registerListener(stateStore);
         storeEntity(EntityType.CLUSTER, "testCluster");
         storeEntity(EntityType.FEED, "clicksFeed");
@@ -160,6 +168,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         FalconExecutionService.get().onEvent(event);
 
         // Ensure the instance is ready for execution
+        instance = stateStore.getExecutionInstance(new InstanceID(instance.getInstance()));
         Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.READY);
 
         // Simulate a scheduled notification
@@ -211,12 +220,15 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         FalconExecutionService.get().onEvent(event);
 
         // One in ready and one in waiting. Both should be suspended.
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY);
         Assert.assertEquals(instance1.getInstance().getAwaitingPredicates().size(), 0);
         Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.WAITING);
 
         FalconExecutionService.get().suspend(process);
 
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+        instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUSPENDED);
         Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.SUSPENDED);
         Mockito.verify(mockDataService).unregister(FalconExecutionService.get(),
@@ -225,7 +237,11 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
                 instance2.getInstance().getId());
         Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), executorID);
 
+        Mockito.verify(mockDataService).unregister(FalconExecutionService.get(), executorID);
+
         FalconExecutionService.get().resume(process);
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+        instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY);
         Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.WAITING);
 
@@ -237,17 +253,22 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         FalconExecutionService.get().onEvent(event);
 
         // One in running and the other in ready. Both should be suspended
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
         Mockito.when(dagEngine.isScheduled(instance1.getInstance())).thenReturn(true);
+        instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
         Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
 
         FalconExecutionService.get().suspend(process);
 
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+        instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUSPENDED);
         Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.SUSPENDED);
 
         FalconExecutionService.get().resume(process);
-
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+        instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
         Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
 
@@ -255,6 +276,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instance1.getInstance());
         FalconExecutionService.get().onEvent(event);
 
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUCCEEDED);
     }
 
@@ -294,6 +316,9 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         FalconExecutionService.get().onEvent(event);
 
         // One in ready, one in waiting and one running.
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+        instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
+        instance3 = stateStore.getExecutionInstance(new InstanceID(instance3.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
         Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
         Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING);
@@ -329,6 +354,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
 
         FalconExecutionService.get().onEvent(dataEvent);
 
+        instanceState = stateStore.getExecutionInstance(new InstanceID(instanceState.getInstance()));
         Assert.assertEquals(instanceState.getCurrentState(), InstanceState.STATE.TIMED_OUT);
     }
 
@@ -390,6 +416,9 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         FalconExecutionService.get().onEvent(event);
 
         // One in ready, one in waiting and one running.
+        instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+        instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
+        instance3 = stateStore.getExecutionInstance(new InstanceID(instance3.getInstance()));
         Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
         Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
         Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING);
@@ -442,7 +471,9 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         EntityID processID = new EntityID(process);
 
         // Store couple of instances in store
-        stateStore.getEntity(processID).setCurrentState(EntityState.STATE.SCHEDULED);
+        EntityState entityState = stateStore.getEntity(processID);
+        entityState.setCurrentState(EntityState.STATE.SCHEDULED);
+        stateStore.updateEntity(entityState);
         ProcessExecutionInstance instance1 = new ProcessExecutionInstance(process,
                 new DateTime(System.currentTimeMillis() - 60 * 60 * 1000), clusterName);
         InstanceState instanceState1 = new InstanceState(instance1);
@@ -459,11 +490,13 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         // Simulate a scheduled notification. This should cause the reload from state store
         Event event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instanceState2.getInstance());
         FalconExecutionService.get().onEvent(event);
+        instanceState2 = stateStore.getExecutionInstance(new InstanceID(instanceState2.getInstance()));
         Assert.assertEquals(instanceState2.getCurrentState(), InstanceState.STATE.RUNNING);
 
         // Simulate a Job completion notification and ensure the instance resumes from where it left
         event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instanceState1.getInstance());
         FalconExecutionService.get().onEvent(event);
+        instanceState1 = stateStore.getExecutionInstance(new InstanceID(instanceState1.getInstance()));
         Assert.assertEquals(instanceState1.getCurrentState(), InstanceState.STATE.SUCCEEDED);
     }
 
@@ -500,6 +533,8 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
             Assert.fail("Exception expected.");
         } catch (Exception e) {
             // One instance must fail and the other not
+            instanceState1 = stateStore.getExecutionInstance(new InstanceID(instanceState1.getInstance()));
+            instanceState2 = stateStore.getExecutionInstance(new InstanceID(instanceState2.getInstance()));
             Assert.assertEquals(instanceState2.getCurrentState(), state);
             Assert.assertEquals(instanceState1.getCurrentState(), InstanceState.STATE.RUNNING);
         }
@@ -508,6 +543,8 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
         ((MockDAGEngine)dagEngine).removeFailInstance(instance1);
         m.invoke(FalconExecutionService.get(), process);
 
+        instanceState1 = stateStore.getExecutionInstance(new InstanceID(instanceState1.getInstance()));
+        instanceState2 = stateStore.getExecutionInstance(new InstanceID(instanceState2.getInstance()));
         // Both instances must be in expected state.
         Assert.assertEquals(instanceState2.getCurrentState(), state);
         Assert.assertEquals(instanceState1.getCurrentState(), state);

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
index 001f466..c43ccf0 100644
--- a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
@@ -40,7 +40,7 @@ import org.apache.falcon.state.ID;
 import org.apache.falcon.state.InstanceID;
 import org.apache.falcon.state.InstanceState;
 import org.apache.falcon.state.store.AbstractStateStore;
-import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.state.store.StateStore;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.engine.DAGEngine;
 import org.apache.falcon.workflow.engine.DAGEngineFactory;
@@ -63,10 +63,10 @@ import static org.apache.falcon.state.InstanceState.STATE;
  */
 public class SchedulerServiceTest extends AbstractTestBase {
 
-    private SchedulerService scheduler = Mockito.spy(new SchedulerService());
+    private SchedulerService scheduler;
     private NotificationHandler handler;
     private static String cluster = "testCluster";
-    private static InMemoryStateStore stateStore = (InMemoryStateStore) AbstractStateStore.get();
+    private static StateStore stateStore;
     private static DAGEngine mockDagEngine;
     private static Process process;
     private volatile boolean failed = false;
@@ -79,6 +79,10 @@ public class SchedulerServiceTest extends AbstractTestBase {
 
     @BeforeClass
     public void init() throws Exception {
+        StartupProperties.get().setProperty("falcon.state.store.impl",
+                "org.apache.falcon.state.store.InMemoryStateStore");
+        stateStore = AbstractStateStore.get();
+        scheduler = Mockito.spy(new SchedulerService());
         this.dfsCluster = EmbeddedCluster.newCluster(cluster);
         this.conf = dfsCluster.getConf();
         setupConfigStore();
@@ -97,6 +101,7 @@ public class SchedulerServiceTest extends AbstractTestBase {
         scheduler.init();
         StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName());
         mockDagEngine =  DAGEngineFactory.getDAGEngine("testCluster");
+
     }
 
     @AfterClass
@@ -199,7 +204,7 @@ public class SchedulerServiceTest extends AbstractTestBase {
                 WorkflowJob.Status.SUCCEEDED, DateTime.now()));
         // Dependency now satisfied. Now, the first instance should get scheduled after retry delay.
         Thread.sleep(100);
-        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
new file mode 100644
index 0000000..48c1426
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.tools.FalconStateStoreDBCLI;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * TestBase for tests in scheduler.
+ */
+public class AbstractSchedulerTestBase extends AbstractTestBase {
+    private static final String DB_BASE_DIR = "target/test-data/falcondb";
+    protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
+    protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
+    protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
+    protected static final String DB_UPDATE_SQL_FILE = DB_BASE_DIR + File.separator + "update.sql";
+    protected LocalFileSystem fs = new LocalFileSystem();
+
+    public void setup() throws Exception {
+        StartupProperties.get();
+        StartupProperties.get().setProperty(FalconJPAService.URL, url);
+        Configuration localConf = new Configuration();
+        fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
+        fs.mkdirs(new Path(DB_BASE_DIR));
+    }
+
+    public void cleanup() throws IOException {
+        cleanupDB();
+    }
+
+    private void cleanupDB() throws IOException {
+        fs.delete(new Path(DB_BASE_DIR), true);
+    }
+
+    protected void createDB(String file) {
+        File sqlFile = new File(file);
+        String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" };
+        int result = execDBCLICommands(argsCreate);
+        Assert.assertEquals(0, result);
+        Assert.assertTrue(sqlFile.exists());
+
+    }
+
+    protected int execDBCLICommands(String[] args) {
+        return new FalconStateStoreDBCLI().run(args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
index 2f32b43..6676754 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
@@ -18,55 +18,72 @@
 package org.apache.falcon.state;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.exception.InvalidStateTransitionException;
+import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.state.store.AbstractStateStore;
-import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.util.StartupProperties;
 import org.mockito.Mockito;
 import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 /**
  * Tests to ensure entity state changes happen correctly.
  */
-public class EntityStateServiceTest {
+public class EntityStateServiceTest extends AbstractSchedulerTestBase{
 
     private EntityStateChangeHandler listener = Mockito.mock(EntityStateChangeHandler.class);
 
-    @BeforeMethod
-    public void setUp() {
-        ((InMemoryStateStore) AbstractStateStore.get()).clear();
+    @BeforeClass
+    public void setup() throws Exception {
+        StartupProperties.get().setProperty("falcon.state.store.impl",
+                "org.apache.falcon.state.store.InMemoryStateStore");
+        super.setup();
+        this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+        this.conf = dfsCluster.getConf();
+    }
+
+    @AfterMethod
+    public void setUp() throws StateStoreException {
+        AbstractStateStore.get().clear();
     }
 
     // Tests a schedulable entity's lifecycle : Submit -> run -> suspend -> resume
     @Test
-    public void testLifeCycle() throws FalconException {
+    public void testLifeCycle() throws Exception {
         Process mockEntity = new Process();
         mockEntity.setName("test");
-
+        storeEntity(EntityType.PROCESS, "test");
         StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUBMIT, listener);
         EntityState entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next();
         Mockito.verify(listener).onSubmit(mockEntity);
         Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SUBMITTED));
         StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SCHEDULE, listener);
         Mockito.verify(listener).onSchedule(mockEntity);
+        entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next();
         Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SCHEDULED));
         StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUSPEND, listener);
         Mockito.verify(listener).onSuspend(mockEntity);
+        entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next();
         Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SUSPENDED));
         StateService.get().handleStateChange(mockEntity, EntityState.EVENT.RESUME, listener);
         Mockito.verify(listener).onResume(mockEntity);
+        entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next();
         Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SCHEDULED));
     }
 
     @Test
-    public void testInvalidTransitions() throws FalconException {
+    public void testInvalidTransitions() throws Exception {
         Feed mockEntity = new Feed();
         mockEntity.setName("test");
+        storeEntity(EntityType.FEED, "test");
         StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUBMIT, listener);
         // Attempt suspending a submitted entity
         try {
@@ -99,10 +116,10 @@ public class EntityStateServiceTest {
 
     @Test(dataProvider = "state_and_events")
     public void testIdempotency(EntityState.STATE state, EntityState.EVENT event)
-        throws InvalidStateTransitionException {
+        throws Exception {
         Process mockEntity = new Process();
         mockEntity.setName("test");
-
+        storeEntity(EntityType.PROCESS, "test");
         EntityState entityState = new EntityState(mockEntity).setCurrentState(state);
         entityState.nextTransition(event);
         Assert.assertEquals(entityState.getCurrentState(), state);

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
index 43c3c54..f0ae7b2 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
@@ -23,11 +23,12 @@ import org.apache.falcon.exception.InvalidStateTransitionException;
 import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.execution.ProcessExecutionInstance;
 import org.apache.falcon.state.store.AbstractStateStore;
-import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.util.StartupProperties;
 import org.joda.time.DateTime;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -40,6 +41,12 @@ public class InstanceStateServiceTest {
     private InstanceStateChangeHandler listener = Mockito.mock(InstanceStateChangeHandler.class);
     private ProcessExecutionInstance mockInstance;
 
+    @BeforeClass
+    public void init() {
+        StartupProperties.get().setProperty("falcon.state.store.impl",
+                "org.apache.falcon.state.store.InMemoryStateStore");
+    }
+
     @BeforeMethod
     public void setup() {
         Process testProcess = new Process();
@@ -47,13 +54,14 @@ public class InstanceStateServiceTest {
         // Setup new mocks so we can verify the no. of invocations
         mockInstance = Mockito.mock(ProcessExecutionInstance.class);
         Mockito.when(mockInstance.getEntity()).thenReturn(testProcess);
+        Mockito.when(mockInstance.getCreationTime()).thenReturn(DateTime.now());
         Mockito.when(mockInstance.getInstanceTime()).thenReturn(DateTime.now());
         Mockito.when(mockInstance.getCluster()).thenReturn("testCluster");
     }
 
     @AfterMethod
-    public void tearDown() {
-        ((InMemoryStateStore) AbstractStateStore.get()).clear();
+    public void tearDown() throws StateStoreException {
+        AbstractStateStore.get().clear();
     }
 
     // Tests an entity instance's lifecycle : Trigger -> waiting -> ready -> running
@@ -67,18 +75,28 @@ public class InstanceStateServiceTest {
         Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.WAITING));
         StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.CONDITIONS_MET, listener);
         Mockito.verify(listener).onConditionsMet(mockInstance);
+        instanceFromStore = AbstractStateStore.get()
+                .getExecutionInstance(new InstanceID(mockInstance));
         Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.READY));
         StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SCHEDULE, listener);
         Mockito.verify(listener).onSchedule(mockInstance);
+        instanceFromStore = AbstractStateStore.get()
+                .getExecutionInstance(new InstanceID(mockInstance));
         Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.RUNNING));
         StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SUSPEND, listener);
         Mockito.verify(listener).onSuspend(mockInstance);
+        instanceFromStore = AbstractStateStore.get()
+                .getExecutionInstance(new InstanceID(mockInstance));
         Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.SUSPENDED));
         StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.RESUME_RUNNING, listener);
         Mockito.verify(listener).onResume(mockInstance);
+        instanceFromStore = AbstractStateStore.get()
+                .getExecutionInstance(new InstanceID(mockInstance));
         Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.RUNNING));
         StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SUCCEED, listener);
         Mockito.verify(listener).onSuccess(mockInstance);
+        instanceFromStore = AbstractStateStore.get()
+                .getExecutionInstance(new InstanceID(mockInstance));
         Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.SUCCEEDED));
         Assert.assertEquals(AbstractStateStore.get().getAllEntities().size(), 0);
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java b/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java
new file mode 100644
index 0000000..ecd5293
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.state.AbstractSchedulerTestBase;
+import org.apache.falcon.state.store.service.FalconJPAService;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.persistence.EntityManager;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Test cases for FalconJPAService.
+ */
+public class TestFalconJPAService extends AbstractSchedulerTestBase {
+
+    private static FalconJPAService falconJPAService = FalconJPAService.get();
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        super.setup();
+        createDB(DB_SQL_FILE);
+    }
+
+    @Test
+    public void testService() throws FalconException {
+        // initialize it
+        falconJPAService.init();
+        EntityManager entityManager = falconJPAService.getEntityManager();
+        Map<String, Object> props = entityManager.getProperties();
+        Assert.assertNotNull(props);
+        entityManager.close();
+    }
+
+    @AfterClass
+    public void tearDown() throws FalconException, IOException {
+        falconJPAService.destroy();
+        super.cleanup();
+    }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
new file mode 100644
index 0000000..6d5bd49
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.service.store;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.FalconExecutionService;
+import org.apache.falcon.execution.MockDAGEngine;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.impl.AlarmService;
+import org.apache.falcon.notification.service.impl.DataAvailabilityService;
+import org.apache.falcon.notification.service.impl.JobCompletionService;
+import org.apache.falcon.notification.service.impl.SchedulerService;
+import org.apache.falcon.predicate.Predicate;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.state.AbstractSchedulerTestBase;
+import org.apache.falcon.state.EntityClusterID;
+import org.apache.falcon.state.EntityID;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.store.jdbc.BeanMapperUtil;
+import org.apache.falcon.state.store.jdbc.JDBCStateStore;
+import org.apache.falcon.state.store.StateStore;
+import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.engine.DAGEngine;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.falcon.workflow.engine.OozieDAGEngine;
+import org.joda.time.DateTime;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Test cases for JDBCStateStore.
+ */
+public class TestJDBCStateStore extends AbstractSchedulerTestBase {
+    private static StateStore stateStore = JDBCStateStore.get();
+    private static Random randomValGenerator = new Random();
+    private static FalconJPAService falconJPAService = FalconJPAService.get();
+    private AlarmService mockTimeService;
+    private DataAvailabilityService mockDataService;
+    private SchedulerService mockSchedulerService;
+    private JobCompletionService mockCompletionService;
+    private DAGEngine dagEngine;
+
+    @BeforeClass
+    public void setup() throws Exception {
+        super.setup();
+        createDB(DB_SQL_FILE);
+        falconJPAService.init();
+        this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+        this.conf = dfsCluster.getConf();
+        registerServices();
+    }
+
+    private void registerServices() throws FalconException {
+        mockTimeService = Mockito.mock(AlarmService.class);
+        Mockito.when(mockTimeService.getName()).thenReturn("AlarmService");
+        Mockito.when(mockTimeService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+                Mockito.any(ID.class))).thenCallRealMethod();
+        mockDataService = Mockito.mock(DataAvailabilityService.class);
+        Mockito.when(mockDataService.getName()).thenReturn("DataAvailabilityService");
+        Mockito.when(mockDataService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+                Mockito.any(ID.class))).thenCallRealMethod();
+        dagEngine = Mockito.mock(OozieDAGEngine.class);
+        Mockito.doNothing().when(dagEngine).resume(Mockito.any(ExecutionInstance.class));
+        mockSchedulerService = Mockito.mock(SchedulerService.class);
+        Mockito.when(mockSchedulerService.getName()).thenReturn("JobSchedulerService");
+        StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName());
+        StartupProperties.get().setProperty("execution.service.impl", FalconExecutionService.class.getName());
+        dagEngine = Mockito.spy(DAGEngineFactory.getDAGEngine("testCluster"));
+        Mockito.when(mockSchedulerService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+                Mockito.any(ID.class))).thenCallRealMethod();
+        mockCompletionService = Mockito.mock(JobCompletionService.class);
+        Mockito.when(mockCompletionService.getName()).thenReturn("JobCompletionService");
+        Mockito.when(mockCompletionService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+                Mockito.any(ID.class))).thenCallRealMethod();
+        Services.get().register(mockTimeService);
+        Services.get().register(mockDataService);
+        Services.get().register(mockSchedulerService);
+        Services.get().register(mockCompletionService);
+    }
+
+
+    @Test
+    public void testInsertRetrieveAndUpdate() throws Exception {
+        EntityState entityState = getEntityState(EntityType.PROCESS, "process");
+        stateStore.putEntity(entityState);
+        EntityID entityID = new EntityID(entityState.getEntity());
+        EntityState actualEntityState = stateStore.getEntity(entityID);
+        Assert.assertEquals(actualEntityState.getEntity(), entityState.getEntity());
+        Assert.assertEquals(actualEntityState.getCurrentState(), entityState.getCurrentState());
+        try {
+            stateStore.putEntity(entityState);
+            Assert.fail("Exception must have been thrown");
+        } catch (StateStoreException e) {
+            //no op
+        }
+
+        entityState.setCurrentState(EntityState.STATE.SCHEDULED);
+        stateStore.updateEntity(entityState);
+        actualEntityState = stateStore.getEntity(entityID);
+        Assert.assertEquals(actualEntityState.getEntity(), entityState.getEntity());
+        Assert.assertEquals(actualEntityState.getCurrentState(), entityState.getCurrentState());
+
+        stateStore.deleteEntity(entityID);
+        boolean entityExists = stateStore.entityExists(entityID);
+        Assert.assertEquals(entityExists, false);
+
+        try {
+            stateStore.getEntity(entityID);
+            Assert.fail("Exception must have been thrown");
+        } catch (StateStoreException e){
+            // no op
+        }
+
+        try {
+            stateStore.updateEntity(entityState);
+            Assert.fail("Exception must have been thrown");
+        } catch (StateStoreException e) {
+            // no op
+        }
+
+        try {
+            stateStore.deleteEntity(entityID);
+            Assert.fail("Exception must have been thrown");
+        } catch (StateStoreException e){
+            // no op
+        }
+    }
+
+
+    @Test
+    public void testGetEntities() throws Exception {
+        EntityState entityState1 = getEntityState(EntityType.PROCESS, "process1");
+        EntityState entityState2 = getEntityState(EntityType.PROCESS, "process2");
+        EntityState entityState3 = getEntityState(EntityType.FEED, "feed1");
+
+        Collection<EntityState> result = stateStore.getAllEntities();
+        Assert.assertEquals(result.size(), 0);
+
+        stateStore.putEntity(entityState1);
+        stateStore.putEntity(entityState2);
+        stateStore.putEntity(entityState3);
+
+        result = stateStore.getAllEntities();
+        Assert.assertEquals(result.size(), 3);
+
+        Collection<Entity> entities = stateStore.getEntities(EntityState.STATE.SUBMITTED);
+        Assert.assertEquals(entities.size(), 3);
+    }
+
+
+    @Test
+    public void testInstanceInsertionAndUpdate() throws Exception {
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        storeEntity(EntityType.FEED, "clicksFeed");
+        storeEntity(EntityType.FEED, "clicksSummary");
+        EntityState entityState = getEntityState(EntityType.PROCESS, "process");
+        ExecutionInstance executionInstance = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                System.currentTimeMillis(), "cluster", System.currentTimeMillis());
+        InstanceState instanceState = new InstanceState(executionInstance);
+        initInstanceState(instanceState);
+        stateStore.putExecutionInstance(instanceState);
+        InstanceID instanceID = new InstanceID(instanceState.getInstance());
+        InstanceState actualInstanceState = stateStore.getExecutionInstance(instanceID);
+        Assert.assertEquals(actualInstanceState, instanceState);
+
+        instanceState.setCurrentState(InstanceState.STATE.RUNNING);
+        Predicate predicate = new Predicate(Predicate.TYPE.DATA);
+        instanceState.getInstance().getAwaitingPredicates().add(predicate);
+
+        stateStore.updateExecutionInstance(instanceState);
+        actualInstanceState = stateStore.getExecutionInstance(instanceID);
+        Assert.assertEquals(actualInstanceState, instanceState);
+
+        try {
+            stateStore.putExecutionInstance(instanceState);
+            Assert.fail("Exception must have been thrown");
+        } catch (StateStoreException e) {
+            // no op
+        }
+
+        stateStore.deleteExecutionInstance(instanceID);
+
+        try {
+            stateStore.getExecutionInstance(instanceID);
+            Assert.fail("Exception must have been thrown");
+        } catch (StateStoreException e) {
+            // no op
+        }
+
+        try {
+            stateStore.deleteExecutionInstance(instanceID);
+            Assert.fail("Exception must have been thrown");
+        } catch (StateStoreException e) {
+            // no op
+        }
+
+        try {
+            stateStore.updateExecutionInstance(instanceState);
+            Assert.fail("Exception must have been thrown");
+        } catch (StateStoreException e) {
+            // no op
+        }
+    }
+
+
+    @Test
+    public void testBulkInstanceOperations() throws Exception {
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        storeEntity(EntityType.FEED, "clicksFeed");
+        storeEntity(EntityType.FEED, "clicksSummary");
+        EntityState entityState = getEntityState(EntityType.PROCESS, "process1");
+        ExecutionInstance processExecutionInstance1 = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                System.currentTimeMillis() - 60000, "cluster1", System.currentTimeMillis() - 60000);
+        InstanceState instanceState1 = new InstanceState(processExecutionInstance1);
+        instanceState1.setCurrentState(InstanceState.STATE.READY);
+
+        ExecutionInstance processExecutionInstance2 = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                System.currentTimeMillis(), "cluster1", System.currentTimeMillis());
+        InstanceState instanceState2 = new InstanceState(processExecutionInstance2);
+        instanceState2.setCurrentState(InstanceState.STATE.RUNNING);
+
+        ExecutionInstance processExecutionInstance3 = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                System.currentTimeMillis(), "cluster2", System.currentTimeMillis());
+        InstanceState instanceState3 = new InstanceState(processExecutionInstance3);
+        instanceState3.setCurrentState(InstanceState.STATE.READY);
+
+        stateStore.putExecutionInstance(instanceState1);
+        stateStore.putExecutionInstance(instanceState2);
+        stateStore.putExecutionInstance(instanceState3);
+
+        Collection<InstanceState> actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(),
+                "cluster1");
+        Assert.assertEquals(actualInstances.size(), 2);
+        Assert.assertEquals(actualInstances.toArray()[0], instanceState1);
+        Assert.assertEquals(actualInstances.toArray()[1], instanceState2);
+
+        actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(),
+                "cluster2");
+        Assert.assertEquals(actualInstances.size(), 1);
+        Assert.assertEquals(actualInstances.toArray()[0], instanceState3);
+
+        List<InstanceState.STATE> states = new ArrayList<>();
+        states.add(InstanceState.STATE.READY);
+
+        actualInstances = stateStore.getExecutionInstances(entityState.getEntity(), "cluster1", states);
+        Assert.assertEquals(actualInstances.size(), 1);
+        Assert.assertEquals(actualInstances.toArray()[0], instanceState1);
+
+        EntityClusterID entityClusterID = new EntityClusterID(entityState.getEntity(), "testCluster");
+        actualInstances = stateStore.getExecutionInstances(entityClusterID, states);
+        Assert.assertEquals(actualInstances.size(), 2);
+        Assert.assertEquals(actualInstances.toArray()[0], instanceState1);
+        Assert.assertEquals(actualInstances.toArray()[1], instanceState3);
+
+        states.add(InstanceState.STATE.RUNNING);
+        actualInstances = stateStore.getExecutionInstances(entityState.getEntity(), "cluster1", states);
+        Assert.assertEquals(actualInstances.size(), 2);
+        Assert.assertEquals(actualInstances.toArray()[0], instanceState1);
+        Assert.assertEquals(actualInstances.toArray()[1], instanceState2);
+
+        InstanceState lastInstanceState = stateStore.getLastExecutionInstance(entityState.getEntity(), "cluster1");
+        Assert.assertEquals(lastInstanceState, instanceState2);
+
+
+        InstanceID instanceKey = new InstanceID(instanceState3.getInstance());
+        stateStore.deleteExecutionInstance(instanceKey);
+
+        actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster2");
+        Assert.assertEquals(actualInstances.size(), 0);
+
+        actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster1");
+        Assert.assertEquals(actualInstances.size(), 2);
+
+        stateStore.putExecutionInstance(instanceState3);
+
+        actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster2");
+        Assert.assertEquals(actualInstances.size(), 1);
+
+        stateStore.deleteExecutionInstances(entityClusterID.getEntityID());
+        actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster1");
+        Assert.assertEquals(actualInstances.size(), 0);
+
+        actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster2");
+        Assert.assertEquals(actualInstances.size(), 0);
+
+    }
+
+
+    @Test
+    public void testGetExecutionInstancesWithRange() throws Exception {
+        storeEntity(EntityType.CLUSTER, "testCluster");
+        storeEntity(EntityType.FEED, "clicksFeed");
+        storeEntity(EntityType.FEED, "clicksSummary");
+
+        long instance1Time = System.currentTimeMillis() - 180000;
+        long instance2Time = System.currentTimeMillis();
+        EntityState entityState = getEntityState(EntityType.PROCESS, "process1");
+        ExecutionInstance processExecutionInstance1 = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                instance1Time, "cluster1", instance1Time);
+        InstanceState instanceState1 = new InstanceState(processExecutionInstance1);
+        instanceState1.setCurrentState(InstanceState.STATE.RUNNING);
+
+        ExecutionInstance processExecutionInstance2 = BeanMapperUtil.getExecutionInstance(
+                entityState.getEntity().getEntityType(), entityState.getEntity(),
+                instance2Time, "cluster1", instance2Time);
+        InstanceState instanceState2 = new InstanceState(processExecutionInstance2);
+        instanceState2.setCurrentState(InstanceState.STATE.RUNNING);
+
+        stateStore.putExecutionInstance(instanceState1);
+        stateStore.putExecutionInstance(instanceState2);
+
+        List<InstanceState.STATE> states = new ArrayList<>();
+        states.add(InstanceState.STATE.RUNNING);
+
+        Collection<InstanceState> actualInstances = stateStore.getExecutionInstances(entityState.getEntity(),
+                "cluster1", states, new DateTime(instance1Time), new DateTime(instance1Time + 60000));
+        Assert.assertEquals(1, actualInstances.size());
+        Assert.assertEquals(instanceState1, actualInstances.toArray()[0]);
+
+        actualInstances = stateStore.getExecutionInstances(entityState.getEntity(),
+                "cluster1", states, new DateTime(instance2Time), new DateTime(instance2Time + 60000));
+        Assert.assertEquals(1, actualInstances.size());
+        Assert.assertEquals(instanceState2, actualInstances.toArray()[0]);
+
+    }
+
+
+    private void initInstanceState(InstanceState instanceState) {
+        instanceState.setCurrentState(InstanceState.STATE.READY);
+        instanceState.getInstance().setExternalID(RandomStringUtils.randomNumeric(6));
+        instanceState.getInstance().setInstanceSequence(randomValGenerator.nextInt());
+        instanceState.getInstance().setActualStart(new DateTime(System.currentTimeMillis()));
+        instanceState.getInstance().setActualEnd(new DateTime(System.currentTimeMillis()));
+        List<Predicate> predicates = new ArrayList<>();
+        Predicate predicate = new Predicate(Predicate.TYPE.JOB_COMPLETION);
+        predicates.add(predicate);
+        instanceState.getInstance().setAwaitingPredicates(predicates);
+    }
+
+    private EntityState getEntityState(EntityType entityType, String name) throws Exception {
+        storeEntity(entityType, name);
+        Entity entity = getStore().get(entityType, name);
+        Assert.assertNotNull(entity);
+        return new EntityState(entity);
+    }
+
+    @AfterTest
+    public void cleanUpTables() throws StateStoreException {
+        stateStore.deleteEntities();
+        stateStore.deleteExecutionInstances();
+    }
+
+    @AfterClass
+    public void cleanup() throws IOException {
+        super.cleanup();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/tools/TestFalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/tools/TestFalconStateStoreDBCLI.java b/scheduler/src/test/java/org/apache/falcon/tools/TestFalconStateStoreDBCLI.java
new file mode 100644
index 0000000..8a42830
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/tools/TestFalconStateStoreDBCLI.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.tools;
+
+
+import org.apache.falcon.state.AbstractSchedulerTestBase;
+import org.apache.falcon.util.BuildProperties;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+
+/**
+ * Tests for DB operations tool.
+ */
+public class TestFalconStateStoreDBCLI extends AbstractSchedulerTestBase {
+
+    @BeforeClass
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    @AfterClass
+    public void cleanup() throws IOException {
+        super.cleanup();
+    }
+
+
+    @Test
+    public void testFalconDBCLI() throws Exception {
+        File sqlFile = new File(DB_SQL_FILE);
+        String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" };
+        int result = execDBCLICommands(argsCreate);
+        Assert.assertEquals(0, result);
+        Assert.assertTrue(sqlFile.exists());
+
+        ByteArrayOutputStream data = new ByteArrayOutputStream();
+        PrintStream oldOut = System.out;
+        try {
+            // show versions
+            System.setOut(new PrintStream(data));
+            String[] argsVersion = { "version" };
+            Assert.assertEquals(0, execDBCLICommands(argsVersion));
+            Assert.assertTrue(data.toString().contains("db.version: "
+                    + BuildProperties.get().getProperty("project.version")));
+            // show help information
+            data.reset();
+            String[] argsHelp = { "help" };
+            Assert.assertEquals(0, execDBCLICommands(argsHelp));
+            Assert.assertTrue(data.toString().contains("falcondb create <OPTIONS> : Create Falcon DB schema"));
+            Assert.assertTrue(data.toString().contains("falcondb upgrade <OPTIONS> : Upgrade Falcon DB schema"));
+            // try run invalid command
+            data.reset();
+            String[] argsInvalidCommand = { "invalidCommand" };
+            Assert.assertEquals(1, execDBCLICommands(argsInvalidCommand));
+        } finally {
+            System.setOut(oldOut);
+        }
+        // generate an upgrade script
+        File update = new File(DB_UPDATE_SQL_FILE);
+
+        String[] argsUpgrade = { "upgrade", "-sqlfile", update.getAbsolutePath(), "-run" };
+        BuildProperties.get().setProperty("project.version", "99999-SNAPSHOT");
+        Assert.assertEquals(0, execDBCLICommands(argsUpgrade));
+
+        Assert.assertTrue(update.exists());
+    }
+
+}


[3/4] falcon git commit: FALCON-1234 State Store for instances scheduled by Falcon (Pavan Kolamuri)

Posted by pa...@apache.org.
FALCON-1234 State Store for instances scheduled by Falcon (Pavan Kolamuri)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/6d313855
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/6d313855
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/6d313855

Branch: refs/heads/master
Commit: 6d3138559e8395ac1140f10197fcd10badf64883
Parents: 6676032
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Thu Nov 26 15:56:14 2015 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Thu Nov 26 15:56:14 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../src/main/resources/falcon/checkstyle.xml    |   6 +-
 .../main/resources/falcon/findbugs-exclude.xml  |  15 +
 common/src/main/resources/startup.properties    |  29 ++
 pom.xml                                         |   4 +
 scheduler/pom.xml                               |  84 ++++
 .../falcon/execution/ExecutionInstance.java     |  41 +-
 .../execution/FalconExecutionService.java       |  24 +-
 .../execution/ProcessExecutionInstance.java     |  60 ++-
 .../org/apache/falcon/predicate/Predicate.java  |  50 ++-
 .../org/apache/falcon/state/EntityState.java    |  30 ++
 .../org/apache/falcon/state/InstanceID.java     |  19 +
 .../org/apache/falcon/state/InstanceState.java  |  32 ++
 .../org/apache/falcon/state/StateService.java   |   1 +
 .../falcon/state/store/AbstractStateStore.java  |   2 +-
 .../falcon/state/store/EntityStateStore.java    |  14 +-
 .../falcon/state/store/InMemoryStateStore.java  |  22 +-
 .../falcon/state/store/InstanceStateStore.java  |  18 +-
 .../apache/falcon/state/store/StateStore.java   |   6 +-
 .../falcon/state/store/jdbc/BeanMapperUtil.java | 271 ++++++++++++
 .../falcon/state/store/jdbc/EntityBean.java     | 104 +++++
 .../falcon/state/store/jdbc/InstanceBean.java   | 199 +++++++++
 .../falcon/state/store/jdbc/JDBCStateStore.java | 416 ++++++++++++++++++
 .../state/store/service/FalconJPAService.java   | 171 ++++++++
 .../falcon/tools/FalconStateStoreDBCLI.java     | 435 +++++++++++++++++++
 .../src/main/resources/META-INF/persistence.xml |  50 +++
 .../main/resources/falcon-buildinfo.properties  |  28 ++
 .../execution/FalconExecutionServiceTest.java   |  53 ++-
 .../service/SchedulerServiceTest.java           |  13 +-
 .../falcon/state/AbstractSchedulerTestBase.java |  71 +++
 .../falcon/state/EntityStateServiceTest.java    |  39 +-
 .../falcon/state/InstanceStateServiceTest.java  |  24 +-
 .../state/service/TestFalconJPAService.java     |  64 +++
 .../state/service/store/TestJDBCStateStore.java | 397 +++++++++++++++++
 .../falcon/tools/TestFalconStateStoreDBCLI.java |  89 ++++
 scheduler/src/test/resources/startup.properties | 154 +++++++
 src/bin/falcon-db.sh                            |  49 +++
 src/conf/startup.properties                     |  22 +-
 src/main/assemblies/distributed-package.xml     |   5 +
 src/main/assemblies/standalone-package.xml      |   5 +
 unit/src/main/resources/startup.properties      |  18 +
 41 files changed, 3071 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 13d3439..31a2566 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1234 State Store for instances scheduled by Falcon (Pavan Kolamuri via Pallavi Rao)
+
     FALCON-1588 Add ability to provide the path for recipe files in command line(Peeyush Bishnoi via Ajay Yadava)
 
     FALCON-1573 Supply user-defined properties to Oozie workflows during schedule(Daniel Del Castillo via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/checkstyle/src/main/resources/falcon/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/src/main/resources/falcon/checkstyle.xml b/checkstyle/src/main/resources/falcon/checkstyle.xml
index 2130e73..292a0a3 100644
--- a/checkstyle/src/main/resources/falcon/checkstyle.xml
+++ b/checkstyle/src/main/resources/falcon/checkstyle.xml
@@ -230,9 +230,9 @@
 
     <!-- allow warnings to be suppressed -->
     <module name="SuppressionCommentFilter">
-        <property name="offCommentFormat" value="SUSPEND CHECKSTYLE CHECK ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName"/>
-        <property name="onCommentFormat" value="RESUME CHECKSTYLE CHECK ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName"/>
-        <property name="checkFormat" value="ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName"/>
+        <property name="offCommentFormat" value="SUSPEND CHECKSTYLE CHECK ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName|LineLengthCheck"/>
+        <property name="onCommentFormat" value="RESUME CHECKSTYLE CHECK ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName|LineLengthCheck"/>
+        <property name="checkFormat" value="ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName|LineLengthCheck"/>
     </module>
 
 </module>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/checkstyle/src/main/resources/falcon/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/checkstyle/src/main/resources/falcon/findbugs-exclude.xml b/checkstyle/src/main/resources/falcon/findbugs-exclude.xml
index 0a7580d..e1a5a2e 100644
--- a/checkstyle/src/main/resources/falcon/findbugs-exclude.xml
+++ b/checkstyle/src/main/resources/falcon/findbugs-exclude.xml
@@ -31,4 +31,19 @@
     <Match>
         <Bug pattern="DM_DEFAULT_ENCODING" />
     </Match>
+
+    <Match>
+        <Class name="org.apache.falcon.tools.FalconStateStoreDBCLI" />
+        <Bug pattern="SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE" />
+    </Match>
+
+    <Match>
+        <Class name="org.apache.falcon.state.store.jdbc.EntityBean" />
+        <Bug pattern="NP_BOOLEAN_RETURN_NULL" />
+    </Match>
+
+    <Match>
+        <Class name="org.apache.falcon.state.store.jdbc.InstanceBean" />
+        <Bug pattern="NP_BOOLEAN_RETURN_NULL" />
+    </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index cc5212a..3f1ed03 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -43,6 +43,14 @@
                         org.apache.falcon.service.LogCleanupService,\
                         org.apache.falcon.service.GroupsService,\
                         org.apache.falcon.service.ProxyUserService
+## If you wish to use Falcon native scheduler add the commented out services below to application.services ##
+#                        org.apache.falcon.notification.service.impl.JobCompletionService,\
+#                        org.apache.falcon.notification.service.impl.SchedulerService,\
+#                        org.apache.falcon.notification.service.impl.AlarmService,\
+#                        org.apache.falcon.notification.service.impl.DataAvailabilityService,\
+#                        org.apache.falcon.execution.FalconExecutionService,\
+#                        org.apache.falcon.state.store.service.FalconJPAService
+
 
 # List of Lifecycle policies configured.
 *.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete
@@ -55,6 +63,8 @@
                         org.apache.falcon.entity.store.FeedLocationStore,\
                         org.apache.falcon.service.FeedSLAMonitoringService,\
                         org.apache.falcon.service.SharedLibraryHostingService
+## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ##
+#                       org.apache.falcon.state.store.jdbc.JdbcStateStore
 
 ##### JMS MQ Broker Implementation class #####
 *.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
@@ -247,3 +257,22 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
 # Setting monitoring plugin, if SMTP parameters is defined
 #*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\
 #                     org.apache.falcon.plugin.EmailNotificationPlugin
+
+######### StateStore Properties #####
+#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+#*.falcon.statestore.jdbc.url=jdbc:derby:data/statestore.db;create=true
+#*.falcon.statestore.jdbc.username=sa
+#*.falcon.statestore.jdbc.password=
+#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+## Maximum number of active connections that can be allocated from this pool at the same time.
+#*.falcon.statestore.pool.max.active.conn=10
+#*.falcon.statestore.connection.properties=
+## Indicates the interval (in milliseconds) between eviction runs.
+#*.falcon.statestore.validate.db.connection.eviction.interval=300000
+## The number of objects to examine during each run of the idle object evictor thread.
+#*.falcon.statestore.validate.db.connection.eviction.num=10
+## Creates Falcon DB.
+## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+#*.falcon.statestore.create.db.schema=true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fad8902..678c87c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,6 +113,10 @@
         <quartz.version>2.2.1</quartz.version>
         <joda.version>2.8.2</joda.version>
         <mockito.version>1.9.5</mockito.version>
+        <openjpa.version>2.4.0</openjpa.version>
+        <javax-validation.version>1.0.0.GA</javax-validation.version>
+        <derby.version>10.10.1.1</derby.version>
+        <commons-dbcp.version>1.4</commons-dbcp.version>
         <internal.maven.repo>file:///tmp/falcontemprepo</internal.maven.repo>
         <excluded.test.groups>exhaustive</excluded.test.groups>
     </properties>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/pom.xml
----------------------------------------------------------------------
diff --git a/scheduler/pom.xml b/scheduler/pom.xml
index 20a91d2..336997d 100644
--- a/scheduler/pom.xml
+++ b/scheduler/pom.xml
@@ -88,6 +88,33 @@
 	</dependency>
 
         <dependency>
+            <groupId>org.apache.openjpa</groupId>
+            <artifactId>openjpa-persistence</artifactId>
+            <version>${openjpa.version}</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.openjpa</groupId>
+            <artifactId>openjpa-jdbc</artifactId>
+            <version>${openjpa.version}</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.openjpa</groupId>
+            <artifactId>openjpa-persistence-jdbc</artifactId>
+            <version>${openjpa.version}</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>javax.validation</groupId>
+            <artifactId>validation-api</artifactId>
+            <version>${javax-validation.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.testng</groupId>
             <artifactId>testng</artifactId>
         </dependency>
@@ -98,11 +125,18 @@
             <version>${mockito.version}</version>
             <scope>test</scope>
         </dependency>
+
         <dependency>
             <groupId>joda-time</groupId>
             <artifactId>joda-time</artifactId>
             <version>${joda.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.derby</groupId>
+            <artifactId>derby</artifactId>
+            <version>10.10.1.1</version>
+        </dependency>
     </dependencies>
 
     <build>
@@ -115,6 +149,56 @@
                     <target>1.7</target>
                 </configuration>
             </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <phase>process-classes</phase>
+                        <configuration>
+                            <tasks>
+                                <taskdef name="openjpac" classname="org.apache.openjpa.ant.PCEnhancerTask" classpathref="maven.compile.classpath"/>
+                                <openjpac>
+                                    <classpath refid="maven.compile.classpath"/>
+                                </openjpac>
+                            </tasks>
+                        </configuration>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-dependency-plugin</artifactId>
+            <version>2.8</version>
+            <executions>
+                <execution>
+                    <id>copy-dependencies</id>
+                    <goals>
+                        <goal>copy</goal>
+                    </goals>
+                    <configuration>
+                        <artifactItems>
+                            <artifactItem>
+                                <groupId>org.apache.derby</groupId>
+                                <artifactId>derby</artifactId>
+                            </artifactItem>
+                            <artifactItem>
+                                <groupId>commons-dbcp</groupId>
+                                <artifactId>commons-dbcp</artifactId>
+                                <version>${commons-dbcp.version}</version>
+                            </artifactItem>
+                        </artifactItems>
+                        <outputDirectory>${project.build.directory}/dependency</outputDirectory>
+                    </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
         </plugins>
     </build>
 </project>

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
index 2d6b67d..5f96d3f 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
@@ -26,7 +26,6 @@ import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
 import java.util.List;
-import java.util.TimeZone;
 
 /**
  * Represents an execution instance of an entity.
@@ -38,20 +37,31 @@ public abstract class ExecutionInstance implements NotificationHandler {
     // External ID is the ID used to identify the Job submitted to the DAG Engine, as returned by the DAG Engine.
     // For example, for Oozie this would be the workflow Id.
     private String externalID;
+    // Time at which instance has to be run.
     private final DateTime instanceTime;
+    // Time at which instance is created.
     private final DateTime creationTime;
     private DateTime actualStart;
     private DateTime actualEnd;
-    private static final DateTimeZone UTC = DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"));
+    protected static final DateTimeZone UTC = DateTimeZone.UTC;
 
     /**
-     * @param instanceTime
+     * @param instanceTime Time at which instance has to be run.
      * @param cluster
+     * @param creationTime Time at which instance is created to run.
      */
-    public ExecutionInstance(DateTime instanceTime, String cluster) {
+    public ExecutionInstance(DateTime instanceTime, String cluster, DateTime creationTime) {
         this.instanceTime = new DateTime(instanceTime, UTC);
         this.cluster = cluster;
-        this.creationTime = DateTime.now(UTC);
+        this.creationTime = new DateTime(creationTime, UTC);
+    }
+
+    /**
+     * @param instanceTime
+     * @param cluster
+     */
+    public ExecutionInstance(DateTime instanceTime, String cluster) {
+        this(instanceTime, cluster, DateTime.now());
     }
 
     /**
@@ -92,7 +102,7 @@ public abstract class ExecutionInstance implements NotificationHandler {
     public abstract Entity getEntity();
 
     /**
-     * @return - The nominal time of the instance.
+     * @return - The instance time of the instance.
      */
     public DateTime getInstanceTime() {
         return instanceTime;
@@ -138,16 +148,31 @@ public abstract class ExecutionInstance implements NotificationHandler {
         this.actualEnd = actualEnd;
     }
 
-
+    /**
+     * Creation time of an instance.
+     * @return
+     */
     public DateTime getCreationTime() {
         return creationTime;
     }
 
     /**
+     * Set the gating conditions on which this instance is waiting before it is scheduled for execution.
+     * @param predicates
+     */
+    public abstract void setAwaitingPredicates(List<Predicate> predicates);
+
+    /**
      * @return - The gating conditions on which this instance is waiting before it is scheduled for execution.
      * @throws FalconException
      */
-    public abstract List<Predicate> getAwaitingPredicates() throws FalconException;
+    public abstract List<Predicate> getAwaitingPredicates();
+
+    /**
+     * set the sequential numerical id of the instance.
+     */
+    public abstract void setInstanceSequence(int sequence);
+
 
     /**
      * Suspends the instance if it is in one of the active states, waiting, ready or running.

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
index b48a65b..b6741a4 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
@@ -22,6 +22,7 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.notification.service.event.Event;
 import org.apache.falcon.service.FalconService;
 import org.apache.falcon.state.EntityClusterID;
@@ -58,17 +59,22 @@ public final class FalconExecutionService implements FalconService, EntityStateC
     public void init() {
         LOG.debug("State store instance being used : {}", AbstractStateStore.get());
         // Initialize all executors from store
-        for (Entity entity : AbstractStateStore.get().getEntities(EntityState.STATE.SCHEDULED)) {
-            try {
-                for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
-                    EntityExecutor executor = createEntityExecutor(entity, cluster);
-                    executors.put(new EntityClusterID(entity, cluster), executor);
-                    executor.schedule();
+        try {
+            for (Entity entity : AbstractStateStore.get().getEntities(EntityState.STATE.SCHEDULED)) {
+                try {
+                    for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+                        EntityExecutor executor = createEntityExecutor(entity, cluster);
+                        executors.put(new EntityClusterID(entity, cluster), executor);
+                        executor.schedule();
+                    }
+                } catch (FalconException e) {
+                    LOG.error("Unable to load entity : " + entity.getName(), e);
+                    throw new RuntimeException(e);
                 }
-            } catch (FalconException e) {
-                LOG.error("Unable to load entity : " + entity.getName(), e);
-                throw new RuntimeException(e);
             }
+        } catch (StateStoreException e) {
+            LOG.error("Unable to get Entities from State Store ", e);
+            throw new RuntimeException(e);
         }
         // TODO : During migration, the state store itself may not have been completely bootstrapped.
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
index 434f168..cff4a73 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
@@ -49,6 +49,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
+
 /**
  * Represents an execution instance of a process.
  * Responsible for user actions such as suspend, resume, kill on individual instances.
@@ -57,7 +58,7 @@ import java.util.List;
 public class ProcessExecutionInstance extends ExecutionInstance {
     private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutionInstance.class);
     private final Process process;
-    private List<Predicate> awaitedPredicates = new ArrayList<Predicate>();
+    private List<Predicate> awaitedPredicates = new ArrayList<>();
     private DAGEngine dagEngine = null;
     private boolean hasTimedOut = false;
     private InstanceID id;
@@ -72,8 +73,9 @@ public class ProcessExecutionInstance extends ExecutionInstance {
      * @param cluster
      * @throws FalconException
      */
-    public ProcessExecutionInstance(Process process, DateTime instanceTime, String cluster) throws FalconException {
-        super(instanceTime, cluster);
+    public ProcessExecutionInstance(Process process, DateTime instanceTime, String cluster,
+                                    DateTime creationTime) throws FalconException {
+        super(instanceTime, cluster, creationTime);
         this.process = process;
         this.id = new InstanceID(process, cluster, getInstanceTime());
         computeInstanceSequence();
@@ -81,7 +83,18 @@ public class ProcessExecutionInstance extends ExecutionInstance {
         registerForNotifications(false);
     }
 
-    // Computes the instance number based on the nominal time.
+    /**
+     *
+     * @param process
+     * @param instanceTime
+     * @param cluster
+     * @throws FalconException
+     */
+    public ProcessExecutionInstance(Process process, DateTime instanceTime, String cluster) throws FalconException {
+        this(process, instanceTime, cluster, DateTime.now(UTC));
+    }
+
+    // Computes the instance number based on the instance Time.
     // Method can be extended to assign instance numbers for non-time based instances.
     private void computeInstanceSequence() {
         for (Cluster processCluster : process.getClusters().getClusters()) {
@@ -225,11 +238,21 @@ public class ProcessExecutionInstance extends ExecutionInstance {
     }
 
     @Override
-    public List<Predicate> getAwaitingPredicates() throws FalconException {
+    public void setAwaitingPredicates(List<Predicate> predicates) {
+        this.awaitedPredicates = predicates;
+    }
+
+    @Override
+    public List<Predicate> getAwaitingPredicates() {
         return awaitedPredicates;
     }
 
     @Override
+    public void setInstanceSequence(int sequence) {
+        this.instanceSequence = sequence;
+    }
+
+    @Override
     public void suspend() throws FalconException {
         if (getExternalID() != null) {
             dagEngine.suspend(this);
@@ -242,7 +265,7 @@ public class ProcessExecutionInstance extends ExecutionInstance {
         // Was already scheduled on the DAGEngine, so resume on DAGEngine if suspended
         if (getExternalID() != null) {
             dagEngine.resume(this);
-        } else if (awaitedPredicates.size() != 0) {
+        } else if (awaitedPredicates != null && !awaitedPredicates.isEmpty()) {
             // Evaluate any remaining predicates
             registerForNotifications(true);
         }
@@ -271,6 +294,31 @@ public class ProcessExecutionInstance extends ExecutionInstance {
     }
 
     @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || !o.getClass().equals(this.getClass())) {
+            return false;
+        }
+
+        ProcessExecutionInstance processExecutionInstance = (ProcessExecutionInstance) o;
+
+        return  this.getId().equals(processExecutionInstance.getId())
+                && Predicate.isEqualAwaitingPredicates(this.getAwaitingPredicates(),
+                    processExecutionInstance.getAwaitingPredicates())
+                && this.getInstanceSequence() == (processExecutionInstance.getInstanceSequence());
+    }
+
+    @Override
+    public int hashCode() {
+        int result = id != null ? id.hashCode() : 0;
+        result = 31 * result + (awaitedPredicates != null ? awaitedPredicates.hashCode() : 0);
+        result = 31 * result + instanceSequence;
+        return result;
+    }
+
+    @Override
     public void destroy() throws FalconException {
         NotificationServicesRegistry.unregister(executionService, getId());
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
index fb4c8c9..164fb0e 100644
--- a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
+++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
@@ -27,14 +27,19 @@ import org.apache.falcon.notification.service.event.TimeElapsedEvent;
 import org.apache.falcon.state.ID;
 
 import java.io.Serializable;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 /**
  * Represents the gating condition for which an instance is waiting before it is scheduled.
  * This will be serialized and stored in state store.
  */
 public class Predicate implements Serializable {
+
     /**
      * Type of predicate, currently data and time are supported.
      */
@@ -47,7 +52,10 @@ public class Predicate implements Serializable {
     private final TYPE type;
 
     // A key-value pair of clauses that need make this predicate.
-    private Map<String, Comparable> clauses = new HashMap<String, Comparable>();
+    private Map<String, Comparable> clauses = new TreeMap<>();
+
+    // Id for a predicate used for comparison.
+    private String id;
 
     // A generic "any" object that can be used when a particular key is allowed to have any value.
     public static final Comparable<? extends Serializable> ANY = new Any();
@@ -59,6 +67,10 @@ public class Predicate implements Serializable {
         return type;
     }
 
+    public String getId() {
+        return id;
+    }
+
     /**
      * @param key
      * @return the value corresponding to the key
@@ -106,6 +118,7 @@ public class Predicate implements Serializable {
      */
     public Predicate(TYPE type) {
         this.type = type;
+        this.id = this.type + String.valueOf(System.currentTimeMillis());
     }
 
     /**
@@ -120,7 +133,7 @@ public class Predicate implements Serializable {
      * @param rhs - The value in the key-value pair of a clause
      * @return This instance
      */
-    public Predicate addClause(String lhs, Comparable<? extends Serializable> rhs) {
+    Predicate addClause(String lhs, Comparable<? extends Serializable> rhs) {
         clauses.put(lhs, rhs);
         return this;
     }
@@ -217,4 +230,35 @@ public class Predicate implements Serializable {
             return super.hashCode();
         }
     }
+
+    public static boolean isEqualAwaitingPredicates(List<Predicate> thisAwaitingPredicates,
+                                              List<Predicate> otherAwaitingPredicates) {
+        if (thisAwaitingPredicates == null && otherAwaitingPredicates == null) {
+            return true;
+        } else if (thisAwaitingPredicates != null && otherAwaitingPredicates != null) {
+            if (thisAwaitingPredicates.size() != otherAwaitingPredicates.size()) {
+                return false;
+            }
+            Collections.sort(thisAwaitingPredicates, new PredicateComparator());
+            Collections.sort(otherAwaitingPredicates, new PredicateComparator());
+
+            Iterator<Predicate> thisIterator = thisAwaitingPredicates.iterator();
+            Iterator<Predicate> otherIterator = otherAwaitingPredicates.iterator();
+
+            while (thisIterator.hasNext()) {
+                if (!thisIterator.next().evaluate(otherIterator.next())) {
+                    return false;
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
+    static class PredicateComparator implements Serializable, Comparator<Predicate> {
+        @Override
+        public int compare(Predicate o1, Predicate o2) {
+            return o1.getId().compareTo(o2.getId());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
index 15aea9a..f44f174 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
@@ -130,4 +130,34 @@ public class EntityState implements StateMachine<EntityState.STATE, EntityState.
     public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
         return currentState.nextTransition(event);
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        EntityState other = (EntityState) o;
+
+        if (this.getCurrentState() != null ? !this.getCurrentState().equals(other.getCurrentState())
+                : other.getCurrentState() != null) {
+            return false;
+        }
+
+        if (this.getEntity() != null ? !this.getEntity().equals(other.getEntity())
+                : other.getEntity() != null) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = currentState != null ? currentState.hashCode() : 0;
+        result = 31 * result + (entity != null ? entity.hashCode() : 0);
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java
index a722be9..72cfc33 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java
@@ -80,4 +80,23 @@ public class InstanceID extends ID {
     public EntityClusterID getEntityClusterID() {
         return new EntityClusterID(entityType, entityName, clusterName);
     }
+
+    public static EntityType getEntityType(String id) {
+        if (id == null) {
+            return null;
+        }
+        String[] values = id.split(KEY_SEPARATOR);
+        String entityType = values[0];
+        return EntityType.valueOf(entityType);
+    }
+
+    public static String getEntityName(String id) {
+        if (id == null) {
+            return null;
+        }
+        String[] values = id.split(KEY_SEPARATOR);
+        String entityName = values[1];
+        return entityName;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
index ada9d2b..7f2bda9 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
@@ -252,4 +252,36 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance
     public String toString() {
         return instance.getId().toString() + "STATE: " + currentState.toString();
     }
+
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        InstanceState other = (InstanceState) o;
+
+        if (this.getCurrentState() != null ? !this.getCurrentState().equals(other.getCurrentState())
+                : other.getCurrentState() != null) {
+            return false;
+        }
+
+        if (this.getInstance() != null ? !this.getInstance().equals(other.getInstance())
+                : other.getInstance() != null) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = currentState != null ? currentState.hashCode() : 0;
+        result = 31 * result + (instance != null ? instance.hashCode() : 0);
+        return result;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/StateService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
index c1671ac..c702cc3 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/StateService.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
@@ -136,6 +136,7 @@ public final class StateService {
             InstanceState instanceState = stateStore.getExecutionInstance(id);
             InstanceState.STATE newState = instanceState.nextTransition(event);
             callbackHandler(instance, event, handler);
+            instanceState = new InstanceState(instance);
             instanceState.setCurrentState(newState);
             stateStore.updateExecutionInstance(instanceState);
             LOG.debug("State of instance: {} changed to: {} as a result of event: {}.", id,

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
index e36f85c..2d576e5 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
@@ -79,7 +79,7 @@ public abstract class AbstractStateStore implements StateStore, ConfigurationCha
      */
     public static synchronized StateStore get() {
         if (stateStore == null) {
-            String storeImpl = StartupProperties.get().getProperty("state.store.impl",
+            String storeImpl = StartupProperties.get().getProperty("falcon.state.store.impl",
                     "org.apache.falcon.state.store.InMemoryStateStore");
             try {
                 stateStore = ReflectionUtils.getInstanceByClassName(storeImpl);

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
index 113f4c5..75a315f 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
@@ -45,18 +45,18 @@ public interface EntityStateStore {
      * @param entityId
      * @return true, if entity exists in store.
      */
-    boolean entityExists(EntityID entityId);
+    boolean entityExists(EntityID entityId) throws StateStoreException;;
 
     /**
      * @param state
      * @return Entities in a given state.
      */
-    Collection<Entity> getEntities(EntityState.STATE state);
+    Collection<Entity> getEntities(EntityState.STATE state) throws StateStoreException;
 
     /**
      * @return All Entities in the store.
      */
-    Collection<EntityState> getAllEntities();
+    Collection<EntityState> getAllEntities() throws StateStoreException;
 
     /**
      * Update an existing entity with the new values.
@@ -73,4 +73,12 @@ public interface EntityStateStore {
      * @throws StateStoreException
      */
     void deleteEntity(EntityID entityId) throws StateStoreException;
+
+
+    /**
+     * Removes all entities and its instances from the store.
+     *
+     * @throws StateStoreException
+     */
+    void deleteEntities() throws StateStoreException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
index 52b3bb8..7ab996a 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
@@ -50,8 +50,7 @@ public final class InMemoryStateStore extends AbstractStateStore {
 
     private static final StateStore STORE = new InMemoryStateStore();
 
-    private InMemoryStateStore() {
-    }
+    private InMemoryStateStore() {}
 
     public static StateStore get() {
         return STORE;
@@ -114,6 +113,11 @@ public final class InMemoryStateStore extends AbstractStateStore {
     }
 
     @Override
+    public void deleteEntities() throws StateStoreException {
+        entityStates.clear();
+    }
+
+    @Override
     public void putExecutionInstance(InstanceState instanceState) throws StateStoreException {
         String key = new InstanceID(instanceState.getInstance()).getKey();
         if (instanceStates.containsKey(key)) {
@@ -223,6 +227,20 @@ public final class InMemoryStateStore extends AbstractStateStore {
         }
     }
 
+    @Override
+    public void deleteExecutionInstances() {
+        instanceStates.clear();
+    }
+
+    @Override
+    public void deleteExecutionInstance(InstanceID instanceID) throws StateStoreException {
+        if (!instanceStates.containsKey(instanceID.toString())) {
+            throw new StateStoreException("Instance with key, " + instanceID.toString() + " does not exist.");
+        }
+        instanceStates.remove(instanceID.toString());
+    }
+
+    @Override
     public void clear() {
         entityStates.clear();
         instanceStates.clear();

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
index 483d9e6..f1d1931 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
@@ -104,12 +104,26 @@ public interface InstanceStateStore {
      * @param instanceId
      * @return true, if instance exists.
      */
-    boolean executionInstanceExists(InstanceID instanceId);
+    boolean executionInstanceExists(InstanceID instanceId) throws StateStoreException;
 
     /**
      * Delete instances of a given entity.
      *
      * @param entityId
      */
-    void deleteExecutionInstances(EntityID entityId);
+    void deleteExecutionInstances(EntityID entityId) throws StateStoreException;
+
+
+    /**
+     * Delete an instance based on ID.
+     *
+     * @param instanceID
+     * @throws StateStoreException
+     */
+    void deleteExecutionInstance(InstanceID instanceID) throws StateStoreException;
+
+    /**
+     * Delete all instances.
+     */
+    void deleteExecutionInstances();
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
index f595c26..592e1fb 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
@@ -17,11 +17,15 @@
  */
 package org.apache.falcon.state.store;
 
+import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.service.ConfigurationChangeListener;
 
 /**
  * Interface that combines entity, instance store APIs and also config change listener's.
  */
 public interface StateStore extends ConfigurationChangeListener, EntityStateStore, InstanceStateStore {
-
+    /**
+     * Deletes all entities and instances.
+     */
+    void clear() throws StateStoreException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
new file mode 100644
index 0000000..4bee269
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store.jdbc;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.ProcessExecutionInstance;
+import org.apache.falcon.predicate.Predicate;
+import org.apache.falcon.state.EntityID;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.InstanceID;
+import org.apache.falcon.state.InstanceState;
+import org.joda.time.DateTime;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Mapping util for Persistent Store.
+ */
+public final class BeanMapperUtil {
+    private BeanMapperUtil() {
+    }
+
+    /**
+     * Converts Entity object to EntityBean which will be stored in DB.
+     * @param entityState
+     * @return
+     */
+    public static EntityBean convertToEntityBean(EntityState entityState) {
+        EntityBean entityBean = new EntityBean();
+        Entity entity = entityState.getEntity();
+        String id = new EntityID(entity).getKey();
+        entityBean.setId(id);
+        entityBean.setName(entity.getName());
+        entityBean.setState(entityState.getCurrentState().toString());
+        entityBean.setType(entity.getEntityType().toString());
+        return entityBean;
+    }
+
+    /**
+     * Converts EntityBean of Data Base to EntityState.
+     * @param entityBean
+     * @return
+     * @throws StateStoreException
+     */
+    public static EntityState convertToEntityState(EntityBean entityBean) throws StateStoreException {
+        try {
+            Entity entity = EntityUtil.getEntity(entityBean.getType(), entityBean.getName());
+            EntityState entityState = new EntityState(entity);
+            entityState.setCurrentState(EntityState.STATE.valueOf(entityBean.getState()));
+            return entityState;
+        } catch (FalconException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    /**
+     * Converts list of EntityBeans of Data Base to EntityStates.
+     * @param entityBeans
+     * @return
+     * @throws StateStoreException
+     */
+    public static Collection<EntityState> convertToEntityState(Collection<EntityBean> entityBeans)
+        throws StateStoreException {
+        List<EntityState> entityStates = new ArrayList<>();
+        if (entityBeans != null && !entityBeans.isEmpty()) {
+            for (EntityBean entityBean : entityBeans) {
+                entityStates.add(convertToEntityState(entityBean));
+            }
+        }
+        return entityStates;
+    }
+
+    /**
+     * Converts list of EntityBeans of Data Base to Entities.
+     * @param entityBeans
+     * @return
+     * @throws StateStoreException
+     */
+    public static Collection<Entity> convertToEntities(Collection<EntityBean> entityBeans) throws StateStoreException {
+        List<Entity> entities = new ArrayList<>();
+        try {
+            if (entityBeans != null && !entityBeans.isEmpty()) {
+                for (EntityBean entityBean : entityBeans) {
+                    Entity entity = EntityUtil.getEntity(entityBean.getType(), entityBean.getName());
+                    entities.add(entity);
+                }
+            }
+            return entities;
+        } catch (FalconException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    /**
+     * Convert instance of Entity's instance to InstanceBean of DB.
+     * @param instanceState
+     * @return
+     * @throws StateStoreException
+     * @throws IOException
+     */
+    public static InstanceBean convertToInstanceBean(InstanceState instanceState) throws StateStoreException,
+            IOException {
+        InstanceBean instanceBean = new InstanceBean();
+        ExecutionInstance instance = instanceState.getInstance();
+        if (instance.getActualEnd() != null) {
+            instanceBean.setActualEndTime(new Timestamp(instance.getActualEnd().getMillis()));
+        }
+        if (instance.getActualStart() != null) {
+            instanceBean.setActualStartTime(new Timestamp(instance.getActualStart().getMillis()));
+        }
+        if (instanceState.getCurrentState() != null) {
+            instanceBean.setCurrentState(instanceState.getCurrentState().toString());
+        }
+        if (instance.getExternalID() != null) {
+            instanceBean.setExternalID(instanceState.getInstance().getExternalID());
+        }
+
+        instanceBean.setCluster(instance.getCluster());
+        instanceBean.setCreationTime(new Timestamp(instance.getCreationTime().getMillis()));
+        instanceBean.setId(instance.getId().toString());
+        instanceBean.setInstanceTime(new Timestamp(instance.getInstanceTime().getMillis()));
+        instanceBean.setEntityId(new InstanceID(instance).getEntityID().toString());
+
+        instanceBean.setInstanceSequence(instance.getInstanceSequence());
+        if (instance.getAwaitingPredicates() != null && !instance.getAwaitingPredicates().isEmpty()) {
+            ObjectOutputStream out = null;
+            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+            try {
+                out = new ObjectOutputStream(byteArrayOutputStream);
+                out.writeInt(instance.getAwaitingPredicates().size());
+                for (Predicate predicate : instance.getAwaitingPredicates()) {
+                    out.writeObject(predicate);
+                }
+                instanceBean.setAwaitedPredicates(byteArrayOutputStream.toByteArray());
+            } finally {
+                IOUtils.closeQuietly(out);
+            }
+        }
+        return instanceBean;
+    }
+
+    /**
+     * Converts instance entry of DB to instance of ExecutionInstance.
+     * @param instanceBean
+     * @return
+     * @throws StateStoreException
+     * @throws IOException
+     */
+    public static InstanceState convertToInstanceState(InstanceBean instanceBean) throws StateStoreException,
+            IOException {
+        EntityType entityType = InstanceID.getEntityType(instanceBean.getId());
+        ExecutionInstance executionInstance = getExecutionInstance(entityType, instanceBean);
+        if (instanceBean.getActualEndTime() != null) {
+            executionInstance.setActualEnd(new DateTime(instanceBean.getActualEndTime().getTime()));
+        }
+        if (instanceBean.getActualStartTime() != null) {
+            executionInstance.setActualStart(new DateTime(instanceBean.getActualStartTime().getTime()));
+        }
+        executionInstance.setExternalID(instanceBean.getExternalID());
+        executionInstance.setInstanceSequence(instanceBean.getInstanceSequence());
+
+        byte[] result = instanceBean.getAwaitedPredicates();
+        List<Predicate> predicates = new ArrayList<>();
+        if (result != null && result.length != 0) {
+            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(result);
+            ObjectInputStream in = null;
+            try {
+                in = new ObjectInputStream(byteArrayInputStream);
+                int length = in.readInt();
+                for (int i = 0; i < length; i++) {
+                    Predicate predicate = (Predicate) in.readObject();
+                    predicates.add(predicate);
+                }
+            } catch (ClassNotFoundException e) {
+                throw new IOException(e);
+            } finally {
+                IOUtils.closeQuietly(in);
+            }
+        }
+        executionInstance.setAwaitingPredicates(predicates);
+        InstanceState instanceState = new InstanceState(executionInstance);
+        instanceState.setCurrentState(InstanceState.STATE.valueOf(instanceBean.getCurrentState()));
+        return instanceState;
+    }
+
+    /**
+     * Converting list of instance entries of DB to instance of ExecutionInstance.
+     * @param instanceBeanList
+     * @return
+     * @throws StateStoreException
+     * @throws IOException
+     */
+    public static Collection<InstanceState> convertToInstanceState(List<InstanceBean> instanceBeanList)
+        throws StateStoreException, IOException {
+        List<InstanceState> instanceStates = new ArrayList<>();
+        for (InstanceBean instanceBean : instanceBeanList) {
+            instanceStates.add(convertToInstanceState(instanceBean));
+        }
+        return instanceStates;
+    }
+
+    private static ExecutionInstance getExecutionInstance(EntityType entityType,
+                                                          InstanceBean instanceBean) throws StateStoreException {
+        try {
+            Entity entity = EntityUtil.getEntity(entityType, InstanceID.getEntityName(instanceBean.getId()));
+            return getExecutionInstance(entityType, entity, instanceBean.getInstanceTime().getTime(),
+                    instanceBean.getCluster(), instanceBean.getCreationTime().getTime());
+        } catch (FalconException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    public static ExecutionInstance getExecutionInstance(EntityType entityType, Entity entity, long instanceTime,
+                                                         String cluster, long creationTime) throws StateStoreException {
+        if (entityType == EntityType.PROCESS) {
+            try {
+                return new ProcessExecutionInstance((org.apache.falcon.entity.v0.process.Process) entity,
+                        new DateTime(instanceTime), cluster, new DateTime(creationTime));
+            } catch (FalconException e) {
+                throw new StateStoreException("Entity not found");
+            }
+        } else {
+            throw new UnsupportedOperationException("Not supported for entity type " + entityType.toString());
+        }
+    }
+
+
+    public static byte[] getAwaitedPredicates(InstanceState instanceState) throws IOException {
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        ObjectOutputStream out = null;
+        try {
+            out = new ObjectOutputStream(byteArrayOutputStream);
+            out.writeInt(instanceState.getInstance().getAwaitingPredicates().size());
+            for (Predicate predicate : instanceState.getInstance().getAwaitingPredicates()) {
+                out.writeObject(predicate);
+            }
+            return byteArrayOutputStream.toByteArray();
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
new file mode 100644
index 0000000..03ada39
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store.jdbc;
+
+import org.apache.openjpa.persistence.jdbc.Index;
+
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+//SUSPEND CHECKSTYLE CHECK  LineLengthCheck
+/**
+ * Entity object which will be stored in Data Base.
+ */
+@Entity
+@NamedQueries({
+        @NamedQuery(name = "GET_ENTITY", query = "select OBJECT(a) from EntityBean a where a.id = :id"),
+        @NamedQuery(name = "GET_ENTITY_FOR_STATE", query = "select OBJECT(a) from EntityBean a where a.state = :state"),
+        @NamedQuery(name = "UPDATE_ENTITY", query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type where a.id = :id"),
+        @NamedQuery(name = "GET_ENTITIES_FOR_TYPE", query = "select OBJECT(a) from EntityBean a where a.type = :type"),
+        @NamedQuery(name = "GET_ENTITIES", query = "select OBJECT(a) from EntityBean a"),
+        @NamedQuery(name = "DELETE_ENTITY", query = "delete from EntityBean a where a.id = :id"),
+        @NamedQuery(name = "DELETE_ENTITIES", query = "delete from EntityBean")})
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+@Table(name = "ENTITIES")
+public class EntityBean {
+    @NotNull
+    @Id
+    private String id;
+
+    @Basic
+    @NotNull
+    @Column(name = "name")
+    private String name;
+
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "type")
+    private String type;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "current_state")
+    private String state;
+
+    public EntityBean() {
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getState() {
+        return state;
+    }
+
+    public void setState(String state) {
+        this.state = state;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
new file mode 100644
index 0000000..0e3dfa9
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store.jdbc;
+
+import org.apache.openjpa.persistence.jdbc.Index;
+
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Lob;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+import java.sql.Timestamp;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Instance State which will be stored in DB.
+ */
+@Entity
+@NamedQueries({
+        @NamedQuery(name = "GET_INSTANCE", query = "select OBJECT(a) from InstanceBean a where a.id = :id"),
+        @NamedQuery(name = "DELETE_INSTANCE", query = "delete from InstanceBean a where a.id = :id"),
+        @NamedQuery(name = "DELETE_INSTANCE_FOR_ENTITY", query = "delete from InstanceBean a where a.entityId = :entityId"),
+        @NamedQuery(name = "UPDATE_INSTANCE", query = "update InstanceBean a set a.cluster = :cluster, a.externalID = :externalID, a.instanceTime = :instanceTime, a.creationTime = :creationTime, a.actualEndTime = :actualEndTime, a.currentState = :currentState, a.actualStartTime = :actualStartTime, a.instanceSequence = :instanceSequence, a.awaitedPredicates = :awaitedPredicates where a.id = :id"),
+        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster"),
+        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState)"),
+        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"),
+        @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"),
+        @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"),
+        @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a")
+})
+//RESUME CHECKSTYLE CHECK  LineLengthCheck
+@Table(name = "INSTANCES")
+public class InstanceBean {
+
+    @Id
+    @NotNull
+    private String id;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "entity_id")
+    private String entityId;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "cluster")
+    private String cluster;
+
+    @Basic
+    @Index
+    @Column(name = "external_id")
+    private String externalID;
+
+    @Basic
+    @Index
+    @Column(name = "instance_time")
+    private Timestamp instanceTime;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "creation_time")
+    private Timestamp creationTime;
+
+    @Basic
+    @Column(name = "actual_start_time")
+    private Timestamp actualStartTime;
+
+    @Basic
+    @Column(name = "actual_end_time")
+    private Timestamp actualEndTime;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "current_state")
+    private String currentState;
+
+    @Basic
+    @Index
+    @NotNull
+    @Column(name = "instance_sequence")
+    private Integer instanceSequence;
+
+
+    @Column(name = "awaited_predicates", columnDefinition = "BLOB")
+    @Lob
+    private byte[] awaitedPredicates;
+
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getCluster() {
+        return cluster;
+    }
+
+    public void setCluster(String cluster) {
+        this.cluster = cluster;
+    }
+
+    public String getExternalID() {
+        return externalID;
+    }
+
+    public void setExternalID(String externalID) {
+        this.externalID = externalID;
+    }
+
+    public Timestamp getInstanceTime() {
+        return instanceTime;
+    }
+
+    public void setInstanceTime(Timestamp instanceTime) {
+        this.instanceTime = instanceTime;
+    }
+
+    public Timestamp getCreationTime() {
+        return creationTime;
+    }
+
+    public void setCreationTime(Timestamp creationTime) {
+        this.creationTime = creationTime;
+    }
+
+    public Timestamp getActualStartTime() {
+        return actualStartTime;
+    }
+
+    public void setActualStartTime(Timestamp actualStartTime) {
+        this.actualStartTime = actualStartTime;
+    }
+
+    public Timestamp getActualEndTime() {
+        return actualEndTime;
+    }
+
+    public void setActualEndTime(Timestamp actualEndTime) {
+        this.actualEndTime = actualEndTime;
+    }
+
+    public String getCurrentState() {
+        return currentState;
+    }
+
+    public void setCurrentState(String currentState) {
+        this.currentState = currentState;
+    }
+
+    public byte[] getAwaitedPredicates() {
+        return awaitedPredicates;
+    }
+
+    public void setAwaitedPredicates(byte[] awaitedPredicates) {
+        this.awaitedPredicates = awaitedPredicates;
+    }
+
+    public Integer getInstanceSequence() {
+        return instanceSequence;
+    }
+
+    public void setInstanceSequence(Integer instanceSequence) {
+        this.instanceSequence = instanceSequence;
+    }
+
+    public String getEntityId() {
+        return entityId;
+    }
+
+    public void setEntityId(String entityId) {
+        this.entityId = entityId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
new file mode 100644
index 0000000..ca65b94
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store.jdbc;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.state.EntityClusterID;
+import org.apache.falcon.state.EntityID;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.StateStore;
+import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.util.StartupProperties;
+import org.joda.time.DateTime;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Persistent Data Store for Entities and Instances.
+ */
+public final class JDBCStateStore extends AbstractStateStore {
+
+    private static final StateStore STORE = new JDBCStateStore();
+    private static final String DEBUG = "debug";
+
+    private JDBCStateStore() {}
+
+    public static StateStore get() {
+        return STORE;
+    }
+
+    @Override
+    public void clear() throws StateStoreException {
+        if (!isModeDebug()) {
+            throw new UnsupportedOperationException("Clear Method not supported");
+        }
+        deleteExecutionInstances();
+        deleteEntities();
+    }
+
+    @Override
+    public void putEntity(EntityState entityState) throws StateStoreException {
+        EntityID entityID = new EntityID(entityState.getEntity());
+        String key = entityID.getKey();
+        if (entityExists(entityID)) {
+            throw new StateStoreException("Entity with key, " + key + " already exists.");
+        }
+        EntityBean entityBean = BeanMapperUtil.convertToEntityBean(entityState);
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        entityManager.persist(entityBean);
+        commitAndCloseTransaction(entityManager);
+    }
+
+
+    @Override
+    public EntityState getEntity(EntityID entityID) throws StateStoreException {
+        EntityState entityState = getEntityByKey(entityID);
+        if (entityState == null) {
+            throw new StateStoreException("Entity with key, " + entityID + " does not exist.");
+        }
+        return entityState;
+    }
+
+    private EntityState getEntityByKey(EntityID id) throws StateStoreException {
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_ENTITY");
+        q.setParameter("id", id.getKey());
+        List result = q.getResultList();
+        if (result.isEmpty()) {
+            return null;
+        }
+        entityManager.close();
+        return BeanMapperUtil.convertToEntityState((EntityBean) result.get(0));
+    }
+
+    @Override
+    public boolean entityExists(EntityID entityID) throws StateStoreException {
+        return getEntityByKey(entityID) == null ? false : true;
+    }
+
+    @Override
+    public Collection<Entity> getEntities(EntityState.STATE state) throws StateStoreException {
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_ENTITY_FOR_STATE");
+        q.setParameter("state", state.toString());
+        List result = q.getResultList();
+        entityManager.close();
+        return BeanMapperUtil.convertToEntities(result);
+    }
+
+    @Override
+    public Collection<EntityState> getAllEntities() throws StateStoreException {
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_ENTITIES");
+        List result = q.getResultList();
+        entityManager.close();
+        return BeanMapperUtil.convertToEntityState(result);
+    }
+
+    @Override
+    public void updateEntity(EntityState entityState) throws StateStoreException {
+        EntityID entityID = new EntityID(entityState.getEntity());
+        if (!entityExists(entityID)) {
+            throw new StateStoreException("Entity with key, " + entityID + " doesn't exists.");
+        }
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery("UPDATE_ENTITY");
+        q.setParameter("id", entityID.getKey());
+        if (entityState.getCurrentState() != null) {
+            q.setParameter("state", entityState.getCurrentState().toString());
+        }
+        q.setParameter("type", entityState.getEntity().getEntityType().toString());
+        q.setParameter("name", entityState.getEntity().getName());
+        q.executeUpdate();
+        commitAndCloseTransaction(entityManager);
+    }
+
+    @Override
+    public void deleteEntity(EntityID entityID) throws StateStoreException {
+        if (!entityExists(entityID)) {
+            throw new StateStoreException("Entity with key, " + entityID.getKey() + " does not exist.");
+        }
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery("DELETE_ENTITY");
+        q.setParameter("id", entityID.getKey());
+        q.executeUpdate();
+        commitAndCloseTransaction(entityManager);
+    }
+
+    @Override
+    public void deleteEntities() throws StateStoreException {
+        if (!isModeDebug()) {
+            throw new UnsupportedOperationException("Delete Entities Table not supported");
+        }
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery("DELETE_ENTITIES");
+        q.executeUpdate();
+        commitAndCloseTransaction(entityManager);
+    }
+
+    @Override
+    public void putExecutionInstance(InstanceState instanceState) throws StateStoreException {
+        InstanceID instanceID = new InstanceID(instanceState.getInstance());
+        if (executionInstanceExists(instanceID)) {
+            throw new StateStoreException("Instance with key, " + instanceID + " already exists.");
+        }
+        try {
+            InstanceBean instanceBean = BeanMapperUtil.convertToInstanceBean(instanceState);
+            EntityManager entityManager = getEntityManager();
+            beginTransaction(entityManager);
+            entityManager.persist(instanceBean);
+            commitAndCloseTransaction(entityManager);
+        } catch (IOException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    @Override
+    public InstanceState getExecutionInstance(InstanceID instanceId) throws StateStoreException {
+        InstanceState instanceState = getExecutionInstanceByKey(instanceId);
+        if (instanceState == null) {
+            throw new StateStoreException("Instance with key, " + instanceId.toString() + " does not exist.");
+        }
+        return instanceState;
+    }
+
+    private InstanceState getExecutionInstanceByKey(ID instanceKey) throws StateStoreException {
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_INSTANCE");
+        q.setParameter("id", instanceKey.toString());
+        List result = q.getResultList();
+        entityManager.close();
+        if (result.isEmpty()) {
+            return null;
+        }
+        try {
+            InstanceBean instanceBean = (InstanceBean)(result.get(0));
+            return BeanMapperUtil.convertToInstanceState(instanceBean);
+        } catch (IOException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    @Override
+    public void updateExecutionInstance(InstanceState instanceState) throws StateStoreException {
+        InstanceID id = new InstanceID(instanceState.getInstance());
+        String key = id.toString();
+        if (!executionInstanceExists(id)) {
+            throw new StateStoreException("Instance with key, " + key + " does not exist.");
+        }
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery("UPDATE_INSTANCE");
+        ExecutionInstance instance = instanceState.getInstance();
+        q.setParameter("id", key);
+        q.setParameter("cluster", instance.getCluster());
+        q.setParameter("externalID", instance.getExternalID());
+        q.setParameter("instanceTime", new Timestamp(instance.getInstanceTime().getMillis()));
+        q.setParameter("creationTime", new Timestamp(instance.getCreationTime().getMillis()));
+        if (instance.getActualEnd() != null) {
+            q.setParameter("actualEndTime", new Timestamp(instance.getActualEnd().getMillis()));
+        }
+        q.setParameter("currentState", instanceState.getCurrentState().toString());
+        if (instance.getActualStart() != null) {
+            q.setParameter("actualStartTime", new Timestamp(instance.getActualStart().getMillis()));
+        }
+        q.setParameter("instanceSequence", instance.getInstanceSequence());
+        if (instanceState.getInstance().getAwaitingPredicates() != null
+                && !instanceState.getInstance().getAwaitingPredicates().isEmpty()) {
+            try {
+                q.setParameter("awaitedPredicates", BeanMapperUtil.getAwaitedPredicates(instanceState));
+            } catch (IOException e) {
+                throw new StateStoreException(e);
+            }
+        }
+        q.executeUpdate();
+        commitAndCloseTransaction(entityManager);
+    }
+
+    @Override
+    public Collection<InstanceState> getAllExecutionInstances(Entity entity, String cluster)
+        throws StateStoreException {
+        EntityClusterID id = new EntityClusterID(entity, cluster);
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_CLUSTER");
+        q.setParameter("entityId", id.getEntityID().getKey());
+        q.setParameter("cluster", cluster);
+        List result  = q.getResultList();
+        entityManager.close();
+        try {
+            return BeanMapperUtil.convertToInstanceState(result);
+        } catch (IOException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    @Override
+    public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
+                                                           Collection<InstanceState.STATE> states)
+        throws StateStoreException {
+        EntityClusterID entityClusterID = new EntityClusterID(entity, cluster);
+        String entityKey = entityClusterID.getEntityID().getKey();
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES");
+        q.setParameter("entityId", entityKey);
+        q.setParameter("cluster", cluster);
+        List<String> instanceStates = new ArrayList<>();
+        for (InstanceState.STATE state : states) {
+            instanceStates.add(state.toString());
+        }
+        q.setParameter("currentState", instanceStates);
+        List result  = q.getResultList();
+        entityManager.close();
+        try {
+            return BeanMapperUtil.convertToInstanceState(result);
+        } catch (IOException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    @Override
+    public Collection<InstanceState> getExecutionInstances(EntityClusterID id,
+                                                           Collection<InstanceState.STATE> states)
+        throws StateStoreException {
+        String entityKey = id.getEntityID().getKey();
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_FOR_STATES");
+        q.setParameter("entityId", entityKey);
+        List<String> instanceStates = new ArrayList<>();
+        for (InstanceState.STATE state : states) {
+            instanceStates.add(state.toString());
+        }
+        q.setParameter("currentState", instanceStates);
+        List result  = q.getResultList();
+        entityManager.close();
+        try {
+            return BeanMapperUtil.convertToInstanceState(result);
+        } catch (IOException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    @Override
+    public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
+                                                           Collection<InstanceState.STATE> states, DateTime start,
+                                                           DateTime end) throws StateStoreException {
+        String entityKey = new EntityClusterID(entity, cluster).getEntityID().getKey();
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_FOR_STATES_WITH_RANGE");
+        q.setParameter("entityId", entityKey);
+        List<String> instanceStates = new ArrayList<>();
+        for (InstanceState.STATE state : states) {
+            instanceStates.add(state.toString());
+        }
+        q.setParameter("currentState", instanceStates);
+        q.setParameter("startTime", new Timestamp(start.getMillis()));
+        q.setParameter("endTime", new Timestamp(end.getMillis()));
+        List result  = q.getResultList();
+        entityManager.close();
+        try {
+            return BeanMapperUtil.convertToInstanceState(result);
+        } catch (IOException e) {
+            throw new StateStoreException(e);
+        }
+    }
+
+    @Override
+    public InstanceState getLastExecutionInstance(Entity entity, String cluster) throws StateStoreException {
+        String key = new EntityClusterID(entity, cluster).getEntityID().getKey();
+        EntityManager entityManager = getEntityManager();
+        Query q = entityManager.createNamedQuery("GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER");
+        q.setParameter("entityId", key);
+        q.setParameter("cluster", cluster);
+        q.setMaxResults(1);
+        List result = q.getResultList();
+        entityManager.close();
+        if (!result.isEmpty()) {
+            try {
+                return BeanMapperUtil.convertToInstanceState((InstanceBean) result.get(0));
+            } catch (IOException e) {
+                throw new StateStoreException(e);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public boolean executionInstanceExists(InstanceID instanceKey) throws StateStoreException {
+        return getExecutionInstanceByKey(instanceKey) == null ? false : true;
+    }
+
+    @Override
+    public void deleteExecutionInstance(InstanceID instanceID) throws StateStoreException {
+        String instanceKey = instanceID.toString();
+        if (!executionInstanceExists(instanceID)) {
+            throw new StateStoreException("Instance with key, " + instanceKey + " does not exist.");
+        }
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery("DELETE_INSTANCE");
+        q.setParameter("id", instanceKey);
+        q.executeUpdate();
+        commitAndCloseTransaction(entityManager);
+    }
+
+    @Override
+    public void deleteExecutionInstances(EntityID entityID) {
+        String entityKey = entityID.getKey();
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery("DELETE_INSTANCE_FOR_ENTITY");
+        q.setParameter("entityId", entityKey);
+        q.executeUpdate();
+        commitAndCloseTransaction(entityManager);
+    }
+
+    @Override
+    public void deleteExecutionInstances() {
+        if (!isModeDebug()) {
+            throw new UnsupportedOperationException("Delete Instances Table not supported");
+        }
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query q = entityManager.createNamedQuery("DELETE_INSTANCES_TABLE");
+        q.executeUpdate();
+        commitAndCloseTransaction(entityManager);
+    }
+
+    // Debug enabled for test cases
+    private boolean isModeDebug() {
+        return DEBUG.equals(StartupProperties.get().getProperty("domain")) ? true : false;
+    }
+
+    private void commitAndCloseTransaction(EntityManager entityManager) {
+        entityManager.getTransaction().commit();
+        entityManager.close();
+    }
+
+    private void beginTransaction(EntityManager entityManager) {
+        entityManager.getTransaction().begin();
+    }
+
+    private EntityManager getEntityManager() {
+        return FalconJPAService.get().getEntityManager();
+    }
+
+}


[4/4] falcon git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/falcon

Posted by pa...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/falcon


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2bf90130
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2bf90130
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2bf90130

Branch: refs/heads/master
Commit: 2bf90130df482e5501ba40b36a58dade8df08e64
Parents: 6d31385 f982f86
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Thu Nov 26 15:56:49 2015 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Thu Nov 26 15:56:49 2015 +0530

----------------------------------------------------------------------
 .reviewboardrc | 24 ++++++++++++++++++++++++
 CHANGES.txt    |  2 ++
 2 files changed, 26 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/2bf90130/CHANGES.txt
----------------------------------------------------------------------