You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2015/11/26 11:28:25 UTC
[1/4] falcon git commit: FALCON-1234 State Store for instances
scheduled by Falcon (Pavan Kolamuri)
Repository: falcon
Updated Branches:
refs/heads/master f982f86c7 -> 2bf90130d
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/resources/startup.properties
----------------------------------------------------------------------
diff --git a/scheduler/src/test/resources/startup.properties b/scheduler/src/test/resources/startup.properties
new file mode 100644
index 0000000..2e938ee
--- /dev/null
+++ b/scheduler/src/test/resources/startup.properties
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+*.domain=debug
+
+######### Implementation classes #########
+## DONT MODIFY UNLESS SURE ABOUT CHANGE ##
+
+*.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine
+*.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder
+*.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder
+*.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager
+*.ConfigSyncService.impl=org.apache.falcon.resource.ConfigSyncService
+*.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager
+*.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService
+
+##### Falcon Services #####
+*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
+ org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
+ org.apache.falcon.service.ProcessSubscriberService,\
+ org.apache.falcon.entity.store.ConfigurationStore,\
+ org.apache.falcon.rerun.service.RetryService,\
+ org.apache.falcon.rerun.service.LateRunService,\
+ org.apache.falcon.notification.service.impl.JobCompletionService,\
+ org.apache.falcon.notification.service.impl.SchedulerService,\
+ org.apache.falcon.notification.service.impl.AlarmService,\
+ org.apache.falcon.notification.service.impl.DataAvailabilityService,\
+ org.apache.falcon.execution.FalconExecutionService,\
+ org.apache.falcon.state.store.service.FalconJPAService
+
+##### Falcon Configuration Store Change listeners #####
+*.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
+ org.apache.falcon.entity.ColoClusterRelation,\
+ org.apache.falcon.group.FeedGroupMap,\
+ org.apache.falcon.entity.store.FeedLocationStore
+
+##### JMS MQ Broker Implementation class #####
+*.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
+
+
+######### System startup parameters #########
+
+# Location to store user entity configurations
+debug.config.store.uri=file://${user.dir}/target/store
+debug.config.store.persist=false
+debug.config.oozie.conf.uri=${user.dir}/target/oozie
+debug.system.lib.location=${system.lib.location}
+debug.broker.url=vm://localhost
+debug.retry.recorder.path=${user.dir}/target/retry
+debug.libext.feed.retention.paths=${falcon.libext}
+debug.libext.feed.replication.paths=${falcon.libext}
+debug.libext.process.paths=${falcon.libext}
+
+*.falcon.cleanup.service.frequency=minutes(5)
+
+
+######### Properties for configuring JMS provider - activemq #########
+# Default Active MQ url
+*.broker.url=tcp://localhost:61616
+
+# default time-to-live for a JMS message 3 days (time in minutes)
+*.broker.ttlInMins=4320
+*.entity.topic=FALCON.ENTITY.TOPIC
+*.max.retry.failure.count=1
+*.retry.recorder.path=${user.dir}/logs/retry
+
+######### Properties for configuring iMon client and metric #########
+*.internal.queue.size=1000
+
+
+##### List of shared libraries for Falcon workflows #####
+*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3
+
+######### Authentication Properties #########
+
+# Authentication type must be specified: simple|kerberos
+*.falcon.authentication.type=simple
+
+##### Service Configuration
+
+# Indicates the Kerberos principal to be used in Falcon Service.
+*.falcon.service.authentication.kerberos.principal=
+
+# Location of the keytab file with the credentials for the Service principal.
+*.falcon.service.authentication.kerberos.keytab=
+
+# name node principal to talk to config store
+*.dfs.namenode.kerberos.principal=
+
+##### SPNEGO Configuration
+
+# Authentication type must be specified: simple|kerberos|<class>
+# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility
+*.falcon.http.authentication.type=simple
+
+# Indicates how long (in seconds) an authentication token is valid before it has to be renewed.
+*.falcon.http.authentication.token.validity=36000
+
+# The signature secret for signing the authentication tokens.
+*.falcon.http.authentication.signature.secret=falcon
+
+# The domain to use for the HTTP cookie that stores the authentication token.
+*.falcon.http.authentication.cookie.domain=
+
+# Indicates if anonymous requests are allowed when using 'simple' authentication.
+*.falcon.http.authentication.simple.anonymous.allowed=false
+
+# Indicates the Kerberos principal to be used for HTTP endpoint.
+# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
+*.falcon.http.authentication.kerberos.principal=
+
+# Location of the keytab file with the credentials for the HTTP principal.
+*.falcon.http.authentication.kerberos.keytab=
+
+# The kerberos names rules is to resolve kerberos principal names, refer to Hadoop's KerberosName for more details.
+*.falcon.http.authentication.kerberos.name.rules=DEFAULT
+
+# Comma separated list of black listed users
+*.falcon.http.authentication.blacklisted.users=
+
+
+######## StateStore Properties #####
+*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true
+*.falcon.statestore.jdbc.username=sa
+*.falcon.statestore.jdbc.password=
+*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+# Maximum number of active connections that can be allocated from this pool at the same time.
+*.falcon.statestore.pool.max.active.conn=10
+*.falcon.statestore.connection.properties=
+# Indicates the interval (in milliseconds) between eviction runs.
+*.falcon.statestore.validate.db.connection.eviction.interval=300000
+# The number of objects to examine during each run of the idle object evictor thread.
+*.falcon.statestore.validate.db.connection.eviction.num=10
+# Creates Falcon DB.
+# If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+# If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+*.falcon.statestore.create.db.schema=true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/src/bin/falcon-db.sh
----------------------------------------------------------------------
diff --git a/src/bin/falcon-db.sh b/src/bin/falcon-db.sh
new file mode 100644
index 0000000..415fd5d
--- /dev/null
+++ b/src/bin/falcon-db.sh
@@ -0,0 +1,49 @@
+#!/bin/bash
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License. See accompanying LICENSE file.
+#
+
+# resolve links - $0 may be a softlink
+PRG="${0}"
+
+while [ -h "${PRG}" ]; do
+ ls=`ls -ld "${PRG}"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "${PRG}"`/"$link"
+ fi
+done
+
+BASEDIR=`dirname ${PRG}`
+BASEDIR=`cd ${BASEDIR}/..;pwd`
+
+. ${BASEDIR}/bin/falcon-config.sh 'server' falcon
+
+if test -z ${JAVA_HOME}
+then
+ JAVA_BIN=java
+else
+ JAVA_BIN=${JAVA_HOME}/bin/java
+fi
+
+while [[ ${1} =~ ^\-D ]]; do
+ JAVA_PROPERTIES="${JAVA_PROPERTIES} ${1}"
+ shift
+done
+
+# set the client class path
+FALCONCPPATH="$FALCONCPPATH:${BASEDIR}/client/lib/*"
+
+${JAVA_BIN} ${JAVA_PROPERTIES} -cp ${FALCONCPPATH} org.apache.falcon.tools.FalconStateStoreDBCLI "${@}"
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index ce6e91f..5ddba01 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -57,7 +57,8 @@
# org.apache.falcon.notification.service.impl.SchedulerService,\
# org.apache.falcon.notification.service.impl.AlarmService,\
# org.apache.falcon.notification.service.impl.DataAvailabilityService,\
-# org.apache.falcon.execution.FalconExecutionService
+# org.apache.falcon.execution.FalconExecutionService,\
+# org.apache.falcon.state.store.service.FalconJPAService
##### Prism Services #####
prism.application.services=org.apache.falcon.service.LifecyclePolicyMap,\
@@ -262,3 +263,22 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
# Setting monitoring plugin, if SMTP parameters is defined
#*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\
# org.apache.falcon.plugin.EmailNotificationPlugin
+
+######### StateStore Properties #####
+#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+#*.falcon.statestore.jdbc.url=jdbc:derby:data/statestore.db;create=true
+#*.falcon.statestore.jdbc.username=sa
+#*.falcon.statestore.jdbc.password=
+#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+## Maximum number of active connections that can be allocated from this pool at the same time.
+#*.falcon.statestore.pool.max.active.conn=10
+#*.falcon.statestore.connection.properties=
+## Indicates the interval (in milliseconds) between eviction runs.
+#*.falcon.statestore.validate.db.connection.eviction.interval=300000
+## The number of objects to examine during each run of the idle object evictor thread.
+#*.falcon.statestore.validate.db.connection.eviction.num=10
+## Creates Falcon DB.
+## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+#*.falcon.statestore.create.db.schema=true
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/src/main/assemblies/distributed-package.xml
----------------------------------------------------------------------
diff --git a/src/main/assemblies/distributed-package.xml b/src/main/assemblies/distributed-package.xml
index 794eaef..ebd1745 100644
--- a/src/main/assemblies/distributed-package.xml
+++ b/src/main/assemblies/distributed-package.xml
@@ -41,6 +41,11 @@
</fileSet>
<fileSet>
+ <directory>scheduler/target/dependency</directory>
+ <outputDirectory>client/lib</outputDirectory>
+ </fileSet>
+
+ <fileSet>
<directory>oozie-el-extensions/target/dependency</directory>
<outputDirectory>oozie/libext</outputDirectory>
</fileSet>
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/src/main/assemblies/standalone-package.xml
----------------------------------------------------------------------
diff --git a/src/main/assemblies/standalone-package.xml b/src/main/assemblies/standalone-package.xml
index fcff8d7..b88aec3 100644
--- a/src/main/assemblies/standalone-package.xml
+++ b/src/main/assemblies/standalone-package.xml
@@ -40,6 +40,11 @@
</fileSet>
<fileSet>
+ <directory>scheduler/target/dependency</directory>
+ <outputDirectory>client/lib</outputDirectory>
+ </fileSet>
+
+ <fileSet>
<directory>oozie-el-extensions/target/dependency</directory>
<outputDirectory>oozie/libext</outputDirectory>
</fileSet>
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/unit/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/unit/src/main/resources/startup.properties b/unit/src/main/resources/startup.properties
index fe6f430..4576e0b 100644
--- a/unit/src/main/resources/startup.properties
+++ b/unit/src/main/resources/startup.properties
@@ -127,3 +127,21 @@ debug.libext.process.paths=${falcon.libext}
# Comma separated list of black listed users
*.falcon.http.authentication.blacklisted.users=
+######### StateStore Properties #####
+#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+#*.falcon.statestore.jdbc.url=jdbc:derby:data/statestore.db;create=true
+#*.falcon.statestore.jdbc.username=sa
+#*.falcon.statestore.jdbc.password=
+#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+## Maximum number of active connections that can be allocated from this pool at the same time.
+#*.falcon.statestore.pool.max.active.conn=10
+#*.falcon.statestore.connection.properties=
+## Indicates the interval (in milliseconds) between eviction runs.
+#*.falcon.statestore.validate.db.connection.eviction.interval=300000
+## The number of objects to examine during each run of the idle object evictor thread.
+#*.falcon.statestore.validate.db.connection.eviction.num=10
+## Creates Falcon DB.
+## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+#*.falcon.statestore.create.db.schema=true
[2/4] falcon git commit: FALCON-1234 State Store for instances
scheduled by Falcon (Pavan Kolamuri)
Posted by pa...@apache.org.
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java b/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
new file mode 100644
index 0000000..72d1aba
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store.service;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.service.FalconService;
+import org.apache.falcon.state.store.jdbc.EntityBean;
+import org.apache.falcon.state.store.jdbc.InstanceBean;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import java.text.MessageFormat;
+import java.util.Properties;
+
+/**
+ * Service that manages JPA.
+ */
+public final class FalconJPAService implements FalconService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FalconJPAService.class);
+ public static final String PREFIX = "falcon.statestore.";
+
+ public static final String DB_SCHEMA = PREFIX + "schema.name";
+ public static final String URL = PREFIX + "jdbc.url";
+ public static final String DRIVER = PREFIX + "jdbc.driver";
+ public static final String USERNAME = PREFIX + "jdbc.username";
+ public static final String PASSWORD = PREFIX + "jdbc.password";
+ public static final String CONN_DATA_SOURCE = PREFIX + "connection.data.source";
+ public static final String CONN_PROPERTIES = PREFIX + "connection.properties";
+ public static final String MAX_ACTIVE_CONN = PREFIX + "pool.max.active.conn";
+ public static final String CREATE_DB_SCHEMA = PREFIX + "create.db.schema";
+ public static final String VALIDATE_DB_CONN = PREFIX + "validate.db.connection";
+ public static final String VALIDATE_DB_CONN_EVICTION_INTERVAL = PREFIX + "validate.db.connection.eviction.interval";
+ public static final String VALIDATE_DB_CONN_EVICTION_NUM = PREFIX + "validate.db.connection.eviction.num";
+
+ private EntityManagerFactory entityManagerFactory;
+ // Persistent Unit which is defined in persistence.xml
+ private String persistenceUnit;
+ private static final FalconJPAService FALCON_JPA_SERVICE = new FalconJPAService();
+
+ private FalconJPAService() {
+ }
+
+ public static FalconJPAService get() {
+ return FALCON_JPA_SERVICE;
+ }
+
+ public EntityManagerFactory getEntityManagerFactory() {
+ return entityManagerFactory;
+ }
+
+ public void setPersistenceUnit(String dbType) {
+ if (StringUtils.isEmpty(dbType)) {
+ throw new IllegalArgumentException(" DB type cannot be null or empty");
+ }
+ dbType = dbType.split(":")[0];
+ this.persistenceUnit = "falcon-" + dbType;
+ }
+
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public void init() throws FalconException {
+ Properties props = getPropsforStore();
+ entityManagerFactory = Persistence.
+ createEntityManagerFactory(persistenceUnit, props);
+ EntityManager entityManager = getEntityManager();
+ entityManager.find(EntityBean.class, 1);
+ entityManager.find(InstanceBean.class, 1);
+ LOG.info("All entities initialized");
+
+ // need to use a pseudo no-op transaction so all entities, datasource
+ // and connection pool are initialized one time only
+ entityManager.getTransaction().begin();
+ OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) entityManagerFactory;
+ // Mask the password with '***'
+ String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,");
+ LOG.info("JPA configuration: {0}", logMsg);
+ entityManager.getTransaction().commit();
+ entityManager.close();
+ }
+
+ private Properties getPropsforStore() throws FalconException {
+ String dbSchema = StartupProperties.get().getProperty(DB_SCHEMA);
+ String url = StartupProperties.get().getProperty(URL);
+ String driver = StartupProperties.get().getProperty(DRIVER);
+ String user = StartupProperties.get().getProperty(USERNAME);
+ String password = StartupProperties.get().getProperty(PASSWORD).trim();
+ String maxConn = StartupProperties.get().getProperty(MAX_ACTIVE_CONN).trim();
+ String dataSource = StartupProperties.get().getProperty(CONN_DATA_SOURCE);
+ String connPropsConfig = StartupProperties.get().getProperty(CONN_PROPERTIES);
+ boolean autoSchemaCreation = Boolean.parseBoolean(StartupProperties.get().getProperty(CREATE_DB_SCHEMA,
+ "false"));
+ boolean validateDbConn = Boolean.parseBoolean(StartupProperties.get().getProperty(VALIDATE_DB_CONN, "true"));
+ String evictionInterval = StartupProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_INTERVAL).trim();
+ String evictionNum = StartupProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_NUM).trim();
+
+ if (!url.startsWith("jdbc:")) {
+ throw new FalconException("invalid JDBC URL, must start with 'jdbc:'" + url);
+ }
+ String dbType = url.substring("jdbc:".length());
+ if (dbType.indexOf(":") <= 0) {
+ throw new FalconException("invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'" + url);
+ }
+ setPersistenceUnit(dbType);
+ String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}";
+ connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn);
+ Properties props = new Properties();
+ if (autoSchemaCreation) {
+ connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
+ props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");
+ } else if (validateDbConn) {
+ // validation can be done only if the schema already exist, else a
+ // connection cannot be obtained to create the schema.
+ String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval;
+ String num = "numTestsPerEvictionRun=" + evictionNum;
+ connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num;
+ connProps += ",ValidationQuery=select count(*) from ENTITIES";
+ connProps = MessageFormat.format(connProps, dbSchema);
+ } else {
+ connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
+ }
+ if (connPropsConfig != null) {
+ connProps += "," + connPropsConfig;
+ }
+ props.setProperty("openjpa.ConnectionProperties", connProps);
+ props.setProperty("openjpa.ConnectionDriverName", dataSource);
+ return props;
+ }
+
+ @Override
+ public void destroy() throws FalconException {
+ if (entityManagerFactory.isOpen()) {
+ entityManagerFactory.close();
+ }
+ }
+
+
+ /**
+ * Return an EntityManager. Used by the StoreService.
+ *
+ * @return an entity manager
+ */
+ public EntityManager getEntityManager() {
+ return getEntityManagerFactory().createEntityManager();
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
new file mode 100644
index 0000000..381b0b3
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.tools;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.falcon.cli.CLIParser;
+import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.util.BuildProperties;
+import org.apache.falcon.util.StartupProperties;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Command Line utility for Table Creation, Update.
+ */
+public class FalconStateStoreDBCLI {
+ public static final String HELP_CMD = "help";
+ public static final String VERSION_CMD = "version";
+ public static final String CREATE_CMD = "create";
+ public static final String SQL_FILE_OPT = "sqlfile";
+ public static final String RUN_OPT = "run";
+ public static final String UPGRADE_CMD = "upgrade";
+
+ // Represents whether DB instance exists or not.
+ private boolean instanceExists;
+ private static final String[] FALCON_HELP = {"Falcon DB initialization tool currently supports Derby DB"};
+
+ public static void main(String[] args) {
+ new FalconStateStoreDBCLI().run(args);
+ }
+
+ public FalconStateStoreDBCLI() {
+ instanceExists = false;
+ }
+
+ protected Options getOptions() {
+ Option sqlfile = new Option(SQL_FILE_OPT, true,
+ "Generate SQL script instead of creating/upgrading the DB schema");
+ Option run = new Option(RUN_OPT, false, "Confirmation option regarding DB schema creation/upgrade");
+ Options options = new Options();
+ options.addOption(sqlfile);
+ options.addOption(run);
+ return options;
+ }
+
+ public synchronized int run(String[] args) {
+ if (instanceExists) {
+ throw new IllegalStateException("CLI instance already used");
+ }
+ instanceExists = true;
+
+ CLIParser parser = new CLIParser("falcondb", FALCON_HELP);
+ parser.addCommand(HELP_CMD, "", "Display usage for all commands or specified command", new Options(), false);
+ parser.addCommand(VERSION_CMD, "", "Show Falcon DB version information", new Options(), false);
+ parser.addCommand(CREATE_CMD, "", "Create Falcon DB schema", getOptions(), false);
+ parser.addCommand(UPGRADE_CMD, "", "Upgrade Falcon DB schema", getOptions(), false);
+
+ try {
+ CLIParser.Command command = parser.parse(args);
+ if (command.getName().equals(HELP_CMD)) {
+ parser.showHelp();
+ } else if (command.getName().equals(VERSION_CMD)) {
+ showVersion();
+ } else {
+ if (!command.getCommandLine().hasOption(SQL_FILE_OPT)
+ && !command.getCommandLine().hasOption(RUN_OPT)) {
+ throw new Exception("'-sqlfile <FILE>' or '-run' options must be specified");
+ }
+ CommandLine commandLine = command.getCommandLine();
+ String sqlFile = (commandLine.hasOption(SQL_FILE_OPT))
+ ? commandLine.getOptionValue(SQL_FILE_OPT)
+ : File.createTempFile("falcondb-", ".sql").getAbsolutePath();
+ boolean run = commandLine.hasOption(RUN_OPT);
+ if (command.getName().equals(CREATE_CMD)) {
+ createDB(sqlFile, run);
+ } else if (command.getName().equals(UPGRADE_CMD)) {
+ upgradeDB(sqlFile, run);
+ }
+ System.out.println("The SQL commands have been written to: " + sqlFile);
+ if (!run) {
+ System.out.println("WARN: The SQL commands have NOT been executed, you must use the '-run' option");
+ }
+ }
+ return 0;
+ } catch (ParseException ex) {
+ System.err.println("Invalid sub-command: " + ex.getMessage());
+ System.err.println();
+ System.err.println(parser.shortHelp());
+ return 1;
+ } catch (Exception ex) {
+ System.err.println();
+ System.err.println("Error: " + ex.getMessage());
+ System.err.println();
+ System.err.println("Stack trace for the error was (for debug purposes):");
+ System.err.println("--------------------------------------");
+ ex.printStackTrace(System.err);
+ System.err.println("--------------------------------------");
+ System.err.println();
+ return 1;
+ }
+ }
+
+ private void upgradeDB(String sqlFile, boolean run) throws Exception {
+ validateConnection();
+ if (!checkDBExists()) {
+ throw new Exception("Falcon DB doesn't exist");
+ }
+ String falconVersion = BuildProperties.get().getProperty("project.version");
+ String dbVersion = getFalconDBVersion();
+ if (dbVersion.compareTo(falconVersion) >= 0) {
+ System.out.println("Falcon DB already upgraded to Falcon version '" + falconVersion + "'");
+ return;
+ }
+
+ createUpgradeDB(sqlFile, run, false);
+ upgradeFalconDBVersion(sqlFile, run, falconVersion);
+
+ // any post upgrade tasks
+ if (run) {
+ System.out.println("Falcon DB has been upgraded to Falcon version '" + falconVersion + "'");
+ }
+ }
+
+
+ private void upgradeFalconDBVersion(String sqlFile, boolean run, String version) throws Exception {
+ String updateDBVersion = "update FALCON_DB_PROPS set data='" + version + "' where name='db.version'";
+ PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
+ writer.println();
+ writer.println(updateDBVersion);
+ writer.close();
+ System.out.println("Upgrade db.version in FALCON_DB_PROPS table to " + version);
+ if (run) {
+ Connection conn = createConnection();
+ Statement st = null;
+ try {
+ conn.setAutoCommit(true);
+ st = conn.createStatement();
+ st.executeUpdate(updateDBVersion);
+ st.close();
+ } catch (Exception ex) {
+ throw new Exception("Could not upgrade db.version in FALCON_DB_PROPS table: " + ex.toString(), ex);
+ } finally {
+ closeStatement(st);
+ conn.close();
+ }
+ }
+ System.out.println("DONE");
+ }
+
+ private static final String GET_FALCON_DB_VERSION = "select data from FALCON_DB_PROPS where name = 'db.version'";
+
+ private String getFalconDBVersion() throws Exception {
+ String version;
+ System.out.println("Get Falcon DB version");
+ Connection conn = createConnection();
+ Statement st = null;
+ ResultSet rs = null;
+ try {
+ st = conn.createStatement();
+ rs = st.executeQuery(GET_FALCON_DB_VERSION);
+ if (rs.next()) {
+ version = rs.getString(1);
+ } else {
+ throw new Exception("ERROR: Could not find Falcon DB 'db.version' in FALCON_DB_PROPS table");
+ }
+ } catch (Exception ex) {
+ throw new Exception("ERROR: Could not query FALCON_DB_PROPS table: " + ex.toString(), ex);
+ } finally {
+ closeResultSet(rs);
+ closeStatement(st);
+ conn.close();
+ }
+ System.out.println("DONE");
+ return version;
+ }
+
+
+ private Map<String, String> getJdbcConf() throws Exception {
+ Map<String, String> jdbcConf = new HashMap<String, String>();
+ jdbcConf.put("driver", StartupProperties.get().getProperty(FalconJPAService.DRIVER));
+ String url = StartupProperties.get().getProperty(FalconJPAService.URL);
+ jdbcConf.put("url", url);
+ jdbcConf.put("user", StartupProperties.get().getProperty(FalconJPAService.USERNAME));
+ jdbcConf.put("password", StartupProperties.get().getProperty(FalconJPAService.PASSWORD));
+ String dbType = url.substring("jdbc:".length());
+ if (dbType.indexOf(":") <= 0) {
+ throw new RuntimeException("Invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");
+ }
+ dbType = dbType.substring(0, dbType.indexOf(":"));
+ jdbcConf.put("dbtype", dbType);
+ return jdbcConf;
+ }
+
+ private String[] createMappingToolArguments(String sqlFile) throws Exception {
+ Map<String, String> conf = getJdbcConf();
+ List<String> args = new ArrayList<String>();
+ args.add("-schemaAction");
+ args.add("add");
+ args.add("-p");
+ args.add("persistence.xml#falcon-" + conf.get("dbtype"));
+ args.add("-connectionDriverName");
+ args.add(conf.get("driver"));
+ args.add("-connectionURL");
+ args.add(conf.get("url"));
+ args.add("-connectionUserName");
+ args.add(conf.get("user"));
+ args.add("-connectionPassword");
+ args.add(conf.get("password"));
+ if (sqlFile != null) {
+ args.add("-sqlFile");
+ args.add(sqlFile);
+ }
+ args.add("-indexes");
+ args.add("true");
+ args.add("org.apache.falcon.state.store.jdbc.EntityBean");
+ args.add("org.apache.falcon.state.store.jdbc.InstanceBean");
+ return args.toArray(new String[args.size()]);
+ }
+
+ private void createDB(String sqlFile, boolean run) throws Exception {
+ validateConnection();
+ if (checkDBExists()) {
+ return;
+ }
+
+ verifyFalconPropsTable(false);
+ createUpgradeDB(sqlFile, run, true);
+ createFalconPropsTable(sqlFile, run, BuildProperties.get().getProperty("project.version"));
+ if (run) {
+ System.out.println("Falcon DB has been created for Falcon version '"
+ + BuildProperties.get().getProperty("project.version") + "'");
+ }
+ }
+
+ private static final String CREATE_FALCON_DB_PROPS =
+ "create table FALCON_DB_PROPS (name varchar(100), data varchar(100))";
+
+ private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception {
+ String insertDbVerion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";
+
+ PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
+ writer.println();
+ writer.println(CREATE_FALCON_DB_PROPS);
+ writer.println(insertDbVerion);
+ writer.close();
+ System.out.println("Create FALCON_DB_PROPS table");
+ if (run) {
+ Connection conn = createConnection();
+ Statement st = null;
+ try {
+ conn.setAutoCommit(true);
+ st = conn.createStatement();
+ st.executeUpdate(CREATE_FALCON_DB_PROPS);
+ st.executeUpdate(insertDbVerion);
+ st.close();
+ } catch (Exception ex) {
+ closeStatement(st);
+ throw new Exception("Could not create FALCON_DB_PROPS table: " + ex.toString(), ex);
+ } finally {
+ conn.close();
+ }
+ }
+ System.out.println("DONE");
+ }
+
+ private static final String FALCON_DB_PROPS_EXISTS = "select count(*) from FALCON_DB_PROPS";
+
+ private boolean verifyFalconPropsTable(boolean exists) throws Exception {
+ System.out.println((exists) ? "Check FALCON_DB_PROPS table exists"
+ : "Checking FALCON_DB_PROPS table does not exist");
+ boolean tableExists;
+ Connection conn = createConnection();
+ Statement st = null;
+ ResultSet rs = null;
+ try {
+ st = conn.createStatement();
+ rs = st.executeQuery(FALCON_DB_PROPS_EXISTS);
+ rs.next();
+ tableExists = true;
+ } catch (Exception ex) {
+ tableExists = false;
+ } finally {
+ closeResultSet(rs);
+ closeStatement(st);
+ conn.close();
+ }
+ if (tableExists != exists) {
+ throw new Exception("FALCON_DB_PROPS_TABLE table " + ((exists) ? "does not exist" : "exists"));
+ }
+ System.out.println("DONE");
+ return tableExists;
+ }
+
+ private void closeResultSet(ResultSet rs) {
+ try {
+ if (rs != null) {
+ rs.close();
+ }
+ } catch (Exception e) {
+ System.out.println("Unable to close ResultSet " + rs);
+ }
+ }
+
+ private void closeStatement(Statement st) throws Exception {
+ try {
+ if (st != null) {
+ st.close();
+ }
+ } catch (Exception e) {
+ System.out.println("Unable to close SQL Statement " + st);
+ throw new Exception(e);
+ }
+ }
+
+ private Connection createConnection() throws Exception {
+ Map<String, String> conf = getJdbcConf();
+ Class.forName(conf.get("driver")).newInstance();
+ return DriverManager.getConnection(conf.get("url"), conf.get("user"), conf.get("password"));
+ }
+
+ private void validateConnection() throws Exception {
+ System.out.println("Validating DB Connection");
+ try {
+ createConnection().close();
+ System.out.println("DONE");
+ } catch (Exception ex) {
+ throw new Exception("Could not connect to the database: " + ex.toString(), ex);
+ }
+ }
+
+ private static final String ENTITY_STATUS_QUERY =
+ "select count(*) from ENTITIES where current_state IN ('RUNNING', 'SUSPENDED')";
+ private static final String INSTANCE_STATUS_QUERY =
+ "select count(*) from INSTANCES where current_state IN ('RUNNING', 'SUSPENDED')";
+
+ private boolean checkDBExists() throws Exception {
+ boolean schemaExists;
+ Connection conn = createConnection();
+ ResultSet rs = null;
+ Statement st = null;
+ try {
+ st = conn.createStatement();
+ rs = st.executeQuery(ENTITY_STATUS_QUERY);
+ rs.next();
+ schemaExists = true;
+ } catch (Exception ex) {
+ schemaExists = false;
+ } finally {
+ closeResultSet(rs);
+ closeStatement(st);
+ conn.close();
+ }
+ System.out.println("DB schema " + ((schemaExists) ? "exists" : "does not exist"));
+ return schemaExists;
+ }
+
+ private void createUpgradeDB(String sqlFile, boolean run, boolean create) throws Exception {
+ System.out.println((create) ? "Create SQL schema" : "Upgrade SQL schema");
+ String[] args = createMappingToolArguments(sqlFile);
+ org.apache.openjpa.jdbc.meta.MappingTool.main(args);
+ if (run) {
+ args = createMappingToolArguments(null);
+ org.apache.openjpa.jdbc.meta.MappingTool.main(args);
+ }
+ System.out.println("DONE");
+ }
+
+ private void showVersion() throws Exception {
+ System.out.println("Falcon Server version: "
+ + BuildProperties.get().getProperty("project.version"));
+ validateConnection();
+ if (!checkDBExists()) {
+ throw new Exception("Falcon DB doesn't exist");
+ }
+ try {
+ verifyFalconPropsTable(true);
+ } catch (Exception ex) {
+ throw new Exception("ERROR: It seems this Falcon DB was never upgraded with the 'falcondb' tool");
+ }
+ showFalconPropsInfo();
+ }
+
+ private static final String GET_FALCON_PROPS_INFO = "select name, data from FALCON_DB_PROPS order by name";
+
+ private void showFalconPropsInfo() throws Exception {
+ Connection conn = createConnection();
+ Statement st = null;
+ ResultSet rs = null;
+ try {
+ System.out.println("Falcon DB Version Information");
+ System.out.println("--------------------------------------");
+ st = conn.createStatement();
+ rs = st.executeQuery(GET_FALCON_PROPS_INFO);
+ while (rs.next()) {
+ System.out.println(rs.getString(1) + ": " + rs.getString(2));
+ }
+ System.out.println("--------------------------------------");
+ } catch (Exception ex) {
+ throw new Exception("ERROR querying FALCON_DB_PROPS table: " + ex.toString(), ex);
+ } finally {
+ closeResultSet(rs);
+ closeStatement(st);
+ conn.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/scheduler/src/main/resources/META-INF/persistence.xml b/scheduler/src/main/resources/META-INF/persistence.xml
new file mode 100644
index 0000000..86558de
--- /dev/null
+++ b/scheduler/src/main/resources/META-INF/persistence.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<persistence xmlns="http://java.sun.com/xml/ns/persistence"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ version="1.0">
+
+ <persistence-unit name="falcon-derby" transaction-type="RESOURCE_LOCAL">
+ <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+
+ <class>org.apache.falcon.state.store.jdbc.EntityBean</class>
+ <class>org.apache.falcon.state.store.jdbc.InstanceBean</class>
+
+ <properties>
+ <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
+
+ <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
+
+ <property name="openjpa.MetaDataFactory"
+ value="jpa(Types=org.apache.falcon.state.store.EntityBean;
+ org.apache.falcon.state.store.InstanceBean)"></property>
+
+ <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
+ <property name="openjpa.LockManager" value="pessimistic"/>
+ <property name="openjpa.ReadLockLevel" value="read"/>
+ <property name="openjpa.WriteLockLevel" value="write"/>
+ <property name="openjpa.jdbc.TransactionIsolation" value="read-committed"/> <!--CUSTOM-->
+ <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
+ <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
+ <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
+ <property name="openjpa.Log" value="log4j"/>
+ </properties>
+ </persistence-unit>
+
+</persistence>
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/resources/falcon-buildinfo.properties
----------------------------------------------------------------------
diff --git a/scheduler/src/main/resources/falcon-buildinfo.properties b/scheduler/src/main/resources/falcon-buildinfo.properties
new file mode 100644
index 0000000..5a7cb82
--- /dev/null
+++ b/scheduler/src/main/resources/falcon-buildinfo.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######################
+*.domain=all
+
+*.build.user=${user.name}
+*.build.epoch=${timestamp}
+*.project.version=${pom.version}
+*.build.version=${pom.version}-r${buildNumber}
+*.vc.revision=${buildNumber}
+*.vc.source.url=${scm.connection}
+######################
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
index bff92c9..2a9fbce 100644
--- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
@@ -19,7 +19,6 @@ package org.apache.falcon.execution;
import org.apache.falcon.FalconException;
import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.AbstractTestBase;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
@@ -36,6 +35,7 @@ import org.apache.falcon.notification.service.impl.DataAvailabilityService;
import org.apache.falcon.notification.service.impl.JobCompletionService;
import org.apache.falcon.notification.service.impl.SchedulerService;
import org.apache.falcon.service.Services;
+import org.apache.falcon.state.AbstractSchedulerTestBase;
import org.apache.falcon.state.EntityClusterID;
import org.apache.falcon.state.EntityID;
import org.apache.falcon.state.EntityState;
@@ -43,7 +43,8 @@ import org.apache.falcon.state.ID;
import org.apache.falcon.state.InstanceID;
import org.apache.falcon.state.InstanceState;
import org.apache.falcon.state.store.AbstractStateStore;
-import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.state.store.StateStore;
+import org.apache.falcon.state.store.service.FalconJPAService;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.engine.DAGEngine;
import org.apache.falcon.workflow.engine.DAGEngineFactory;
@@ -59,6 +60,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
@@ -68,32 +70,38 @@ import java.util.Iterator;
/**
* Tests the API of FalconExecution Service and in turn the FalconExecutionService.get()s.
*/
-public class FalconExecutionServiceTest extends AbstractTestBase {
+public class FalconExecutionServiceTest extends AbstractSchedulerTestBase {
- private InMemoryStateStore stateStore = null;
+ private StateStore stateStore = null;
private AlarmService mockTimeService;
private DataAvailabilityService mockDataService;
private SchedulerService mockSchedulerService;
private JobCompletionService mockCompletionService;
private DAGEngine dagEngine;
private int instanceCount = 0;
+ private static FalconJPAService falconJPAService = FalconJPAService.get();
@BeforeClass
public void init() throws Exception {
this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
this.conf = dfsCluster.getConf();
setupServices();
+ super.setup();
+ createDB(DB_SQL_FILE);
+ falconJPAService.init();
setupConfigStore();
}
@AfterClass
- public void tearDown() {
+ public void tearDown() throws FalconException, IOException {
+ super.cleanup();
this.dfsCluster.shutdown();
+ falconJPAService.destroy();
}
// State store is set up to sync with Config Store. That gets tested too.
public void setupConfigStore() throws Exception {
- stateStore = (InMemoryStateStore) AbstractStateStore.get();
+ stateStore = AbstractStateStore.get();
getStore().registerListener(stateStore);
storeEntity(EntityType.CLUSTER, "testCluster");
storeEntity(EntityType.FEED, "clicksFeed");
@@ -160,6 +168,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
FalconExecutionService.get().onEvent(event);
// Ensure the instance is ready for execution
+ instance = stateStore.getExecutionInstance(new InstanceID(instance.getInstance()));
Assert.assertEquals(instance.getCurrentState(), InstanceState.STATE.READY);
// Simulate a scheduled notification
@@ -211,12 +220,15 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
FalconExecutionService.get().onEvent(event);
// One in ready and one in waiting. Both should be suspended.
+ instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY);
Assert.assertEquals(instance1.getInstance().getAwaitingPredicates().size(), 0);
Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.WAITING);
FalconExecutionService.get().suspend(process);
+ instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+ instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUSPENDED);
Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.SUSPENDED);
Mockito.verify(mockDataService).unregister(FalconExecutionService.get(),
@@ -225,7 +237,11 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
instance2.getInstance().getId());
Mockito.verify(mockTimeService).unregister(FalconExecutionService.get(), executorID);
+ Mockito.verify(mockDataService).unregister(FalconExecutionService.get(), executorID);
+
FalconExecutionService.get().resume(process);
+ instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+ instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.READY);
Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.WAITING);
@@ -237,17 +253,22 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
FalconExecutionService.get().onEvent(event);
// One in running and the other in ready. Both should be suspended
+ instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
Mockito.when(dagEngine.isScheduled(instance1.getInstance())).thenReturn(true);
+ instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
FalconExecutionService.get().suspend(process);
+ instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+ instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUSPENDED);
Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.SUSPENDED);
FalconExecutionService.get().resume(process);
-
+ instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+ instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
@@ -255,6 +276,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instance1.getInstance());
FalconExecutionService.get().onEvent(event);
+ instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.SUCCEEDED);
}
@@ -294,6 +316,9 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
FalconExecutionService.get().onEvent(event);
// One in ready, one in waiting and one running.
+ instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+ instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
+ instance3 = stateStore.getExecutionInstance(new InstanceID(instance3.getInstance()));
Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING);
@@ -329,6 +354,7 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
FalconExecutionService.get().onEvent(dataEvent);
+ instanceState = stateStore.getExecutionInstance(new InstanceID(instanceState.getInstance()));
Assert.assertEquals(instanceState.getCurrentState(), InstanceState.STATE.TIMED_OUT);
}
@@ -390,6 +416,9 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
FalconExecutionService.get().onEvent(event);
// One in ready, one in waiting and one running.
+ instance1 = stateStore.getExecutionInstance(new InstanceID(instance1.getInstance()));
+ instance2 = stateStore.getExecutionInstance(new InstanceID(instance2.getInstance()));
+ instance3 = stateStore.getExecutionInstance(new InstanceID(instance3.getInstance()));
Assert.assertEquals(instance1.getCurrentState(), InstanceState.STATE.RUNNING);
Assert.assertEquals(instance2.getCurrentState(), InstanceState.STATE.READY);
Assert.assertEquals(instance3.getCurrentState(), InstanceState.STATE.WAITING);
@@ -442,7 +471,9 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
EntityID processID = new EntityID(process);
// Store couple of instances in store
- stateStore.getEntity(processID).setCurrentState(EntityState.STATE.SCHEDULED);
+ EntityState entityState = stateStore.getEntity(processID);
+ entityState.setCurrentState(EntityState.STATE.SCHEDULED);
+ stateStore.updateEntity(entityState);
ProcessExecutionInstance instance1 = new ProcessExecutionInstance(process,
new DateTime(System.currentTimeMillis() - 60 * 60 * 1000), clusterName);
InstanceState instanceState1 = new InstanceState(instance1);
@@ -459,11 +490,13 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
// Simulate a scheduled notification. This should cause the reload from state store
Event event = createEvent(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE, instanceState2.getInstance());
FalconExecutionService.get().onEvent(event);
+ instanceState2 = stateStore.getExecutionInstance(new InstanceID(instanceState2.getInstance()));
Assert.assertEquals(instanceState2.getCurrentState(), InstanceState.STATE.RUNNING);
// Simulate a Job completion notification and ensure the instance resumes from where it left
event = createEvent(NotificationServicesRegistry.SERVICE.JOB_COMPLETION, instanceState1.getInstance());
FalconExecutionService.get().onEvent(event);
+ instanceState1 = stateStore.getExecutionInstance(new InstanceID(instanceState1.getInstance()));
Assert.assertEquals(instanceState1.getCurrentState(), InstanceState.STATE.SUCCEEDED);
}
@@ -500,6 +533,8 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
Assert.fail("Exception expected.");
} catch (Exception e) {
// One instance must fail and the other not
+ instanceState1 = stateStore.getExecutionInstance(new InstanceID(instanceState1.getInstance()));
+ instanceState2 = stateStore.getExecutionInstance(new InstanceID(instanceState2.getInstance()));
Assert.assertEquals(instanceState2.getCurrentState(), state);
Assert.assertEquals(instanceState1.getCurrentState(), InstanceState.STATE.RUNNING);
}
@@ -508,6 +543,8 @@ public class FalconExecutionServiceTest extends AbstractTestBase {
((MockDAGEngine)dagEngine).removeFailInstance(instance1);
m.invoke(FalconExecutionService.get(), process);
+ instanceState1 = stateStore.getExecutionInstance(new InstanceID(instanceState1.getInstance()));
+ instanceState2 = stateStore.getExecutionInstance(new InstanceID(instanceState2.getInstance()));
// Both instances must be in expected state.
Assert.assertEquals(instanceState2.getCurrentState(), state);
Assert.assertEquals(instanceState1.getCurrentState(), state);
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
index 001f466..c43ccf0 100644
--- a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
@@ -40,7 +40,7 @@ import org.apache.falcon.state.ID;
import org.apache.falcon.state.InstanceID;
import org.apache.falcon.state.InstanceState;
import org.apache.falcon.state.store.AbstractStateStore;
-import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.state.store.StateStore;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.engine.DAGEngine;
import org.apache.falcon.workflow.engine.DAGEngineFactory;
@@ -63,10 +63,10 @@ import static org.apache.falcon.state.InstanceState.STATE;
*/
public class SchedulerServiceTest extends AbstractTestBase {
- private SchedulerService scheduler = Mockito.spy(new SchedulerService());
+ private SchedulerService scheduler;
private NotificationHandler handler;
private static String cluster = "testCluster";
- private static InMemoryStateStore stateStore = (InMemoryStateStore) AbstractStateStore.get();
+ private static StateStore stateStore;
private static DAGEngine mockDagEngine;
private static Process process;
private volatile boolean failed = false;
@@ -79,6 +79,10 @@ public class SchedulerServiceTest extends AbstractTestBase {
@BeforeClass
public void init() throws Exception {
+ StartupProperties.get().setProperty("falcon.state.store.impl",
+ "org.apache.falcon.state.store.InMemoryStateStore");
+ stateStore = AbstractStateStore.get();
+ scheduler = Mockito.spy(new SchedulerService());
this.dfsCluster = EmbeddedCluster.newCluster(cluster);
this.conf = dfsCluster.getConf();
setupConfigStore();
@@ -97,6 +101,7 @@ public class SchedulerServiceTest extends AbstractTestBase {
scheduler.init();
StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName());
mockDagEngine = DAGEngineFactory.getDAGEngine("testCluster");
+
}
@AfterClass
@@ -199,7 +204,7 @@ public class SchedulerServiceTest extends AbstractTestBase {
WorkflowJob.Status.SUCCEEDED, DateTime.now()));
// Dependency now satisfied. Now, the first instance should get scheduled after retry delay.
Thread.sleep(100);
- Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
+ Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1));
}
@Test
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
new file mode 100644
index 0000000..48c1426
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state;
+
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.tools.FalconStateStoreDBCLI;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * TestBase for tests in scheduler.
+ */
+public class AbstractSchedulerTestBase extends AbstractTestBase {
+ private static final String DB_BASE_DIR = "target/test-data/falcondb";
+ protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
+ protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
+ protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
+ protected static final String DB_UPDATE_SQL_FILE = DB_BASE_DIR + File.separator + "update.sql";
+ protected LocalFileSystem fs = new LocalFileSystem();
+
+ public void setup() throws Exception {
+ StartupProperties.get();
+ StartupProperties.get().setProperty(FalconJPAService.URL, url);
+ Configuration localConf = new Configuration();
+ fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
+ fs.mkdirs(new Path(DB_BASE_DIR));
+ }
+
+ public void cleanup() throws IOException {
+ cleanupDB();
+ }
+
+ private void cleanupDB() throws IOException {
+ fs.delete(new Path(DB_BASE_DIR), true);
+ }
+
+ protected void createDB(String file) {
+ File sqlFile = new File(file);
+ String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" };
+ int result = execDBCLICommands(argsCreate);
+ Assert.assertEquals(0, result);
+ Assert.assertTrue(sqlFile.exists());
+
+ }
+
+ protected int execDBCLICommands(String[] args) {
+ return new FalconStateStoreDBCLI().run(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
index 2f32b43..6676754 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/EntityStateServiceTest.java
@@ -18,55 +18,72 @@
package org.apache.falcon.state;
import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.exception.InvalidStateTransitionException;
+import org.apache.falcon.exception.StateStoreException;
import org.apache.falcon.state.store.AbstractStateStore;
-import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.util.StartupProperties;
import org.mockito.Mockito;
import org.testng.Assert;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
/**
* Tests to ensure entity state changes happen correctly.
*/
-public class EntityStateServiceTest {
+public class EntityStateServiceTest extends AbstractSchedulerTestBase{
private EntityStateChangeHandler listener = Mockito.mock(EntityStateChangeHandler.class);
- @BeforeMethod
- public void setUp() {
- ((InMemoryStateStore) AbstractStateStore.get()).clear();
+ @BeforeClass
+ public void setup() throws Exception {
+ StartupProperties.get().setProperty("falcon.state.store.impl",
+ "org.apache.falcon.state.store.InMemoryStateStore");
+ super.setup();
+ this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+ this.conf = dfsCluster.getConf();
+ }
+
+ @AfterMethod
+ public void setUp() throws StateStoreException {
+ AbstractStateStore.get().clear();
}
// Tests a schedulable entity's lifecycle : Submit -> run -> suspend -> resume
@Test
- public void testLifeCycle() throws FalconException {
+ public void testLifeCycle() throws Exception {
Process mockEntity = new Process();
mockEntity.setName("test");
-
+ storeEntity(EntityType.PROCESS, "test");
StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUBMIT, listener);
EntityState entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next();
Mockito.verify(listener).onSubmit(mockEntity);
Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SUBMITTED));
StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SCHEDULE, listener);
Mockito.verify(listener).onSchedule(mockEntity);
+ entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next();
Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SCHEDULED));
StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUSPEND, listener);
Mockito.verify(listener).onSuspend(mockEntity);
+ entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next();
Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SUSPENDED));
StateService.get().handleStateChange(mockEntity, EntityState.EVENT.RESUME, listener);
Mockito.verify(listener).onResume(mockEntity);
+ entityFromStore = AbstractStateStore.get().getAllEntities().iterator().next();
Assert.assertTrue(entityFromStore.getCurrentState().equals(EntityState.STATE.SCHEDULED));
}
@Test
- public void testInvalidTransitions() throws FalconException {
+ public void testInvalidTransitions() throws Exception {
Feed mockEntity = new Feed();
mockEntity.setName("test");
+ storeEntity(EntityType.FEED, "test");
StateService.get().handleStateChange(mockEntity, EntityState.EVENT.SUBMIT, listener);
// Attempt suspending a submitted entity
try {
@@ -99,10 +116,10 @@ public class EntityStateServiceTest {
@Test(dataProvider = "state_and_events")
public void testIdempotency(EntityState.STATE state, EntityState.EVENT event)
- throws InvalidStateTransitionException {
+ throws Exception {
Process mockEntity = new Process();
mockEntity.setName("test");
-
+ storeEntity(EntityType.PROCESS, "test");
EntityState entityState = new EntityState(mockEntity).setCurrentState(state);
entityState.nextTransition(event);
Assert.assertEquals(entityState.getCurrentState(), state);
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
index 43c3c54..f0ae7b2 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/InstanceStateServiceTest.java
@@ -23,11 +23,12 @@ import org.apache.falcon.exception.InvalidStateTransitionException;
import org.apache.falcon.exception.StateStoreException;
import org.apache.falcon.execution.ProcessExecutionInstance;
import org.apache.falcon.state.store.AbstractStateStore;
-import org.apache.falcon.state.store.InMemoryStateStore;
+import org.apache.falcon.util.StartupProperties;
import org.joda.time.DateTime;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -40,6 +41,12 @@ public class InstanceStateServiceTest {
private InstanceStateChangeHandler listener = Mockito.mock(InstanceStateChangeHandler.class);
private ProcessExecutionInstance mockInstance;
+ @BeforeClass
+ public void init() {
+ StartupProperties.get().setProperty("falcon.state.store.impl",
+ "org.apache.falcon.state.store.InMemoryStateStore");
+ }
+
@BeforeMethod
public void setup() {
Process testProcess = new Process();
@@ -47,13 +54,14 @@ public class InstanceStateServiceTest {
// Setup new mocks so we can verify the no. of invocations
mockInstance = Mockito.mock(ProcessExecutionInstance.class);
Mockito.when(mockInstance.getEntity()).thenReturn(testProcess);
+ Mockito.when(mockInstance.getCreationTime()).thenReturn(DateTime.now());
Mockito.when(mockInstance.getInstanceTime()).thenReturn(DateTime.now());
Mockito.when(mockInstance.getCluster()).thenReturn("testCluster");
}
@AfterMethod
- public void tearDown() {
- ((InMemoryStateStore) AbstractStateStore.get()).clear();
+ public void tearDown() throws StateStoreException {
+ AbstractStateStore.get().clear();
}
// Tests an entity instance's lifecycle : Trigger -> waiting -> ready -> running
@@ -67,18 +75,28 @@ public class InstanceStateServiceTest {
Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.WAITING));
StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.CONDITIONS_MET, listener);
Mockito.verify(listener).onConditionsMet(mockInstance);
+ instanceFromStore = AbstractStateStore.get()
+ .getExecutionInstance(new InstanceID(mockInstance));
Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.READY));
StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SCHEDULE, listener);
Mockito.verify(listener).onSchedule(mockInstance);
+ instanceFromStore = AbstractStateStore.get()
+ .getExecutionInstance(new InstanceID(mockInstance));
Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.RUNNING));
StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SUSPEND, listener);
Mockito.verify(listener).onSuspend(mockInstance);
+ instanceFromStore = AbstractStateStore.get()
+ .getExecutionInstance(new InstanceID(mockInstance));
Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.SUSPENDED));
StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.RESUME_RUNNING, listener);
Mockito.verify(listener).onResume(mockInstance);
+ instanceFromStore = AbstractStateStore.get()
+ .getExecutionInstance(new InstanceID(mockInstance));
Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.RUNNING));
StateService.get().handleStateChange(mockInstance, InstanceState.EVENT.SUCCEED, listener);
Mockito.verify(listener).onSuccess(mockInstance);
+ instanceFromStore = AbstractStateStore.get()
+ .getExecutionInstance(new InstanceID(mockInstance));
Assert.assertTrue(instanceFromStore.getCurrentState().equals(InstanceState.STATE.SUCCEEDED));
Assert.assertEquals(AbstractStateStore.get().getAllEntities().size(), 0);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java b/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java
new file mode 100644
index 0000000..ecd5293
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.state.AbstractSchedulerTestBase;
+import org.apache.falcon.state.store.service.FalconJPAService;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.persistence.EntityManager;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Test cases for FalconJPAService.
+ */
+public class TestFalconJPAService extends AbstractSchedulerTestBase {
+
+ private static FalconJPAService falconJPAService = FalconJPAService.get();
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ super.setup();
+ createDB(DB_SQL_FILE);
+ }
+
+ @Test
+ public void testService() throws FalconException {
+ // initialize it
+ falconJPAService.init();
+ EntityManager entityManager = falconJPAService.getEntityManager();
+ Map<String, Object> props = entityManager.getProperties();
+ Assert.assertNotNull(props);
+ entityManager.close();
+ }
+
+ @AfterClass
+ public void tearDown() throws FalconException, IOException {
+ falconJPAService.destroy();
+ super.cleanup();
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
new file mode 100644
index 0000000..6d5bd49
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.service.store;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.FalconExecutionService;
+import org.apache.falcon.execution.MockDAGEngine;
+import org.apache.falcon.execution.NotificationHandler;
+import org.apache.falcon.notification.service.impl.AlarmService;
+import org.apache.falcon.notification.service.impl.DataAvailabilityService;
+import org.apache.falcon.notification.service.impl.JobCompletionService;
+import org.apache.falcon.notification.service.impl.SchedulerService;
+import org.apache.falcon.predicate.Predicate;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.state.AbstractSchedulerTestBase;
+import org.apache.falcon.state.EntityClusterID;
+import org.apache.falcon.state.EntityID;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.store.jdbc.BeanMapperUtil;
+import org.apache.falcon.state.store.jdbc.JDBCStateStore;
+import org.apache.falcon.state.store.StateStore;
+import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.engine.DAGEngine;
+import org.apache.falcon.workflow.engine.DAGEngineFactory;
+import org.apache.falcon.workflow.engine.OozieDAGEngine;
+import org.joda.time.DateTime;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Test cases for JDBCStateStore.
+ */
+public class TestJDBCStateStore extends AbstractSchedulerTestBase {
+ private static StateStore stateStore = JDBCStateStore.get();
+ private static Random randomValGenerator = new Random();
+ private static FalconJPAService falconJPAService = FalconJPAService.get();
+ private AlarmService mockTimeService;
+ private DataAvailabilityService mockDataService;
+ private SchedulerService mockSchedulerService;
+ private JobCompletionService mockCompletionService;
+ private DAGEngine dagEngine;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ super.setup();
+ createDB(DB_SQL_FILE);
+ falconJPAService.init();
+ this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+ this.conf = dfsCluster.getConf();
+ registerServices();
+ }
+
+ private void registerServices() throws FalconException {
+ mockTimeService = Mockito.mock(AlarmService.class);
+ Mockito.when(mockTimeService.getName()).thenReturn("AlarmService");
+ Mockito.when(mockTimeService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+ Mockito.any(ID.class))).thenCallRealMethod();
+ mockDataService = Mockito.mock(DataAvailabilityService.class);
+ Mockito.when(mockDataService.getName()).thenReturn("DataAvailabilityService");
+ Mockito.when(mockDataService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+ Mockito.any(ID.class))).thenCallRealMethod();
+ dagEngine = Mockito.mock(OozieDAGEngine.class);
+ Mockito.doNothing().when(dagEngine).resume(Mockito.any(ExecutionInstance.class));
+ mockSchedulerService = Mockito.mock(SchedulerService.class);
+ Mockito.when(mockSchedulerService.getName()).thenReturn("JobSchedulerService");
+ StartupProperties.get().setProperty("dag.engine.impl", MockDAGEngine.class.getName());
+ StartupProperties.get().setProperty("execution.service.impl", FalconExecutionService.class.getName());
+ dagEngine = Mockito.spy(DAGEngineFactory.getDAGEngine("testCluster"));
+ Mockito.when(mockSchedulerService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+ Mockito.any(ID.class))).thenCallRealMethod();
+ mockCompletionService = Mockito.mock(JobCompletionService.class);
+ Mockito.when(mockCompletionService.getName()).thenReturn("JobCompletionService");
+ Mockito.when(mockCompletionService.createRequestBuilder(Mockito.any(NotificationHandler.class),
+ Mockito.any(ID.class))).thenCallRealMethod();
+ Services.get().register(mockTimeService);
+ Services.get().register(mockDataService);
+ Services.get().register(mockSchedulerService);
+ Services.get().register(mockCompletionService);
+ }
+
+
+ @Test
+ public void testInsertRetrieveAndUpdate() throws Exception {
+ EntityState entityState = getEntityState(EntityType.PROCESS, "process");
+ stateStore.putEntity(entityState);
+ EntityID entityID = new EntityID(entityState.getEntity());
+ EntityState actualEntityState = stateStore.getEntity(entityID);
+ Assert.assertEquals(actualEntityState.getEntity(), entityState.getEntity());
+ Assert.assertEquals(actualEntityState.getCurrentState(), entityState.getCurrentState());
+ try {
+ stateStore.putEntity(entityState);
+ Assert.fail("Exception must have been thrown");
+ } catch (StateStoreException e) {
+ //no op
+ }
+
+ entityState.setCurrentState(EntityState.STATE.SCHEDULED);
+ stateStore.updateEntity(entityState);
+ actualEntityState = stateStore.getEntity(entityID);
+ Assert.assertEquals(actualEntityState.getEntity(), entityState.getEntity());
+ Assert.assertEquals(actualEntityState.getCurrentState(), entityState.getCurrentState());
+
+ stateStore.deleteEntity(entityID);
+ boolean entityExists = stateStore.entityExists(entityID);
+ Assert.assertEquals(entityExists, false);
+
+ try {
+ stateStore.getEntity(entityID);
+ Assert.fail("Exception must have been thrown");
+ } catch (StateStoreException e){
+ // no op
+ }
+
+ try {
+ stateStore.updateEntity(entityState);
+ Assert.fail("Exception must have been thrown");
+ } catch (StateStoreException e) {
+ // no op
+ }
+
+ try {
+ stateStore.deleteEntity(entityID);
+ Assert.fail("Exception must have been thrown");
+ } catch (StateStoreException e){
+ // no op
+ }
+ }
+
+
+ @Test
+ public void testGetEntities() throws Exception {
+ EntityState entityState1 = getEntityState(EntityType.PROCESS, "process1");
+ EntityState entityState2 = getEntityState(EntityType.PROCESS, "process2");
+ EntityState entityState3 = getEntityState(EntityType.FEED, "feed1");
+
+ Collection<EntityState> result = stateStore.getAllEntities();
+ Assert.assertEquals(result.size(), 0);
+
+ stateStore.putEntity(entityState1);
+ stateStore.putEntity(entityState2);
+ stateStore.putEntity(entityState3);
+
+ result = stateStore.getAllEntities();
+ Assert.assertEquals(result.size(), 3);
+
+ Collection<Entity> entities = stateStore.getEntities(EntityState.STATE.SUBMITTED);
+ Assert.assertEquals(entities.size(), 3);
+ }
+
+
+ @Test
+ public void testInstanceInsertionAndUpdate() throws Exception {
+ storeEntity(EntityType.CLUSTER, "testCluster");
+ storeEntity(EntityType.FEED, "clicksFeed");
+ storeEntity(EntityType.FEED, "clicksSummary");
+ EntityState entityState = getEntityState(EntityType.PROCESS, "process");
+ ExecutionInstance executionInstance = BeanMapperUtil.getExecutionInstance(
+ entityState.getEntity().getEntityType(), entityState.getEntity(),
+ System.currentTimeMillis(), "cluster", System.currentTimeMillis());
+ InstanceState instanceState = new InstanceState(executionInstance);
+ initInstanceState(instanceState);
+ stateStore.putExecutionInstance(instanceState);
+ InstanceID instanceID = new InstanceID(instanceState.getInstance());
+ InstanceState actualInstanceState = stateStore.getExecutionInstance(instanceID);
+ Assert.assertEquals(actualInstanceState, instanceState);
+
+ instanceState.setCurrentState(InstanceState.STATE.RUNNING);
+ Predicate predicate = new Predicate(Predicate.TYPE.DATA);
+ instanceState.getInstance().getAwaitingPredicates().add(predicate);
+
+ stateStore.updateExecutionInstance(instanceState);
+ actualInstanceState = stateStore.getExecutionInstance(instanceID);
+ Assert.assertEquals(actualInstanceState, instanceState);
+
+ try {
+ stateStore.putExecutionInstance(instanceState);
+ Assert.fail("Exception must have been thrown");
+ } catch (StateStoreException e) {
+ // no op
+ }
+
+ stateStore.deleteExecutionInstance(instanceID);
+
+ try {
+ stateStore.getExecutionInstance(instanceID);
+ Assert.fail("Exception must have been thrown");
+ } catch (StateStoreException e) {
+ // no op
+ }
+
+ try {
+ stateStore.deleteExecutionInstance(instanceID);
+ Assert.fail("Exception must have been thrown");
+ } catch (StateStoreException e) {
+ // no op
+ }
+
+ try {
+ stateStore.updateExecutionInstance(instanceState);
+ Assert.fail("Exception must have been thrown");
+ } catch (StateStoreException e) {
+ // no op
+ }
+ }
+
+
+ @Test
+ public void testBulkInstanceOperations() throws Exception {
+ storeEntity(EntityType.CLUSTER, "testCluster");
+ storeEntity(EntityType.FEED, "clicksFeed");
+ storeEntity(EntityType.FEED, "clicksSummary");
+ EntityState entityState = getEntityState(EntityType.PROCESS, "process1");
+ ExecutionInstance processExecutionInstance1 = BeanMapperUtil.getExecutionInstance(
+ entityState.getEntity().getEntityType(), entityState.getEntity(),
+ System.currentTimeMillis() - 60000, "cluster1", System.currentTimeMillis() - 60000);
+ InstanceState instanceState1 = new InstanceState(processExecutionInstance1);
+ instanceState1.setCurrentState(InstanceState.STATE.READY);
+
+ ExecutionInstance processExecutionInstance2 = BeanMapperUtil.getExecutionInstance(
+ entityState.getEntity().getEntityType(), entityState.getEntity(),
+ System.currentTimeMillis(), "cluster1", System.currentTimeMillis());
+ InstanceState instanceState2 = new InstanceState(processExecutionInstance2);
+ instanceState2.setCurrentState(InstanceState.STATE.RUNNING);
+
+ ExecutionInstance processExecutionInstance3 = BeanMapperUtil.getExecutionInstance(
+ entityState.getEntity().getEntityType(), entityState.getEntity(),
+ System.currentTimeMillis(), "cluster2", System.currentTimeMillis());
+ InstanceState instanceState3 = new InstanceState(processExecutionInstance3);
+ instanceState3.setCurrentState(InstanceState.STATE.READY);
+
+ stateStore.putExecutionInstance(instanceState1);
+ stateStore.putExecutionInstance(instanceState2);
+ stateStore.putExecutionInstance(instanceState3);
+
+ Collection<InstanceState> actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(),
+ "cluster1");
+ Assert.assertEquals(actualInstances.size(), 2);
+ Assert.assertEquals(actualInstances.toArray()[0], instanceState1);
+ Assert.assertEquals(actualInstances.toArray()[1], instanceState2);
+
+ actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(),
+ "cluster2");
+ Assert.assertEquals(actualInstances.size(), 1);
+ Assert.assertEquals(actualInstances.toArray()[0], instanceState3);
+
+ List<InstanceState.STATE> states = new ArrayList<>();
+ states.add(InstanceState.STATE.READY);
+
+ actualInstances = stateStore.getExecutionInstances(entityState.getEntity(), "cluster1", states);
+ Assert.assertEquals(actualInstances.size(), 1);
+ Assert.assertEquals(actualInstances.toArray()[0], instanceState1);
+
+ EntityClusterID entityClusterID = new EntityClusterID(entityState.getEntity(), "testCluster");
+ actualInstances = stateStore.getExecutionInstances(entityClusterID, states);
+ Assert.assertEquals(actualInstances.size(), 2);
+ Assert.assertEquals(actualInstances.toArray()[0], instanceState1);
+ Assert.assertEquals(actualInstances.toArray()[1], instanceState3);
+
+ states.add(InstanceState.STATE.RUNNING);
+ actualInstances = stateStore.getExecutionInstances(entityState.getEntity(), "cluster1", states);
+ Assert.assertEquals(actualInstances.size(), 2);
+ Assert.assertEquals(actualInstances.toArray()[0], instanceState1);
+ Assert.assertEquals(actualInstances.toArray()[1], instanceState2);
+
+ InstanceState lastInstanceState = stateStore.getLastExecutionInstance(entityState.getEntity(), "cluster1");
+ Assert.assertEquals(lastInstanceState, instanceState2);
+
+
+ InstanceID instanceKey = new InstanceID(instanceState3.getInstance());
+ stateStore.deleteExecutionInstance(instanceKey);
+
+ actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster2");
+ Assert.assertEquals(actualInstances.size(), 0);
+
+ actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster1");
+ Assert.assertEquals(actualInstances.size(), 2);
+
+ stateStore.putExecutionInstance(instanceState3);
+
+ actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster2");
+ Assert.assertEquals(actualInstances.size(), 1);
+
+ stateStore.deleteExecutionInstances(entityClusterID.getEntityID());
+ actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster1");
+ Assert.assertEquals(actualInstances.size(), 0);
+
+ actualInstances = stateStore.getAllExecutionInstances(entityState.getEntity(), "cluster2");
+ Assert.assertEquals(actualInstances.size(), 0);
+
+ }
+
+
+ @Test
+ public void testGetExecutionInstancesWithRange() throws Exception {
+ storeEntity(EntityType.CLUSTER, "testCluster");
+ storeEntity(EntityType.FEED, "clicksFeed");
+ storeEntity(EntityType.FEED, "clicksSummary");
+
+ long instance1Time = System.currentTimeMillis() - 180000;
+ long instance2Time = System.currentTimeMillis();
+ EntityState entityState = getEntityState(EntityType.PROCESS, "process1");
+ ExecutionInstance processExecutionInstance1 = BeanMapperUtil.getExecutionInstance(
+ entityState.getEntity().getEntityType(), entityState.getEntity(),
+ instance1Time, "cluster1", instance1Time);
+ InstanceState instanceState1 = new InstanceState(processExecutionInstance1);
+ instanceState1.setCurrentState(InstanceState.STATE.RUNNING);
+
+ ExecutionInstance processExecutionInstance2 = BeanMapperUtil.getExecutionInstance(
+ entityState.getEntity().getEntityType(), entityState.getEntity(),
+ instance2Time, "cluster1", instance2Time);
+ InstanceState instanceState2 = new InstanceState(processExecutionInstance2);
+ instanceState2.setCurrentState(InstanceState.STATE.RUNNING);
+
+ stateStore.putExecutionInstance(instanceState1);
+ stateStore.putExecutionInstance(instanceState2);
+
+ List<InstanceState.STATE> states = new ArrayList<>();
+ states.add(InstanceState.STATE.RUNNING);
+
+ Collection<InstanceState> actualInstances = stateStore.getExecutionInstances(entityState.getEntity(),
+ "cluster1", states, new DateTime(instance1Time), new DateTime(instance1Time + 60000));
+ Assert.assertEquals(1, actualInstances.size());
+ Assert.assertEquals(instanceState1, actualInstances.toArray()[0]);
+
+ actualInstances = stateStore.getExecutionInstances(entityState.getEntity(),
+ "cluster1", states, new DateTime(instance2Time), new DateTime(instance2Time + 60000));
+ Assert.assertEquals(1, actualInstances.size());
+ Assert.assertEquals(instanceState2, actualInstances.toArray()[0]);
+
+ }
+
+
+ private void initInstanceState(InstanceState instanceState) {
+ instanceState.setCurrentState(InstanceState.STATE.READY);
+ instanceState.getInstance().setExternalID(RandomStringUtils.randomNumeric(6));
+ instanceState.getInstance().setInstanceSequence(randomValGenerator.nextInt());
+ instanceState.getInstance().setActualStart(new DateTime(System.currentTimeMillis()));
+ instanceState.getInstance().setActualEnd(new DateTime(System.currentTimeMillis()));
+ List<Predicate> predicates = new ArrayList<>();
+ Predicate predicate = new Predicate(Predicate.TYPE.JOB_COMPLETION);
+ predicates.add(predicate);
+ instanceState.getInstance().setAwaitingPredicates(predicates);
+ }
+
+ private EntityState getEntityState(EntityType entityType, String name) throws Exception {
+ storeEntity(entityType, name);
+ Entity entity = getStore().get(entityType, name);
+ Assert.assertNotNull(entity);
+ return new EntityState(entity);
+ }
+
+ @AfterTest
+ public void cleanUpTables() throws StateStoreException {
+ stateStore.deleteEntities();
+ stateStore.deleteExecutionInstances();
+ }
+
+ @AfterClass
+ public void cleanup() throws IOException {
+ super.cleanup();
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/test/java/org/apache/falcon/tools/TestFalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/tools/TestFalconStateStoreDBCLI.java b/scheduler/src/test/java/org/apache/falcon/tools/TestFalconStateStoreDBCLI.java
new file mode 100644
index 0000000..8a42830
--- /dev/null
+++ b/scheduler/src/test/java/org/apache/falcon/tools/TestFalconStateStoreDBCLI.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.tools;
+
+
+import org.apache.falcon.state.AbstractSchedulerTestBase;
+import org.apache.falcon.util.BuildProperties;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+
+/**
+ * Tests for DB operations tool.
+ */
+public class TestFalconStateStoreDBCLI extends AbstractSchedulerTestBase {
+
+ @BeforeClass
+ public void setup() throws Exception {
+ super.setup();
+ }
+
+ @AfterClass
+ public void cleanup() throws IOException {
+ super.cleanup();
+ }
+
+
+ @Test
+ public void testFalconDBCLI() throws Exception {
+ File sqlFile = new File(DB_SQL_FILE);
+ String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" };
+ int result = execDBCLICommands(argsCreate);
+ Assert.assertEquals(0, result);
+ Assert.assertTrue(sqlFile.exists());
+
+ ByteArrayOutputStream data = new ByteArrayOutputStream();
+ PrintStream oldOut = System.out;
+ try {
+ // show versions
+ System.setOut(new PrintStream(data));
+ String[] argsVersion = { "version" };
+ Assert.assertEquals(0, execDBCLICommands(argsVersion));
+ Assert.assertTrue(data.toString().contains("db.version: "
+ + BuildProperties.get().getProperty("project.version")));
+ // show help information
+ data.reset();
+ String[] argsHelp = { "help" };
+ Assert.assertEquals(0, execDBCLICommands(argsHelp));
+ Assert.assertTrue(data.toString().contains("falcondb create <OPTIONS> : Create Falcon DB schema"));
+ Assert.assertTrue(data.toString().contains("falcondb upgrade <OPTIONS> : Upgrade Falcon DB schema"));
+ // try run invalid command
+ data.reset();
+ String[] argsInvalidCommand = { "invalidCommand" };
+ Assert.assertEquals(1, execDBCLICommands(argsInvalidCommand));
+ } finally {
+ System.setOut(oldOut);
+ }
+ // generate an upgrade script
+ File update = new File(DB_UPDATE_SQL_FILE);
+
+ String[] argsUpgrade = { "upgrade", "-sqlfile", update.getAbsolutePath(), "-run" };
+ BuildProperties.get().setProperty("project.version", "99999-SNAPSHOT");
+ Assert.assertEquals(0, execDBCLICommands(argsUpgrade));
+
+ Assert.assertTrue(update.exists());
+ }
+
+}
[3/4] falcon git commit: FALCON-1234 State Store for instances
scheduled by Falcon (Pavan Kolamuri)
Posted by pa...@apache.org.
FALCON-1234 State Store for instances scheduled by Falcon (Pavan Kolamuri)
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/6d313855
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/6d313855
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/6d313855
Branch: refs/heads/master
Commit: 6d3138559e8395ac1140f10197fcd10badf64883
Parents: 6676032
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Thu Nov 26 15:56:14 2015 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Thu Nov 26 15:56:14 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../src/main/resources/falcon/checkstyle.xml | 6 +-
.../main/resources/falcon/findbugs-exclude.xml | 15 +
common/src/main/resources/startup.properties | 29 ++
pom.xml | 4 +
scheduler/pom.xml | 84 ++++
.../falcon/execution/ExecutionInstance.java | 41 +-
.../execution/FalconExecutionService.java | 24 +-
.../execution/ProcessExecutionInstance.java | 60 ++-
.../org/apache/falcon/predicate/Predicate.java | 50 ++-
.../org/apache/falcon/state/EntityState.java | 30 ++
.../org/apache/falcon/state/InstanceID.java | 19 +
.../org/apache/falcon/state/InstanceState.java | 32 ++
.../org/apache/falcon/state/StateService.java | 1 +
.../falcon/state/store/AbstractStateStore.java | 2 +-
.../falcon/state/store/EntityStateStore.java | 14 +-
.../falcon/state/store/InMemoryStateStore.java | 22 +-
.../falcon/state/store/InstanceStateStore.java | 18 +-
.../apache/falcon/state/store/StateStore.java | 6 +-
.../falcon/state/store/jdbc/BeanMapperUtil.java | 271 ++++++++++++
.../falcon/state/store/jdbc/EntityBean.java | 104 +++++
.../falcon/state/store/jdbc/InstanceBean.java | 199 +++++++++
.../falcon/state/store/jdbc/JDBCStateStore.java | 416 ++++++++++++++++++
.../state/store/service/FalconJPAService.java | 171 ++++++++
.../falcon/tools/FalconStateStoreDBCLI.java | 435 +++++++++++++++++++
.../src/main/resources/META-INF/persistence.xml | 50 +++
.../main/resources/falcon-buildinfo.properties | 28 ++
.../execution/FalconExecutionServiceTest.java | 53 ++-
.../service/SchedulerServiceTest.java | 13 +-
.../falcon/state/AbstractSchedulerTestBase.java | 71 +++
.../falcon/state/EntityStateServiceTest.java | 39 +-
.../falcon/state/InstanceStateServiceTest.java | 24 +-
.../state/service/TestFalconJPAService.java | 64 +++
.../state/service/store/TestJDBCStateStore.java | 397 +++++++++++++++++
.../falcon/tools/TestFalconStateStoreDBCLI.java | 89 ++++
scheduler/src/test/resources/startup.properties | 154 +++++++
src/bin/falcon-db.sh | 49 +++
src/conf/startup.properties | 22 +-
src/main/assemblies/distributed-package.xml | 5 +
src/main/assemblies/standalone-package.xml | 5 +
unit/src/main/resources/startup.properties | 18 +
41 files changed, 3071 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 13d3439..31a2566 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ Trunk (Unreleased)
INCOMPATIBLE CHANGES
NEW FEATURES
+ FALCON-1234 State Store for instances scheduled by Falcon (Pavan Kolamuri via Pallavi Rao)
+
FALCON-1588 Add ability to provide the path for recipe files in command line(Peeyush Bishnoi via Ajay Yadava)
FALCON-1573 Supply user-defined properties to Oozie workflows during schedule(Daniel Del Castillo via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/checkstyle/src/main/resources/falcon/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/src/main/resources/falcon/checkstyle.xml b/checkstyle/src/main/resources/falcon/checkstyle.xml
index 2130e73..292a0a3 100644
--- a/checkstyle/src/main/resources/falcon/checkstyle.xml
+++ b/checkstyle/src/main/resources/falcon/checkstyle.xml
@@ -230,9 +230,9 @@
<!-- allow warnings to be suppressed -->
<module name="SuppressionCommentFilter">
- <property name="offCommentFormat" value="SUSPEND CHECKSTYLE CHECK ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName"/>
- <property name="onCommentFormat" value="RESUME CHECKSTYLE CHECK ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName"/>
- <property name="checkFormat" value="ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName"/>
+ <property name="offCommentFormat" value="SUSPEND CHECKSTYLE CHECK ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName|LineLengthCheck"/>
+ <property name="onCommentFormat" value="RESUME CHECKSTYLE CHECK ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName|LineLengthCheck"/>
+ <property name="checkFormat" value="ParameterNumberCheck|VisibilityModifierCheck|HiddenFieldCheck|MethodName|LineLengthCheck"/>
</module>
</module>
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/checkstyle/src/main/resources/falcon/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/checkstyle/src/main/resources/falcon/findbugs-exclude.xml b/checkstyle/src/main/resources/falcon/findbugs-exclude.xml
index 0a7580d..e1a5a2e 100644
--- a/checkstyle/src/main/resources/falcon/findbugs-exclude.xml
+++ b/checkstyle/src/main/resources/falcon/findbugs-exclude.xml
@@ -31,4 +31,19 @@
<Match>
<Bug pattern="DM_DEFAULT_ENCODING" />
</Match>
+
+ <Match>
+ <Class name="org.apache.falcon.tools.FalconStateStoreDBCLI" />
+ <Bug pattern="SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE" />
+ </Match>
+
+ <Match>
+ <Class name="org.apache.falcon.state.store.jdbc.EntityBean" />
+ <Bug pattern="NP_BOOLEAN_RETURN_NULL" />
+ </Match>
+
+ <Match>
+ <Class name="org.apache.falcon.state.store.jdbc.InstanceBean" />
+ <Bug pattern="NP_BOOLEAN_RETURN_NULL" />
+ </Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index cc5212a..3f1ed03 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -43,6 +43,14 @@
org.apache.falcon.service.LogCleanupService,\
org.apache.falcon.service.GroupsService,\
org.apache.falcon.service.ProxyUserService
+## If you wish to use Falcon native scheduler add the commented out services below to application.services ##
+# org.apache.falcon.notification.service.impl.JobCompletionService,\
+# org.apache.falcon.notification.service.impl.SchedulerService,\
+# org.apache.falcon.notification.service.impl.AlarmService,\
+# org.apache.falcon.notification.service.impl.DataAvailabilityService,\
+# org.apache.falcon.execution.FalconExecutionService,\
+# org.apache.falcon.state.store.service.FalconJPAService
+
# List of Lifecycle policies configured.
*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete
@@ -55,6 +63,8 @@
org.apache.falcon.entity.store.FeedLocationStore,\
org.apache.falcon.service.FeedSLAMonitoringService,\
org.apache.falcon.service.SharedLibraryHostingService
+## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ##
+# org.apache.falcon.state.store.jdbc.JdbcStateStore
##### JMS MQ Broker Implementation class #####
*.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
@@ -247,3 +257,22 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
# Setting monitoring plugin, if SMTP parameters is defined
#*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\
# org.apache.falcon.plugin.EmailNotificationPlugin
+
+######### StateStore Properties #####
+#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+#*.falcon.statestore.jdbc.url=jdbc:derby:data/statestore.db;create=true
+#*.falcon.statestore.jdbc.username=sa
+#*.falcon.statestore.jdbc.password=
+#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+## Maximum number of active connections that can be allocated from this pool at the same time.
+#*.falcon.statestore.pool.max.active.conn=10
+#*.falcon.statestore.connection.properties=
+## Indicates the interval (in milliseconds) between eviction runs.
+#*.falcon.statestore.validate.db.connection.eviction.interval=300000
+## The number of objects to examine during each run of the idle object evictor thread.
+#*.falcon.statestore.validate.db.connection.eviction.num=10
+## Creates Falcon DB.
+## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+#*.falcon.statestore.create.db.schema=true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fad8902..678c87c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,6 +113,10 @@
<quartz.version>2.2.1</quartz.version>
<joda.version>2.8.2</joda.version>
<mockito.version>1.9.5</mockito.version>
+ <openjpa.version>2.4.0</openjpa.version>
+ <javax-validation.version>1.0.0.GA</javax-validation.version>
+ <derby.version>10.10.1.1</derby.version>
+ <commons-dbcp.version>1.4</commons-dbcp.version>
<internal.maven.repo>file:///tmp/falcontemprepo</internal.maven.repo>
<excluded.test.groups>exhaustive</excluded.test.groups>
</properties>
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/pom.xml
----------------------------------------------------------------------
diff --git a/scheduler/pom.xml b/scheduler/pom.xml
index 20a91d2..336997d 100644
--- a/scheduler/pom.xml
+++ b/scheduler/pom.xml
@@ -88,6 +88,33 @@
</dependency>
<dependency>
+ <groupId>org.apache.openjpa</groupId>
+ <artifactId>openjpa-persistence</artifactId>
+ <version>${openjpa.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.openjpa</groupId>
+ <artifactId>openjpa-jdbc</artifactId>
+ <version>${openjpa.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.openjpa</groupId>
+ <artifactId>openjpa-persistence-jdbc</artifactId>
+ <version>${openjpa.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>javax.validation</groupId>
+ <artifactId>validation-api</artifactId>
+ <version>${javax-validation.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
@@ -98,11 +125,18 @@
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>10.10.1.1</version>
+ </dependency>
</dependencies>
<build>
@@ -115,6 +149,56 @@
<target>1.7</target>
</configuration>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <phase>process-classes</phase>
+ <configuration>
+ <tasks>
+ <taskdef name="openjpac" classname="org.apache.openjpa.ant.PCEnhancerTask" classpathref="maven.compile.classpath"/>
+ <openjpac>
+ <classpath refid="maven.compile.classpath"/>
+ </openjpac>
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.8</version>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ </artifactItem>
+ <artifactItem>
+ <groupId>commons-dbcp</groupId>
+ <artifactId>commons-dbcp</artifactId>
+ <version>${commons-dbcp.version}</version>
+ </artifactItem>
+ </artifactItems>
+ <outputDirectory>${project.build.directory}/dependency</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
index 2d6b67d..5f96d3f 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ExecutionInstance.java
@@ -26,7 +26,6 @@ import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.util.List;
-import java.util.TimeZone;
/**
* Represents an execution instance of an entity.
@@ -38,20 +37,31 @@ public abstract class ExecutionInstance implements NotificationHandler {
// External ID is the ID used to identify the Job submitted to the DAG Engine, as returned by the DAG Engine.
// For example, for Oozie this would be the workflow Id.
private String externalID;
+ // Time at which instance has to be run.
private final DateTime instanceTime;
+ // Time at which instance is created.
private final DateTime creationTime;
private DateTime actualStart;
private DateTime actualEnd;
- private static final DateTimeZone UTC = DateTimeZone.forTimeZone(TimeZone.getTimeZone("UTC"));
+ protected static final DateTimeZone UTC = DateTimeZone.UTC;
/**
- * @param instanceTime
+ * @param instanceTime Time at which instance has to be run.
* @param cluster
+ * @param creationTime Time at which instance is created to run.
*/
- public ExecutionInstance(DateTime instanceTime, String cluster) {
+ public ExecutionInstance(DateTime instanceTime, String cluster, DateTime creationTime) {
this.instanceTime = new DateTime(instanceTime, UTC);
this.cluster = cluster;
- this.creationTime = DateTime.now(UTC);
+ this.creationTime = new DateTime(creationTime, UTC);
+ }
+
+ /**
+ * @param instanceTime
+ * @param cluster
+ */
+ public ExecutionInstance(DateTime instanceTime, String cluster) {
+ this(instanceTime, cluster, DateTime.now());
}
/**
@@ -92,7 +102,7 @@ public abstract class ExecutionInstance implements NotificationHandler {
public abstract Entity getEntity();
/**
- * @return - The nominal time of the instance.
+ * @return - The instance time of the instance.
*/
public DateTime getInstanceTime() {
return instanceTime;
@@ -138,16 +148,31 @@ public abstract class ExecutionInstance implements NotificationHandler {
this.actualEnd = actualEnd;
}
-
+ /**
+ * Creation time of an instance.
+ * @return
+ */
public DateTime getCreationTime() {
return creationTime;
}
/**
+ * Set the gating conditions on which this instance is waiting before it is scheduled for execution.
+ * @param predicates
+ */
+ public abstract void setAwaitingPredicates(List<Predicate> predicates);
+
+ /**
* @return - The gating conditions on which this instance is waiting before it is scheduled for execution.
* @throws FalconException
*/
- public abstract List<Predicate> getAwaitingPredicates() throws FalconException;
+ public abstract List<Predicate> getAwaitingPredicates();
+
+ /**
+ * set the sequential numerical id of the instance.
+ */
+ public abstract void setInstanceSequence(int sequence);
+
/**
* Suspends the instance if it is in one of the active states, waiting, ready or running.
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
index b48a65b..b6741a4 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
@@ -22,6 +22,7 @@ import org.apache.falcon.FalconException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.exception.StateStoreException;
import org.apache.falcon.notification.service.event.Event;
import org.apache.falcon.service.FalconService;
import org.apache.falcon.state.EntityClusterID;
@@ -58,17 +59,22 @@ public final class FalconExecutionService implements FalconService, EntityStateC
public void init() {
LOG.debug("State store instance being used : {}", AbstractStateStore.get());
// Initialize all executors from store
- for (Entity entity : AbstractStateStore.get().getEntities(EntityState.STATE.SCHEDULED)) {
- try {
- for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
- EntityExecutor executor = createEntityExecutor(entity, cluster);
- executors.put(new EntityClusterID(entity, cluster), executor);
- executor.schedule();
+ try {
+ for (Entity entity : AbstractStateStore.get().getEntities(EntityState.STATE.SCHEDULED)) {
+ try {
+ for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
+ EntityExecutor executor = createEntityExecutor(entity, cluster);
+ executors.put(new EntityClusterID(entity, cluster), executor);
+ executor.schedule();
+ }
+ } catch (FalconException e) {
+ LOG.error("Unable to load entity : " + entity.getName(), e);
+ throw new RuntimeException(e);
}
- } catch (FalconException e) {
- LOG.error("Unable to load entity : " + entity.getName(), e);
- throw new RuntimeException(e);
}
+ } catch (StateStoreException e) {
+ LOG.error("Unable to get Entities from State Store ", e);
+ throw new RuntimeException(e);
}
// TODO : During migration, the state store itself may not have been completely bootstrapped.
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
index 434f168..cff4a73 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
@@ -49,6 +49,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+
/**
* Represents an execution instance of a process.
* Responsible for user actions such as suspend, resume, kill on individual instances.
@@ -57,7 +58,7 @@ import java.util.List;
public class ProcessExecutionInstance extends ExecutionInstance {
private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutionInstance.class);
private final Process process;
- private List<Predicate> awaitedPredicates = new ArrayList<Predicate>();
+ private List<Predicate> awaitedPredicates = new ArrayList<>();
private DAGEngine dagEngine = null;
private boolean hasTimedOut = false;
private InstanceID id;
@@ -72,8 +73,9 @@ public class ProcessExecutionInstance extends ExecutionInstance {
* @param cluster
* @throws FalconException
*/
- public ProcessExecutionInstance(Process process, DateTime instanceTime, String cluster) throws FalconException {
- super(instanceTime, cluster);
+ public ProcessExecutionInstance(Process process, DateTime instanceTime, String cluster,
+ DateTime creationTime) throws FalconException {
+ super(instanceTime, cluster, creationTime);
this.process = process;
this.id = new InstanceID(process, cluster, getInstanceTime());
computeInstanceSequence();
@@ -81,7 +83,18 @@ public class ProcessExecutionInstance extends ExecutionInstance {
registerForNotifications(false);
}
- // Computes the instance number based on the nominal time.
+ /**
+ *
+ * @param process
+ * @param instanceTime
+ * @param cluster
+ * @throws FalconException
+ */
+ public ProcessExecutionInstance(Process process, DateTime instanceTime, String cluster) throws FalconException {
+ this(process, instanceTime, cluster, DateTime.now(UTC));
+ }
+
+ // Computes the instance number based on the instance Time.
// Method can be extended to assign instance numbers for non-time based instances.
private void computeInstanceSequence() {
for (Cluster processCluster : process.getClusters().getClusters()) {
@@ -225,11 +238,21 @@ public class ProcessExecutionInstance extends ExecutionInstance {
}
@Override
- public List<Predicate> getAwaitingPredicates() throws FalconException {
+ public void setAwaitingPredicates(List<Predicate> predicates) {
+ this.awaitedPredicates = predicates;
+ }
+
+ @Override
+ public List<Predicate> getAwaitingPredicates() {
return awaitedPredicates;
}
@Override
+ public void setInstanceSequence(int sequence) {
+ this.instanceSequence = sequence;
+ }
+
+ @Override
public void suspend() throws FalconException {
if (getExternalID() != null) {
dagEngine.suspend(this);
@@ -242,7 +265,7 @@ public class ProcessExecutionInstance extends ExecutionInstance {
// Was already scheduled on the DAGEngine, so resume on DAGEngine if suspended
if (getExternalID() != null) {
dagEngine.resume(this);
- } else if (awaitedPredicates.size() != 0) {
+ } else if (awaitedPredicates != null && !awaitedPredicates.isEmpty()) {
// Evaluate any remaining predicates
registerForNotifications(true);
}
@@ -271,6 +294,31 @@ public class ProcessExecutionInstance extends ExecutionInstance {
}
@Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || !o.getClass().equals(this.getClass())) {
+ return false;
+ }
+
+ ProcessExecutionInstance processExecutionInstance = (ProcessExecutionInstance) o;
+
+ return this.getId().equals(processExecutionInstance.getId())
+ && Predicate.isEqualAwaitingPredicates(this.getAwaitingPredicates(),
+ processExecutionInstance.getAwaitingPredicates())
+ && this.getInstanceSequence() == (processExecutionInstance.getInstanceSequence());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = id != null ? id.hashCode() : 0;
+ result = 31 * result + (awaitedPredicates != null ? awaitedPredicates.hashCode() : 0);
+ result = 31 * result + instanceSequence;
+ return result;
+ }
+
+ @Override
public void destroy() throws FalconException {
NotificationServicesRegistry.unregister(executionService, getId());
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
index fb4c8c9..164fb0e 100644
--- a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
+++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
@@ -27,14 +27,19 @@ import org.apache.falcon.notification.service.event.TimeElapsedEvent;
import org.apache.falcon.state.ID;
import java.io.Serializable;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
/**
* Represents the gating condition for which an instance is waiting before it is scheduled.
* This will be serialized and stored in state store.
*/
public class Predicate implements Serializable {
+
/**
* Type of predicate, currently data and time are supported.
*/
@@ -47,7 +52,10 @@ public class Predicate implements Serializable {
private final TYPE type;
// A key-value pair of clauses that need make this predicate.
- private Map<String, Comparable> clauses = new HashMap<String, Comparable>();
+ private Map<String, Comparable> clauses = new TreeMap<>();
+
+ // Id for a predicate used for comparison.
+ private String id;
// A generic "any" object that can be used when a particular key is allowed to have any value.
public static final Comparable<? extends Serializable> ANY = new Any();
@@ -59,6 +67,10 @@ public class Predicate implements Serializable {
return type;
}
+ public String getId() {
+ return id;
+ }
+
/**
* @param key
* @return the value corresponding to the key
@@ -106,6 +118,7 @@ public class Predicate implements Serializable {
*/
public Predicate(TYPE type) {
this.type = type;
+ this.id = this.type + String.valueOf(System.currentTimeMillis());
}
/**
@@ -120,7 +133,7 @@ public class Predicate implements Serializable {
* @param rhs - The value in the key-value pair of a clause
* @return This instance
*/
- public Predicate addClause(String lhs, Comparable<? extends Serializable> rhs) {
+ Predicate addClause(String lhs, Comparable<? extends Serializable> rhs) {
clauses.put(lhs, rhs);
return this;
}
@@ -217,4 +230,35 @@ public class Predicate implements Serializable {
return super.hashCode();
}
}
+
+ public static boolean isEqualAwaitingPredicates(List<Predicate> thisAwaitingPredicates,
+ List<Predicate> otherAwaitingPredicates) {
+ if (thisAwaitingPredicates == null && otherAwaitingPredicates == null) {
+ return true;
+ } else if (thisAwaitingPredicates != null && otherAwaitingPredicates != null) {
+ if (thisAwaitingPredicates.size() != otherAwaitingPredicates.size()) {
+ return false;
+ }
+ Collections.sort(thisAwaitingPredicates, new PredicateComparator());
+ Collections.sort(otherAwaitingPredicates, new PredicateComparator());
+
+ Iterator<Predicate> thisIterator = thisAwaitingPredicates.iterator();
+ Iterator<Predicate> otherIterator = otherAwaitingPredicates.iterator();
+
+ while (thisIterator.hasNext()) {
+ if (!thisIterator.next().evaluate(otherIterator.next())) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ static class PredicateComparator implements Serializable, Comparator<Predicate> {
+ @Override
+ public int compare(Predicate o1, Predicate o2) {
+ return o1.getId().compareTo(o2.getId());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
index 15aea9a..f44f174 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
@@ -130,4 +130,34 @@ public class EntityState implements StateMachine<EntityState.STATE, EntityState.
public STATE nextTransition(EVENT event) throws InvalidStateTransitionException {
return currentState.nextTransition(event);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ EntityState other = (EntityState) o;
+
+ if (this.getCurrentState() != null ? !this.getCurrentState().equals(other.getCurrentState())
+ : other.getCurrentState() != null) {
+ return false;
+ }
+
+ if (this.getEntity() != null ? !this.getEntity().equals(other.getEntity())
+ : other.getEntity() != null) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = currentState != null ? currentState.hashCode() : 0;
+ result = 31 * result + (entity != null ? entity.hashCode() : 0);
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java
index a722be9..72cfc33 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceID.java
@@ -80,4 +80,23 @@ public class InstanceID extends ID {
public EntityClusterID getEntityClusterID() {
return new EntityClusterID(entityType, entityName, clusterName);
}
+
+ public static EntityType getEntityType(String id) {
+ if (id == null) {
+ return null;
+ }
+ String[] values = id.split(KEY_SEPARATOR);
+ String entityType = values[0];
+ return EntityType.valueOf(entityType);
+ }
+
+ public static String getEntityName(String id) {
+ if (id == null) {
+ return null;
+ }
+ String[] values = id.split(KEY_SEPARATOR);
+ String entityName = values[1];
+ return entityName;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
index ada9d2b..7f2bda9 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
@@ -252,4 +252,36 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance
public String toString() {
return instance.getId().toString() + "STATE: " + currentState.toString();
}
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ InstanceState other = (InstanceState) o;
+
+ if (this.getCurrentState() != null ? !this.getCurrentState().equals(other.getCurrentState())
+ : other.getCurrentState() != null) {
+ return false;
+ }
+
+ if (this.getInstance() != null ? !this.getInstance().equals(other.getInstance())
+ : other.getInstance() != null) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = currentState != null ? currentState.hashCode() : 0;
+ result = 31 * result + (instance != null ? instance.hashCode() : 0);
+ return result;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/StateService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
index c1671ac..c702cc3 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/StateService.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
@@ -136,6 +136,7 @@ public final class StateService {
InstanceState instanceState = stateStore.getExecutionInstance(id);
InstanceState.STATE newState = instanceState.nextTransition(event);
callbackHandler(instance, event, handler);
+ instanceState = new InstanceState(instance);
instanceState.setCurrentState(newState);
stateStore.updateExecutionInstance(instanceState);
LOG.debug("State of instance: {} changed to: {} as a result of event: {}.", id,
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
index e36f85c..2d576e5 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/AbstractStateStore.java
@@ -79,7 +79,7 @@ public abstract class AbstractStateStore implements StateStore, ConfigurationCha
*/
public static synchronized StateStore get() {
if (stateStore == null) {
- String storeImpl = StartupProperties.get().getProperty("state.store.impl",
+ String storeImpl = StartupProperties.get().getProperty("falcon.state.store.impl",
"org.apache.falcon.state.store.InMemoryStateStore");
try {
stateStore = ReflectionUtils.getInstanceByClassName(storeImpl);
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
index 113f4c5..75a315f 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
@@ -45,18 +45,18 @@ public interface EntityStateStore {
* @param entityId
* @return true, if entity exists in store.
*/
- boolean entityExists(EntityID entityId);
+ boolean entityExists(EntityID entityId) throws StateStoreException;;
/**
* @param state
* @return Entities in a given state.
*/
- Collection<Entity> getEntities(EntityState.STATE state);
+ Collection<Entity> getEntities(EntityState.STATE state) throws StateStoreException;
/**
* @return All Entities in the store.
*/
- Collection<EntityState> getAllEntities();
+ Collection<EntityState> getAllEntities() throws StateStoreException;
/**
* Update an existing entity with the new values.
@@ -73,4 +73,12 @@ public interface EntityStateStore {
* @throws StateStoreException
*/
void deleteEntity(EntityID entityId) throws StateStoreException;
+
+
+ /**
+ * Removes all entities and its instances from the store.
+ *
+ * @throws StateStoreException
+ */
+ void deleteEntities() throws StateStoreException;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
index 52b3bb8..7ab996a 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InMemoryStateStore.java
@@ -50,8 +50,7 @@ public final class InMemoryStateStore extends AbstractStateStore {
private static final StateStore STORE = new InMemoryStateStore();
- private InMemoryStateStore() {
- }
+ private InMemoryStateStore() {}
public static StateStore get() {
return STORE;
@@ -114,6 +113,11 @@ public final class InMemoryStateStore extends AbstractStateStore {
}
@Override
+ public void deleteEntities() throws StateStoreException {
+ entityStates.clear();
+ }
+
+ @Override
public void putExecutionInstance(InstanceState instanceState) throws StateStoreException {
String key = new InstanceID(instanceState.getInstance()).getKey();
if (instanceStates.containsKey(key)) {
@@ -223,6 +227,20 @@ public final class InMemoryStateStore extends AbstractStateStore {
}
}
+ @Override
+ public void deleteExecutionInstances() {
+ instanceStates.clear();
+ }
+
+ @Override
+ public void deleteExecutionInstance(InstanceID instanceID) throws StateStoreException {
+ if (!instanceStates.containsKey(instanceID.toString())) {
+ throw new StateStoreException("Instance with key, " + instanceID.toString() + " does not exist.");
+ }
+ instanceStates.remove(instanceID.toString());
+ }
+
+ @Override
public void clear() {
entityStates.clear();
instanceStates.clear();
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
index 483d9e6..f1d1931 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/InstanceStateStore.java
@@ -104,12 +104,26 @@ public interface InstanceStateStore {
* @param instanceId
* @return true, if instance exists.
*/
- boolean executionInstanceExists(InstanceID instanceId);
+ boolean executionInstanceExists(InstanceID instanceId) throws StateStoreException;
/**
* Delete instances of a given entity.
*
* @param entityId
*/
- void deleteExecutionInstances(EntityID entityId);
+ void deleteExecutionInstances(EntityID entityId) throws StateStoreException;
+
+
+ /**
+ * Delete an instance based on ID.
+ *
+ * @param instanceID
+ * @throws StateStoreException
+ */
+ void deleteExecutionInstance(InstanceID instanceID) throws StateStoreException;
+
+ /**
+ * Delete all instances.
+ */
+ void deleteExecutionInstances();
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
index f595c26..592e1fb 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/StateStore.java
@@ -17,11 +17,15 @@
*/
package org.apache.falcon.state.store;
+import org.apache.falcon.exception.StateStoreException;
import org.apache.falcon.service.ConfigurationChangeListener;
/**
* Interface that combines entity, instance store APIs and also config change listener's.
*/
public interface StateStore extends ConfigurationChangeListener, EntityStateStore, InstanceStateStore {
-
+ /**
+ * Deletes all entities and instances.
+ */
+ void clear() throws StateStoreException;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
new file mode 100644
index 0000000..4bee269
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store.jdbc;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.ProcessExecutionInstance;
+import org.apache.falcon.predicate.Predicate;
+import org.apache.falcon.state.EntityID;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.InstanceID;
+import org.apache.falcon.state.InstanceState;
+import org.joda.time.DateTime;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Mapping util for Persistent Store.
+ */
+public final class BeanMapperUtil {
+ private BeanMapperUtil() {
+ }
+
+ /**
+ * Converts Entity object to EntityBean which will be stored in DB.
+ * @param entityState
+ * @return
+ */
+ public static EntityBean convertToEntityBean(EntityState entityState) {
+ EntityBean entityBean = new EntityBean();
+ Entity entity = entityState.getEntity();
+ String id = new EntityID(entity).getKey();
+ entityBean.setId(id);
+ entityBean.setName(entity.getName());
+ entityBean.setState(entityState.getCurrentState().toString());
+ entityBean.setType(entity.getEntityType().toString());
+ return entityBean;
+ }
+
+ /**
+ * Converts EntityBean of Data Base to EntityState.
+ * @param entityBean
+ * @return
+ * @throws StateStoreException
+ */
+ public static EntityState convertToEntityState(EntityBean entityBean) throws StateStoreException {
+ try {
+ Entity entity = EntityUtil.getEntity(entityBean.getType(), entityBean.getName());
+ EntityState entityState = new EntityState(entity);
+ entityState.setCurrentState(EntityState.STATE.valueOf(entityBean.getState()));
+ return entityState;
+ } catch (FalconException e) {
+ throw new StateStoreException(e);
+ }
+ }
+
+ /**
+ * Converts list of EntityBeans of Data Base to EntityStates.
+ * @param entityBeans
+ * @return
+ * @throws StateStoreException
+ */
+ public static Collection<EntityState> convertToEntityState(Collection<EntityBean> entityBeans)
+ throws StateStoreException {
+ List<EntityState> entityStates = new ArrayList<>();
+ if (entityBeans != null && !entityBeans.isEmpty()) {
+ for (EntityBean entityBean : entityBeans) {
+ entityStates.add(convertToEntityState(entityBean));
+ }
+ }
+ return entityStates;
+ }
+
+ /**
+ * Converts list of EntityBeans of Data Base to Entities.
+ * @param entityBeans
+ * @return
+ * @throws StateStoreException
+ */
+ public static Collection<Entity> convertToEntities(Collection<EntityBean> entityBeans) throws StateStoreException {
+ List<Entity> entities = new ArrayList<>();
+ try {
+ if (entityBeans != null && !entityBeans.isEmpty()) {
+ for (EntityBean entityBean : entityBeans) {
+ Entity entity = EntityUtil.getEntity(entityBean.getType(), entityBean.getName());
+ entities.add(entity);
+ }
+ }
+ return entities;
+ } catch (FalconException e) {
+ throw new StateStoreException(e);
+ }
+ }
+
+ /**
+ * Convert instance of Entity's instance to InstanceBean of DB.
+ * @param instanceState
+ * @return
+ * @throws StateStoreException
+ * @throws IOException
+ */
+ public static InstanceBean convertToInstanceBean(InstanceState instanceState) throws StateStoreException,
+ IOException {
+ InstanceBean instanceBean = new InstanceBean();
+ ExecutionInstance instance = instanceState.getInstance();
+ if (instance.getActualEnd() != null) {
+ instanceBean.setActualEndTime(new Timestamp(instance.getActualEnd().getMillis()));
+ }
+ if (instance.getActualStart() != null) {
+ instanceBean.setActualStartTime(new Timestamp(instance.getActualStart().getMillis()));
+ }
+ if (instanceState.getCurrentState() != null) {
+ instanceBean.setCurrentState(instanceState.getCurrentState().toString());
+ }
+ if (instance.getExternalID() != null) {
+ instanceBean.setExternalID(instanceState.getInstance().getExternalID());
+ }
+
+ instanceBean.setCluster(instance.getCluster());
+ instanceBean.setCreationTime(new Timestamp(instance.getCreationTime().getMillis()));
+ instanceBean.setId(instance.getId().toString());
+ instanceBean.setInstanceTime(new Timestamp(instance.getInstanceTime().getMillis()));
+ instanceBean.setEntityId(new InstanceID(instance).getEntityID().toString());
+
+ instanceBean.setInstanceSequence(instance.getInstanceSequence());
+ if (instance.getAwaitingPredicates() != null && !instance.getAwaitingPredicates().isEmpty()) {
+ ObjectOutputStream out = null;
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ try {
+ out = new ObjectOutputStream(byteArrayOutputStream);
+ out.writeInt(instance.getAwaitingPredicates().size());
+ for (Predicate predicate : instance.getAwaitingPredicates()) {
+ out.writeObject(predicate);
+ }
+ instanceBean.setAwaitedPredicates(byteArrayOutputStream.toByteArray());
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ }
+ return instanceBean;
+ }
+
+ /**
+ * Converts instance entry of DB to instance of ExecutionInstance.
+ * @param instanceBean
+ * @return
+ * @throws StateStoreException
+ * @throws IOException
+ */
+ public static InstanceState convertToInstanceState(InstanceBean instanceBean) throws StateStoreException,
+ IOException {
+ EntityType entityType = InstanceID.getEntityType(instanceBean.getId());
+ ExecutionInstance executionInstance = getExecutionInstance(entityType, instanceBean);
+ if (instanceBean.getActualEndTime() != null) {
+ executionInstance.setActualEnd(new DateTime(instanceBean.getActualEndTime().getTime()));
+ }
+ if (instanceBean.getActualStartTime() != null) {
+ executionInstance.setActualStart(new DateTime(instanceBean.getActualStartTime().getTime()));
+ }
+ executionInstance.setExternalID(instanceBean.getExternalID());
+ executionInstance.setInstanceSequence(instanceBean.getInstanceSequence());
+
+ byte[] result = instanceBean.getAwaitedPredicates();
+ List<Predicate> predicates = new ArrayList<>();
+ if (result != null && result.length != 0) {
+ ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(result);
+ ObjectInputStream in = null;
+ try {
+ in = new ObjectInputStream(byteArrayInputStream);
+ int length = in.readInt();
+ for (int i = 0; i < length; i++) {
+ Predicate predicate = (Predicate) in.readObject();
+ predicates.add(predicate);
+ }
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ } finally {
+ IOUtils.closeQuietly(in);
+ }
+ }
+ executionInstance.setAwaitingPredicates(predicates);
+ InstanceState instanceState = new InstanceState(executionInstance);
+ instanceState.setCurrentState(InstanceState.STATE.valueOf(instanceBean.getCurrentState()));
+ return instanceState;
+ }
+
+ /**
+ * Converting list of instance entries of DB to instance of ExecutionInstance.
+ * @param instanceBeanList
+ * @return
+ * @throws StateStoreException
+ * @throws IOException
+ */
+ public static Collection<InstanceState> convertToInstanceState(List<InstanceBean> instanceBeanList)
+ throws StateStoreException, IOException {
+ List<InstanceState> instanceStates = new ArrayList<>();
+ for (InstanceBean instanceBean : instanceBeanList) {
+ instanceStates.add(convertToInstanceState(instanceBean));
+ }
+ return instanceStates;
+ }
+
+ private static ExecutionInstance getExecutionInstance(EntityType entityType,
+ InstanceBean instanceBean) throws StateStoreException {
+ try {
+ Entity entity = EntityUtil.getEntity(entityType, InstanceID.getEntityName(instanceBean.getId()));
+ return getExecutionInstance(entityType, entity, instanceBean.getInstanceTime().getTime(),
+ instanceBean.getCluster(), instanceBean.getCreationTime().getTime());
+ } catch (FalconException e) {
+ throw new StateStoreException(e);
+ }
+ }
+
+ public static ExecutionInstance getExecutionInstance(EntityType entityType, Entity entity, long instanceTime,
+ String cluster, long creationTime) throws StateStoreException {
+ if (entityType == EntityType.PROCESS) {
+ try {
+ return new ProcessExecutionInstance((org.apache.falcon.entity.v0.process.Process) entity,
+ new DateTime(instanceTime), cluster, new DateTime(creationTime));
+ } catch (FalconException e) {
+ throw new StateStoreException("Entity not found");
+ }
+ } else {
+ throw new UnsupportedOperationException("Not supported for entity type " + entityType.toString());
+ }
+ }
+
+
+ public static byte[] getAwaitedPredicates(InstanceState instanceState) throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ ObjectOutputStream out = null;
+ try {
+ out = new ObjectOutputStream(byteArrayOutputStream);
+ out.writeInt(instanceState.getInstance().getAwaitingPredicates().size());
+ for (Predicate predicate : instanceState.getInstance().getAwaitingPredicates()) {
+ out.writeObject(predicate);
+ }
+ return byteArrayOutputStream.toByteArray();
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
new file mode 100644
index 0000000..03ada39
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store.jdbc;
+
+import org.apache.openjpa.persistence.jdbc.Index;
+
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Entity object which will be stored in Data Base.
+ */
+@Entity
+@NamedQueries({
+ @NamedQuery(name = "GET_ENTITY", query = "select OBJECT(a) from EntityBean a where a.id = :id"),
+ @NamedQuery(name = "GET_ENTITY_FOR_STATE", query = "select OBJECT(a) from EntityBean a where a.state = :state"),
+ @NamedQuery(name = "UPDATE_ENTITY", query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type where a.id = :id"),
+ @NamedQuery(name = "GET_ENTITIES_FOR_TYPE", query = "select OBJECT(a) from EntityBean a where a.type = :type"),
+ @NamedQuery(name = "GET_ENTITIES", query = "select OBJECT(a) from EntityBean a"),
+ @NamedQuery(name = "DELETE_ENTITY", query = "delete from EntityBean a where a.id = :id"),
+ @NamedQuery(name = "DELETE_ENTITIES", query = "delete from EntityBean")})
+//RESUME CHECKSTYLE CHECK LineLengthCheck
+@Table(name = "ENTITIES")
+public class EntityBean {
+ @NotNull
+ @Id
+ private String id;
+
+ @Basic
+ @NotNull
+ @Column(name = "name")
+ private String name;
+
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "type")
+ private String type;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "current_state")
+ private String state;
+
+ public EntityBean() {
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public void setState(String state) {
+ this.state = state;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
new file mode 100644
index 0000000..0e3dfa9
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store.jdbc;
+
+import org.apache.openjpa.persistence.jdbc.Index;
+
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Lob;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+import java.sql.Timestamp;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Instance State which will be stored in DB.
+ */
+@Entity
+@NamedQueries({
+ @NamedQuery(name = "GET_INSTANCE", query = "select OBJECT(a) from InstanceBean a where a.id = :id"),
+ @NamedQuery(name = "DELETE_INSTANCE", query = "delete from InstanceBean a where a.id = :id"),
+ @NamedQuery(name = "DELETE_INSTANCE_FOR_ENTITY", query = "delete from InstanceBean a where a.entityId = :entityId"),
+ @NamedQuery(name = "UPDATE_INSTANCE", query = "update InstanceBean a set a.cluster = :cluster, a.externalID = :externalID, a.instanceTime = :instanceTime, a.creationTime = :creationTime, a.actualEndTime = :actualEndTime, a.currentState = :currentState, a.actualStartTime = :actualStartTime, a.instanceSequence = :instanceSequence, a.awaitedPredicates = :awaitedPredicates where a.id = :id"),
+ @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster"),
+ @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState)"),
+ @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"),
+ @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"),
+ @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"),
+ @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a")
+})
+//RESUME CHECKSTYLE CHECK LineLengthCheck
+@Table(name = "INSTANCES")
+public class InstanceBean {
+
+ @Id
+ @NotNull
+ private String id;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "entity_id")
+ private String entityId;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "cluster")
+ private String cluster;
+
+ @Basic
+ @Index
+ @Column(name = "external_id")
+ private String externalID;
+
+ @Basic
+ @Index
+ @Column(name = "instance_time")
+ private Timestamp instanceTime;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "creation_time")
+ private Timestamp creationTime;
+
+ @Basic
+ @Column(name = "actual_start_time")
+ private Timestamp actualStartTime;
+
+ @Basic
+ @Column(name = "actual_end_time")
+ private Timestamp actualEndTime;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "current_state")
+ private String currentState;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "instance_sequence")
+ private Integer instanceSequence;
+
+
+ @Column(name = "awaited_predicates", columnDefinition = "BLOB")
+ @Lob
+ private byte[] awaitedPredicates;
+
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ public void setCluster(String cluster) {
+ this.cluster = cluster;
+ }
+
+ public String getExternalID() {
+ return externalID;
+ }
+
+ public void setExternalID(String externalID) {
+ this.externalID = externalID;
+ }
+
+ public Timestamp getInstanceTime() {
+ return instanceTime;
+ }
+
+ public void setInstanceTime(Timestamp instanceTime) {
+ this.instanceTime = instanceTime;
+ }
+
+ public Timestamp getCreationTime() {
+ return creationTime;
+ }
+
+ public void setCreationTime(Timestamp creationTime) {
+ this.creationTime = creationTime;
+ }
+
+ public Timestamp getActualStartTime() {
+ return actualStartTime;
+ }
+
+ public void setActualStartTime(Timestamp actualStartTime) {
+ this.actualStartTime = actualStartTime;
+ }
+
+ public Timestamp getActualEndTime() {
+ return actualEndTime;
+ }
+
+ public void setActualEndTime(Timestamp actualEndTime) {
+ this.actualEndTime = actualEndTime;
+ }
+
+ public String getCurrentState() {
+ return currentState;
+ }
+
+ public void setCurrentState(String currentState) {
+ this.currentState = currentState;
+ }
+
+ public byte[] getAwaitedPredicates() {
+ return awaitedPredicates;
+ }
+
+ public void setAwaitedPredicates(byte[] awaitedPredicates) {
+ this.awaitedPredicates = awaitedPredicates;
+ }
+
+ public Integer getInstanceSequence() {
+ return instanceSequence;
+ }
+
+ public void setInstanceSequence(Integer instanceSequence) {
+ this.instanceSequence = instanceSequence;
+ }
+
+ public String getEntityId() {
+ return entityId;
+ }
+
+ public void setEntityId(String entityId) {
+ this.entityId = entityId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6d313855/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
new file mode 100644
index 0000000..ca65b94
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.state.store.jdbc;
+
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.exception.StateStoreException;
+import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.state.EntityClusterID;
+import org.apache.falcon.state.EntityID;
+import org.apache.falcon.state.EntityState;
+import org.apache.falcon.state.ID;
+import org.apache.falcon.state.InstanceID;
+import org.apache.falcon.state.InstanceState;
+import org.apache.falcon.state.store.AbstractStateStore;
+import org.apache.falcon.state.store.StateStore;
+import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.util.StartupProperties;
+import org.joda.time.DateTime;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Persistent Data Store for Entities and Instances.
+ */
+public final class JDBCStateStore extends AbstractStateStore {
+
+ private static final StateStore STORE = new JDBCStateStore();
+ private static final String DEBUG = "debug";
+
+ private JDBCStateStore() {}
+
+ public static StateStore get() {
+ return STORE;
+ }
+
+ @Override
+ public void clear() throws StateStoreException {
+ if (!isModeDebug()) {
+ throw new UnsupportedOperationException("Clear Method not supported");
+ }
+ deleteExecutionInstances();
+ deleteEntities();
+ }
+
+ @Override
+ public void putEntity(EntityState entityState) throws StateStoreException {
+ EntityID entityID = new EntityID(entityState.getEntity());
+ String key = entityID.getKey();
+ if (entityExists(entityID)) {
+ throw new StateStoreException("Entity with key, " + key + " already exists.");
+ }
+ EntityBean entityBean = BeanMapperUtil.convertToEntityBean(entityState);
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ entityManager.persist(entityBean);
+ commitAndCloseTransaction(entityManager);
+ }
+
+
+ @Override
+ public EntityState getEntity(EntityID entityID) throws StateStoreException {
+ EntityState entityState = getEntityByKey(entityID);
+ if (entityState == null) {
+ throw new StateStoreException("Entity with key, " + entityID + " does not exist.");
+ }
+ return entityState;
+ }
+
+ private EntityState getEntityByKey(EntityID id) throws StateStoreException {
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery("GET_ENTITY");
+ q.setParameter("id", id.getKey());
+ List result = q.getResultList();
+ if (result.isEmpty()) {
+ return null;
+ }
+ entityManager.close();
+ return BeanMapperUtil.convertToEntityState((EntityBean) result.get(0));
+ }
+
+ @Override
+ public boolean entityExists(EntityID entityID) throws StateStoreException {
+ return getEntityByKey(entityID) == null ? false : true;
+ }
+
+ @Override
+ public Collection<Entity> getEntities(EntityState.STATE state) throws StateStoreException {
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery("GET_ENTITY_FOR_STATE");
+ q.setParameter("state", state.toString());
+ List result = q.getResultList();
+ entityManager.close();
+ return BeanMapperUtil.convertToEntities(result);
+ }
+
+ @Override
+ public Collection<EntityState> getAllEntities() throws StateStoreException {
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery("GET_ENTITIES");
+ List result = q.getResultList();
+ entityManager.close();
+ return BeanMapperUtil.convertToEntityState(result);
+ }
+
+ @Override
+ public void updateEntity(EntityState entityState) throws StateStoreException {
+ EntityID entityID = new EntityID(entityState.getEntity());
+ if (!entityExists(entityID)) {
+ throw new StateStoreException("Entity with key, " + entityID + " doesn't exists.");
+ }
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query q = entityManager.createNamedQuery("UPDATE_ENTITY");
+ q.setParameter("id", entityID.getKey());
+ if (entityState.getCurrentState() != null) {
+ q.setParameter("state", entityState.getCurrentState().toString());
+ }
+ q.setParameter("type", entityState.getEntity().getEntityType().toString());
+ q.setParameter("name", entityState.getEntity().getName());
+ q.executeUpdate();
+ commitAndCloseTransaction(entityManager);
+ }
+
+ @Override
+ public void deleteEntity(EntityID entityID) throws StateStoreException {
+ if (!entityExists(entityID)) {
+ throw new StateStoreException("Entity with key, " + entityID.getKey() + " does not exist.");
+ }
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query q = entityManager.createNamedQuery("DELETE_ENTITY");
+ q.setParameter("id", entityID.getKey());
+ q.executeUpdate();
+ commitAndCloseTransaction(entityManager);
+ }
+
+ @Override
+ public void deleteEntities() throws StateStoreException {
+ if (!isModeDebug()) {
+ throw new UnsupportedOperationException("Delete Entities Table not supported");
+ }
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query q = entityManager.createNamedQuery("DELETE_ENTITIES");
+ q.executeUpdate();
+ commitAndCloseTransaction(entityManager);
+ }
+
+ @Override
+ public void putExecutionInstance(InstanceState instanceState) throws StateStoreException {
+ InstanceID instanceID = new InstanceID(instanceState.getInstance());
+ if (executionInstanceExists(instanceID)) {
+ throw new StateStoreException("Instance with key, " + instanceID + " already exists.");
+ }
+ try {
+ InstanceBean instanceBean = BeanMapperUtil.convertToInstanceBean(instanceState);
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ entityManager.persist(instanceBean);
+ commitAndCloseTransaction(entityManager);
+ } catch (IOException e) {
+ throw new StateStoreException(e);
+ }
+ }
+
+ @Override
+ public InstanceState getExecutionInstance(InstanceID instanceId) throws StateStoreException {
+ InstanceState instanceState = getExecutionInstanceByKey(instanceId);
+ if (instanceState == null) {
+ throw new StateStoreException("Instance with key, " + instanceId.toString() + " does not exist.");
+ }
+ return instanceState;
+ }
+
+ private InstanceState getExecutionInstanceByKey(ID instanceKey) throws StateStoreException {
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery("GET_INSTANCE");
+ q.setParameter("id", instanceKey.toString());
+ List result = q.getResultList();
+ entityManager.close();
+ if (result.isEmpty()) {
+ return null;
+ }
+ try {
+ InstanceBean instanceBean = (InstanceBean)(result.get(0));
+ return BeanMapperUtil.convertToInstanceState(instanceBean);
+ } catch (IOException e) {
+ throw new StateStoreException(e);
+ }
+ }
+
+ @Override
+ public void updateExecutionInstance(InstanceState instanceState) throws StateStoreException {
+ InstanceID id = new InstanceID(instanceState.getInstance());
+ String key = id.toString();
+ if (!executionInstanceExists(id)) {
+ throw new StateStoreException("Instance with key, " + key + " does not exist.");
+ }
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query q = entityManager.createNamedQuery("UPDATE_INSTANCE");
+ ExecutionInstance instance = instanceState.getInstance();
+ q.setParameter("id", key);
+ q.setParameter("cluster", instance.getCluster());
+ q.setParameter("externalID", instance.getExternalID());
+ q.setParameter("instanceTime", new Timestamp(instance.getInstanceTime().getMillis()));
+ q.setParameter("creationTime", new Timestamp(instance.getCreationTime().getMillis()));
+ if (instance.getActualEnd() != null) {
+ q.setParameter("actualEndTime", new Timestamp(instance.getActualEnd().getMillis()));
+ }
+ q.setParameter("currentState", instanceState.getCurrentState().toString());
+ if (instance.getActualStart() != null) {
+ q.setParameter("actualStartTime", new Timestamp(instance.getActualStart().getMillis()));
+ }
+ q.setParameter("instanceSequence", instance.getInstanceSequence());
+ if (instanceState.getInstance().getAwaitingPredicates() != null
+ && !instanceState.getInstance().getAwaitingPredicates().isEmpty()) {
+ try {
+ q.setParameter("awaitedPredicates", BeanMapperUtil.getAwaitedPredicates(instanceState));
+ } catch (IOException e) {
+ throw new StateStoreException(e);
+ }
+ }
+ q.executeUpdate();
+ commitAndCloseTransaction(entityManager);
+ }
+
+ @Override
+ public Collection<InstanceState> getAllExecutionInstances(Entity entity, String cluster)
+ throws StateStoreException {
+ EntityClusterID id = new EntityClusterID(entity, cluster);
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_CLUSTER");
+ q.setParameter("entityId", id.getEntityID().getKey());
+ q.setParameter("cluster", cluster);
+ List result = q.getResultList();
+ entityManager.close();
+ try {
+ return BeanMapperUtil.convertToInstanceState(result);
+ } catch (IOException e) {
+ throw new StateStoreException(e);
+ }
+ }
+
+ @Override
+ public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
+ Collection<InstanceState.STATE> states)
+ throws StateStoreException {
+ EntityClusterID entityClusterID = new EntityClusterID(entity, cluster);
+ String entityKey = entityClusterID.getEntityID().getKey();
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES");
+ q.setParameter("entityId", entityKey);
+ q.setParameter("cluster", cluster);
+ List<String> instanceStates = new ArrayList<>();
+ for (InstanceState.STATE state : states) {
+ instanceStates.add(state.toString());
+ }
+ q.setParameter("currentState", instanceStates);
+ List result = q.getResultList();
+ entityManager.close();
+ try {
+ return BeanMapperUtil.convertToInstanceState(result);
+ } catch (IOException e) {
+ throw new StateStoreException(e);
+ }
+ }
+
+ @Override
+ public Collection<InstanceState> getExecutionInstances(EntityClusterID id,
+ Collection<InstanceState.STATE> states)
+ throws StateStoreException {
+ String entityKey = id.getEntityID().getKey();
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_FOR_STATES");
+ q.setParameter("entityId", entityKey);
+ List<String> instanceStates = new ArrayList<>();
+ for (InstanceState.STATE state : states) {
+ instanceStates.add(state.toString());
+ }
+ q.setParameter("currentState", instanceStates);
+ List result = q.getResultList();
+ entityManager.close();
+ try {
+ return BeanMapperUtil.convertToInstanceState(result);
+ } catch (IOException e) {
+ throw new StateStoreException(e);
+ }
+ }
+
+ @Override
+ public Collection<InstanceState> getExecutionInstances(Entity entity, String cluster,
+ Collection<InstanceState.STATE> states, DateTime start,
+ DateTime end) throws StateStoreException {
+ String entityKey = new EntityClusterID(entity, cluster).getEntityID().getKey();
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery("GET_INSTANCES_FOR_ENTITY_FOR_STATES_WITH_RANGE");
+ q.setParameter("entityId", entityKey);
+ List<String> instanceStates = new ArrayList<>();
+ for (InstanceState.STATE state : states) {
+ instanceStates.add(state.toString());
+ }
+ q.setParameter("currentState", instanceStates);
+ q.setParameter("startTime", new Timestamp(start.getMillis()));
+ q.setParameter("endTime", new Timestamp(end.getMillis()));
+ List result = q.getResultList();
+ entityManager.close();
+ try {
+ return BeanMapperUtil.convertToInstanceState(result);
+ } catch (IOException e) {
+ throw new StateStoreException(e);
+ }
+ }
+
+ @Override
+ public InstanceState getLastExecutionInstance(Entity entity, String cluster) throws StateStoreException {
+ String key = new EntityClusterID(entity, cluster).getEntityID().getKey();
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery("GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER");
+ q.setParameter("entityId", key);
+ q.setParameter("cluster", cluster);
+ q.setMaxResults(1);
+ List result = q.getResultList();
+ entityManager.close();
+ if (!result.isEmpty()) {
+ try {
+ return BeanMapperUtil.convertToInstanceState((InstanceBean) result.get(0));
+ } catch (IOException e) {
+ throw new StateStoreException(e);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean executionInstanceExists(InstanceID instanceKey) throws StateStoreException {
+ return getExecutionInstanceByKey(instanceKey) == null ? false : true;
+ }
+
+ @Override
+ public void deleteExecutionInstance(InstanceID instanceID) throws StateStoreException {
+ String instanceKey = instanceID.toString();
+ if (!executionInstanceExists(instanceID)) {
+ throw new StateStoreException("Instance with key, " + instanceKey + " does not exist.");
+ }
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query q = entityManager.createNamedQuery("DELETE_INSTANCE");
+ q.setParameter("id", instanceKey);
+ q.executeUpdate();
+ commitAndCloseTransaction(entityManager);
+ }
+
+ @Override
+ public void deleteExecutionInstances(EntityID entityID) {
+ String entityKey = entityID.getKey();
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query q = entityManager.createNamedQuery("DELETE_INSTANCE_FOR_ENTITY");
+ q.setParameter("entityId", entityKey);
+ q.executeUpdate();
+ commitAndCloseTransaction(entityManager);
+ }
+
+ @Override
+ public void deleteExecutionInstances() {
+ if (!isModeDebug()) {
+ throw new UnsupportedOperationException("Delete Instances Table not supported");
+ }
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query q = entityManager.createNamedQuery("DELETE_INSTANCES_TABLE");
+ q.executeUpdate();
+ commitAndCloseTransaction(entityManager);
+ }
+
+ // Debug enabled for test cases
+ private boolean isModeDebug() {
+ return DEBUG.equals(StartupProperties.get().getProperty("domain")) ? true : false;
+ }
+
+ private void commitAndCloseTransaction(EntityManager entityManager) {
+ entityManager.getTransaction().commit();
+ entityManager.close();
+ }
+
+ private void beginTransaction(EntityManager entityManager) {
+ entityManager.getTransaction().begin();
+ }
+
+ private EntityManager getEntityManager() {
+ return FalconJPAService.get().getEntityManager();
+ }
+
+}
[4/4] falcon git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/falcon
Posted by pa...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/falcon
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2bf90130
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2bf90130
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2bf90130
Branch: refs/heads/master
Commit: 2bf90130df482e5501ba40b36a58dade8df08e64
Parents: 6d31385 f982f86
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Thu Nov 26 15:56:49 2015 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Thu Nov 26 15:56:49 2015 +0530
----------------------------------------------------------------------
.reviewboardrc | 24 ++++++++++++++++++++++++
CHANGES.txt | 2 ++
2 files changed, 26 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/2bf90130/CHANGES.txt
----------------------------------------------------------------------