You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2016/03/28 15:39:26 UTC
[3/3] falcon git commit: FALCON-1865 Persist Feed sla data to database
FALCON-1865 Persist Feed sla data to database
Author: Praveen Adlakha <ad...@gmail.com>
Reviewers: Ajay Yadava <aj...@apache.org>
Closes #77 from PraveenAdlakha/feed_alert
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/de2f5c0a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/de2f5c0a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/de2f5c0a
Branch: refs/heads/master
Commit: de2f5c0ab1c26b8d198d067e066c579a86bce737
Parents: 10f3843
Author: Praveen Adlakha <ad...@gmail.com>
Authored: Mon Mar 28 19:08:56 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Mar 28 19:08:56 2016 +0530
----------------------------------------------------------------------
common/pom.xml | 41 ++
.../apache/falcon/persistence/EntityBean.java | 117 +++++
.../apache/falcon/persistence/InstanceBean.java | 229 ++++++++++
.../falcon/persistence/MonitoredFeedsBean.java | 73 ++++
.../falcon/persistence/PendingInstanceBean.java | 98 +++++
.../persistence/PersistenceConstants.java | 35 ++
.../persistence/ResultNotFoundException.java | 31 ++
.../apache/falcon/service/FalconJPAService.java | 170 +++++++
.../falcon/tools/FalconStateStoreDBCLI.java | 438 +++++++++++++++++++
.../src/main/resources/META-INF/persistence.xml | 113 +++++
common/src/main/resources/startup.properties | 7 +-
.../src/main/resources/statestore.credentials | 4 +-
common/src/main/resources/statestore.properties | 20 +-
docs/src/site/twiki/FalconNativeScheduler.twiki | 2 +-
.../falcon/jdbc/MonitoringJdbcStateStore.java | 175 ++++++++
.../service/FeedSLAMonitoringService.java | 191 +++-----
.../jdbc/MonitoringJdbcStateStoreTest.java | 97 ++++
.../falcon/service/FeedSLAMonitoringTest.java | 34 --
scheduler/pom.xml | 42 +-
.../falcon/state/store/jdbc/BeanMapperUtil.java | 2 +
.../falcon/state/store/jdbc/EntityBean.java | 117 -----
.../falcon/state/store/jdbc/InstanceBean.java | 229 ----------
.../falcon/state/store/jdbc/JDBCStateStore.java | 4 +-
.../state/store/service/FalconJPAService.java | 171 --------
.../falcon/tools/FalconStateStoreDBCLI.java | 436 ------------------
.../src/main/resources/META-INF/persistence.xml | 104 -----
.../execution/FalconExecutionServiceTest.java | 2 +-
.../falcon/state/AbstractSchedulerTestBase.java | 2 +-
.../state/service/TestFalconJPAService.java | 2 +-
.../state/service/store/TestJDBCStateStore.java | 2 +-
scheduler/src/test/resources/startup.properties | 5 +-
.../src/test/resources/statestore.properties | 2 +-
src/build/findbugs-exclude.xml | 25 +-
src/conf/startup.properties | 2 +-
unit/pom.xml | 10 +
unit/src/main/resources/startup.properties | 1 +
.../org/apache/falcon/unit/TestFalconUnit.java | 1 +
.../AbstractSchedulerManagerJerseyIT.java | 2 +-
webapp/src/test/resources/startup.properties | 2 +-
39 files changed, 1744 insertions(+), 1294 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index df28f9b..c54f9d8 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -187,6 +187,26 @@
<groupId>com.thinkaurelius.titan</groupId>
<artifactId>titan-berkeleyje</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.openjpa</groupId>
+ <artifactId>openjpa-jdbc</artifactId>
+ <version>${openjpa.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.openjpa</groupId>
+ <artifactId>openjpa-persistence-jdbc</artifactId>
+ <version>${openjpa.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>javax.validation</groupId>
+ <artifactId>validation-api</artifactId>
+ <version>${javax-validation.version}</version>
+ </dependency>
</dependencies>
<build>
@@ -216,6 +236,27 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <phase>process-classes</phase>
+ <configuration>
+ <tasks>
+ <taskdef name="openjpac" classname="org.apache.openjpa.ant.PCEnhancerTask" classpathref="maven.compile.classpath"/>
+ <openjpac>
+ <classpath refid="maven.compile.classpath"/>
+ </openjpac>
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/EntityBean.java b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
new file mode 100644
index 0000000..5c94fa4
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import org.apache.openjpa.persistence.jdbc.Index;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.OneToMany;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+import java.util.List;
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Entity object which will be stored in Data Base.
+ */
+@Entity
+@NamedQueries({
+ @NamedQuery(name = "GET_ENTITY", query = "select OBJECT(a) from EntityBean a where a.id = :id"),
+ @NamedQuery(name = "GET_ENTITY_FOR_STATE", query = "select OBJECT(a) from EntityBean a where a.state = :state"),
+ @NamedQuery(name = "UPDATE_ENTITY", query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type where a.id = :id"),
+ @NamedQuery(name = "GET_ENTITIES_FOR_TYPE", query = "select OBJECT(a) from EntityBean a where a.type = :type"),
+ @NamedQuery(name = "GET_ENTITIES", query = "select OBJECT(a) from EntityBean a"),
+ @NamedQuery(name = "DELETE_ENTITY", query = "delete from EntityBean a where a.id = :id"),
+ @NamedQuery(name = "DELETE_ENTITIES", query = "delete from EntityBean")})
+//RESUME CHECKSTYLE CHECK LineLengthCheck
+@Table(name = "ENTITIES")
+public class EntityBean {
+ @NotNull
+ @Id
+ private String id;
+
+ @Basic
+ @NotNull
+ @Column(name = "name")
+ private String name;
+
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "type")
+ private String type;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "current_state")
+ private String state;
+
+ @OneToMany(cascade= CascadeType.REMOVE, mappedBy="entityBean")
+ private List<InstanceBean> instanceBeans;
+
+ public EntityBean() {
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public void setState(String state) {
+ this.state = state;
+ }
+
+ public List<InstanceBean> getInstanceBeans() {
+ return instanceBeans;
+ }
+
+ public void setInstanceBeans(List<InstanceBean> instanceBeans) {
+ this.instanceBeans = instanceBeans;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java
new file mode 100644
index 0000000..b7e10f1
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import org.apache.openjpa.persistence.jdbc.ForeignKey;
+import org.apache.openjpa.persistence.jdbc.ForeignKeyAction;
+import org.apache.openjpa.persistence.jdbc.Index;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Lob;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+import java.sql.Timestamp;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Instance State which will be stored in DB.
+ */
+@Entity
+@NamedQueries({
+ @NamedQuery(name = "GET_INSTANCE", query = "select OBJECT(a) from InstanceBean a where a.id = :id"),
+ @NamedQuery(name = "GET_INSTANCE_FOR_EXTERNAL_ID", query = "select OBJECT(a) from InstanceBean a where a.externalID = :externalID"),
+ @NamedQuery(name = "DELETE_INSTANCE", query = "delete from InstanceBean a where a.id = :id"),
+ @NamedQuery(name = "DELETE_INSTANCE_FOR_ENTITY", query = "delete from InstanceBean a where a.entityId = :entityId"),
+ @NamedQuery(name = "UPDATE_INSTANCE", query = "update InstanceBean a set a.cluster = :cluster, a.externalID = :externalID, a.instanceTime = :instanceTime, a.creationTime = :creationTime, a.actualEndTime = :actualEndTime, a.currentState = :currentState, a.actualStartTime = :actualStartTime, a.instanceSequence = :instanceSequence, a.awaitedPredicates = :awaitedPredicates, a.properties = :properties where a.id = :id"),
+ @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster"),
+ @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState)"),
+ @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"),
+ @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"),
+ @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"),
+ @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a"),
+ @NamedQuery(name = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE", query = "select a.currentState, COUNT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.instanceTime >= :startTime AND a.instanceTime < :endTime GROUP BY a.currentState")
+})
+//RESUME CHECKSTYLE CHECK LineLengthCheck
+@Table(name = "INSTANCES")
+public class InstanceBean {
+
+ @Id
+ @NotNull
+ private String id;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "entity_id")
+ private String entityId;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "cluster")
+ private String cluster;
+
+ @Basic
+ @Index
+ @Column(name = "external_id")
+ private String externalID;
+
+ @Basic
+ @Index
+ @Column(name = "instance_time")
+ private Timestamp instanceTime;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "creation_time")
+ private Timestamp creationTime;
+
+ @Basic
+ @Column(name = "actual_start_time")
+ private Timestamp actualStartTime;
+
+ @Basic
+ @Column(name = "actual_end_time")
+ private Timestamp actualEndTime;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "current_state")
+ private String currentState;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "instance_sequence")
+ private Integer instanceSequence;
+
+ @ForeignKey(deleteAction= ForeignKeyAction.CASCADE)
+ @ManyToOne(cascade= CascadeType.REMOVE)
+ private EntityBean entityBean;
+
+
+ @Column(name = "awaited_predicates")
+ @Lob
+ private byte[] awaitedPredicates;
+
+ @Column(name = "properties")
+ @Lob
+ private byte[] properties;
+
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ public void setCluster(String cluster) {
+ this.cluster = cluster;
+ }
+
+ public String getExternalID() {
+ return externalID;
+ }
+
+ public void setExternalID(String externalID) {
+ this.externalID = externalID;
+ }
+
+ public Timestamp getInstanceTime() {
+ return instanceTime;
+ }
+
+ public void setInstanceTime(Timestamp instanceTime) {
+ this.instanceTime = instanceTime;
+ }
+
+ public Timestamp getCreationTime() {
+ return creationTime;
+ }
+
+ public void setCreationTime(Timestamp creationTime) {
+ this.creationTime = creationTime;
+ }
+
+ public Timestamp getActualStartTime() {
+ return actualStartTime;
+ }
+
+ public void setActualStartTime(Timestamp actualStartTime) {
+ this.actualStartTime = actualStartTime;
+ }
+
+ public Timestamp getActualEndTime() {
+ return actualEndTime;
+ }
+
+ public void setActualEndTime(Timestamp actualEndTime) {
+ this.actualEndTime = actualEndTime;
+ }
+
+ public String getCurrentState() {
+ return currentState;
+ }
+
+ public void setCurrentState(String currentState) {
+ this.currentState = currentState;
+ }
+
+ public byte[] getAwaitedPredicates() {
+ return awaitedPredicates;
+ }
+
+ public void setAwaitedPredicates(byte[] awaitedPredicates) {
+ this.awaitedPredicates = awaitedPredicates;
+ }
+
+ public Integer getInstanceSequence() {
+ return instanceSequence;
+ }
+
+ public void setInstanceSequence(Integer instanceSequence) {
+ this.instanceSequence = instanceSequence;
+ }
+
+ public String getEntityId() {
+ return entityId;
+ }
+
+ public void setEntityId(String entityId) {
+ this.entityId = entityId;
+ }
+
+ public byte[] getProperties() {
+ return properties;
+ }
+
+ public void setProperties(byte[] properties) {
+ this.properties = properties;
+ }
+
+ public EntityBean getEntityBean() {
+ return entityBean;
+ }
+
+ public void setEntityBean(EntityBean entityBean) {
+ this.entityBean = entityBean;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
new file mode 100644
index 0000000..2b48569
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import javax.persistence.Entity;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Column;
+import javax.persistence.Basic;
+import javax.validation.constraints.NotNull;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+* The Feeds that are to be monitered will be stored in the db.
+* */
+
+@Entity
+@NamedQueries({
+ @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from "
+ + "MonitoredFeedsBean a where a.feedName = :feedName"),
+ @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredFeedsBean "
+ + "a where a.feedName = :feedName"),
+ @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select OBJECT(a) "
+ + "from MonitoredFeedsBean a")
+})
+@Table(name="MONITORED_FEEDS")
+//RESUME CHECKSTYLE CHECK LineLengthCheck
+public class MonitoredFeedsBean {
+ @NotNull
+ @GeneratedValue(strategy = GenerationType.AUTO)
+ @Id
+ private String id;
+
+ @Basic
+ @NotNull
+ @Column(name = "feed_name")
+ private String feedName;
+
+ public String getFeedName() {
+ return feedName;
+ }
+
+ public void setFeedName(String feedName) {
+ this.feedName = feedName;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
new file mode 100644
index 0000000..038244a
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import javax.persistence.Entity;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.GenerationType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.Id;
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.validation.constraints.NotNull;
+import java.util.Date;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+* The instances of feed to be monitored will be stored in db.
+* */
+@Entity
+@NamedQueries({
+ @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"),
+ @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"),
+ @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED, query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
+ @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
+ @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a ")
+})
+@Table(name = "PENDING_INSTANCES")
+//RESUME CHECKSTYLE CHECK LineLengthCheck
+public class PendingInstanceBean {
+ @NotNull
+ @GeneratedValue(strategy = GenerationType.AUTO)
+ @Id
+ private String id;
+
+ @Basic
+ @NotNull
+ @Column(name = "feed_name")
+ private String feedName;
+
+ @Basic
+ @NotNull
+ @Column(name = "cluster_name")
+ private String clusterName;
+
+ @Basic
+ @NotNull
+ @Column(name = "nominal_time")
+ private Date nominalTime;
+
+ public Date getNominalTime() {
+ return nominalTime;
+ }
+
+ public void setNominalTime(Date nominalTime) {
+ this.nominalTime = nominalTime;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getFeedName() {
+ return feedName;
+ }
+
+ public void setFeedName(String feedName) {
+ this.feedName = feedName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
new file mode 100644
index 0000000..511270e
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+/**
+ * The name of queries to be used as constants accross the packages.
+ */
+
+public final class PersistenceConstants {
+ private PersistenceConstants(){
+
+ }
+ public static final String GET_MONITERED_INSTANCE = "GET_MONITERED_INSTANCE";
+ public static final String DELETE_MONITORED_INSTANCES = "DELETE_MONITORED_INSTANCES";
+ public static final String GET_ALL_MONITORING_FEEDS = "GET_ALL_MONITORING_FEEDS";
+ public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES";
+ public static final String DELETE_PENDING_NOMINAL_INSTANCES = "DELETE_PENDING_NOMINAL_INSTANCES";
+ public static final String DELETE_ALL_INSTANCES_FOR_FEED = "DELETE_ALL_INSTANCES_FOR_FEED";
+ public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES";
+ public static final String GET_ALL_PENDING_INSTANCES = "GET_ALL_PENDING_INSTANCES";
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java b/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java
new file mode 100644
index 0000000..c368d2c
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.persistence;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * Exception to be through by the bean classes.
+ */
+public class ResultNotFoundException extends FalconException {
+
+ public ResultNotFoundException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/service/FalconJPAService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/FalconJPAService.java b/common/src/main/java/org/apache/falcon/service/FalconJPAService.java
new file mode 100644
index 0000000..73fde33
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/service/FalconJPAService.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.service;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.persistence.EntityBean;
+import org.apache.falcon.persistence.InstanceBean;
+import org.apache.falcon.util.StateStoreProperties;
+import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import java.text.MessageFormat;
+import java.util.Properties;
+
+/**
+ * Service that manages JPA.
+ */
+public final class FalconJPAService implements FalconService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FalconJPAService.class);
+ public static final String PREFIX = "falcon.statestore.";
+
+ public static final String DB_SCHEMA = PREFIX + "schema.name";
+ public static final String URL = PREFIX + "jdbc.url";
+ public static final String DRIVER = PREFIX + "jdbc.driver";
+ public static final String USERNAME = PREFIX + "jdbc.username";
+ public static final String PASSWORD = PREFIX + "jdbc.password";
+ public static final String CONN_DATA_SOURCE = PREFIX + "connection.data.source";
+ public static final String CONN_PROPERTIES = PREFIX + "connection.properties";
+ public static final String MAX_ACTIVE_CONN = PREFIX + "pool.max.active.conn";
+ public static final String CREATE_DB_SCHEMA = PREFIX + "create.db.schema";
+ public static final String VALIDATE_DB_CONN = PREFIX + "validate.db.connection";
+ public static final String VALIDATE_DB_CONN_EVICTION_INTERVAL = PREFIX + "validate.db.connection.eviction.interval";
+ public static final String VALIDATE_DB_CONN_EVICTION_NUM = PREFIX + "validate.db.connection.eviction.num";
+
+ private EntityManagerFactory entityManagerFactory;
+ // Persistent Unit which is defined in persistence.xml
+ private String persistenceUnit;
+ private static final FalconJPAService FALCON_JPA_SERVICE = new FalconJPAService();
+
+ private FalconJPAService() {
+ }
+
+ public static FalconJPAService get() {
+ return FALCON_JPA_SERVICE;
+ }
+
+ public EntityManagerFactory getEntityManagerFactory() {
+ return entityManagerFactory;
+ }
+
+ public void setPersistenceUnit(String dbType) {
+ if (StringUtils.isEmpty(dbType)) {
+ throw new IllegalArgumentException(" DB type cannot be null or empty");
+ }
+ dbType = dbType.split(":")[0];
+ this.persistenceUnit = "falcon-" + dbType;
+ }
+
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public void init() throws FalconException {
+ Properties props = getPropsforStore();
+ entityManagerFactory = Persistence.
+ createEntityManagerFactory(persistenceUnit, props);
+ EntityManager entityManager = getEntityManager();
+ entityManager.find(EntityBean.class, 1);
+ entityManager.find(InstanceBean.class, 1);
+ LOG.info("All entities initialized");
+
+ // need to use a pseudo no-op transaction so all entities, datasource
+ // and connection pool are initialized one time only
+ entityManager.getTransaction().begin();
+ OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) entityManagerFactory;
+ // Mask the password with '***'
+ String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,");
+ LOG.info("JPA configuration: {0}", logMsg);
+ entityManager.getTransaction().commit();
+ entityManager.close();
+ }
+
+ private Properties getPropsforStore() throws FalconException {
+ String dbSchema = StateStoreProperties.get().getProperty(DB_SCHEMA);
+ String url = StateStoreProperties.get().getProperty(URL);
+ String driver = StateStoreProperties.get().getProperty(DRIVER);
+ String user = StateStoreProperties.get().getProperty(USERNAME);
+ String password = StateStoreProperties.get().getProperty(PASSWORD).trim();
+ String maxConn = StateStoreProperties.get().getProperty(MAX_ACTIVE_CONN).trim();
+ String dataSource = StateStoreProperties.get().getProperty(CONN_DATA_SOURCE);
+ String connPropsConfig = StateStoreProperties.get().getProperty(CONN_PROPERTIES);
+ boolean autoSchemaCreation = Boolean.parseBoolean(StateStoreProperties.get().getProperty(CREATE_DB_SCHEMA,
+ "false"));
+ boolean validateDbConn = Boolean.parseBoolean(StateStoreProperties.get().getProperty(VALIDATE_DB_CONN, "true"));
+ String evictionInterval = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_INTERVAL).trim();
+ String evictionNum = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_NUM).trim();
+
+ if (!url.startsWith("jdbc:")) {
+ throw new FalconException("invalid JDBC URL, must start with 'jdbc:'" + url);
+ }
+ String dbType = url.substring("jdbc:".length());
+ if (dbType.indexOf(":") <= 0) {
+ throw new FalconException("invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'" + url);
+ }
+ setPersistenceUnit(dbType);
+ String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}";
+ connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn);
+ Properties props = new Properties();
+ if (autoSchemaCreation) {
+ connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
+ props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");
+ } else if (validateDbConn) {
+ // validation can be done only if the schema already exist, else a
+ // connection cannot be obtained to create the schema.
+ String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval;
+ String num = "numTestsPerEvictionRun=" + evictionNum;
+ connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num;
+ connProps += ",ValidationQuery=select 1";
+ connProps = MessageFormat.format(connProps, dbSchema);
+ } else {
+ connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
+ }
+ if (connPropsConfig != null) {
+ connProps += "," + connPropsConfig;
+ }
+ props.setProperty("openjpa.ConnectionProperties", connProps);
+ props.setProperty("openjpa.ConnectionDriverName", dataSource);
+ return props;
+ }
+
+ @Override
+ public void destroy() throws FalconException {
+ if (entityManagerFactory.isOpen()) {
+ entityManagerFactory.close();
+ }
+ }
+
+
+ /**
+ * Return an EntityManager. Used by the StoreService.
+ *
+ * @return an entity manager
+ */
+ public EntityManager getEntityManager() {
+ return getEntityManagerFactory().createEntityManager();
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
new file mode 100644
index 0000000..df8194c
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.tools;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.falcon.cli.CLIParser;
+import org.apache.falcon.service.FalconJPAService;
+import org.apache.falcon.util.BuildProperties;
+import org.apache.falcon.util.StateStoreProperties;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Command Line utility for Table Creation, Update.
+ */
+public class FalconStateStoreDBCLI {
+ public static final String HELP_CMD = "help";
+ public static final String VERSION_CMD = "version";
+ public static final String CREATE_CMD = "create";
+ public static final String SQL_FILE_OPT = "sqlfile";
+ public static final String RUN_OPT = "run";
+ public static final String UPGRADE_CMD = "upgrade";
+
+ // Represents whether DB instance exists or not.
+ private boolean instanceExists;
+ private static final String[] FALCON_HELP =
+ {"Falcon DB initialization tool currently supports Derby DB/ Mysql/ PostgreSQL"};
+
+ public static void main(String[] args) {
+ new FalconStateStoreDBCLI().run(args);
+ }
+
+ public FalconStateStoreDBCLI() {
+ instanceExists = false;
+ }
+
+ protected Options getOptions() {
+ Option sqlfile = new Option(SQL_FILE_OPT, true,
+ "Generate SQL script instead of creating/upgrading the DB schema");
+ Option run = new Option(RUN_OPT, false, "Confirmation option regarding DB schema creation/upgrade");
+ Options options = new Options();
+ options.addOption(sqlfile);
+ options.addOption(run);
+ return options;
+ }
+
+ public synchronized int run(String[] args) {
+ if (instanceExists) {
+ throw new IllegalStateException("CLI instance already used");
+ }
+ instanceExists = true;
+
+ CLIParser parser = new CLIParser("falcondb", FALCON_HELP);
+ parser.addCommand(HELP_CMD, "", "Display usage for all commands or specified command", new Options(), false);
+ parser.addCommand(VERSION_CMD, "", "Show Falcon DB version information", new Options(), false);
+ parser.addCommand(CREATE_CMD, "", "Create Falcon DB schema", getOptions(), false);
+ parser.addCommand(UPGRADE_CMD, "", "Upgrade Falcon DB schema", getOptions(), false);
+
+ try {
+ CLIParser.Command command = parser.parse(args);
+ if (command.getName().equals(HELP_CMD)) {
+ parser.showHelp();
+ } else if (command.getName().equals(VERSION_CMD)) {
+ showVersion();
+ } else {
+ if (!command.getCommandLine().hasOption(SQL_FILE_OPT)
+ && !command.getCommandLine().hasOption(RUN_OPT)) {
+ throw new Exception("'-sqlfile <FILE>' or '-run' options must be specified");
+ }
+ CommandLine commandLine = command.getCommandLine();
+ String sqlFile = (commandLine.hasOption(SQL_FILE_OPT))
+ ? commandLine.getOptionValue(SQL_FILE_OPT)
+ : File.createTempFile("falcondb-", ".sql").getAbsolutePath();
+ boolean run = commandLine.hasOption(RUN_OPT);
+ if (command.getName().equals(CREATE_CMD)) {
+ createDB(sqlFile, run);
+ } else if (command.getName().equals(UPGRADE_CMD)) {
+ upgradeDB(sqlFile, run);
+ }
+ System.out.println("The SQL commands have been written to: " + sqlFile);
+ if (!run) {
+ System.out.println("WARN: The SQL commands have NOT been executed, you must use the '-run' option");
+ }
+ }
+ return 0;
+ } catch (ParseException ex) {
+ System.err.println("Invalid sub-command: " + ex.getMessage());
+ System.err.println();
+ System.err.println(parser.shortHelp());
+ return 1;
+ } catch (Exception ex) {
+ System.err.println();
+ System.err.println("Error: " + ex.getMessage());
+ System.err.println();
+ System.err.println("Stack trace for the error was (for debug purposes):");
+ System.err.println("--------------------------------------");
+ ex.printStackTrace(System.err);
+ System.err.println("--------------------------------------");
+ System.err.println();
+ return 1;
+ }
+ }
+
+ private void upgradeDB(String sqlFile, boolean run) throws Exception {
+ validateConnection();
+ if (!checkDBExists()) {
+ throw new Exception("Falcon DB doesn't exist");
+ }
+ String falconVersion = BuildProperties.get().getProperty("project.version");
+ String dbVersion = getFalconDBVersion();
+ if (dbVersion.compareTo(falconVersion) >= 0) {
+ System.out.println("Falcon DB already upgraded to Falcon version '" + falconVersion + "'");
+ return;
+ }
+
+ createUpgradeDB(sqlFile, run, false);
+ upgradeFalconDBVersion(sqlFile, run, falconVersion);
+
+ // any post upgrade tasks
+ if (run) {
+ System.out.println("Falcon DB has been upgraded to Falcon version '" + falconVersion + "'");
+ }
+ }
+
+
+ private void upgradeFalconDBVersion(String sqlFile, boolean run, String version) throws Exception {
+ String updateDBVersion = "update FALCON_DB_PROPS set data='" + version + "' where name='db.version'";
+ PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
+ writer.println();
+ writer.println(updateDBVersion);
+ writer.close();
+ System.out.println("Upgrade db.version in FALCON_DB_PROPS table to " + version);
+ if (run) {
+ Connection conn = createConnection();
+ Statement st = null;
+ try {
+ conn.setAutoCommit(true);
+ st = conn.createStatement();
+ st.executeUpdate(updateDBVersion);
+ st.close();
+ } catch (Exception ex) {
+ throw new Exception("Could not upgrade db.version in FALCON_DB_PROPS table: " + ex.toString(), ex);
+ } finally {
+ closeStatement(st);
+ conn.close();
+ }
+ }
+ System.out.println("DONE");
+ }
+
+ private static final String GET_FALCON_DB_VERSION = "select data from FALCON_DB_PROPS where name = 'db.version'";
+
+ private String getFalconDBVersion() throws Exception {
+ String version;
+ System.out.println("Get Falcon DB version");
+ Connection conn = createConnection();
+ Statement st = null;
+ ResultSet rs = null;
+ try {
+ st = conn.createStatement();
+ rs = st.executeQuery(GET_FALCON_DB_VERSION);
+ if (rs.next()) {
+ version = rs.getString(1);
+ } else {
+ throw new Exception("ERROR: Could not find Falcon DB 'db.version' in FALCON_DB_PROPS table");
+ }
+ } catch (Exception ex) {
+ throw new Exception("ERROR: Could not query FALCON_DB_PROPS table: " + ex.toString(), ex);
+ } finally {
+ closeResultSet(rs);
+ closeStatement(st);
+ conn.close();
+ }
+ System.out.println("DONE");
+ return version;
+ }
+
+
+ private Map<String, String> getJdbcConf() throws Exception {
+ Map<String, String> jdbcConf = new HashMap<String, String>();
+ jdbcConf.put("driver", StateStoreProperties.get().getProperty(FalconJPAService.DRIVER));
+ String url = StateStoreProperties.get().getProperty(FalconJPAService.URL);
+ jdbcConf.put("url", url);
+ jdbcConf.put("user", StateStoreProperties.get().getProperty(FalconJPAService.USERNAME));
+ jdbcConf.put("password", StateStoreProperties.get().getProperty(FalconJPAService.PASSWORD));
+ String dbType = url.substring("jdbc:".length());
+ if (dbType.indexOf(":") <= 0) {
+ throw new RuntimeException("Invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");
+ }
+ dbType = dbType.substring(0, dbType.indexOf(":"));
+ jdbcConf.put("dbtype", dbType);
+ return jdbcConf;
+ }
+
+ private String[] createMappingToolArguments(String sqlFile) throws Exception {
+ Map<String, String> conf = getJdbcConf();
+ List<String> args = new ArrayList<String>();
+ args.add("-schemaAction");
+ args.add("add");
+ args.add("-p");
+ args.add("persistence.xml#falcon-" + conf.get("dbtype"));
+ args.add("-connectionDriverName");
+ args.add(conf.get("driver"));
+ args.add("-connectionURL");
+ args.add(conf.get("url"));
+ args.add("-connectionUserName");
+ args.add(conf.get("user"));
+ args.add("-connectionPassword");
+ args.add(conf.get("password"));
+ if (sqlFile != null) {
+ args.add("-sqlFile");
+ args.add(sqlFile);
+ }
+ args.add("-indexes");
+ args.add("true");
+ args.add("org.apache.falcon.persistence.EntityBean");
+ args.add("org.apache.falcon.persistence.InstanceBean");
+ args.add("org.apache.falcon.persistence.PendingInstanceBean");
+ args.add("org.apache.falcon.persistence.MonitoredFeedsBean");
+ return args.toArray(new String[args.size()]);
+ }
+
+ private void createDB(String sqlFile, boolean run) throws Exception {
+ validateConnection();
+ if (checkDBExists()) {
+ return;
+ }
+
+ verifyFalconPropsTable(false);
+ createUpgradeDB(sqlFile, run, true);
+ createFalconPropsTable(sqlFile, run, BuildProperties.get().getProperty("project.version"));
+ if (run) {
+ System.out.println("Falcon DB has been created for Falcon version '"
+ + BuildProperties.get().getProperty("project.version") + "'");
+ }
+ }
+
+ private static final String CREATE_FALCON_DB_PROPS =
+ "create table FALCON_DB_PROPS (name varchar(100), data varchar(100))";
+
+ private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception {
+ String insertDbVerion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";
+
+ PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
+ writer.println();
+ writer.println(CREATE_FALCON_DB_PROPS);
+ writer.println(insertDbVerion);
+ writer.close();
+ System.out.println("Create FALCON_DB_PROPS table");
+ if (run) {
+ Connection conn = createConnection();
+ Statement st = null;
+ try {
+ conn.setAutoCommit(true);
+ st = conn.createStatement();
+ st.executeUpdate(CREATE_FALCON_DB_PROPS);
+ st.executeUpdate(insertDbVerion);
+ st.close();
+ } catch (Exception ex) {
+ closeStatement(st);
+ throw new Exception("Could not create FALCON_DB_PROPS table: " + ex.toString(), ex);
+ } finally {
+ conn.close();
+ }
+ }
+ System.out.println("DONE");
+ }
+
+ private static final String FALCON_DB_PROPS_EXISTS = "select count(*) from FALCON_DB_PROPS";
+
+ private boolean verifyFalconPropsTable(boolean exists) throws Exception {
+ System.out.println((exists) ? "Check FALCON_DB_PROPS table exists"
+ : "Checking FALCON_DB_PROPS table does not exist");
+ boolean tableExists;
+ Connection conn = createConnection();
+ Statement st = null;
+ ResultSet rs = null;
+ try {
+ st = conn.createStatement();
+ rs = st.executeQuery(FALCON_DB_PROPS_EXISTS);
+ rs.next();
+ tableExists = true;
+ } catch (Exception ex) {
+ tableExists = false;
+ } finally {
+ closeResultSet(rs);
+ closeStatement(st);
+ conn.close();
+ }
+ if (tableExists != exists) {
+ throw new Exception("FALCON_DB_PROPS_TABLE table " + ((exists) ? "does not exist" : "exists"));
+ }
+ System.out.println("DONE");
+ return tableExists;
+ }
+
+ private void closeResultSet(ResultSet rs) {
+ try {
+ if (rs != null) {
+ rs.close();
+ }
+ } catch (Exception e) {
+ System.out.println("Unable to close ResultSet " + rs);
+ }
+ }
+
+ private void closeStatement(Statement st) throws Exception {
+ try {
+ if (st != null) {
+ st.close();
+ }
+ } catch (Exception e) {
+ System.out.println("Unable to close SQL Statement " + st);
+ throw new Exception(e);
+ }
+ }
+
+ private Connection createConnection() throws Exception {
+ Map<String, String> conf = getJdbcConf();
+ Class.forName(conf.get("driver")).newInstance();
+ return DriverManager.getConnection(conf.get("url"), conf.get("user"), conf.get("password"));
+ }
+
+ private void validateConnection() throws Exception {
+ System.out.println("Validating DB Connection");
+ try {
+ createConnection().close();
+ System.out.println("DONE");
+ } catch (Exception ex) {
+ throw new Exception("Could not connect to the database: " + ex.toString(), ex);
+ }
+ }
+
+ private static final String ENTITY_STATUS_QUERY =
+ "select count(*) from ENTITIES where current_state IN ('RUNNING', 'SUSPENDED')";
+ private static final String INSTANCE_STATUS_QUERY =
+ "select count(*) from INSTANCES where current_state IN ('RUNNING', 'SUSPENDED')";
+
+ private boolean checkDBExists() throws Exception {
+ boolean schemaExists;
+ Connection conn = createConnection();
+ ResultSet rs = null;
+ Statement st = null;
+ try {
+ st = conn.createStatement();
+ rs = st.executeQuery(ENTITY_STATUS_QUERY);
+ rs.next();
+ schemaExists = true;
+ } catch (Exception ex) {
+ schemaExists = false;
+ } finally {
+ closeResultSet(rs);
+ closeStatement(st);
+ conn.close();
+ }
+ System.out.println("DB schema " + ((schemaExists) ? "exists" : "does not exist"));
+ return schemaExists;
+ }
+
+ private void createUpgradeDB(String sqlFile, boolean run, boolean create) throws Exception {
+ System.out.println((create) ? "Create SQL schema" : "Upgrade SQL schema");
+ String[] args = createMappingToolArguments(sqlFile);
+ org.apache.openjpa.jdbc.meta.MappingTool.main(args);
+ if (run) {
+ args = createMappingToolArguments(null);
+ org.apache.openjpa.jdbc.meta.MappingTool.main(args);
+ }
+ System.out.println("DONE");
+ }
+
+ private void showVersion() throws Exception {
+ System.out.println("Falcon Server version: "
+ + BuildProperties.get().getProperty("project.version"));
+ validateConnection();
+ if (!checkDBExists()) {
+ throw new Exception("Falcon DB doesn't exist");
+ }
+ try {
+ verifyFalconPropsTable(true);
+ } catch (Exception ex) {
+ throw new Exception("ERROR: It seems this Falcon DB was never upgraded with the 'falcondb' tool");
+ }
+ showFalconPropsInfo();
+ }
+
+ private static final String GET_FALCON_PROPS_INFO = "select name, data from FALCON_DB_PROPS order by name";
+
+ private void showFalconPropsInfo() throws Exception {
+ Connection conn = createConnection();
+ Statement st = null;
+ ResultSet rs = null;
+ try {
+ System.out.println("Falcon DB Version Information");
+ System.out.println("--------------------------------------");
+ st = conn.createStatement();
+ rs = st.executeQuery(GET_FALCON_PROPS_INFO);
+ while (rs.next()) {
+ System.out.println(rs.getString(1) + ": " + rs.getString(2));
+ }
+ System.out.println("--------------------------------------");
+ } catch (Exception ex) {
+ throw new Exception("ERROR querying FALCON_DB_PROPS table: " + ex.toString(), ex);
+ } finally {
+ closeResultSet(rs);
+ closeStatement(st);
+ conn.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml
new file mode 100644
index 0000000..4c9388c
--- /dev/null
+++ b/common/src/main/resources/META-INF/persistence.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<persistence xmlns="http://java.sun.com/xml/ns/persistence"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ version="1.0">
+
+ <persistence-unit name="falcon-derby" transaction-type="RESOURCE_LOCAL">
+ <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+
+ <class>org.apache.falcon.persistence.EntityBean</class>
+ <class>org.apache.falcon.persistence.InstanceBean</class>
+ <class>org.apache.falcon.persistence.PendingInstanceBean</class>
+ <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
+
+ <properties>
+ <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
+
+ <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
+
+ <property name="openjpa.MetaDataFactory"
+ value="jpa(Types=org.apache.falcon.persistence.EntityBean;
+ org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
+ org.apache.falcon.persistence.MonitoredFeedsBean)"></property>
+
+ <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
+ <property name="openjpa.LockManager" value="pessimistic"/>
+ <property name="openjpa.ReadLockLevel" value="read"/>
+ <property name="openjpa.WriteLockLevel" value="write"/>
+ <property name="openjpa.jdbc.TransactionIsolation" value="read-committed"/> <!--CUSTOM-->
+ <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
+ <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
+ <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
+ <property name="openjpa.Log" value="log4j"/>
+ </properties>
+ </persistence-unit>
+
+ <persistence-unit name="falcon-mysql" transaction-type="RESOURCE_LOCAL">
+ <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+
+ <class>org.apache.falcon.persistence.EntityBean</class>
+ <class>org.apache.falcon.persistence.InstanceBean</class>
+ <class>org.apache.falcon.persistence.PendingInstanceBean</class>
+ <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
+
+ <properties>
+ <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
+
+ <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
+
+ <property name="openjpa.MetaDataFactory"
+ value="jpa(Types=org.apache.falcon.persistence.EntityBean;
+ org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
+ org.apache.falcon.persistence.MonitoredFeedsBean)"></property>
+
+ <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
+ <property name="openjpa.LockManager" value="pessimistic"/>
+ <property name="openjpa.ReadLockLevel" value="read"/>
+ <property name="openjpa.WriteLockLevel" value="write"/>
+ <property name="openjpa.jdbc.TransactionIsolation" value="repeatable-read"/> <!--CUSTOM-->
+ <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
+ <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
+ <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
+ <property name="openjpa.Log" value="log4j"/>
+ </properties>
+ </persistence-unit>
+
+ <persistence-unit name="falcon-postgresql" transaction-type="RESOURCE_LOCAL">
+ <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+
+ <class>org.apache.falcon.persistence.EntityBean</class>
+ <class>org.apache.falcon.persistence.InstanceBean</class>
+ <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
+ <class>org.apache.falcon.persistence.PendingInstanceBean</class>
+
+ <properties>
+ <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
+
+ <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
+
+ <property name="openjpa.MetaDataFactory"
+ value="jpa(Types=org.apache.falcon.persistence.EntityBean;
+ org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
+ org.apache.falcon.persistence.MonitoredFeedsBean)"></property>
+
+ <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
+ <property name="openjpa.LockManager" value="pessimistic"/>
+ <property name="openjpa.ReadLockLevel" value="read"/>
+ <property name="openjpa.WriteLockLevel" value="write"/>
+ <property name="openjpa.jdbc.TransactionIsolation" value="repeatable-read"/> <!--CUSTOM-->
+ <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
+ <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
+ <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
+ <property name="openjpa.Log" value="log4j"/>
+ </properties>
+ </persistence-unit>
+
+</persistence>
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 81d3da1..87a74bf 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -42,7 +42,8 @@
org.apache.falcon.metadata.MetadataMappingService,\
org.apache.falcon.service.LogCleanupService,\
org.apache.falcon.service.GroupsService,\
- org.apache.falcon.service.ProxyUserService
+ org.apache.falcon.service.ProxyUserService,\
+ org.apache.falcon.service.FalconJPAService
## Add if you want to use Falcon Azure integration ##
# org.apache.falcon.adfservice.ADFProviderService
## If you wish to use Falcon native scheduler add the commented out services below to application.services ##
@@ -51,7 +52,7 @@
# org.apache.falcon.notification.service.impl.AlarmService,\
# org.apache.falcon.notification.service.impl.DataAvailabilityService,\
# org.apache.falcon.execution.FalconExecutionService,\
-# org.apache.falcon.state.store.service.FalconJPAService
+
# List of Lifecycle policies configured.
@@ -305,4 +306,4 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
## Creates Falcon DB.
## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
-#*.falcon.statestore.create.db.schema=true
\ No newline at end of file
+#*.falcon.statestore.create.db.schema=true
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/statestore.credentials
----------------------------------------------------------------------
diff --git a/common/src/main/resources/statestore.credentials b/common/src/main/resources/statestore.credentials
index 86c32a1..b0e4196 100644
--- a/common/src/main/resources/statestore.credentials
+++ b/common/src/main/resources/statestore.credentials
@@ -18,5 +18,5 @@
######### StateStore Credentials #####
-#*.falcon.statestore.jdbc.username=sa
-#*.falcon.statestore.jdbc.password=
\ No newline at end of file
+*.falcon.statestore.jdbc.username=sa
+*.falcon.statestore.jdbc.password=
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/statestore.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/statestore.properties b/common/src/main/resources/statestore.properties
index 44e79b3..7686426 100644
--- a/common/src/main/resources/statestore.properties
+++ b/common/src/main/resources/statestore.properties
@@ -42,4 +42,22 @@
## Creates Falcon DB.
## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
-#*.falcon.statestore.create.db.schema=true
\ No newline at end of file
+#*.falcon.statestore.create.db.schema=true
+
+
+######## StateStore Properties #####
+*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true
+*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+# Maximum number of active connections that can be allocated from this pool at the same time.
+*.falcon.statestore.pool.max.active.conn=10
+*.falcon.statestore.connection.properties=
+# Indicates the interval (in milliseconds) between eviction runs.
+*.falcon.statestore.validate.db.connection.eviction.interval=300000
+# The number of objects to examine during each run of the idle object evictor thread.
+*.falcon.statestore.validate.db.connection.eviction.num=10
+# Creates Falcon DB.
+# If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+# If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+*.falcon.statestore.create.db.schema=true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/docs/src/site/twiki/FalconNativeScheduler.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconNativeScheduler.twiki b/docs/src/site/twiki/FalconNativeScheduler.twiki
index 9ffc5e9..1f51739 100644
--- a/docs/src/site/twiki/FalconNativeScheduler.twiki
+++ b/docs/src/site/twiki/FalconNativeScheduler.twiki
@@ -29,7 +29,7 @@ You can enable native scheduler by making changes to __$FALCON_HOME/conf/startup
org.apache.falcon.service.ProcessSubscriberService,\
org.apache.falcon.service.FeedSLAMonitoringService,\
org.apache.falcon.service.LifecyclePolicyMap,\
- org.apache.falcon.state.store.service.FalconJPAService,\
+ org.apache.falcon.service.FalconJPAService,\
org.apache.falcon.entity.store.ConfigurationStore,\
org.apache.falcon.rerun.service.RetryService,\
org.apache.falcon.rerun.service.LateRunService,\
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
new file mode 100644
index 0000000..39e2562
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.jdbc;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.falcon.persistence.MonitoredFeedsBean;
+import org.apache.falcon.persistence.PendingInstanceBean;
+import org.apache.falcon.persistence.PersistenceConstants;
+import org.apache.falcon.persistence.ResultNotFoundException;
+import org.apache.falcon.service.FalconJPAService;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.util.Date;
+import java.util.List;
+
+/**
+* StateStore for MonitoringFeeds and PendingFeedInstances.
+*/
+
+public class MonitoringJdbcStateStore {
+
+ private EntityManager getEntityManager() {
+ return FalconJPAService.get().getEntityManager();
+ }
+
+
+ public void putMonitoredFeed(String feedName){
+
+ MonitoredFeedsBean monitoredFeedsBean = new MonitoredFeedsBean();
+ monitoredFeedsBean.setFeedName(feedName);
+ EntityManager entityManager = getEntityManager();
+ try {
+ beginTransaction(entityManager);
+ entityManager.persist(monitoredFeedsBean);
+ } finally {
+ commitAndCloseTransaction(entityManager);
+ }
+ }
+
+ public MonitoredFeedsBean getMonitoredFeed(String feedName){
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITERED_INSTANCE);
+ q.setParameter("feedName", feedName);
+ List result = q.getResultList();
+ try {
+ if (result.isEmpty()) {
+ return null;
+ }
+ } finally {
+ entityManager.close();
+ }
+ return ((MonitoredFeedsBean)result.get(0));
+ }
+
+ public void deleteMonitoringFeed(String feedName) {
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_INSTANCES);
+ q.setParameter("feedName", feedName);
+ try{
+ q.executeUpdate();
+ } finally {
+ commitAndCloseTransaction(entityManager);
+ }
+ }
+
+ public List<MonitoredFeedsBean> getAllMonitoredFeed() throws ResultNotFoundException{
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_FEEDS);
+ List result = q.getResultList();
+ try{
+ if (result.isEmpty()) {
+ throw new ResultNotFoundException("No Feed has been scheduled for monitoring.");
+ }
+ } finally {
+ entityManager.close();
+ }
+ return result;
+ }
+
+ public void deletePendingInstance(String feedName, String clusterName , Date nominalTime){
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES);
+ q.setParameter("feedName", feedName);
+ q.setParameter("clusterName", clusterName);
+ q.setParameter("nominalTime", nominalTime);
+ try{
+ q.executeUpdate();
+ } finally {
+ commitAndCloseTransaction(entityManager);
+ }
+ }
+
+ public void deletePendingInstances(String feedName, String clusterName){
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED);
+ q.setParameter("feedName", feedName);
+ q.setParameter("clusterName", clusterName);
+ try{
+ q.executeUpdate();
+ } finally {
+ commitAndCloseTransaction(entityManager);
+ }
+ }
+
+ public void putPendingInstances(String feed, String clusterName, Date nominalTime){
+ EntityManager entityManager = getEntityManager();
+ PendingInstanceBean pendingInstanceBean = new PendingInstanceBean();
+ pendingInstanceBean.setFeedName(feed);
+ pendingInstanceBean.setClusterName(clusterName);
+ pendingInstanceBean.setNominalTime(nominalTime);
+
+ beginTransaction(entityManager);
+ entityManager.persist(pendingInstanceBean);
+ commitAndCloseTransaction(entityManager);
+ }
+
+ public List<Date> getNominalInstances(String feedName, String clusterName) throws ResultNotFoundException{
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES);
+ q.setParameter("feedName", feedName);
+ q.setParameter("clusterName", clusterName);
+ List result = q.getResultList();
+ try{
+ if (CollectionUtils.isEmpty(result)) {
+ throw new ResultNotFoundException(feedName + " with " + clusterName + "Not Found");
+ }
+ } finally {
+ entityManager.close();
+ }
+ return result;
+ }
+ public List<PendingInstanceBean> getAllInstances(){
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES);
+ List result = q.getResultList();
+
+ try {
+ if (CollectionUtils.isEmpty(result)) {
+ return null;
+ }
+ } finally{
+ entityManager.close();
+ }
+ return result;
+ }
+
+ private void commitAndCloseTransaction(EntityManager entityManager) {
+ entityManager.getTransaction().commit();
+ entityManager.close();
+ }
+
+ private void beginTransaction(EntityManager entityManager) {
+ entityManager.getTransaction().begin();
+ }
+
+}