You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2016/03/28 15:39:24 UTC
[1/3] falcon git commit: FALCON-1865 Persist Feed sla data to database
Repository: falcon
Updated Branches:
refs/heads/master 10f3843ad -> de2f5c0ab
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/webapp/src/test/resources/startup.properties
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/startup.properties b/webapp/src/test/resources/startup.properties
index bc88534..3544f0a 100644
--- a/webapp/src/test/resources/startup.properties
+++ b/webapp/src/test/resources/startup.properties
@@ -32,7 +32,7 @@
*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
org.apache.falcon.service.ProcessSubscriberService,\
- org.apache.falcon.state.store.service.FalconJPAService,\
+ org.apache.falcon.service.FalconJPAService,\
org.apache.falcon.entity.store.ConfigurationStore,\
org.apache.falcon.rerun.service.RetryService,\
org.apache.falcon.rerun.service.LateRunService,\
[2/3] falcon git commit: FALCON-1865 Persist Feed sla data to database
Posted by aj...@apache.org.
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
index 29bd7ba..b5a2569 100644
--- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
@@ -17,7 +17,7 @@
*/
package org.apache.falcon.service;
-import org.apache.commons.io.IOUtils;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Pair;
import org.apache.falcon.entity.EntityUtil;
@@ -31,6 +31,9 @@ import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Sla;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
+import org.apache.falcon.persistence.MonitoredFeedsBean;
+import org.apache.falcon.persistence.PendingInstanceBean;
import org.apache.falcon.resource.SchedulableEntityInstance;
import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.util.StartupProperties;
@@ -38,24 +41,15 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
import java.util.Date;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
+import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -66,6 +60,8 @@ import java.util.concurrent.TimeUnit;
public final class FeedSLAMonitoringService implements ConfigurationChangeListener, FalconService {
private static final Logger LOG = LoggerFactory.getLogger("FeedSLA");
+ private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore();
+
private static final String ONE_HOUR = String.valueOf(60 * 60 * 1000);
private static final int ONE_MS = 1;
@@ -88,29 +84,10 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
/**
- * Feeds to be monitored.
- */
- protected Set<String> monitoredFeeds;
-
-
- /**
- * Map<Pair<feedName, clusterName>, Set<instanceTime> to store
- * each missing instance of a feed.
- */
- protected Map<Pair<String, String>, BlockingQueue<Date>> pendingInstances;
-
-
- /**
* Used to store the last time when pending instances were checked for SLA.
*/
private Date lastCheckedAt;
-
- /**
- * Used to store last time when the state was serialized to the store.
- */
- private Date lastSerializedAt;
-
/**
* Frequency in seconds of "status check" for pending feed instances.
*/
@@ -156,7 +133,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
if (currentClusters.contains(cluster.getName())) {
if (FeedHelper.getSLA(cluster, feed) != null) {
LOG.debug("Adding feed:{} for monitoring", feed.getName());
- monitoredFeeds.add(feed.getName());
+ MONITORING_JDBC_STATE_STORE.putMonitoredFeed(feed.getName());
}
}
}
@@ -173,8 +150,8 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
for (Cluster cluster : feed.getClusters().getClusters()) {
if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
- monitoredFeeds.remove(feed.getName());
- pendingInstances.remove(new Pair<>(feed.getName(), cluster.getName()));
+ MONITORING_JDBC_STATE_STORE.deleteMonitoringFeed(feed.getName());
+ MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), cluster.getName());
}
}
}
@@ -212,7 +189,7 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
}
for (String clusterName : slaRemovedClusters) {
- pendingInstances.remove(new Pair<>(newFeed.getName(), clusterName));
+ MONITORING_JDBC_STATE_STORE.deletePendingInstances(newFeed.getName(), clusterName);
}
}
}
@@ -247,32 +224,21 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
String size = StartupProperties.get().getProperty("feed.sla.queue.size", "288");
queueSize = Integer.parseInt(size);
- try {
- if (fileSystem.exists(filePath)) {
- deserialize(filePath);
- } else {
- LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString());
- initializeService();
- }
- } catch (IOException e) {
- throw new FalconException("Couldn't check the existence of " + filePath, e);
- }
+ LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString());
+ initializeService();
+
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS);
}
- @Override
- public void destroy() throws FalconException {
- serializeState(); // store the state of monitoring service to the disk.
- }
-
- public void makeFeedInstanceAvailable(String feedName, String clusterName, Date nominalTime) {
+ public void makeFeedInstanceAvailable(String feedName, String clusterName, Date nominalTime)
+ throws FalconException {
LOG.info("Removing {} feed's instance {} in cluster {} from pendingSLA", feedName,
clusterName, nominalTime);
- Pair<String, String> feedCluster = new Pair<>(feedName, clusterName);
+ List<Date> instances = (MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName));
// Slas for feeds not having sla tag are not stored.
- if (pendingInstances.get(feedCluster) != null) {
- pendingInstances.get(feedCluster).remove(nominalTime);
+ if (CollectionUtils.isEmpty(instances)){
+ MONITORING_JDBC_STATE_STORE.deletePendingInstance(feedName, clusterName, nominalTime);
}
}
@@ -290,13 +256,17 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
}
}
+ @Override
+ public void destroy() throws FalconException {
+ }
+
//Periodically update status of pending instances, add new instances and take backup.
private class Monitor implements Runnable {
@Override
public void run() {
try {
- if (!monitoredFeeds.isEmpty()) {
+ if (MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed().size() > 0) {
checkPendingInstanceAvailability();
// add Instances from last checked time to 10 minutes from now(some buffer for status check)
@@ -304,12 +274,6 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis);
addNewPendingFeedInstances(lastCheckedAt, newCheckPoint);
lastCheckedAt = newCheckPoint;
-
- //take backup
- if (now.getTime() - lastSerializedAt.getTime() > serializationFrequencyMillis) {
- serializeState();
- lastSerializedAt = new Date();
- }
}
} catch (Throwable e) {
LOG.error("Feed SLA monitoring failed: ", e);
@@ -320,14 +284,18 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
void addNewPendingFeedInstances(Date from, Date to) throws FalconException {
Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
- for (String feedName : monitoredFeeds) {
+ List<MonitoredFeedsBean> feedsBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed();
+ for(MonitoredFeedsBean monitoredFeedsBean : feedsBeanList) {
+ String feedName = monitoredFeedsBean.getFeedName();
Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
for (Cluster feedCluster : feed.getClusters().getClusters()) {
if (currentClusters.contains(feedCluster.getName())) {
Date nextInstanceTime = from;
Pair<String, String> key = new Pair<>(feed.getName(), feedCluster.getName());
- BlockingQueue<Date> instances = pendingInstances.get(key);
- if (instances == null) {
+ BlockingQueue<Date> instances = new LinkedBlockingQueue<>(
+ MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, feedCluster.getName()));
+ if (CollectionUtils.isEmpty(MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName,
+ feedCluster.getName()))) {
instances = new LinkedBlockingQueue<>(queueSize);
Date feedStartTime = feedCluster.getValidity().getStart();
Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, feedCluster);
@@ -357,7 +325,9 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime);
}
- pendingInstances.put(key, instances);
+ for(Date date:instances){
+ MONITORING_JDBC_STATE_STORE.putPendingInstances(feed.getName(), feedCluster.getName(), date);
+ }
}
}
}
@@ -368,11 +338,14 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
* Checks the availability of all the pendingInstances and removes the ones which have become available.
*/
private void checkPendingInstanceAvailability() throws FalconException {
- for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> entry: pendingInstances.entrySet()) {
- for (Date date : entry.getValue()) {
- boolean status = checkFeedInstanceAvailability(entry.getKey().first, entry.getKey().second, date);
+ for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){
+ for (Date date : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getFeedName(),
+ pendingInstanceBean.getClusterName())) {
+ boolean status = checkFeedInstanceAvailability(pendingInstanceBean.getFeedName(),
+ pendingInstanceBean.getClusterName(), date);
if (status) {
- pendingInstances.get(entry.getKey()).remove(date);
+ MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getFeedName(),
+ pendingInstanceBean.getClusterName(), date);
}
}
}
@@ -402,79 +375,9 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
return false;
}
- private void serializeState() throws FalconException{
- LOG.info("Saving context to: [{}]", storePath);
-
- //create a temporary file and rename it.
- Path tmp = new Path(storePath , "tmp");
- ObjectOutputStream oos = null;
- try {
- OutputStream out = fileSystem.create(tmp);
- oos = new ObjectOutputStream(out);
- Map<String, Object> state = new HashMap<>();
- state.put("lastSerializedAt", lastSerializedAt.getTime());
- state.put("lastCheckedAt", lastCheckedAt.getTime());
- state.put("pendingInstances", pendingInstances);
- oos.writeObject(state);
- fileSystem.rename(tmp, filePath);
- } catch (IOException e) {
- throw new FalconException("Error serializing context to : " + storePath.toUri(), e);
- } finally {
- IOUtils.closeQuietly(oos);
- }
- }
-
- @SuppressWarnings("unchecked")
- private void deserialize(Path path) throws FalconException {
- try {
- Map<String, Object> state = deserializeInternal(path);
- pendingInstances = new ConcurrentHashMap<>();
- Map<Pair<String, String>, BlockingQueue<Date>> pendingInstancesCopy =
- (Map<Pair<String, String>, BlockingQueue<Date>>) state.get("pendingInstances");
- // queue size can change during restarts, hence copy
- for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> entry : pendingInstancesCopy.entrySet()) {
- BlockingQueue<Date> value = new LinkedBlockingQueue<>(queueSize);
- BlockingQueue<Date> oldValue = entry.getValue();
- LOG.debug("Number of old instances:{}, new queue size:{}", oldValue.size(), queueSize);
- while (!oldValue.isEmpty()) {
- Date instance = oldValue.remove();
- if (value.size() == queueSize) { // if full
- LOG.debug("Deserialization: Removing value={} for <feed,cluster>={}", value.peek(),
- entry.getKey());
- value.remove();
- }
- LOG.debug("Deserialization Adding: key={} to <feed,cluster>={}", entry.getKey(), instance);
- value.add(instance);
- }
- pendingInstances.put(entry.getKey(), value);
- }
- lastCheckedAt = new Date((Long) state.get("lastCheckedAt"));
- lastSerializedAt = new Date((Long) state.get("lastSerializedAt"));
- monitoredFeeds = new ConcurrentHashSet<>(); // will be populated on the onLoad of entities.
- LOG.debug("Restored the service from old state.");
- } catch (IOException | ClassNotFoundException e) {
- throw new FalconException("Couldn't deserialize the old state", e);
- }
- }
protected void initializeService() {
- pendingInstances = new ConcurrentHashMap<>();
lastCheckedAt = new Date();
- lastSerializedAt = new Date();
- monitoredFeeds = new ConcurrentHashSet<>();
- }
-
- @SuppressWarnings("unchecked")
- private Map<String, Object> deserializeInternal(Path path) throws IOException, ClassNotFoundException {
- Map<String, Object> state;
- InputStream in = fileSystem.open(path);
- ObjectInputStream ois = new ObjectInputStream(in);
- try {
- state = (Map<String, Object>) ois.readObject();
- } finally {
- IOUtils.closeQuietly(ois);
- }
- return state;
}
/**
@@ -492,13 +395,16 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(Date start, Date end)
throws FalconException {
Set<SchedulableEntityInstance> result = new HashSet<>();
- for (Map.Entry<Pair<String, String>, BlockingQueue<Date>> feedInstances : pendingInstances.entrySet()) {
- Pair<String, String> feedClusterPair = feedInstances.getKey();
+ for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){
+ Pair<String, String> feedClusterPair = new Pair<>(pendingInstanceBean.getFeedName(),
+ pendingInstanceBean.getClusterName());
Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first);
Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
Sla sla = FeedHelper.getSLA(cluster, feed);
if (sla != null) {
- Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end, feedInstances.getValue());
+ Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end,
+ new LinkedBlockingQueue<Date>(MONITORING_JDBC_STATE_STORE.getNominalInstances(
+ pendingInstanceBean.getFeedName(), pendingInstanceBean.getClusterName())));
for (Pair<Date, String> status : slaStatus){
SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first,
feedClusterPair.second, status.first, EntityType.FEED);
@@ -525,7 +431,8 @@ public final class FeedSLAMonitoringService implements ConfigurationChangeListen
Set<SchedulableEntityInstance> result = new HashSet<>();
Pair<String, String> feedClusterPair = new Pair<>(feedName, clusterName);
- BlockingQueue<Date> missingInstances = pendingInstances.get(feedClusterPair);
+ BlockingQueue<Date> missingInstances = new LinkedBlockingQueue<>(MONITORING_JDBC_STATE_STORE.
+ getNominalInstances(feedName, clusterName));
Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
Sla sla = FeedHelper.getSLA(cluster, feed);
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
new file mode 100644
index 0000000..aa32167
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.jdbc;
+
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.service.FalconJPAService;
+import org.apache.falcon.tools.FalconStateStoreDBCLI;
+import org.apache.falcon.util.StateStoreProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.util.Date;
+import java.util.Random;
+
+/**
+*Unit test for MonitoringJdbcStateStore.
+ * */
+
+public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
+ private static final String DB_BASE_DIR = "target/test-data/persistancedb";
+ protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
+ protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
+ protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
+ protected LocalFileSystem fs = new LocalFileSystem();
+
+ private static Random randomValGenerator = new Random();
+ private static FalconJPAService falconJPAService = FalconJPAService.get();
+
+ protected int execDBCLICommands(String[] args) {
+ return new FalconStateStoreDBCLI().run(args);
+ }
+
+ public void createDB(String file) {
+ File sqlFile = new File(file);
+ String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" };
+ int result = execDBCLICommands(argsCreate);
+ Assert.assertEquals(0, result);
+ Assert.assertTrue(sqlFile.exists());
+
+ }
+
+ @BeforeClass
+ public void setup() throws Exception{
+ StateStoreProperties.get().setProperty(FalconJPAService.URL, url);
+ Configuration localConf = new Configuration();
+ fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
+ fs.mkdirs(new Path(DB_BASE_DIR));
+ createDB(DB_SQL_FILE);
+ falconJPAService.init();
+ this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+ this.conf = dfsCluster.getConf();
+ }
+
+ @Test
+ public void testInsertRetrieveAndUpdate() throws Exception {
+
+ MonitoringJdbcStateStore monitoringJdbcStateStore = new MonitoringJdbcStateStore();
+ monitoringJdbcStateStore.putMonitoredFeed("test_feed1");
+ monitoringJdbcStateStore.putMonitoredFeed("test_feed2");
+ Assert.assertEquals("test_feed1", monitoringJdbcStateStore.getMonitoredFeed("test_feed1").getFeedName());
+ Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredFeed().size(), 2);
+
+ monitoringJdbcStateStore.deleteMonitoringFeed("test_feed1");
+ monitoringJdbcStateStore.deleteMonitoringFeed("test_feed2");
+ Date dateOne = SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
+ Date dateTwo = SchemaHelper.parseDateUTC("2015-11-20T01:00Z");
+ monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateOne);
+ monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateTwo);
+
+ Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 2);
+ monitoringJdbcStateStore.deletePendingInstance("test_feed1", "test_cluster", dateOne);
+ Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 1);
+ monitoringJdbcStateStore.deletePendingInstances("test_feed1", "test_cluster");
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
index 90eec4d..b739037 100644
--- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
+++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
@@ -121,40 +121,6 @@ public class FeedSLAMonitoringTest extends AbstractTestBase {
AbstractSchedulableEntityManager.validateSlaParams("feed", null, "2015-05-05T00:00Z", null, "*");
}
- @Test
- public void testMakeFeedInstanceAvailable() {
- Date instanceDate = SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
- Date nextInstanceDate = SchemaHelper.parseDateUTC("2015-11-20T01:00Z");
- Pair<String, String> feedCluster = new Pair<>("testFeed", "testCluster");
-
- BlockingQueue<Date> missingInstances = new LinkedBlockingQueue<>();
- missingInstances.add(instanceDate);
- missingInstances.add(nextInstanceDate);
-
- FeedSLAMonitoringService.get().initializeService();
- FeedSLAMonitoringService.get().pendingInstances.put(feedCluster, missingInstances);
- FeedSLAMonitoringService.get().makeFeedInstanceAvailable("testFeed", "testCluster", instanceDate);
-
- Assert.assertEquals(FeedSLAMonitoringService.get().pendingInstances.get(feedCluster).size(), 1);
- }
-
- @Test
- public void testEndDateCheck() throws Exception {
- Cluster cluster = publishCluster();
- publishFeed(cluster, "hours(1)", "2015-11-20 00:00 UTC", "2015-11-20 05:00 UTC");
- Pair<String, String> feedCluster = new Pair<>(FEED_NAME, CLUSTER_NAME);
-
- FeedSLAMonitoringService service = FeedSLAMonitoringService.get();
- service.initializeService();
- service.queueSize = 100;
- service.monitoredFeeds.add(FEED_NAME);
- Date from = SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
- Date to = SchemaHelper.parseDateUTC("2015-11-25T00:00Z");
- service.addNewPendingFeedInstances(from, to);
- // check that instances after feed's end date are not added.
- Assert.assertEquals(service.pendingInstances.get(feedCluster).size(), 5);
- }
-
private Cluster publishCluster() throws FalconException {
Cluster cluster = new Cluster();
cluster.setName(CLUSTER_NAME);
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/pom.xml
----------------------------------------------------------------------
diff --git a/scheduler/pom.xml b/scheduler/pom.xml
index dc006a1..6cb1c0d 100644
--- a/scheduler/pom.xml
+++ b/scheduler/pom.xml
@@ -95,26 +95,6 @@
</dependency>
<dependency>
- <groupId>org.apache.openjpa</groupId>
- <artifactId>openjpa-jdbc</artifactId>
- <version>${openjpa.version}</version>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.openjpa</groupId>
- <artifactId>openjpa-persistence-jdbc</artifactId>
- <version>${openjpa.version}</version>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
- <groupId>javax.validation</groupId>
- <artifactId>validation-api</artifactId>
- <version>${javax-validation.version}</version>
- </dependency>
-
- <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
@@ -169,27 +149,7 @@
</configuration>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-antrun-plugin</artifactId>
- <version>1.8</version>
- <executions>
- <execution>
- <phase>process-classes</phase>
- <configuration>
- <tasks>
- <taskdef name="openjpac" classname="org.apache.openjpa.ant.PCEnhancerTask" classpathref="maven.compile.classpath"/>
- <openjpac>
- <classpath refid="maven.compile.classpath"/>
- </openjpac>
- </tasks>
- </configuration>
- <goals>
- <goal>run</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
index 194819e..3384186 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
@@ -27,6 +27,8 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.exception.StateStoreException;
import org.apache.falcon.execution.ExecutionInstance;
import org.apache.falcon.execution.ProcessExecutionInstance;
+import org.apache.falcon.persistence.EntityBean;
+import org.apache.falcon.persistence.InstanceBean;
import org.apache.falcon.predicate.Predicate;
import org.apache.falcon.state.EntityID;
import org.apache.falcon.state.EntityState;
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
deleted file mode 100644
index 37fb0cb..0000000
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/EntityBean.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.state.store.jdbc;
-
-import org.apache.openjpa.persistence.jdbc.Index;
-
-import javax.persistence.Basic;
-import javax.persistence.CascadeType;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.Id;
-import javax.persistence.NamedQueries;
-import javax.persistence.NamedQuery;
-import javax.persistence.OneToMany;
-import javax.persistence.Table;
-import javax.validation.constraints.NotNull;
-import java.util.List;
-//SUSPEND CHECKSTYLE CHECK LineLengthCheck
-/**
- * Entity object which will be stored in Data Base.
- */
-@Entity
-@NamedQueries({
- @NamedQuery(name = "GET_ENTITY", query = "select OBJECT(a) from EntityBean a where a.id = :id"),
- @NamedQuery(name = "GET_ENTITY_FOR_STATE", query = "select OBJECT(a) from EntityBean a where a.state = :state"),
- @NamedQuery(name = "UPDATE_ENTITY", query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type where a.id = :id"),
- @NamedQuery(name = "GET_ENTITIES_FOR_TYPE", query = "select OBJECT(a) from EntityBean a where a.type = :type"),
- @NamedQuery(name = "GET_ENTITIES", query = "select OBJECT(a) from EntityBean a"),
- @NamedQuery(name = "DELETE_ENTITY", query = "delete from EntityBean a where a.id = :id"),
- @NamedQuery(name = "DELETE_ENTITIES", query = "delete from EntityBean")})
-//RESUME CHECKSTYLE CHECK LineLengthCheck
-@Table(name = "ENTITIES")
-public class EntityBean {
- @NotNull
- @Id
- private String id;
-
- @Basic
- @NotNull
- @Column(name = "name")
- private String name;
-
-
- @Basic
- @Index
- @NotNull
- @Column(name = "type")
- private String type;
-
- @Basic
- @Index
- @NotNull
- @Column(name = "current_state")
- private String state;
-
- @OneToMany(cascade= CascadeType.REMOVE, mappedBy="entityBean")
- private List<InstanceBean> instanceBeans;
-
- public EntityBean() {
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public String getState() {
- return state;
- }
-
- public void setState(String state) {
- this.state = state;
- }
-
- public List<InstanceBean> getInstanceBeans() {
- return instanceBeans;
- }
-
- public void setInstanceBeans(List<InstanceBean> instanceBeans) {
- this.instanceBeans = instanceBeans;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
deleted file mode 100644
index dffb116..0000000
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/InstanceBean.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.state.store.jdbc;
-
-import org.apache.openjpa.persistence.jdbc.ForeignKey;
-import org.apache.openjpa.persistence.jdbc.ForeignKeyAction;
-import org.apache.openjpa.persistence.jdbc.Index;
-
-import javax.persistence.Basic;
-import javax.persistence.CascadeType;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.Id;
-import javax.persistence.Lob;
-import javax.persistence.ManyToOne;
-import javax.persistence.NamedQueries;
-import javax.persistence.NamedQuery;
-import javax.persistence.Table;
-import javax.validation.constraints.NotNull;
-import java.sql.Timestamp;
-
-//SUSPEND CHECKSTYLE CHECK LineLengthCheck
-/**
- * Instance State which will be stored in DB.
- */
-@Entity
-@NamedQueries({
- @NamedQuery(name = "GET_INSTANCE", query = "select OBJECT(a) from InstanceBean a where a.id = :id"),
- @NamedQuery(name = "GET_INSTANCE_FOR_EXTERNAL_ID", query = "select OBJECT(a) from InstanceBean a where a.externalID = :externalID"),
- @NamedQuery(name = "DELETE_INSTANCE", query = "delete from InstanceBean a where a.id = :id"),
- @NamedQuery(name = "DELETE_INSTANCE_FOR_ENTITY", query = "delete from InstanceBean a where a.entityId = :entityId"),
- @NamedQuery(name = "UPDATE_INSTANCE", query = "update InstanceBean a set a.cluster = :cluster, a.externalID = :externalID, a.instanceTime = :instanceTime, a.creationTime = :creationTime, a.actualEndTime = :actualEndTime, a.currentState = :currentState, a.actualStartTime = :actualStartTime, a.instanceSequence = :instanceSequence, a.awaitedPredicates = :awaitedPredicates, a.properties = :properties where a.id = :id"),
- @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster"),
- @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState)"),
- @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"),
- @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"),
- @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"),
- @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a"),
- @NamedQuery(name = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE", query = "select a.currentState, COUNT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.instanceTime >= :startTime AND a.instanceTime < :endTime GROUP BY a.currentState")
-})
-//RESUME CHECKSTYLE CHECK LineLengthCheck
-@Table(name = "INSTANCES")
-public class InstanceBean {
-
- @Id
- @NotNull
- private String id;
-
- @Basic
- @Index
- @NotNull
- @Column(name = "entity_id")
- private String entityId;
-
- @Basic
- @Index
- @NotNull
- @Column(name = "cluster")
- private String cluster;
-
- @Basic
- @Index
- @Column(name = "external_id")
- private String externalID;
-
- @Basic
- @Index
- @Column(name = "instance_time")
- private Timestamp instanceTime;
-
- @Basic
- @Index
- @NotNull
- @Column(name = "creation_time")
- private Timestamp creationTime;
-
- @Basic
- @Column(name = "actual_start_time")
- private Timestamp actualStartTime;
-
- @Basic
- @Column(name = "actual_end_time")
- private Timestamp actualEndTime;
-
- @Basic
- @Index
- @NotNull
- @Column(name = "current_state")
- private String currentState;
-
- @Basic
- @Index
- @NotNull
- @Column(name = "instance_sequence")
- private Integer instanceSequence;
-
- @ForeignKey(deleteAction= ForeignKeyAction.CASCADE)
- @ManyToOne(cascade= CascadeType.REMOVE)
- private EntityBean entityBean;
-
-
- @Column(name = "awaited_predicates")
- @Lob
- private byte[] awaitedPredicates;
-
- @Column(name = "properties")
- @Lob
- private byte[] properties;
-
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getCluster() {
- return cluster;
- }
-
- public void setCluster(String cluster) {
- this.cluster = cluster;
- }
-
- public String getExternalID() {
- return externalID;
- }
-
- public void setExternalID(String externalID) {
- this.externalID = externalID;
- }
-
- public Timestamp getInstanceTime() {
- return instanceTime;
- }
-
- public void setInstanceTime(Timestamp instanceTime) {
- this.instanceTime = instanceTime;
- }
-
- public Timestamp getCreationTime() {
- return creationTime;
- }
-
- public void setCreationTime(Timestamp creationTime) {
- this.creationTime = creationTime;
- }
-
- public Timestamp getActualStartTime() {
- return actualStartTime;
- }
-
- public void setActualStartTime(Timestamp actualStartTime) {
- this.actualStartTime = actualStartTime;
- }
-
- public Timestamp getActualEndTime() {
- return actualEndTime;
- }
-
- public void setActualEndTime(Timestamp actualEndTime) {
- this.actualEndTime = actualEndTime;
- }
-
- public String getCurrentState() {
- return currentState;
- }
-
- public void setCurrentState(String currentState) {
- this.currentState = currentState;
- }
-
- public byte[] getAwaitedPredicates() {
- return awaitedPredicates;
- }
-
- public void setAwaitedPredicates(byte[] awaitedPredicates) {
- this.awaitedPredicates = awaitedPredicates;
- }
-
- public Integer getInstanceSequence() {
- return instanceSequence;
- }
-
- public void setInstanceSequence(Integer instanceSequence) {
- this.instanceSequence = instanceSequence;
- }
-
- public String getEntityId() {
- return entityId;
- }
-
- public void setEntityId(String entityId) {
- this.entityId = entityId;
- }
-
- public byte[] getProperties() {
- return properties;
- }
-
- public void setProperties(byte[] properties) {
- this.properties = properties;
- }
-
- public EntityBean getEntityBean() {
- return entityBean;
- }
-
- public void setEntityBean(EntityBean entityBean) {
- this.entityBean = entityBean;
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
index 1c07286..d2bb8c8 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
@@ -22,6 +22,8 @@ import org.apache.commons.lang.StringUtils;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.exception.StateStoreException;
import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.persistence.EntityBean;
+import org.apache.falcon.persistence.InstanceBean;
import org.apache.falcon.state.EntityClusterID;
import org.apache.falcon.state.EntityID;
import org.apache.falcon.state.EntityState;
@@ -30,7 +32,7 @@ import org.apache.falcon.state.InstanceID;
import org.apache.falcon.state.InstanceState;
import org.apache.falcon.state.store.AbstractStateStore;
import org.apache.falcon.state.store.StateStore;
-import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.service.FalconJPAService;
import org.apache.falcon.util.StateStoreProperties;
import org.joda.time.DateTime;
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java b/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
deleted file mode 100644
index 027a8ef..0000000
--- a/scheduler/src/main/java/org/apache/falcon/state/store/service/FalconJPAService.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.state.store.service;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.service.FalconService;
-import org.apache.falcon.state.store.jdbc.EntityBean;
-import org.apache.falcon.state.store.jdbc.InstanceBean;
-import org.apache.falcon.util.StateStoreProperties;
-import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.persistence.EntityManager;
-import javax.persistence.EntityManagerFactory;
-import javax.persistence.Persistence;
-import java.text.MessageFormat;
-import java.util.Properties;
-
-/**
- * Service that manages JPA.
- */
-public final class FalconJPAService implements FalconService {
-
- private static final Logger LOG = LoggerFactory.getLogger(FalconJPAService.class);
- public static final String PREFIX = "falcon.statestore.";
-
- public static final String DB_SCHEMA = PREFIX + "schema.name";
- public static final String URL = PREFIX + "jdbc.url";
- public static final String DRIVER = PREFIX + "jdbc.driver";
- public static final String USERNAME = PREFIX + "jdbc.username";
- public static final String PASSWORD = PREFIX + "jdbc.password";
- public static final String CONN_DATA_SOURCE = PREFIX + "connection.data.source";
- public static final String CONN_PROPERTIES = PREFIX + "connection.properties";
- public static final String MAX_ACTIVE_CONN = PREFIX + "pool.max.active.conn";
- public static final String CREATE_DB_SCHEMA = PREFIX + "create.db.schema";
- public static final String VALIDATE_DB_CONN = PREFIX + "validate.db.connection";
- public static final String VALIDATE_DB_CONN_EVICTION_INTERVAL = PREFIX + "validate.db.connection.eviction.interval";
- public static final String VALIDATE_DB_CONN_EVICTION_NUM = PREFIX + "validate.db.connection.eviction.num";
-
- private EntityManagerFactory entityManagerFactory;
- // Persistent Unit which is defined in persistence.xml
- private String persistenceUnit;
- private static final FalconJPAService FALCON_JPA_SERVICE = new FalconJPAService();
-
- private FalconJPAService() {
- }
-
- public static FalconJPAService get() {
- return FALCON_JPA_SERVICE;
- }
-
- public EntityManagerFactory getEntityManagerFactory() {
- return entityManagerFactory;
- }
-
- public void setPersistenceUnit(String dbType) {
- if (StringUtils.isEmpty(dbType)) {
- throw new IllegalArgumentException(" DB type cannot be null or empty");
- }
- dbType = dbType.split(":")[0];
- this.persistenceUnit = "falcon-" + dbType;
- }
-
- @Override
- public String getName() {
- return this.getClass().getSimpleName();
- }
-
- @Override
- public void init() throws FalconException {
- Properties props = getPropsforStore();
- entityManagerFactory = Persistence.
- createEntityManagerFactory(persistenceUnit, props);
- EntityManager entityManager = getEntityManager();
- entityManager.find(EntityBean.class, 1);
- entityManager.find(InstanceBean.class, 1);
- LOG.info("All entities initialized");
-
- // need to use a pseudo no-op transaction so all entities, datasource
- // and connection pool are initialized one time only
- entityManager.getTransaction().begin();
- OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) entityManagerFactory;
- // Mask the password with '***'
- String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,");
- LOG.info("JPA configuration: {0}", logMsg);
- entityManager.getTransaction().commit();
- entityManager.close();
- }
-
- private Properties getPropsforStore() throws FalconException {
- String dbSchema = StateStoreProperties.get().getProperty(DB_SCHEMA);
- String url = StateStoreProperties.get().getProperty(URL);
- String driver = StateStoreProperties.get().getProperty(DRIVER);
- String user = StateStoreProperties.get().getProperty(USERNAME);
- String password = StateStoreProperties.get().getProperty(PASSWORD).trim();
- String maxConn = StateStoreProperties.get().getProperty(MAX_ACTIVE_CONN).trim();
- String dataSource = StateStoreProperties.get().getProperty(CONN_DATA_SOURCE);
- String connPropsConfig = StateStoreProperties.get().getProperty(CONN_PROPERTIES);
- boolean autoSchemaCreation = Boolean.parseBoolean(StateStoreProperties.get().getProperty(CREATE_DB_SCHEMA,
- "false"));
- boolean validateDbConn = Boolean.parseBoolean(StateStoreProperties.get().getProperty(VALIDATE_DB_CONN, "true"));
- String evictionInterval = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_INTERVAL).trim();
- String evictionNum = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_NUM).trim();
-
- if (!url.startsWith("jdbc:")) {
- throw new FalconException("invalid JDBC URL, must start with 'jdbc:'" + url);
- }
- String dbType = url.substring("jdbc:".length());
- if (dbType.indexOf(":") <= 0) {
- throw new FalconException("invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'" + url);
- }
- setPersistenceUnit(dbType);
- String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}";
- connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn);
- Properties props = new Properties();
- if (autoSchemaCreation) {
- connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
- props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");
- } else if (validateDbConn) {
- // validation can be done only if the schema already exist, else a
- // connection cannot be obtained to create the schema.
- String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval;
- String num = "numTestsPerEvictionRun=" + evictionNum;
- connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num;
- connProps += ",ValidationQuery=select 1";
- connProps = MessageFormat.format(connProps, dbSchema);
- } else {
- connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
- }
- if (connPropsConfig != null) {
- connProps += "," + connPropsConfig;
- }
- props.setProperty("openjpa.ConnectionProperties", connProps);
- props.setProperty("openjpa.ConnectionDriverName", dataSource);
- return props;
- }
-
- @Override
- public void destroy() throws FalconException {
- if (entityManagerFactory.isOpen()) {
- entityManagerFactory.close();
- }
- }
-
-
- /**
- * Return an EntityManager. Used by the StoreService.
- *
- * @return an entity manager
- */
- public EntityManager getEntityManager() {
- return getEntityManagerFactory().createEntityManager();
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
deleted file mode 100644
index 6de9f7d..0000000
--- a/scheduler/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
+++ /dev/null
@@ -1,436 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.tools;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.falcon.cli.CLIParser;
-import org.apache.falcon.state.store.service.FalconJPAService;
-import org.apache.falcon.util.BuildProperties;
-import org.apache.falcon.util.StateStoreProperties;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.PrintWriter;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Command Line utility for Table Creation, Update.
- */
-public class FalconStateStoreDBCLI {
- public static final String HELP_CMD = "help";
- public static final String VERSION_CMD = "version";
- public static final String CREATE_CMD = "create";
- public static final String SQL_FILE_OPT = "sqlfile";
- public static final String RUN_OPT = "run";
- public static final String UPGRADE_CMD = "upgrade";
-
- // Represents whether DB instance exists or not.
- private boolean instanceExists;
- private static final String[] FALCON_HELP =
- {"Falcon DB initialization tool currently supports Derby DB/ Mysql/ PostgreSQL"};
-
- public static void main(String[] args) {
- new FalconStateStoreDBCLI().run(args);
- }
-
- public FalconStateStoreDBCLI() {
- instanceExists = false;
- }
-
- protected Options getOptions() {
- Option sqlfile = new Option(SQL_FILE_OPT, true,
- "Generate SQL script instead of creating/upgrading the DB schema");
- Option run = new Option(RUN_OPT, false, "Confirmation option regarding DB schema creation/upgrade");
- Options options = new Options();
- options.addOption(sqlfile);
- options.addOption(run);
- return options;
- }
-
- public synchronized int run(String[] args) {
- if (instanceExists) {
- throw new IllegalStateException("CLI instance already used");
- }
- instanceExists = true;
-
- CLIParser parser = new CLIParser("falcondb", FALCON_HELP);
- parser.addCommand(HELP_CMD, "", "Display usage for all commands or specified command", new Options(), false);
- parser.addCommand(VERSION_CMD, "", "Show Falcon DB version information", new Options(), false);
- parser.addCommand(CREATE_CMD, "", "Create Falcon DB schema", getOptions(), false);
- parser.addCommand(UPGRADE_CMD, "", "Upgrade Falcon DB schema", getOptions(), false);
-
- try {
- CLIParser.Command command = parser.parse(args);
- if (command.getName().equals(HELP_CMD)) {
- parser.showHelp();
- } else if (command.getName().equals(VERSION_CMD)) {
- showVersion();
- } else {
- if (!command.getCommandLine().hasOption(SQL_FILE_OPT)
- && !command.getCommandLine().hasOption(RUN_OPT)) {
- throw new Exception("'-sqlfile <FILE>' or '-run' options must be specified");
- }
- CommandLine commandLine = command.getCommandLine();
- String sqlFile = (commandLine.hasOption(SQL_FILE_OPT))
- ? commandLine.getOptionValue(SQL_FILE_OPT)
- : File.createTempFile("falcondb-", ".sql").getAbsolutePath();
- boolean run = commandLine.hasOption(RUN_OPT);
- if (command.getName().equals(CREATE_CMD)) {
- createDB(sqlFile, run);
- } else if (command.getName().equals(UPGRADE_CMD)) {
- upgradeDB(sqlFile, run);
- }
- System.out.println("The SQL commands have been written to: " + sqlFile);
- if (!run) {
- System.out.println("WARN: The SQL commands have NOT been executed, you must use the '-run' option");
- }
- }
- return 0;
- } catch (ParseException ex) {
- System.err.println("Invalid sub-command: " + ex.getMessage());
- System.err.println();
- System.err.println(parser.shortHelp());
- return 1;
- } catch (Exception ex) {
- System.err.println();
- System.err.println("Error: " + ex.getMessage());
- System.err.println();
- System.err.println("Stack trace for the error was (for debug purposes):");
- System.err.println("--------------------------------------");
- ex.printStackTrace(System.err);
- System.err.println("--------------------------------------");
- System.err.println();
- return 1;
- }
- }
-
- private void upgradeDB(String sqlFile, boolean run) throws Exception {
- validateConnection();
- if (!checkDBExists()) {
- throw new Exception("Falcon DB doesn't exist");
- }
- String falconVersion = BuildProperties.get().getProperty("project.version");
- String dbVersion = getFalconDBVersion();
- if (dbVersion.compareTo(falconVersion) >= 0) {
- System.out.println("Falcon DB already upgraded to Falcon version '" + falconVersion + "'");
- return;
- }
-
- createUpgradeDB(sqlFile, run, false);
- upgradeFalconDBVersion(sqlFile, run, falconVersion);
-
- // any post upgrade tasks
- if (run) {
- System.out.println("Falcon DB has been upgraded to Falcon version '" + falconVersion + "'");
- }
- }
-
-
- private void upgradeFalconDBVersion(String sqlFile, boolean run, String version) throws Exception {
- String updateDBVersion = "update FALCON_DB_PROPS set data='" + version + "' where name='db.version'";
- PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
- writer.println();
- writer.println(updateDBVersion);
- writer.close();
- System.out.println("Upgrade db.version in FALCON_DB_PROPS table to " + version);
- if (run) {
- Connection conn = createConnection();
- Statement st = null;
- try {
- conn.setAutoCommit(true);
- st = conn.createStatement();
- st.executeUpdate(updateDBVersion);
- st.close();
- } catch (Exception ex) {
- throw new Exception("Could not upgrade db.version in FALCON_DB_PROPS table: " + ex.toString(), ex);
- } finally {
- closeStatement(st);
- conn.close();
- }
- }
- System.out.println("DONE");
- }
-
- private static final String GET_FALCON_DB_VERSION = "select data from FALCON_DB_PROPS where name = 'db.version'";
-
- private String getFalconDBVersion() throws Exception {
- String version;
- System.out.println("Get Falcon DB version");
- Connection conn = createConnection();
- Statement st = null;
- ResultSet rs = null;
- try {
- st = conn.createStatement();
- rs = st.executeQuery(GET_FALCON_DB_VERSION);
- if (rs.next()) {
- version = rs.getString(1);
- } else {
- throw new Exception("ERROR: Could not find Falcon DB 'db.version' in FALCON_DB_PROPS table");
- }
- } catch (Exception ex) {
- throw new Exception("ERROR: Could not query FALCON_DB_PROPS table: " + ex.toString(), ex);
- } finally {
- closeResultSet(rs);
- closeStatement(st);
- conn.close();
- }
- System.out.println("DONE");
- return version;
- }
-
-
- private Map<String, String> getJdbcConf() throws Exception {
- Map<String, String> jdbcConf = new HashMap<String, String>();
- jdbcConf.put("driver", StateStoreProperties.get().getProperty(FalconJPAService.DRIVER));
- String url = StateStoreProperties.get().getProperty(FalconJPAService.URL);
- jdbcConf.put("url", url);
- jdbcConf.put("user", StateStoreProperties.get().getProperty(FalconJPAService.USERNAME));
- jdbcConf.put("password", StateStoreProperties.get().getProperty(FalconJPAService.PASSWORD));
- String dbType = url.substring("jdbc:".length());
- if (dbType.indexOf(":") <= 0) {
- throw new RuntimeException("Invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");
- }
- dbType = dbType.substring(0, dbType.indexOf(":"));
- jdbcConf.put("dbtype", dbType);
- return jdbcConf;
- }
-
- private String[] createMappingToolArguments(String sqlFile) throws Exception {
- Map<String, String> conf = getJdbcConf();
- List<String> args = new ArrayList<String>();
- args.add("-schemaAction");
- args.add("add");
- args.add("-p");
- args.add("persistence.xml#falcon-" + conf.get("dbtype"));
- args.add("-connectionDriverName");
- args.add(conf.get("driver"));
- args.add("-connectionURL");
- args.add(conf.get("url"));
- args.add("-connectionUserName");
- args.add(conf.get("user"));
- args.add("-connectionPassword");
- args.add(conf.get("password"));
- if (sqlFile != null) {
- args.add("-sqlFile");
- args.add(sqlFile);
- }
- args.add("-indexes");
- args.add("true");
- args.add("org.apache.falcon.state.store.jdbc.EntityBean");
- args.add("org.apache.falcon.state.store.jdbc.InstanceBean");
- return args.toArray(new String[args.size()]);
- }
-
- private void createDB(String sqlFile, boolean run) throws Exception {
- validateConnection();
- if (checkDBExists()) {
- return;
- }
-
- verifyFalconPropsTable(false);
- createUpgradeDB(sqlFile, run, true);
- createFalconPropsTable(sqlFile, run, BuildProperties.get().getProperty("project.version"));
- if (run) {
- System.out.println("Falcon DB has been created for Falcon version '"
- + BuildProperties.get().getProperty("project.version") + "'");
- }
- }
-
- private static final String CREATE_FALCON_DB_PROPS =
- "create table FALCON_DB_PROPS (name varchar(100), data varchar(100))";
-
- private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception {
- String insertDbVerion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";
-
- PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
- writer.println();
- writer.println(CREATE_FALCON_DB_PROPS);
- writer.println(insertDbVerion);
- writer.close();
- System.out.println("Create FALCON_DB_PROPS table");
- if (run) {
- Connection conn = createConnection();
- Statement st = null;
- try {
- conn.setAutoCommit(true);
- st = conn.createStatement();
- st.executeUpdate(CREATE_FALCON_DB_PROPS);
- st.executeUpdate(insertDbVerion);
- st.close();
- } catch (Exception ex) {
- closeStatement(st);
- throw new Exception("Could not create FALCON_DB_PROPS table: " + ex.toString(), ex);
- } finally {
- conn.close();
- }
- }
- System.out.println("DONE");
- }
-
- private static final String FALCON_DB_PROPS_EXISTS = "select count(*) from FALCON_DB_PROPS";
-
- private boolean verifyFalconPropsTable(boolean exists) throws Exception {
- System.out.println((exists) ? "Check FALCON_DB_PROPS table exists"
- : "Checking FALCON_DB_PROPS table does not exist");
- boolean tableExists;
- Connection conn = createConnection();
- Statement st = null;
- ResultSet rs = null;
- try {
- st = conn.createStatement();
- rs = st.executeQuery(FALCON_DB_PROPS_EXISTS);
- rs.next();
- tableExists = true;
- } catch (Exception ex) {
- tableExists = false;
- } finally {
- closeResultSet(rs);
- closeStatement(st);
- conn.close();
- }
- if (tableExists != exists) {
- throw new Exception("FALCON_DB_PROPS_TABLE table " + ((exists) ? "does not exist" : "exists"));
- }
- System.out.println("DONE");
- return tableExists;
- }
-
- private void closeResultSet(ResultSet rs) {
- try {
- if (rs != null) {
- rs.close();
- }
- } catch (Exception e) {
- System.out.println("Unable to close ResultSet " + rs);
- }
- }
-
- private void closeStatement(Statement st) throws Exception {
- try {
- if (st != null) {
- st.close();
- }
- } catch (Exception e) {
- System.out.println("Unable to close SQL Statement " + st);
- throw new Exception(e);
- }
- }
-
- private Connection createConnection() throws Exception {
- Map<String, String> conf = getJdbcConf();
- Class.forName(conf.get("driver")).newInstance();
- return DriverManager.getConnection(conf.get("url"), conf.get("user"), conf.get("password"));
- }
-
- private void validateConnection() throws Exception {
- System.out.println("Validating DB Connection");
- try {
- createConnection().close();
- System.out.println("DONE");
- } catch (Exception ex) {
- throw new Exception("Could not connect to the database: " + ex.toString(), ex);
- }
- }
-
- private static final String ENTITY_STATUS_QUERY =
- "select count(*) from ENTITIES where current_state IN ('RUNNING', 'SUSPENDED')";
- private static final String INSTANCE_STATUS_QUERY =
- "select count(*) from INSTANCES where current_state IN ('RUNNING', 'SUSPENDED')";
-
- private boolean checkDBExists() throws Exception {
- boolean schemaExists;
- Connection conn = createConnection();
- ResultSet rs = null;
- Statement st = null;
- try {
- st = conn.createStatement();
- rs = st.executeQuery(ENTITY_STATUS_QUERY);
- rs.next();
- schemaExists = true;
- } catch (Exception ex) {
- schemaExists = false;
- } finally {
- closeResultSet(rs);
- closeStatement(st);
- conn.close();
- }
- System.out.println("DB schema " + ((schemaExists) ? "exists" : "does not exist"));
- return schemaExists;
- }
-
- private void createUpgradeDB(String sqlFile, boolean run, boolean create) throws Exception {
- System.out.println((create) ? "Create SQL schema" : "Upgrade SQL schema");
- String[] args = createMappingToolArguments(sqlFile);
- org.apache.openjpa.jdbc.meta.MappingTool.main(args);
- if (run) {
- args = createMappingToolArguments(null);
- org.apache.openjpa.jdbc.meta.MappingTool.main(args);
- }
- System.out.println("DONE");
- }
-
- private void showVersion() throws Exception {
- System.out.println("Falcon Server version: "
- + BuildProperties.get().getProperty("project.version"));
- validateConnection();
- if (!checkDBExists()) {
- throw new Exception("Falcon DB doesn't exist");
- }
- try {
- verifyFalconPropsTable(true);
- } catch (Exception ex) {
- throw new Exception("ERROR: It seems this Falcon DB was never upgraded with the 'falcondb' tool");
- }
- showFalconPropsInfo();
- }
-
- private static final String GET_FALCON_PROPS_INFO = "select name, data from FALCON_DB_PROPS order by name";
-
- private void showFalconPropsInfo() throws Exception {
- Connection conn = createConnection();
- Statement st = null;
- ResultSet rs = null;
- try {
- System.out.println("Falcon DB Version Information");
- System.out.println("--------------------------------------");
- st = conn.createStatement();
- rs = st.executeQuery(GET_FALCON_PROPS_INFO);
- while (rs.next()) {
- System.out.println(rs.getString(1) + ": " + rs.getString(2));
- }
- System.out.println("--------------------------------------");
- } catch (Exception ex) {
- throw new Exception("ERROR querying FALCON_DB_PROPS table: " + ex.toString(), ex);
- } finally {
- closeResultSet(rs);
- closeStatement(st);
- conn.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/scheduler/src/main/resources/META-INF/persistence.xml b/scheduler/src/main/resources/META-INF/persistence.xml
deleted file mode 100644
index c2ef794..0000000
--- a/scheduler/src/main/resources/META-INF/persistence.xml
+++ /dev/null
@@ -1,104 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<persistence xmlns="http://java.sun.com/xml/ns/persistence"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- version="1.0">
-
- <persistence-unit name="falcon-derby" transaction-type="RESOURCE_LOCAL">
- <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
-
- <class>org.apache.falcon.state.store.jdbc.EntityBean</class>
- <class>org.apache.falcon.state.store.jdbc.InstanceBean</class>
-
- <properties>
- <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
-
- <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
-
- <property name="openjpa.MetaDataFactory"
- value="jpa(Types=org.apache.falcon.state.store.EntityBean;
- org.apache.falcon.state.store.InstanceBean)"></property>
-
- <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
- <property name="openjpa.LockManager" value="pessimistic"/>
- <property name="openjpa.ReadLockLevel" value="read"/>
- <property name="openjpa.WriteLockLevel" value="write"/>
- <property name="openjpa.jdbc.TransactionIsolation" value="read-committed"/> <!--CUSTOM-->
- <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
- <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
- <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
- <property name="openjpa.Log" value="log4j"/>
- </properties>
- </persistence-unit>
-
- <persistence-unit name="falcon-mysql" transaction-type="RESOURCE_LOCAL">
- <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
-
- <class>org.apache.falcon.state.store.jdbc.EntityBean</class>
- <class>org.apache.falcon.state.store.jdbc.InstanceBean</class>
-
- <properties>
- <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
-
- <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
-
- <property name="openjpa.MetaDataFactory"
- value="jpa(Types=org.apache.falcon.state.store.EntityBean;
- org.apache.falcon.state.store.InstanceBean)"></property>
-
- <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
- <property name="openjpa.LockManager" value="pessimistic"/>
- <property name="openjpa.ReadLockLevel" value="read"/>
- <property name="openjpa.WriteLockLevel" value="write"/>
- <property name="openjpa.jdbc.TransactionIsolation" value="repeatable-read"/> <!--CUSTOM-->
- <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
- <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
- <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
- <property name="openjpa.Log" value="log4j"/>
- </properties>
- </persistence-unit>
-
- <persistence-unit name="falcon-postgresql" transaction-type="RESOURCE_LOCAL">
- <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
-
- <class>org.apache.falcon.state.store.jdbc.EntityBean</class>
- <class>org.apache.falcon.state.store.jdbc.InstanceBean</class>
-
- <properties>
- <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
-
- <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
-
- <property name="openjpa.MetaDataFactory"
- value="jpa(Types=org.apache.falcon.state.store.EntityBean;
- org.apache.falcon.state.store.InstanceBean)"></property>
-
- <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
- <property name="openjpa.LockManager" value="pessimistic"/>
- <property name="openjpa.ReadLockLevel" value="read"/>
- <property name="openjpa.WriteLockLevel" value="write"/>
- <property name="openjpa.jdbc.TransactionIsolation" value="repeatable-read"/> <!--CUSTOM-->
- <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
- <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
- <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
- <property name="openjpa.Log" value="log4j"/>
- </properties>
- </persistence-unit>
-
-</persistence>
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
index 417ec3e..437c5f5 100644
--- a/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/execution/FalconExecutionServiceTest.java
@@ -45,7 +45,7 @@ import org.apache.falcon.state.InstanceID;
import org.apache.falcon.state.InstanceState;
import org.apache.falcon.state.store.AbstractStateStore;
import org.apache.falcon.state.store.StateStore;
-import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.service.FalconJPAService;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.engine.DAGEngine;
import org.apache.falcon.workflow.engine.DAGEngineFactory;
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
index 155be69..cd99049 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/AbstractSchedulerTestBase.java
@@ -18,7 +18,7 @@
package org.apache.falcon.state;
import org.apache.falcon.entity.AbstractTestBase;
-import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.service.FalconJPAService;
import org.apache.falcon.tools.FalconStateStoreDBCLI;
import org.apache.falcon.util.StateStoreProperties;
import org.apache.hadoop.conf.Configuration;
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java b/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java
index ecd5293..3e186dd 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/service/TestFalconJPAService.java
@@ -19,7 +19,7 @@ package org.apache.falcon.state.service;
import org.apache.falcon.FalconException;
import org.apache.falcon.state.AbstractSchedulerTestBase;
-import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.service.FalconJPAService;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
index d597e27..bf5c142 100644
--- a/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
+++ b/scheduler/src/test/java/org/apache/falcon/state/service/store/TestJDBCStateStore.java
@@ -44,7 +44,7 @@ import org.apache.falcon.state.InstanceState;
import org.apache.falcon.state.store.jdbc.BeanMapperUtil;
import org.apache.falcon.state.store.jdbc.JDBCStateStore;
import org.apache.falcon.state.store.StateStore;
-import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.service.FalconJPAService;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.engine.DAGEngine;
import org.apache.falcon.workflow.engine.DAGEngineFactory;
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/resources/startup.properties
----------------------------------------------------------------------
diff --git a/scheduler/src/test/resources/startup.properties b/scheduler/src/test/resources/startup.properties
index 7160bb2..6216b70 100644
--- a/scheduler/src/test/resources/startup.properties
+++ b/scheduler/src/test/resources/startup.properties
@@ -41,7 +41,7 @@
org.apache.falcon.notification.service.impl.AlarmService,\
org.apache.falcon.notification.service.impl.DataAvailabilityService,\
org.apache.falcon.execution.FalconExecutionService,\
- org.apache.falcon.state.store.service.FalconJPAService
+ org.apache.falcon.service.FalconJPAService
##### Falcon Configuration Store Change listeners #####
*.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
@@ -121,7 +121,8 @@ debug.libext.process.paths=${falcon.libext}
*.falcon.http.authentication.simple.anonymous.allowed=false
# Indicates the Kerberos principal to be used for HTTP endpoint.
-# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
+# The principal MUST
+#rt with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
*.falcon.http.authentication.kerberos.principal=
# Location of the keytab file with the credentials for the HTTP principal.
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/scheduler/src/test/resources/statestore.properties
----------------------------------------------------------------------
diff --git a/scheduler/src/test/resources/statestore.properties b/scheduler/src/test/resources/statestore.properties
index 2ae642f..e7a08fc 100644
--- a/scheduler/src/test/resources/statestore.properties
+++ b/scheduler/src/test/resources/statestore.properties
@@ -18,7 +18,7 @@
*.domain=debug
-######## StateStore Properties #####
+######### StateStore Properties #####
*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/src/build/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/src/build/findbugs-exclude.xml b/src/build/findbugs-exclude.xml
index 78f2fd0..a6766df 100644
--- a/src/build/findbugs-exclude.xml
+++ b/src/build/findbugs-exclude.xml
@@ -38,16 +38,37 @@
</Match>
<Match>
- <Class name="org.apache.falcon.state.store.jdbc.EntityBean" />
+ <Class name="org.apache.falcon.persistence.EntityBean" />
<Bug pattern="NP_BOOLEAN_RETURN_NULL" />
</Match>
<Match>
- <Class name="org.apache.falcon.state.store.jdbc.InstanceBean" />
+ <Class name="org.apache.falcon.persistence.InstanceBean" />
<Bug pattern="NP_BOOLEAN_RETURN_NULL" />
</Match>
<Match>
+ <Class name="org.apache.falcon.persistence.PendingInstanceBean" />
+ <Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" />
+ </Match>
+
+ <!--<Match>-->
+ <!--<Class name="org.apache.falcon.persistence.PendingInstanceBean" />-->
+ <!--<Bug pattern="UWF_UNWRITTEN_FIELD" />-->
+ <!--</Match>-->
+
+
+ <Match>
+ <Class name="org.apache.falcon.persistence.MonitoredFeedsBean" />
+ <Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" />
+ </Match>
+
+ <!--<Match>-->
+ <!--<Class name="org.apache.falcon.persistence.MonitoredFeedsBean" />-->
+ <!--<Bug pattern="UWF_UNWRITTEN_FIELD" />-->
+ <!--</Match>-->
+
+ <Match>
<Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT" />
</Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index f23337b..3601e22 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -59,7 +59,7 @@
# org.apache.falcon.service.ProcessSubscriberService,\
# org.apache.falcon.service.FeedSLAMonitoringService,\
# org.apache.falcon.service.LifecyclePolicyMap,\
-# org.apache.falcon.state.store.service.FalconJPAService,\
+# org.apache.falcon.service.FalconJPAService,\
# org.apache.falcon.entity.store.ConfigurationStore,\
# org.apache.falcon.rerun.service.RetryService,\
# org.apache.falcon.rerun.service.LateRunService,\
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/unit/pom.xml
----------------------------------------------------------------------
diff --git a/unit/pom.xml b/unit/pom.xml
index 7e5b073..f1ef463 100644
--- a/unit/pom.xml
+++ b/unit/pom.xml
@@ -44,6 +44,16 @@
<dependency>
<groupId>org.apache.oozie</groupId>
<artifactId>oozie-core</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.openjpa</groupId>
+ <artifactId>openjpa-jdbc</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.openjpa</groupId>
+ <artifactId>openjpa-persistence</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/unit/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/unit/src/main/resources/startup.properties b/unit/src/main/resources/startup.properties
index 4576e0b..4dfea31 100644
--- a/unit/src/main/resources/startup.properties
+++ b/unit/src/main/resources/startup.properties
@@ -33,6 +33,7 @@
*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
org.apache.falcon.service.ProcessSubscriberService,\
+ org.apache.falcon.service.FalconJPAService,\
org.apache.falcon.entity.store.ConfigurationStore,\
org.apache.falcon.rerun.service.RetryService,\
org.apache.falcon.rerun.service.LateRunService,\
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index aaf2b37..9b1ff2a 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -133,6 +133,7 @@ public class TestFalconUnit extends FalconUnitTestBase {
ParseException, InterruptedException {
// submit cluster and feeds
submitClusterAndFeeds();
+
APIResult result = submitProcess(getAbsolutePath(PROCESS), PROCESS_APP_PATH);
assertStatus(result);
createData(INPUT_FEED_NAME, CLUSTER_NAME, SCHEDULE_TIME, INPUT_FILE_NAME);
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
index 175833a..1bd4f45 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/AbstractSchedulerManagerJerseyIT.java
@@ -24,7 +24,7 @@ import org.apache.falcon.client.FalconCLIException;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.state.AbstractSchedulerTestBase;
-import org.apache.falcon.state.store.service.FalconJPAService;
+import org.apache.falcon.service.FalconJPAService;
import org.apache.falcon.unit.FalconUnitTestBase;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.util.StateStoreProperties;
[3/3] falcon git commit: FALCON-1865 Persist Feed sla data to database
Posted by aj...@apache.org.
FALCON-1865 Persist Feed sla data to database
Author: Praveen Adlakha <ad...@gmail.com>
Reviewers: Ajay Yadava <aj...@apache.org>
Closes #77 from PraveenAdlakha/feed_alert
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/de2f5c0a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/de2f5c0a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/de2f5c0a
Branch: refs/heads/master
Commit: de2f5c0ab1c26b8d198d067e066c579a86bce737
Parents: 10f3843
Author: Praveen Adlakha <ad...@gmail.com>
Authored: Mon Mar 28 19:08:56 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Mon Mar 28 19:08:56 2016 +0530
----------------------------------------------------------------------
common/pom.xml | 41 ++
.../apache/falcon/persistence/EntityBean.java | 117 +++++
.../apache/falcon/persistence/InstanceBean.java | 229 ++++++++++
.../falcon/persistence/MonitoredFeedsBean.java | 73 ++++
.../falcon/persistence/PendingInstanceBean.java | 98 +++++
.../persistence/PersistenceConstants.java | 35 ++
.../persistence/ResultNotFoundException.java | 31 ++
.../apache/falcon/service/FalconJPAService.java | 170 +++++++
.../falcon/tools/FalconStateStoreDBCLI.java | 438 +++++++++++++++++++
.../src/main/resources/META-INF/persistence.xml | 113 +++++
common/src/main/resources/startup.properties | 7 +-
.../src/main/resources/statestore.credentials | 4 +-
common/src/main/resources/statestore.properties | 20 +-
docs/src/site/twiki/FalconNativeScheduler.twiki | 2 +-
.../falcon/jdbc/MonitoringJdbcStateStore.java | 175 ++++++++
.../service/FeedSLAMonitoringService.java | 191 +++-----
.../jdbc/MonitoringJdbcStateStoreTest.java | 97 ++++
.../falcon/service/FeedSLAMonitoringTest.java | 34 --
scheduler/pom.xml | 42 +-
.../falcon/state/store/jdbc/BeanMapperUtil.java | 2 +
.../falcon/state/store/jdbc/EntityBean.java | 117 -----
.../falcon/state/store/jdbc/InstanceBean.java | 229 ----------
.../falcon/state/store/jdbc/JDBCStateStore.java | 4 +-
.../state/store/service/FalconJPAService.java | 171 --------
.../falcon/tools/FalconStateStoreDBCLI.java | 436 ------------------
.../src/main/resources/META-INF/persistence.xml | 104 -----
.../execution/FalconExecutionServiceTest.java | 2 +-
.../falcon/state/AbstractSchedulerTestBase.java | 2 +-
.../state/service/TestFalconJPAService.java | 2 +-
.../state/service/store/TestJDBCStateStore.java | 2 +-
scheduler/src/test/resources/startup.properties | 5 +-
.../src/test/resources/statestore.properties | 2 +-
src/build/findbugs-exclude.xml | 25 +-
src/conf/startup.properties | 2 +-
unit/pom.xml | 10 +
unit/src/main/resources/startup.properties | 1 +
.../org/apache/falcon/unit/TestFalconUnit.java | 1 +
.../AbstractSchedulerManagerJerseyIT.java | 2 +-
webapp/src/test/resources/startup.properties | 2 +-
39 files changed, 1744 insertions(+), 1294 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index df28f9b..c54f9d8 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -187,6 +187,26 @@
<groupId>com.thinkaurelius.titan</groupId>
<artifactId>titan-berkeleyje</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.openjpa</groupId>
+ <artifactId>openjpa-jdbc</artifactId>
+ <version>${openjpa.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.openjpa</groupId>
+ <artifactId>openjpa-persistence-jdbc</artifactId>
+ <version>${openjpa.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>javax.validation</groupId>
+ <artifactId>validation-api</artifactId>
+ <version>${javax-validation.version}</version>
+ </dependency>
</dependencies>
<build>
@@ -216,6 +236,27 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <phase>process-classes</phase>
+ <configuration>
+ <tasks>
+ <taskdef name="openjpac" classname="org.apache.openjpa.ant.PCEnhancerTask" classpathref="maven.compile.classpath"/>
+ <openjpac>
+ <classpath refid="maven.compile.classpath"/>
+ </openjpac>
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/EntityBean.java b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
new file mode 100644
index 0000000..5c94fa4
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import org.apache.openjpa.persistence.jdbc.Index;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.OneToMany;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+import java.util.List;
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Entity object which will be stored in Data Base.
+ */
+@Entity
+@NamedQueries({
+ @NamedQuery(name = "GET_ENTITY", query = "select OBJECT(a) from EntityBean a where a.id = :id"),
+ @NamedQuery(name = "GET_ENTITY_FOR_STATE", query = "select OBJECT(a) from EntityBean a where a.state = :state"),
+ @NamedQuery(name = "UPDATE_ENTITY", query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type where a.id = :id"),
+ @NamedQuery(name = "GET_ENTITIES_FOR_TYPE", query = "select OBJECT(a) from EntityBean a where a.type = :type"),
+ @NamedQuery(name = "GET_ENTITIES", query = "select OBJECT(a) from EntityBean a"),
+ @NamedQuery(name = "DELETE_ENTITY", query = "delete from EntityBean a where a.id = :id"),
+ @NamedQuery(name = "DELETE_ENTITIES", query = "delete from EntityBean")})
+//RESUME CHECKSTYLE CHECK LineLengthCheck
+@Table(name = "ENTITIES")
+public class EntityBean {
+ @NotNull
+ @Id
+ private String id;
+
+ @Basic
+ @NotNull
+ @Column(name = "name")
+ private String name;
+
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "type")
+ private String type;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "current_state")
+ private String state;
+
+ @OneToMany(cascade= CascadeType.REMOVE, mappedBy="entityBean")
+ private List<InstanceBean> instanceBeans;
+
+ public EntityBean() {
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public void setState(String state) {
+ this.state = state;
+ }
+
+ public List<InstanceBean> getInstanceBeans() {
+ return instanceBeans;
+ }
+
+ public void setInstanceBeans(List<InstanceBean> instanceBeans) {
+ this.instanceBeans = instanceBeans;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java
new file mode 100644
index 0000000..b7e10f1
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/InstanceBean.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import org.apache.openjpa.persistence.jdbc.ForeignKey;
+import org.apache.openjpa.persistence.jdbc.ForeignKeyAction;
+import org.apache.openjpa.persistence.jdbc.Index;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Lob;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+import java.sql.Timestamp;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Instance State which will be stored in DB.
+ */
+@Entity
+@NamedQueries({
+ @NamedQuery(name = "GET_INSTANCE", query = "select OBJECT(a) from InstanceBean a where a.id = :id"),
+ @NamedQuery(name = "GET_INSTANCE_FOR_EXTERNAL_ID", query = "select OBJECT(a) from InstanceBean a where a.externalID = :externalID"),
+ @NamedQuery(name = "DELETE_INSTANCE", query = "delete from InstanceBean a where a.id = :id"),
+ @NamedQuery(name = "DELETE_INSTANCE_FOR_ENTITY", query = "delete from InstanceBean a where a.entityId = :entityId"),
+ @NamedQuery(name = "UPDATE_INSTANCE", query = "update InstanceBean a set a.cluster = :cluster, a.externalID = :externalID, a.instanceTime = :instanceTime, a.creationTime = :creationTime, a.actualEndTime = :actualEndTime, a.currentState = :currentState, a.actualStartTime = :actualStartTime, a.instanceSequence = :instanceSequence, a.awaitedPredicates = :awaitedPredicates, a.properties = :properties where a.id = :id"),
+ @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster"),
+ @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState)"),
+ @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_FOR_STATES", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.currentState IN (:currentState)"),
+ @NamedQuery(name = "GET_INSTANCES_FOR_ENTITY_CLUSTER_FOR_STATES_WITH_RANGE", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.currentState IN (:currentState) AND a.instanceTime >= :startTime AND a.instanceTime < :endTime"),
+ @NamedQuery(name = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER", query = "select OBJECT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster order by a.instanceTime desc"),
+ @NamedQuery(name = "DELETE_INSTANCES_TABLE", query = "delete from InstanceBean a"),
+ @NamedQuery(name = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE", query = "select a.currentState, COUNT(a) from InstanceBean a where a.entityId = :entityId AND a.cluster = :cluster AND a.instanceTime >= :startTime AND a.instanceTime < :endTime GROUP BY a.currentState")
+})
+//RESUME CHECKSTYLE CHECK LineLengthCheck
+@Table(name = "INSTANCES")
+public class InstanceBean {
+
+ @Id
+ @NotNull
+ private String id;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "entity_id")
+ private String entityId;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "cluster")
+ private String cluster;
+
+ @Basic
+ @Index
+ @Column(name = "external_id")
+ private String externalID;
+
+ @Basic
+ @Index
+ @Column(name = "instance_time")
+ private Timestamp instanceTime;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "creation_time")
+ private Timestamp creationTime;
+
+ @Basic
+ @Column(name = "actual_start_time")
+ private Timestamp actualStartTime;
+
+ @Basic
+ @Column(name = "actual_end_time")
+ private Timestamp actualEndTime;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "current_state")
+ private String currentState;
+
+ @Basic
+ @Index
+ @NotNull
+ @Column(name = "instance_sequence")
+ private Integer instanceSequence;
+
+ @ForeignKey(deleteAction= ForeignKeyAction.CASCADE)
+ @ManyToOne(cascade= CascadeType.REMOVE)
+ private EntityBean entityBean;
+
+
+ @Column(name = "awaited_predicates")
+ @Lob
+ private byte[] awaitedPredicates;
+
+ @Column(name = "properties")
+ @Lob
+ private byte[] properties;
+
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ public void setCluster(String cluster) {
+ this.cluster = cluster;
+ }
+
+ public String getExternalID() {
+ return externalID;
+ }
+
+ public void setExternalID(String externalID) {
+ this.externalID = externalID;
+ }
+
+ public Timestamp getInstanceTime() {
+ return instanceTime;
+ }
+
+ public void setInstanceTime(Timestamp instanceTime) {
+ this.instanceTime = instanceTime;
+ }
+
+ public Timestamp getCreationTime() {
+ return creationTime;
+ }
+
+ public void setCreationTime(Timestamp creationTime) {
+ this.creationTime = creationTime;
+ }
+
+ public Timestamp getActualStartTime() {
+ return actualStartTime;
+ }
+
+ public void setActualStartTime(Timestamp actualStartTime) {
+ this.actualStartTime = actualStartTime;
+ }
+
+ public Timestamp getActualEndTime() {
+ return actualEndTime;
+ }
+
+ public void setActualEndTime(Timestamp actualEndTime) {
+ this.actualEndTime = actualEndTime;
+ }
+
+ public String getCurrentState() {
+ return currentState;
+ }
+
+ public void setCurrentState(String currentState) {
+ this.currentState = currentState;
+ }
+
+ public byte[] getAwaitedPredicates() {
+ return awaitedPredicates;
+ }
+
+ public void setAwaitedPredicates(byte[] awaitedPredicates) {
+ this.awaitedPredicates = awaitedPredicates;
+ }
+
+ public Integer getInstanceSequence() {
+ return instanceSequence;
+ }
+
+ public void setInstanceSequence(Integer instanceSequence) {
+ this.instanceSequence = instanceSequence;
+ }
+
+ public String getEntityId() {
+ return entityId;
+ }
+
+ public void setEntityId(String entityId) {
+ this.entityId = entityId;
+ }
+
+ public byte[] getProperties() {
+ return properties;
+ }
+
+ public void setProperties(byte[] properties) {
+ this.properties = properties;
+ }
+
+ public EntityBean getEntityBean() {
+ return entityBean;
+ }
+
+ public void setEntityBean(EntityBean entityBean) {
+ this.entityBean = entityBean;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
new file mode 100644
index 0000000..2b48569
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import javax.persistence.Entity;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Column;
+import javax.persistence.Basic;
+import javax.validation.constraints.NotNull;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+* The Feeds that are to be monitered will be stored in the db.
+* */
+
+@Entity
+@NamedQueries({
+ @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from "
+ + "MonitoredFeedsBean a where a.feedName = :feedName"),
+ @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredFeedsBean "
+ + "a where a.feedName = :feedName"),
+ @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select OBJECT(a) "
+ + "from MonitoredFeedsBean a")
+})
+@Table(name="MONITORED_FEEDS")
+//RESUME CHECKSTYLE CHECK LineLengthCheck
+public class MonitoredFeedsBean {
+ @NotNull
+ @GeneratedValue(strategy = GenerationType.AUTO)
+ @Id
+ private String id;
+
+ @Basic
+ @NotNull
+ @Column(name = "feed_name")
+ private String feedName;
+
+ public String getFeedName() {
+ return feedName;
+ }
+
+ public void setFeedName(String feedName) {
+ this.feedName = feedName;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
new file mode 100644
index 0000000..038244a
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import javax.persistence.Entity;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.GenerationType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.Id;
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.validation.constraints.NotNull;
+import java.util.Date;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+* The instances of feed to be monitored will be stored in db.
+* */
+@Entity
+@NamedQueries({
+ @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"),
+ @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"),
+ @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED, query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
+ @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
+ @NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a ")
+})
+@Table(name = "PENDING_INSTANCES")
+//RESUME CHECKSTYLE CHECK LineLengthCheck
+public class PendingInstanceBean {
+ @NotNull
+ @GeneratedValue(strategy = GenerationType.AUTO)
+ @Id
+ private String id;
+
+ @Basic
+ @NotNull
+ @Column(name = "feed_name")
+ private String feedName;
+
+ @Basic
+ @NotNull
+ @Column(name = "cluster_name")
+ private String clusterName;
+
+ @Basic
+ @NotNull
+ @Column(name = "nominal_time")
+ private Date nominalTime;
+
+ public Date getNominalTime() {
+ return nominalTime;
+ }
+
+ public void setNominalTime(Date nominalTime) {
+ this.nominalTime = nominalTime;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getFeedName() {
+ return feedName;
+ }
+
+ public void setFeedName(String feedName) {
+ this.feedName = feedName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
new file mode 100644
index 0000000..511270e
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+/**
+ * The name of queries to be used as constants accross the packages.
+ */
+
+public final class PersistenceConstants {
+ private PersistenceConstants(){
+
+ }
+ public static final String GET_MONITERED_INSTANCE = "GET_MONITERED_INSTANCE";
+ public static final String DELETE_MONITORED_INSTANCES = "DELETE_MONITORED_INSTANCES";
+ public static final String GET_ALL_MONITORING_FEEDS = "GET_ALL_MONITORING_FEEDS";
+ public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES";
+ public static final String DELETE_PENDING_NOMINAL_INSTANCES = "DELETE_PENDING_NOMINAL_INSTANCES";
+ public static final String DELETE_ALL_INSTANCES_FOR_FEED = "DELETE_ALL_INSTANCES_FOR_FEED";
+ public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES";
+ public static final String GET_ALL_PENDING_INSTANCES = "GET_ALL_PENDING_INSTANCES";
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java b/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java
new file mode 100644
index 0000000..c368d2c
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/ResultNotFoundException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.persistence;
+
+import org.apache.falcon.FalconException;
+
+/**
+ * Exception to be through by the bean classes.
+ */
+public class ResultNotFoundException extends FalconException {
+
+ public ResultNotFoundException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/service/FalconJPAService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/FalconJPAService.java b/common/src/main/java/org/apache/falcon/service/FalconJPAService.java
new file mode 100644
index 0000000..73fde33
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/service/FalconJPAService.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.service;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.persistence.EntityBean;
+import org.apache.falcon.persistence.InstanceBean;
+import org.apache.falcon.util.StateStoreProperties;
+import org.apache.openjpa.persistence.OpenJPAEntityManagerFactorySPI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import java.text.MessageFormat;
+import java.util.Properties;
+
+/**
+ * Service that manages JPA.
+ */
+public final class FalconJPAService implements FalconService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FalconJPAService.class);
+ public static final String PREFIX = "falcon.statestore.";
+
+ public static final String DB_SCHEMA = PREFIX + "schema.name";
+ public static final String URL = PREFIX + "jdbc.url";
+ public static final String DRIVER = PREFIX + "jdbc.driver";
+ public static final String USERNAME = PREFIX + "jdbc.username";
+ public static final String PASSWORD = PREFIX + "jdbc.password";
+ public static final String CONN_DATA_SOURCE = PREFIX + "connection.data.source";
+ public static final String CONN_PROPERTIES = PREFIX + "connection.properties";
+ public static final String MAX_ACTIVE_CONN = PREFIX + "pool.max.active.conn";
+ public static final String CREATE_DB_SCHEMA = PREFIX + "create.db.schema";
+ public static final String VALIDATE_DB_CONN = PREFIX + "validate.db.connection";
+ public static final String VALIDATE_DB_CONN_EVICTION_INTERVAL = PREFIX + "validate.db.connection.eviction.interval";
+ public static final String VALIDATE_DB_CONN_EVICTION_NUM = PREFIX + "validate.db.connection.eviction.num";
+
+ private EntityManagerFactory entityManagerFactory;
+ // Persistent Unit which is defined in persistence.xml
+ private String persistenceUnit;
+ private static final FalconJPAService FALCON_JPA_SERVICE = new FalconJPAService();
+
+ private FalconJPAService() {
+ }
+
+ public static FalconJPAService get() {
+ return FALCON_JPA_SERVICE;
+ }
+
+ public EntityManagerFactory getEntityManagerFactory() {
+ return entityManagerFactory;
+ }
+
+ public void setPersistenceUnit(String dbType) {
+ if (StringUtils.isEmpty(dbType)) {
+ throw new IllegalArgumentException(" DB type cannot be null or empty");
+ }
+ dbType = dbType.split(":")[0];
+ this.persistenceUnit = "falcon-" + dbType;
+ }
+
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public void init() throws FalconException {
+ Properties props = getPropsforStore();
+ entityManagerFactory = Persistence.
+ createEntityManagerFactory(persistenceUnit, props);
+ EntityManager entityManager = getEntityManager();
+ entityManager.find(EntityBean.class, 1);
+ entityManager.find(InstanceBean.class, 1);
+ LOG.info("All entities initialized");
+
+ // need to use a pseudo no-op transaction so all entities, datasource
+ // and connection pool are initialized one time only
+ entityManager.getTransaction().begin();
+ OpenJPAEntityManagerFactorySPI spi = (OpenJPAEntityManagerFactorySPI) entityManagerFactory;
+ // Mask the password with '***'
+ String logMsg = spi.getConfiguration().getConnectionProperties().replaceAll("Password=.*?,", "Password=***,");
+ LOG.info("JPA configuration: {0}", logMsg);
+ entityManager.getTransaction().commit();
+ entityManager.close();
+ }
+
+ private Properties getPropsforStore() throws FalconException {
+ String dbSchema = StateStoreProperties.get().getProperty(DB_SCHEMA);
+ String url = StateStoreProperties.get().getProperty(URL);
+ String driver = StateStoreProperties.get().getProperty(DRIVER);
+ String user = StateStoreProperties.get().getProperty(USERNAME);
+ String password = StateStoreProperties.get().getProperty(PASSWORD).trim();
+ String maxConn = StateStoreProperties.get().getProperty(MAX_ACTIVE_CONN).trim();
+ String dataSource = StateStoreProperties.get().getProperty(CONN_DATA_SOURCE);
+ String connPropsConfig = StateStoreProperties.get().getProperty(CONN_PROPERTIES);
+ boolean autoSchemaCreation = Boolean.parseBoolean(StateStoreProperties.get().getProperty(CREATE_DB_SCHEMA,
+ "false"));
+ boolean validateDbConn = Boolean.parseBoolean(StateStoreProperties.get().getProperty(VALIDATE_DB_CONN, "true"));
+ String evictionInterval = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_INTERVAL).trim();
+ String evictionNum = StateStoreProperties.get().getProperty(VALIDATE_DB_CONN_EVICTION_NUM).trim();
+
+ if (!url.startsWith("jdbc:")) {
+ throw new FalconException("invalid JDBC URL, must start with 'jdbc:'" + url);
+ }
+ String dbType = url.substring("jdbc:".length());
+ if (dbType.indexOf(":") <= 0) {
+ throw new FalconException("invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'" + url);
+ }
+ setPersistenceUnit(dbType);
+ String connProps = "DriverClassName={0},Url={1},Username={2},Password={3},MaxActive={4}";
+ connProps = MessageFormat.format(connProps, driver, url, user, password, maxConn);
+ Properties props = new Properties();
+ if (autoSchemaCreation) {
+ connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
+ props.setProperty("openjpa.jdbc.SynchronizeMappings", "buildSchema(ForeignKeys=true)");
+ } else if (validateDbConn) {
+ // validation can be done only if the schema already exist, else a
+ // connection cannot be obtained to create the schema.
+ String interval = "timeBetweenEvictionRunsMillis=" + evictionInterval;
+ String num = "numTestsPerEvictionRun=" + evictionNum;
+ connProps += ",TestOnBorrow=true,TestOnReturn=true,TestWhileIdle=true," + interval + "," + num;
+ connProps += ",ValidationQuery=select 1";
+ connProps = MessageFormat.format(connProps, dbSchema);
+ } else {
+ connProps += ",TestOnBorrow=false,TestOnReturn=false,TestWhileIdle=false";
+ }
+ if (connPropsConfig != null) {
+ connProps += "," + connPropsConfig;
+ }
+ props.setProperty("openjpa.ConnectionProperties", connProps);
+ props.setProperty("openjpa.ConnectionDriverName", dataSource);
+ return props;
+ }
+
+ @Override
+ public void destroy() throws FalconException {
+ if (entityManagerFactory.isOpen()) {
+ entityManagerFactory.close();
+ }
+ }
+
+
+ /**
+ * Return an EntityManager. Used by the StoreService.
+ *
+ * @return an entity manager
+ */
+ public EntityManager getEntityManager() {
+ return getEntityManagerFactory().createEntityManager();
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
new file mode 100644
index 0000000..df8194c
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.tools;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.falcon.cli.CLIParser;
+import org.apache.falcon.service.FalconJPAService;
+import org.apache.falcon.util.BuildProperties;
+import org.apache.falcon.util.StateStoreProperties;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Command Line utility for Table Creation, Update.
+ */
+public class FalconStateStoreDBCLI {
+ public static final String HELP_CMD = "help";
+ public static final String VERSION_CMD = "version";
+ public static final String CREATE_CMD = "create";
+ public static final String SQL_FILE_OPT = "sqlfile";
+ public static final String RUN_OPT = "run";
+ public static final String UPGRADE_CMD = "upgrade";
+
+ // Represents whether DB instance exists or not.
+ private boolean instanceExists;
+ private static final String[] FALCON_HELP =
+ {"Falcon DB initialization tool currently supports Derby DB/ Mysql/ PostgreSQL"};
+
+ public static void main(String[] args) {
+ new FalconStateStoreDBCLI().run(args);
+ }
+
+ public FalconStateStoreDBCLI() {
+ instanceExists = false;
+ }
+
+ protected Options getOptions() {
+ Option sqlfile = new Option(SQL_FILE_OPT, true,
+ "Generate SQL script instead of creating/upgrading the DB schema");
+ Option run = new Option(RUN_OPT, false, "Confirmation option regarding DB schema creation/upgrade");
+ Options options = new Options();
+ options.addOption(sqlfile);
+ options.addOption(run);
+ return options;
+ }
+
+ public synchronized int run(String[] args) {
+ if (instanceExists) {
+ throw new IllegalStateException("CLI instance already used");
+ }
+ instanceExists = true;
+
+ CLIParser parser = new CLIParser("falcondb", FALCON_HELP);
+ parser.addCommand(HELP_CMD, "", "Display usage for all commands or specified command", new Options(), false);
+ parser.addCommand(VERSION_CMD, "", "Show Falcon DB version information", new Options(), false);
+ parser.addCommand(CREATE_CMD, "", "Create Falcon DB schema", getOptions(), false);
+ parser.addCommand(UPGRADE_CMD, "", "Upgrade Falcon DB schema", getOptions(), false);
+
+ try {
+ CLIParser.Command command = parser.parse(args);
+ if (command.getName().equals(HELP_CMD)) {
+ parser.showHelp();
+ } else if (command.getName().equals(VERSION_CMD)) {
+ showVersion();
+ } else {
+ if (!command.getCommandLine().hasOption(SQL_FILE_OPT)
+ && !command.getCommandLine().hasOption(RUN_OPT)) {
+ throw new Exception("'-sqlfile <FILE>' or '-run' options must be specified");
+ }
+ CommandLine commandLine = command.getCommandLine();
+ String sqlFile = (commandLine.hasOption(SQL_FILE_OPT))
+ ? commandLine.getOptionValue(SQL_FILE_OPT)
+ : File.createTempFile("falcondb-", ".sql").getAbsolutePath();
+ boolean run = commandLine.hasOption(RUN_OPT);
+ if (command.getName().equals(CREATE_CMD)) {
+ createDB(sqlFile, run);
+ } else if (command.getName().equals(UPGRADE_CMD)) {
+ upgradeDB(sqlFile, run);
+ }
+ System.out.println("The SQL commands have been written to: " + sqlFile);
+ if (!run) {
+ System.out.println("WARN: The SQL commands have NOT been executed, you must use the '-run' option");
+ }
+ }
+ return 0;
+ } catch (ParseException ex) {
+ System.err.println("Invalid sub-command: " + ex.getMessage());
+ System.err.println();
+ System.err.println(parser.shortHelp());
+ return 1;
+ } catch (Exception ex) {
+ System.err.println();
+ System.err.println("Error: " + ex.getMessage());
+ System.err.println();
+ System.err.println("Stack trace for the error was (for debug purposes):");
+ System.err.println("--------------------------------------");
+ ex.printStackTrace(System.err);
+ System.err.println("--------------------------------------");
+ System.err.println();
+ return 1;
+ }
+ }
+
+ private void upgradeDB(String sqlFile, boolean run) throws Exception {
+ validateConnection();
+ if (!checkDBExists()) {
+ throw new Exception("Falcon DB doesn't exist");
+ }
+ String falconVersion = BuildProperties.get().getProperty("project.version");
+ String dbVersion = getFalconDBVersion();
+ if (dbVersion.compareTo(falconVersion) >= 0) {
+ System.out.println("Falcon DB already upgraded to Falcon version '" + falconVersion + "'");
+ return;
+ }
+
+ createUpgradeDB(sqlFile, run, false);
+ upgradeFalconDBVersion(sqlFile, run, falconVersion);
+
+ // any post upgrade tasks
+ if (run) {
+ System.out.println("Falcon DB has been upgraded to Falcon version '" + falconVersion + "'");
+ }
+ }
+
+
+ private void upgradeFalconDBVersion(String sqlFile, boolean run, String version) throws Exception {
+ String updateDBVersion = "update FALCON_DB_PROPS set data='" + version + "' where name='db.version'";
+ PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
+ writer.println();
+ writer.println(updateDBVersion);
+ writer.close();
+ System.out.println("Upgrade db.version in FALCON_DB_PROPS table to " + version);
+ if (run) {
+ Connection conn = createConnection();
+ Statement st = null;
+ try {
+ conn.setAutoCommit(true);
+ st = conn.createStatement();
+ st.executeUpdate(updateDBVersion);
+ st.close();
+ } catch (Exception ex) {
+ throw new Exception("Could not upgrade db.version in FALCON_DB_PROPS table: " + ex.toString(), ex);
+ } finally {
+ closeStatement(st);
+ conn.close();
+ }
+ }
+ System.out.println("DONE");
+ }
+
+ private static final String GET_FALCON_DB_VERSION = "select data from FALCON_DB_PROPS where name = 'db.version'";
+
+ private String getFalconDBVersion() throws Exception {
+ String version;
+ System.out.println("Get Falcon DB version");
+ Connection conn = createConnection();
+ Statement st = null;
+ ResultSet rs = null;
+ try {
+ st = conn.createStatement();
+ rs = st.executeQuery(GET_FALCON_DB_VERSION);
+ if (rs.next()) {
+ version = rs.getString(1);
+ } else {
+ throw new Exception("ERROR: Could not find Falcon DB 'db.version' in FALCON_DB_PROPS table");
+ }
+ } catch (Exception ex) {
+ throw new Exception("ERROR: Could not query FALCON_DB_PROPS table: " + ex.toString(), ex);
+ } finally {
+ closeResultSet(rs);
+ closeStatement(st);
+ conn.close();
+ }
+ System.out.println("DONE");
+ return version;
+ }
+
+
+ private Map<String, String> getJdbcConf() throws Exception {
+ Map<String, String> jdbcConf = new HashMap<String, String>();
+ jdbcConf.put("driver", StateStoreProperties.get().getProperty(FalconJPAService.DRIVER));
+ String url = StateStoreProperties.get().getProperty(FalconJPAService.URL);
+ jdbcConf.put("url", url);
+ jdbcConf.put("user", StateStoreProperties.get().getProperty(FalconJPAService.USERNAME));
+ jdbcConf.put("password", StateStoreProperties.get().getProperty(FalconJPAService.PASSWORD));
+ String dbType = url.substring("jdbc:".length());
+ if (dbType.indexOf(":") <= 0) {
+ throw new RuntimeException("Invalid JDBC URL, missing vendor 'jdbc:[VENDOR]:...'");
+ }
+ dbType = dbType.substring(0, dbType.indexOf(":"));
+ jdbcConf.put("dbtype", dbType);
+ return jdbcConf;
+ }
+
+ private String[] createMappingToolArguments(String sqlFile) throws Exception {
+ Map<String, String> conf = getJdbcConf();
+ List<String> args = new ArrayList<String>();
+ args.add("-schemaAction");
+ args.add("add");
+ args.add("-p");
+ args.add("persistence.xml#falcon-" + conf.get("dbtype"));
+ args.add("-connectionDriverName");
+ args.add(conf.get("driver"));
+ args.add("-connectionURL");
+ args.add(conf.get("url"));
+ args.add("-connectionUserName");
+ args.add(conf.get("user"));
+ args.add("-connectionPassword");
+ args.add(conf.get("password"));
+ if (sqlFile != null) {
+ args.add("-sqlFile");
+ args.add(sqlFile);
+ }
+ args.add("-indexes");
+ args.add("true");
+ args.add("org.apache.falcon.persistence.EntityBean");
+ args.add("org.apache.falcon.persistence.InstanceBean");
+ args.add("org.apache.falcon.persistence.PendingInstanceBean");
+ args.add("org.apache.falcon.persistence.MonitoredFeedsBean");
+ return args.toArray(new String[args.size()]);
+ }
+
+ private void createDB(String sqlFile, boolean run) throws Exception {
+ validateConnection();
+ if (checkDBExists()) {
+ return;
+ }
+
+ verifyFalconPropsTable(false);
+ createUpgradeDB(sqlFile, run, true);
+ createFalconPropsTable(sqlFile, run, BuildProperties.get().getProperty("project.version"));
+ if (run) {
+ System.out.println("Falcon DB has been created for Falcon version '"
+ + BuildProperties.get().getProperty("project.version") + "'");
+ }
+ }
+
+ private static final String CREATE_FALCON_DB_PROPS =
+ "create table FALCON_DB_PROPS (name varchar(100), data varchar(100))";
+
+ private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception {
+ String insertDbVerion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";
+
+ PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
+ writer.println();
+ writer.println(CREATE_FALCON_DB_PROPS);
+ writer.println(insertDbVerion);
+ writer.close();
+ System.out.println("Create FALCON_DB_PROPS table");
+ if (run) {
+ Connection conn = createConnection();
+ Statement st = null;
+ try {
+ conn.setAutoCommit(true);
+ st = conn.createStatement();
+ st.executeUpdate(CREATE_FALCON_DB_PROPS);
+ st.executeUpdate(insertDbVerion);
+ st.close();
+ } catch (Exception ex) {
+ closeStatement(st);
+ throw new Exception("Could not create FALCON_DB_PROPS table: " + ex.toString(), ex);
+ } finally {
+ conn.close();
+ }
+ }
+ System.out.println("DONE");
+ }
+
+ private static final String FALCON_DB_PROPS_EXISTS = "select count(*) from FALCON_DB_PROPS";
+
+ private boolean verifyFalconPropsTable(boolean exists) throws Exception {
+ System.out.println((exists) ? "Check FALCON_DB_PROPS table exists"
+ : "Checking FALCON_DB_PROPS table does not exist");
+ boolean tableExists;
+ Connection conn = createConnection();
+ Statement st = null;
+ ResultSet rs = null;
+ try {
+ st = conn.createStatement();
+ rs = st.executeQuery(FALCON_DB_PROPS_EXISTS);
+ rs.next();
+ tableExists = true;
+ } catch (Exception ex) {
+ tableExists = false;
+ } finally {
+ closeResultSet(rs);
+ closeStatement(st);
+ conn.close();
+ }
+ if (tableExists != exists) {
+ throw new Exception("FALCON_DB_PROPS_TABLE table " + ((exists) ? "does not exist" : "exists"));
+ }
+ System.out.println("DONE");
+ return tableExists;
+ }
+
+ private void closeResultSet(ResultSet rs) {
+ try {
+ if (rs != null) {
+ rs.close();
+ }
+ } catch (Exception e) {
+ System.out.println("Unable to close ResultSet " + rs);
+ }
+ }
+
+ private void closeStatement(Statement st) throws Exception {
+ try {
+ if (st != null) {
+ st.close();
+ }
+ } catch (Exception e) {
+ System.out.println("Unable to close SQL Statement " + st);
+ throw new Exception(e);
+ }
+ }
+
+ private Connection createConnection() throws Exception {
+ Map<String, String> conf = getJdbcConf();
+ Class.forName(conf.get("driver")).newInstance();
+ return DriverManager.getConnection(conf.get("url"), conf.get("user"), conf.get("password"));
+ }
+
+ private void validateConnection() throws Exception {
+ System.out.println("Validating DB Connection");
+ try {
+ createConnection().close();
+ System.out.println("DONE");
+ } catch (Exception ex) {
+ throw new Exception("Could not connect to the database: " + ex.toString(), ex);
+ }
+ }
+
+ private static final String ENTITY_STATUS_QUERY =
+ "select count(*) from ENTITIES where current_state IN ('RUNNING', 'SUSPENDED')";
+ private static final String INSTANCE_STATUS_QUERY =
+ "select count(*) from INSTANCES where current_state IN ('RUNNING', 'SUSPENDED')";
+
+ private boolean checkDBExists() throws Exception {
+ boolean schemaExists;
+ Connection conn = createConnection();
+ ResultSet rs = null;
+ Statement st = null;
+ try {
+ st = conn.createStatement();
+ rs = st.executeQuery(ENTITY_STATUS_QUERY);
+ rs.next();
+ schemaExists = true;
+ } catch (Exception ex) {
+ schemaExists = false;
+ } finally {
+ closeResultSet(rs);
+ closeStatement(st);
+ conn.close();
+ }
+ System.out.println("DB schema " + ((schemaExists) ? "exists" : "does not exist"));
+ return schemaExists;
+ }
+
+ private void createUpgradeDB(String sqlFile, boolean run, boolean create) throws Exception {
+ System.out.println((create) ? "Create SQL schema" : "Upgrade SQL schema");
+ String[] args = createMappingToolArguments(sqlFile);
+ org.apache.openjpa.jdbc.meta.MappingTool.main(args);
+ if (run) {
+ args = createMappingToolArguments(null);
+ org.apache.openjpa.jdbc.meta.MappingTool.main(args);
+ }
+ System.out.println("DONE");
+ }
+
+ private void showVersion() throws Exception {
+ System.out.println("Falcon Server version: "
+ + BuildProperties.get().getProperty("project.version"));
+ validateConnection();
+ if (!checkDBExists()) {
+ throw new Exception("Falcon DB doesn't exist");
+ }
+ try {
+ verifyFalconPropsTable(true);
+ } catch (Exception ex) {
+ throw new Exception("ERROR: It seems this Falcon DB was never upgraded with the 'falcondb' tool");
+ }
+ showFalconPropsInfo();
+ }
+
+ private static final String GET_FALCON_PROPS_INFO = "select name, data from FALCON_DB_PROPS order by name";
+
+ private void showFalconPropsInfo() throws Exception {
+ Connection conn = createConnection();
+ Statement st = null;
+ ResultSet rs = null;
+ try {
+ System.out.println("Falcon DB Version Information");
+ System.out.println("--------------------------------------");
+ st = conn.createStatement();
+ rs = st.executeQuery(GET_FALCON_PROPS_INFO);
+ while (rs.next()) {
+ System.out.println(rs.getString(1) + ": " + rs.getString(2));
+ }
+ System.out.println("--------------------------------------");
+ } catch (Exception ex) {
+ throw new Exception("ERROR querying FALCON_DB_PROPS table: " + ex.toString(), ex);
+ } finally {
+ closeResultSet(rs);
+ closeStatement(st);
+ conn.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml
new file mode 100644
index 0000000..4c9388c
--- /dev/null
+++ b/common/src/main/resources/META-INF/persistence.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<persistence xmlns="http://java.sun.com/xml/ns/persistence"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ version="1.0">
+
+ <persistence-unit name="falcon-derby" transaction-type="RESOURCE_LOCAL">
+ <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+
+ <class>org.apache.falcon.persistence.EntityBean</class>
+ <class>org.apache.falcon.persistence.InstanceBean</class>
+ <class>org.apache.falcon.persistence.PendingInstanceBean</class>
+ <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
+
+ <properties>
+ <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
+
+ <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
+
+ <property name="openjpa.MetaDataFactory"
+ value="jpa(Types=org.apache.falcon.persistence.EntityBean;
+ org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
+ org.apache.falcon.persistence.MonitoredFeedsBean)"></property>
+
+ <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
+ <property name="openjpa.LockManager" value="pessimistic"/>
+ <property name="openjpa.ReadLockLevel" value="read"/>
+ <property name="openjpa.WriteLockLevel" value="write"/>
+ <property name="openjpa.jdbc.TransactionIsolation" value="read-committed"/> <!--CUSTOM-->
+ <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
+ <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
+ <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
+ <property name="openjpa.Log" value="log4j"/>
+ </properties>
+ </persistence-unit>
+
+ <persistence-unit name="falcon-mysql" transaction-type="RESOURCE_LOCAL">
+ <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+
+ <class>org.apache.falcon.persistence.EntityBean</class>
+ <class>org.apache.falcon.persistence.InstanceBean</class>
+ <class>org.apache.falcon.persistence.PendingInstanceBean</class>
+ <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
+
+ <properties>
+ <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
+
+ <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
+
+ <property name="openjpa.MetaDataFactory"
+ value="jpa(Types=org.apache.falcon.persistence.EntityBean;
+ org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
+ org.apache.falcon.persistence.MonitoredFeedsBean)"></property>
+
+ <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
+ <property name="openjpa.LockManager" value="pessimistic"/>
+ <property name="openjpa.ReadLockLevel" value="read"/>
+ <property name="openjpa.WriteLockLevel" value="write"/>
+ <property name="openjpa.jdbc.TransactionIsolation" value="repeatable-read"/> <!--CUSTOM-->
+ <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
+ <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
+ <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
+ <property name="openjpa.Log" value="log4j"/>
+ </properties>
+ </persistence-unit>
+
+ <persistence-unit name="falcon-postgresql" transaction-type="RESOURCE_LOCAL">
+ <provider>org.apache.openjpa.persistence.PersistenceProviderImpl</provider>
+
+ <class>org.apache.falcon.persistence.EntityBean</class>
+ <class>org.apache.falcon.persistence.InstanceBean</class>
+ <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
+ <class>org.apache.falcon.persistence.PendingInstanceBean</class>
+
+ <properties>
+ <property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
+
+ <property name="openjpa.ConnectionProperties" value="**INVALID**"/> <!--Set by StoreService at init time -->
+
+ <property name="openjpa.MetaDataFactory"
+ value="jpa(Types=org.apache.falcon.persistence.EntityBean;
+ org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
+ org.apache.falcon.persistence.MonitoredFeedsBean)"></property>
+
+ <property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
+ <property name="openjpa.LockManager" value="pessimistic"/>
+ <property name="openjpa.ReadLockLevel" value="read"/>
+ <property name="openjpa.WriteLockLevel" value="write"/>
+ <property name="openjpa.jdbc.TransactionIsolation" value="repeatable-read"/> <!--CUSTOM-->
+ <property name="openjpa.jdbc.DBDictionary" value="batchLimit=50"/>
+ <property name="openjpa.jdbc.DBDictionary" value="TimestampTypeName=TIMESTAMP"/>
+ <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
+ <property name="openjpa.Log" value="log4j"/>
+ </properties>
+ </persistence-unit>
+
+</persistence>
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 81d3da1..87a74bf 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -42,7 +42,8 @@
org.apache.falcon.metadata.MetadataMappingService,\
org.apache.falcon.service.LogCleanupService,\
org.apache.falcon.service.GroupsService,\
- org.apache.falcon.service.ProxyUserService
+ org.apache.falcon.service.ProxyUserService,\
+ org.apache.falcon.service.FalconJPAService
## Add if you want to use Falcon Azure integration ##
# org.apache.falcon.adfservice.ADFProviderService
## If you wish to use Falcon native scheduler add the commented out services below to application.services ##
@@ -51,7 +52,7 @@
# org.apache.falcon.notification.service.impl.AlarmService,\
# org.apache.falcon.notification.service.impl.DataAvailabilityService,\
# org.apache.falcon.execution.FalconExecutionService,\
-# org.apache.falcon.state.store.service.FalconJPAService
+
# List of Lifecycle policies configured.
@@ -305,4 +306,4 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
## Creates Falcon DB.
## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
-#*.falcon.statestore.create.db.schema=true
\ No newline at end of file
+#*.falcon.statestore.create.db.schema=true
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/statestore.credentials
----------------------------------------------------------------------
diff --git a/common/src/main/resources/statestore.credentials b/common/src/main/resources/statestore.credentials
index 86c32a1..b0e4196 100644
--- a/common/src/main/resources/statestore.credentials
+++ b/common/src/main/resources/statestore.credentials
@@ -18,5 +18,5 @@
######### StateStore Credentials #####
-#*.falcon.statestore.jdbc.username=sa
-#*.falcon.statestore.jdbc.password=
\ No newline at end of file
+*.falcon.statestore.jdbc.username=sa
+*.falcon.statestore.jdbc.password=
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/common/src/main/resources/statestore.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/statestore.properties b/common/src/main/resources/statestore.properties
index 44e79b3..7686426 100644
--- a/common/src/main/resources/statestore.properties
+++ b/common/src/main/resources/statestore.properties
@@ -42,4 +42,22 @@
## Creates Falcon DB.
## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
-#*.falcon.statestore.create.db.schema=true
\ No newline at end of file
+#*.falcon.statestore.create.db.schema=true
+
+
+######## StateStore Properties #####
+*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+*.falcon.statestore.jdbc.url=jdbc:derby:target/test-data/data.db;create=true
+*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+# Maximum number of active connections that can be allocated from this pool at the same time.
+*.falcon.statestore.pool.max.active.conn=10
+*.falcon.statestore.connection.properties=
+# Indicates the interval (in milliseconds) between eviction runs.
+*.falcon.statestore.validate.db.connection.eviction.interval=300000
+# The number of objects to examine during each run of the idle object evictor thread.
+*.falcon.statestore.validate.db.connection.eviction.num=10
+# Creates Falcon DB.
+# If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+# If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+*.falcon.statestore.create.db.schema=true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/docs/src/site/twiki/FalconNativeScheduler.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconNativeScheduler.twiki b/docs/src/site/twiki/FalconNativeScheduler.twiki
index 9ffc5e9..1f51739 100644
--- a/docs/src/site/twiki/FalconNativeScheduler.twiki
+++ b/docs/src/site/twiki/FalconNativeScheduler.twiki
@@ -29,7 +29,7 @@ You can enable native scheduler by making changes to __$FALCON_HOME/conf/startup
org.apache.falcon.service.ProcessSubscriberService,\
org.apache.falcon.service.FeedSLAMonitoringService,\
org.apache.falcon.service.LifecyclePolicyMap,\
- org.apache.falcon.state.store.service.FalconJPAService,\
+ org.apache.falcon.service.FalconJPAService,\
org.apache.falcon.entity.store.ConfigurationStore,\
org.apache.falcon.rerun.service.RetryService,\
org.apache.falcon.rerun.service.LateRunService,\
http://git-wip-us.apache.org/repos/asf/falcon/blob/de2f5c0a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
new file mode 100644
index 0000000..39e2562
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.jdbc;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.falcon.persistence.MonitoredFeedsBean;
+import org.apache.falcon.persistence.PendingInstanceBean;
+import org.apache.falcon.persistence.PersistenceConstants;
+import org.apache.falcon.persistence.ResultNotFoundException;
+import org.apache.falcon.service.FalconJPAService;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.util.Date;
+import java.util.List;
+
+/**
+* StateStore for MonitoringFeeds and PendingFeedInstances.
+*/
+
+public class MonitoringJdbcStateStore {
+
+ private EntityManager getEntityManager() {
+ return FalconJPAService.get().getEntityManager();
+ }
+
+
+ public void putMonitoredFeed(String feedName){
+
+ MonitoredFeedsBean monitoredFeedsBean = new MonitoredFeedsBean();
+ monitoredFeedsBean.setFeedName(feedName);
+ EntityManager entityManager = getEntityManager();
+ try {
+ beginTransaction(entityManager);
+ entityManager.persist(monitoredFeedsBean);
+ } finally {
+ commitAndCloseTransaction(entityManager);
+ }
+ }
+
+ public MonitoredFeedsBean getMonitoredFeed(String feedName){
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITERED_INSTANCE);
+ q.setParameter("feedName", feedName);
+ List result = q.getResultList();
+ try {
+ if (result.isEmpty()) {
+ return null;
+ }
+ } finally {
+ entityManager.close();
+ }
+ return ((MonitoredFeedsBean)result.get(0));
+ }
+
+ public void deleteMonitoringFeed(String feedName) {
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_INSTANCES);
+ q.setParameter("feedName", feedName);
+ try{
+ q.executeUpdate();
+ } finally {
+ commitAndCloseTransaction(entityManager);
+ }
+ }
+
+ public List<MonitoredFeedsBean> getAllMonitoredFeed() throws ResultNotFoundException{
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_FEEDS);
+ List result = q.getResultList();
+ try{
+ if (result.isEmpty()) {
+ throw new ResultNotFoundException("No Feed has been scheduled for monitoring.");
+ }
+ } finally {
+ entityManager.close();
+ }
+ return result;
+ }
+
+ public void deletePendingInstance(String feedName, String clusterName , Date nominalTime){
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES);
+ q.setParameter("feedName", feedName);
+ q.setParameter("clusterName", clusterName);
+ q.setParameter("nominalTime", nominalTime);
+ try{
+ q.executeUpdate();
+ } finally {
+ commitAndCloseTransaction(entityManager);
+ }
+ }
+
+ public void deletePendingInstances(String feedName, String clusterName){
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED);
+ q.setParameter("feedName", feedName);
+ q.setParameter("clusterName", clusterName);
+ try{
+ q.executeUpdate();
+ } finally {
+ commitAndCloseTransaction(entityManager);
+ }
+ }
+
+ public void putPendingInstances(String feed, String clusterName, Date nominalTime){
+ EntityManager entityManager = getEntityManager();
+ PendingInstanceBean pendingInstanceBean = new PendingInstanceBean();
+ pendingInstanceBean.setFeedName(feed);
+ pendingInstanceBean.setClusterName(clusterName);
+ pendingInstanceBean.setNominalTime(nominalTime);
+
+ beginTransaction(entityManager);
+ entityManager.persist(pendingInstanceBean);
+ commitAndCloseTransaction(entityManager);
+ }
+
+ public List<Date> getNominalInstances(String feedName, String clusterName) throws ResultNotFoundException{
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES);
+ q.setParameter("feedName", feedName);
+ q.setParameter("clusterName", clusterName);
+ List result = q.getResultList();
+ try{
+ if (CollectionUtils.isEmpty(result)) {
+ throw new ResultNotFoundException(feedName + " with " + clusterName + "Not Found");
+ }
+ } finally {
+ entityManager.close();
+ }
+ return result;
+ }
+ public List<PendingInstanceBean> getAllInstances(){
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PENDING_INSTANCES);
+ List result = q.getResultList();
+
+ try {
+ if (CollectionUtils.isEmpty(result)) {
+ return null;
+ }
+ } finally{
+ entityManager.close();
+ }
+ return result;
+ }
+
+ private void commitAndCloseTransaction(EntityManager entityManager) {
+ entityManager.getTransaction().commit();
+ entityManager.close();
+ }
+
+ private void beginTransaction(EntityManager entityManager) {
+ entityManager.getTransaction().begin();
+ }
+
+}