You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2016/07/06 03:21:22 UTC
[1/3] falcon git commit: FALCON-2052 Process SLA monitoring
Repository: falcon
Updated Branches:
refs/heads/master bd32b610e -> 60e2f68b8
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/src/build/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/src/build/findbugs-exclude.xml b/src/build/findbugs-exclude.xml
index 62720e3..5c35b8c 100644
--- a/src/build/findbugs-exclude.xml
+++ b/src/build/findbugs-exclude.xml
@@ -53,13 +53,13 @@
</Match>
<Match>
- <Class name="org.apache.falcon.persistence.FeedSLAAlertBean" />
+ <Class name="org.apache.falcon.persistence.EntitySLAAlertBean" />
<Bug pattern="UWF_UNWRITTEN_FIELD,NP_BOOLEAN_RETURN_NULL" />
</Match>
<Match>
- <Class name="org.apache.falcon.persistence.MonitoredFeedsBean" />
+ <Class name="org.apache.falcon.persistence.MonitoredEntityBean" />
<Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" />
</Match>
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 5ac3d5c..78c7e1e 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -54,7 +54,7 @@
##For feed SLA monitoring enable these two
# org.apache.falcon.service.FalconJPAService,\
-# org.apache.falcon.service.FeedSLAMonitoringService,\
+# org.apache.falcon.service.EntitySLAMonitoringService,\
##Add if you want to send data to graphite
# org.apache.falcon.metrics.MetricNotificationService\
@@ -64,7 +64,7 @@
#*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
# org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
# org.apache.falcon.service.ProcessSubscriberService,\
-# org.apache.falcon.service.FeedSLAMonitoringService,\
+# org.apache.falcon.service.EntitySLAMonitoringService,\
# org.apache.falcon.service.LifecyclePolicyMap,\
# org.apache.falcon.service.FalconJPAService,\
# org.apache.falcon.entity.store.ConfigurationStore,\
@@ -96,7 +96,7 @@ prism.application.services=org.apache.falcon.service.LifecyclePolicyMap,\
org.apache.falcon.entity.ColoClusterRelation,\
org.apache.falcon.group.FeedGroupMap,\
org.apache.falcon.entity.store.FeedLocationStore,\
- org.apache.falcon.service.FeedSLAMonitoringService,\
+ org.apache.falcon.service.EntitySLAMonitoringService,\
org.apache.falcon.service.SharedLibraryHostingService
## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ##
# org.apache.falcon.state.store.jdbc.JDBCStateStore
[2/3] falcon git commit: FALCON-2052 Process SLA monitoring
Posted by aj...@apache.org.
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
new file mode 100644
index 0000000..f931625
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAMonitoringService.java
@@ -0,0 +1,644 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.service;
+
+import java.text.ParseException;
+import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Pair;
+import org.apache.falcon.entity.FeedInstanceStatus;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.entity.v0.process.Clusters;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
+import org.apache.falcon.persistence.MonitoredEntityBean;
+import org.apache.falcon.persistence.PendingInstanceBean;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.resource.SchedulableEntityInstance;
+import org.apache.falcon.util.DateUtil;
+import org.apache.falcon.util.DeploymentUtil;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowEngineFactory;
+import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.falcon.entity.v0.process.Process;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Service to monitor Feed SLAs.
+ */
+public final class EntitySLAMonitoringService implements ConfigurationChangeListener, FalconService {
+ private static final Logger LOG = LoggerFactory.getLogger("FeedSLA");
+
+ private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore();
+
+ private static final int ONE_MS = 1;
+
+ private static final EntitySLAMonitoringService SERVICE = new EntitySLAMonitoringService();
+
+ public static final String TAG_CRITICAL = "Missed-SLA-High";
+ public static final String TAG_WARN = "Missed-SLA-Low";
+
+ private EntitySLAMonitoringService() {
+
+ }
+
+ public static EntitySLAMonitoringService get() {
+ return SERVICE;
+ }
+
+ /**
+ * Permissions for storePath.
+ */
+ private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
+
+
+ /**
+ * Frequency in seconds of "status check" for pending feed instances.
+ */
+ private int statusCheckFrequencySeconds; // 10 minutes
+
+
+ /**
+ * Time Duration (in milliseconds) in future for generating pending feed instances.
+ *
+ * In every cycle pending feed instances are added for monitoring, till this time in future.
+ */
+ private int lookAheadWindowMillis; // 15 MINUTES
+
+
+ /**
+ * Filesystem used for serializing and deserializing.
+ */
+ private FileSystem fileSystem;
+
+ /**
+ * Working directory for the feed sla monitoring service.
+ */
+ private Path storePath;
+
+ /**
+ * Path to store the state of the monitoring service.
+ */
+ private Path filePath;
+
+ @Override
+ public void onAdd(Entity entity) throws FalconException {
+ Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+ if (entity.getEntityType() == EntityType.FEED) {
+ Feed feed = (Feed) entity;
+ // currently sla service is enabled only for fileSystemStorage
+ if (feed.getLocations() != null || feed.getSla() != null || checkFeedClusterSLA(feed)) {
+ for (Cluster cluster : feed.getClusters().getClusters()) {
+ if (currentClusters.contains(cluster.getName())) {
+ if (FeedHelper.getSLA(cluster, feed) != null) {
+ LOG.debug("Adding feed:{} for monitoring", feed.getName());
+ MONITORING_JDBC_STATE_STORE.putMonitoredEntity(feed.getName(), EntityType.FEED.toString());
+ break;
+ }
+ }
+ }
+ }
+ }
+ if (entity.getEntityType() == EntityType.PROCESS){
+ Process process = (Process) entity;
+ if (process.getSla() != null || checkProcessClusterSLA(process)){
+ for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
+ if (currentClusters.contains(cluster.getName())) {
+ LOG.debug("Adding process:{} for monitoring", process.getName());
+ MONITORING_JDBC_STATE_STORE.putMonitoredEntity(process.getName(),
+ EntityType.PROCESS.toString());
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ public Boolean checkFeedClusterSLA(Feed feed){
+ for(Cluster cluster : feed.getClusters().getClusters()){
+ Sla sla = FeedHelper.getSLA(cluster, feed);
+ if (sla != null){
+ return true;
+ }
+ }
+ return false;
+ }
+
+
+ public Boolean checkProcessClusterSLA(Process process){
+ Clusters clusters = process.getClusters();
+ for(org.apache.falcon.entity.v0.process.Cluster cluster : clusters.getClusters()){
+ org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(cluster, process);
+ if (sla != null){
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void onRemove(Entity entity) throws FalconException {
+ Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+ if (entity.getEntityType() == EntityType.FEED) {
+ Feed feed = (Feed) entity;
+ // currently sla service is enabled only for fileSystemStorage
+ if (feed.getLocations() != null) {
+ for (Cluster cluster : feed.getClusters().getClusters()) {
+ if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
+ MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(feed.getName(), EntityType.FEED.toString());
+ MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), cluster.getName(),
+ EntityType.FEED.toString());
+ }
+ }
+ }
+ }
+ if (entity.getEntityType() == EntityType.PROCESS){
+ Process process = (Process) entity;
+ if (process.getSla() != null){
+ for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
+ if (currentClusters.contains(cluster.getName())) {
+ MONITORING_JDBC_STATE_STORE.deleteMonitoringEntity(process.getName(),
+ EntityType.PROCESS.toString());
+ MONITORING_JDBC_STATE_STORE.deletePendingInstances(process.getName(), cluster.getName(),
+ EntityType.PROCESS.toString());
+ }
+ }
+ }
+ }
+ }
+
+ private boolean isSLAMonitoringEnabledInCurrentColo(Feed feed) {
+ if (feed.getLocations() != null) {
+ Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+ for (Cluster cluster : feed.getClusters().getClusters()) {
+ if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean isSLAMonitoringEnabledInCurrentColo(Process process) {
+
+ Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+ for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
+ if (currentClusters.contains(cluster.getName()) && ProcessHelper.getSLA(cluster, process) != null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
+ if (newEntity.getEntityType() == EntityType.FEED) {
+ Feed oldFeed = (Feed) oldEntity;
+ Feed newFeed = (Feed) newEntity;
+ if (!isSLAMonitoringEnabledInCurrentColo(newFeed)) {
+ onRemove(oldFeed);
+ } else if (!isSLAMonitoringEnabledInCurrentColo(oldFeed)) {
+ onAdd(newFeed);
+ } else {
+ List<String> slaRemovedClusters = new ArrayList<>();
+ for (String oldCluster : EntityUtil.getClustersDefinedInColos(oldFeed)) {
+ if (FeedHelper.getSLA(oldCluster, oldFeed) != null
+ && FeedHelper.getSLA(oldCluster, newFeed) == null) {
+ slaRemovedClusters.add(oldCluster);
+ }
+ }
+ updatePendingInstances(newFeed.getName(), slaRemovedClusters, EntityType.FEED.toString());
+ }
+ }
+ if (newEntity.getEntityType() == EntityType.PROCESS) {
+ Process oldProcess = (Process) oldEntity;
+ Process newProcess = (Process) newEntity;
+ if (!isSLAMonitoringEnabledInCurrentColo(oldProcess)){
+ onRemove(newProcess);
+ } else if (!isSLAMonitoringEnabledInCurrentColo(newProcess)){
+ onAdd(newProcess);
+ } else {
+ List<String> slaRemovedClusters = new ArrayList<>();
+ for (String oldCluster : EntityUtil.getClustersDefined(oldProcess)){
+ if (ProcessHelper.getSLA(oldCluster, oldProcess) != null
+ && ProcessHelper.getSLA(oldCluster, newProcess) == null){
+ slaRemovedClusters.add(oldCluster);
+ }
+ }
+ updatePendingInstances(newProcess.getName(), slaRemovedClusters, EntityType.PROCESS.toString());
+ }
+ }
+ }
+
+ void updatePendingInstances(String entityName, List<String> slaRemovedClusters , String entityType){
+ for(String clusterName :slaRemovedClusters){
+ MONITORING_JDBC_STATE_STORE.deletePendingInstances(entityName, clusterName,
+ entityType);
+ }
+ }
+
+ @Override
+ public void onReload(Entity entity) throws FalconException {
+ onAdd(entity);
+ }
+
+ @Override
+ public String getName() {
+ return EntitySLAMonitoringService.class.getSimpleName();
+ }
+
+ @Override
+ public void init() throws FalconException {
+ String uri = StartupProperties.get().getProperty("feed.sla.service.store.uri");
+ storePath = new Path(uri);
+ filePath = new Path(storePath, "feedSLAMonitoringService");
+ fileSystem = initializeFileSystem();
+
+ String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600");
+ statusCheckFrequencySeconds = Integer.parseInt(freq);
+
+ freq = StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", "900000");
+ lookAheadWindowMillis = Integer.parseInt(freq);
+ LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString());
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS);
+ }
+
+ public void makeFeedInstanceAvailable(String feedName, String clusterName, Date nominalTime)
+ throws FalconException {
+ LOG.info("Removing {} feed's instance {} in cluster {} from pendingSLA", feedName,
+ clusterName, nominalTime);
+ List<Date> instances = (MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName,
+ EntityType.FEED.toString()));
+ // Slas for feeds not having sla tag are not stored.
+ if (CollectionUtils.isEmpty(instances)){
+ MONITORING_JDBC_STATE_STORE.deletePendingInstance(feedName, clusterName, nominalTime,
+ EntityType.FEED.toString());
+ }
+ }
+
+ private FileSystem initializeFileSystem() {
+ try {
+ fileSystem = HadoopClientFactory.get().createFalconFileSystem(storePath.toUri());
+ if (!fileSystem.exists(storePath)) {
+ LOG.info("Creating directory for pending feed instances: {}", storePath);
+ // set permissions so config store dir is owned by falcon alone
+ HadoopClientFactory.mkdirs(fileSystem, storePath, STORE_PERMISSION);
+ }
+ return fileSystem;
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to bring up feed sla store for path: " + storePath, e);
+ }
+ }
+
+ @Override
+ public void destroy() throws FalconException {
+ }
+
+ //Periodically update status of pending instances, add new instances and take backup.
+ private class Monitor implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ if (MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed().size() > 0) {
+ checkPendingInstanceAvailability(EntityType.FEED.toString());
+ checkPendingInstanceAvailability(EntityType.PROCESS.toString());
+
+ // add Instances from last checked time to 10 minutes from now(some buffer for status check)
+ Date now = new Date();
+ Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis);
+ addNewPendingFeedInstances(newCheckPoint, EntityType.FEED.toString());
+ addNewPendingFeedInstances(newCheckPoint, EntityType.PROCESS.toString());
+ }
+ } catch (Throwable e) {
+ LOG.error("Feed SLA monitoring failed: ", e);
+ }
+ }
+ }
+
+
+ void addNewPendingFeedInstances(Date to, String entityType) throws FalconException {
+ Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
+ List<MonitoredEntityBean> feedsBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed();
+ for(MonitoredEntityBean monitoredEntityBean : feedsBeanList) {
+ String entityName = monitoredEntityBean.getFeedName();
+ Entity entity = EntityUtil.getEntity(entityType, entityName);
+ Set<String> clusters = EntityUtil.getClustersDefined(entity);
+ List<org.apache.falcon.entity.v0.cluster.Cluster> cluster = new ArrayList();
+ for(String string : clusters){
+ cluster.add(ClusterHelper.getCluster(string));
+ }
+ for (org.apache.falcon.entity.v0.cluster.Cluster feedCluster : cluster) {
+ if (currentClusters.contains(feedCluster.getName())) {
+ // get start of instances from the database
+ Date nextInstanceTime = MONITORING_JDBC_STATE_STORE.getLastInstanceTime(entityName,
+ EntityType.FEED.toString());
+ Pair<String, String> key = new Pair<>(entity.getName(), feedCluster.getName());
+ if (nextInstanceTime == null) {
+ nextInstanceTime = getInitialStartTime(entity, feedCluster.getName(), entityType);
+ } else {
+ nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
+ }
+
+ Set<Date> instances = new HashSet<>();
+ org.apache.falcon.entity.v0.cluster.Cluster currentCluster =
+ EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName());
+ nextInstanceTime = EntityUtil.getNextStartTime(entity, currentCluster, nextInstanceTime);
+ Date endDate;
+ if (entityType.equals(EntityType.FEED.toString())){
+ endDate = FeedHelper.getClusterValidity((Feed) entity, currentCluster.getName()).getEnd();
+ }else {
+ endDate = ProcessHelper.getClusterValidity((Process) entity,
+ currentCluster.getName()).getEnd();
+ }
+ while (nextInstanceTime.before(to) && nextInstanceTime.before(endDate)) {
+ LOG.debug("Adding instance={} for <feed,cluster>={}", nextInstanceTime, key);
+ instances.add(nextInstanceTime);
+ nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
+ nextInstanceTime = EntityUtil.getNextStartTime(entity, currentCluster, nextInstanceTime);
+ }
+
+ for(Date date:instances){
+ MONITORING_JDBC_STATE_STORE.putPendingInstances(entity.getName(), feedCluster.getName(), date,
+ entityType);
+ }
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Checks the availability of all the pendingInstances and removes the ones which have become available.
+ */
+ private void checkPendingInstanceAvailability(String entityType) throws FalconException {
+ for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){
+ for (Date date : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(),
+ pendingInstanceBean.getClusterName(), entityType)) {
+ boolean status = checkEntityInstanceAvailability(pendingInstanceBean.getEntityName(),
+ pendingInstanceBean.getClusterName(), date, entityType);
+ if (status) {
+ MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getEntityName(),
+ pendingInstanceBean.getClusterName(), date, EntityType.FEED.toString());
+ }
+ }
+ }
+ }
+
+ // checks whether a given feed instance is available or not
+ private boolean checkEntityInstanceAvailability(String entityName, String clusterName, Date nominalTime,
+ String entityType) throws
+ FalconException {
+ Entity entity = EntityUtil.getEntity(entityType, entityName);
+
+ try {
+ if (entityType.equals(EntityType.PROCESS.toString())){
+ LOG.debug("Checking instance availability status for entity:{}, cluster:{}, "
+ + "instanceTime:{}", entity.getName(), clusterName, nominalTime, entityType);
+ AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
+ InstancesResult instancesResult = wfEngine.getStatus(entity, nominalTime, nominalTime, null, null);
+ if (instancesResult.getStatus().equals(APIResult.Status.SUCCEEDED)){
+ LOG.debug("Entity instance(feed:{}, cluster:{}, instanceTime:{}) is available.", entity.getName(),
+ clusterName, nominalTime);
+ return true;
+ }
+ return false;
+ }
+ if (entityType.equals(EntityType.FEED.toString())){
+ LOG.debug("Checking instance availability status for feed:{}, cluster:{}, instanceTime:{}",
+ entity.getName(), clusterName, nominalTime);
+
+ FeedInstanceStatus.AvailabilityStatus status = FeedHelper.getFeedInstanceStatus((Feed) entity,
+ clusterName, nominalTime);
+ if (status.equals(FeedInstanceStatus.AvailabilityStatus.AVAILABLE)
+ || status.equals(FeedInstanceStatus.AvailabilityStatus.EMPTY)) {
+ LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is available.", entity.getName(),
+ clusterName, nominalTime);
+ return true;
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error("Couldn't find status for Entity:{}, cluster:{}, entityType{}", entityName, clusterName,
+ entityType, e);
+ }
+ LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is not available.", entity.getName(),
+ clusterName, nominalTime);
+ return false;
+ }
+
+
+ /**
+ * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances between given time range which have missed
+ * slaLow or slaHigh.
+ *
+ * Only feeds which have defined sla in their definition are considered.
+ * Only the feed instances between the given time range are considered.
+ * Start time and end time are both inclusive.
+ * @param start start time, inclusive
+ * @param end end time, inclusive
+ * @return Set of pending feed instances belonging to the given range which have missed SLA
+ * @throws FalconException
+ */
+ public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(Date start, Date end)
+ throws FalconException {
+ Set<SchedulableEntityInstance> result = new HashSet<>();
+ for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){
+ Pair<String, String> feedClusterPair = new Pair<>(pendingInstanceBean.getEntityName(),
+ pendingInstanceBean.getClusterName());
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first);
+ Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
+ Sla sla = FeedHelper.getSLA(cluster, feed);
+ if (sla != null) {
+ Set<Pair<Date, String>> slaStatus = getFeedSLAStatus(sla, start, end,
+ MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getEntityName(),
+ pendingInstanceBean.getClusterName(), EntityType.FEED.toString()));
+ for (Pair<Date, String> status : slaStatus) {
+ SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first,
+ feedClusterPair.second, status.first, EntityType.FEED);
+ instance.setTags(status.second);
+ result.add(instance);
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances of a given feed between the given time range
+ * which missed sla.Only those instances are included which have missed either slaLow or slaHigh.
+ * @param entityName name of the feed
+ * @param clusterName cluster name
+ * @param start start time, inclusive
+ * @param end end time, inclusive
+ * @return Pending feed instances of the given feed which belong to the given time range and have missed SLA.
+ * @throws FalconException
+ */
+ public Set<SchedulableEntityInstance> getEntitySLAMissPendingAlerts(String entityName, String clusterName,
+ Date start, Date end, String entityType) throws FalconException {
+
+ Set<SchedulableEntityInstance> result = new HashSet<>();
+ List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstances(entityName, clusterName,
+ entityType);
+ if (missingInstances == null || !Arrays.asList(EntityType.FEED.toString(),
+ EntityType.PROCESS.toString()).contains(entityType)){
+ return result;
+ }
+ Entity entity = EntityUtil.getEntity(entityType, entityName);
+
+ if (entityType.equals(EntityType.FEED.toString())) {
+ Sla sla = FeedHelper.getSLA(clusterName, (Feed) entity);
+
+ if (sla != null) {
+ Set<Pair<Date, String>> slaStatus = getFeedSLAStatus(sla, start, end, missingInstances);
+ for (Pair<Date, String> status : slaStatus){
+ SchedulableEntityInstance instance = new SchedulableEntityInstance(entityName, clusterName,
+ status.first, EntityType.FEED);
+ instance.setTags(status.second);
+ result.add(instance);
+ }
+ }
+ return result;
+ } else {
+ org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(clusterName, (Process) entity);
+ if (sla != null){
+ Set<Pair<Date, String>> slaStatus = getProcessSLAStatus(sla, start, end, missingInstances);
+ for (Pair<Date, String> status : slaStatus){
+ SchedulableEntityInstance instance = new SchedulableEntityInstance(entityName, clusterName,
+ status.first, EntityType.PROCESS);
+ instance.setTags(status.second);
+ result.add(instance);
+ }
+ }
+ }
+ return result;
+ }
+
+ Set<Pair<Date, String>> getFeedSLAStatus(Sla sla, Date start, Date end, List<Date> missingInstances)
+ throws FalconException {
+ Date now = new Date();
+ Frequency slaLow = sla.getSlaLow();
+ Frequency slaHigh = sla.getSlaHigh();
+ Set<Pair<Date, String>> result = new HashSet<>();
+ for (Date nominalTime : missingInstances) {
+ if (!nominalTime.before(start) && !nominalTime.after(end)) {
+ ExpressionHelper.setReferenceDate(nominalTime);
+ ExpressionHelper evaluator = ExpressionHelper.get();
+ Long slaHighDuration = evaluator.evaluate(slaHigh.toString(), Long.class);
+ Long slaLowDuration = evaluator.evaluate(slaLow.toString(), Long.class);
+ Date slaCriticalTime = new Date(nominalTime.getTime() + slaHighDuration);
+ Date slaWarnTime = new Date(nominalTime.getTime() + slaLowDuration);
+ if (slaCriticalTime.before(now)) {
+ result.add(new Pair<>(nominalTime, TAG_CRITICAL));
+ } else if (slaWarnTime.before(now)) {
+ result.add(new Pair<>(nominalTime, TAG_WARN));
+ }
+ }
+ }
+ return result;
+ }
+
+ Set<Pair<Date, String>> getProcessSLAStatus(org.apache.falcon.entity.v0.process.Sla sla, Date start,
+ Date end, List<Date> missingInstances) throws FalconException {
+ Date now = new Date();
+ Frequency slaHigh = sla.getShouldEndIn();
+ Set<Pair<Date, String>> result = new HashSet<>();
+ for (Date nominalTime : missingInstances) {
+ if (!nominalTime.before(start) && !nominalTime.after(end)) {
+ ExpressionHelper.setReferenceDate(nominalTime);
+ ExpressionHelper evaluator = ExpressionHelper.get();
+ Long slaHighDuration = evaluator.evaluate(slaHigh.toString(), Long.class);
+ Date slaCriticalTime = new Date(nominalTime.getTime() + slaHighDuration);
+ if (slaCriticalTime.before(now)) {
+ result.add(new Pair<>(nominalTime, TAG_CRITICAL));
+ }
+ }
+ }
+ return result;
+ }
+
+ @VisibleForTesting
+ Date getInitialStartTime(Entity entity, String clusterName, String entityType) throws FalconException {
+ if (entityType.equals(EntityType.PROCESS.toString())){
+ Sla sla = FeedHelper.getSLA(clusterName, (Feed) entity);
+ if (sla == null) {
+ throw new IllegalStateException("InitialStartTime can not be determined as the feed: "
+ + entity.getName() + " and cluster: " + clusterName + " does not have any sla");
+ }
+ Date startTime = FeedHelper.getFeedValidityStart((Feed) entity, clusterName);
+ Frequency slaLow = sla.getSlaLow();
+ Date slaTime = new Date(DateUtil.now().getTime() - DateUtil.getFrequencyInMillis(slaLow));
+ return startTime.before(slaTime) ? startTime : slaTime;
+ } else{
+ org.apache.falcon.entity.v0.process.Sla sla = ProcessHelper.getSLA(clusterName, (Process)entity);
+ if (sla == null) {
+ throw new IllegalStateException("InitialStartTime can not be determined as the feed: "
+ + entity.getName() + " and cluster: " + clusterName + " does not have any sla");
+ }
+ Date startTime = ProcessHelper.getProcessValidityStart((Process) entity, clusterName);
+ Frequency slaLow = sla.getShouldEndIn();
+ Date slaTime = new Date(DateUtil.now().getTime() - DateUtil.getFrequencyInMillis(slaLow));
+ return startTime.before(slaTime) ? startTime : slaTime;
+ }
+ }
+
+ public void makeProcessInstanceAvailable(String clusterName, String entityName, String date, String entityType) {
+ Date nominalTime = null;
+ try {
+ nominalTime = DateUtil.parseDateFalconTZ(date);
+ }catch (ParseException e){
+ LOG.error("Exception while translating the date:", e);
+ }
+ if (nominalTime!= null){
+ List<Date> instances = (MONITORING_JDBC_STATE_STORE.getNominalInstances(entityName, clusterName,
+ entityType));
+ if (!CollectionUtils.isEmpty(instances)){
+ MONITORING_JDBC_STATE_STORE.deletePendingInstance(entityName, clusterName, nominalTime,
+ entityType);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java
deleted file mode 100644
index c09c7ae..0000000
--- a/prism/src/main/java/org/apache/falcon/service/FeedSLAAlertService.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.service;
-
-
-import java.util.Date;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.ArrayList;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
-import org.apache.falcon.persistence.PendingInstanceBean;
-import org.apache.falcon.resource.SchedulableEntityInstance;
-import org.apache.falcon.util.ReflectionUtils;
-import org.apache.falcon.util.StartupProperties;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Service to know which all feeds have missed SLA.
- */
-public final class FeedSLAAlertService implements FalconService, EntitySLAListener {
-
- private static final String NAME = "FeedSLAAlertService";
-
- private static final Logger LOG = LoggerFactory.getLogger(FeedSLAAlertService.class);
-
- private MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
-
- private Set<EntitySLAListener> listeners = new LinkedHashSet<EntitySLAListener>();
-
- private static final FeedSLAAlertService SERVICE = new FeedSLAAlertService();
-
- public static FeedSLAAlertService get() {
- return SERVICE;
- }
-
- private FeedSLAAlertService(){}
-
-
- @Override
- public String getName() {
- return NAME;
- }
-
- @Override
- public void init() throws FalconException {
- String listenerClassNames = StartupProperties.get().
- getProperty("feedAlert.listeners");
- for (String listenerClassName : listenerClassNames.split(",")) {
- listenerClassName = listenerClassName.trim();
- if (listenerClassName.isEmpty()) {
- continue;
- }
- EntitySLAListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName);
- registerListener(listener);
- }
-
- String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600");
- int statusCheckFrequencySeconds = Integer.parseInt(freq);
-
- ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
- executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds + 10, TimeUnit.SECONDS);
- }
-
- public void registerListener(EntitySLAListener listener) {
- listeners.add(listener);
- }
-
- @Override
- public void destroy() throws FalconException {
-
- }
-
-
- private class Monitor implements Runnable {
-
- @Override
- public void run() {
- processSLACandidates();
- }
- }
-
- void processSLACandidates(){
- //Get all feeds instances to be monitored
- List<PendingInstanceBean> pendingInstanceBeanList = store.getAllInstances();
- if (pendingInstanceBeanList.isEmpty()){
- return;
- }
-
- LOG.debug("In processSLACandidates :" + pendingInstanceBeanList.size());
- try{
- for (PendingInstanceBean pendingInstanceBean : pendingInstanceBeanList) {
-
- String feedName = pendingInstanceBean.getFeedName();
- String clusterName = pendingInstanceBean.getClusterName();
- Date nominalTime = pendingInstanceBean.getNominalTime();
- Feed feed = ConfigurationStore.get().get(EntityType.FEED, feedName);
-
- Cluster cluster = FeedHelper.getCluster(feed, clusterName);
-
- Set<SchedulableEntityInstance> schedulableEntityInstances= FeedSLAMonitoringService.get().
- getFeedSLAMissPendingAlerts(feed.getName(), cluster.getName(), nominalTime, nominalTime);
- if (schedulableEntityInstances.isEmpty()){
- store.deleteFeedAlertInstance(feed.getName(), cluster.getName(), nominalTime);
- return;
- }
- List<SchedulableEntityInstance> schedulableEntityList = new ArrayList<>(schedulableEntityInstances);
- SchedulableEntityInstance schedulableEntityInstance = schedulableEntityList.get(0);
-
-
- if (schedulableEntityInstance.getTags().contains(FeedSLAMonitoringService.get().TAG_WARN)) {
- store.putSLAAlertInstance(feedName, clusterName, nominalTime, true, false);
- //Mark in DB as SLA missed
- LOG.info("Feed :"+ feedName
- + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "missed SLALow");
- } else if (schedulableEntityInstance.getTags().contains(FeedSLAMonitoringService.get().TAG_CRITICAL)){
- store.updateSLAAlertInstance(feedName, clusterName, nominalTime);
- LOG.info("Feed :"+ feedName
- + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "missed SLAHigh");
- highSLAMissed(feedName, EntityType.FEED, clusterName, nominalTime);
- }
- }
- } catch (FalconException e){
- LOG.error("Exception in FeedSLAALertService:", e);
- }
-
- }
-
- @Override
- public void highSLAMissed(String feedName, EntityType entityType, String clusterName, Date nominalTime)
- throws FalconException{
- for (EntitySLAListener listener : listeners) {
- listener.highSLAMissed(feedName, entityType, clusterName, nominalTime);
- }
- store.deleteFeedAlertInstance(feedName, clusterName, nominalTime);
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
deleted file mode 100644
index ed7bb08..0000000
--- a/prism/src/main/java/org/apache/falcon/service/FeedSLAMonitoringService.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.service;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Pair;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.FeedInstanceStatus;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Sla;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
-import org.apache.falcon.persistence.MonitoredFeedsBean;
-import org.apache.falcon.persistence.PendingInstanceBean;
-import org.apache.falcon.resource.SchedulableEntityInstance;
-import org.apache.falcon.util.DateUtil;
-import org.apache.falcon.util.DeploymentUtil;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Service to monitor Feed SLAs.
- */
-public final class FeedSLAMonitoringService implements ConfigurationChangeListener, FalconService {
- private static final Logger LOG = LoggerFactory.getLogger("FeedSLA");
-
- private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore();
-
- private static final int ONE_MS = 1;
-
- private static final FeedSLAMonitoringService SERVICE = new FeedSLAMonitoringService();
-
- public static final String TAG_CRITICAL = "Missed-SLA-High";
- public static final String TAG_WARN = "Missed-SLA-Low";
-
- private FeedSLAMonitoringService() {
-
- }
-
- public static FeedSLAMonitoringService get() {
- return SERVICE;
- }
-
- /**
- * Permissions for storePath.
- */
- private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
-
-
- /**
- * Frequency in seconds of "status check" for pending feed instances.
- */
- private int statusCheckFrequencySeconds; // 10 minutes
-
-
- /**
- * Time Duration (in milliseconds) in future for generating pending feed instances.
- *
- * In every cycle pending feed instances are added for monitoring, till this time in future.
- */
- private int lookAheadWindowMillis; // 15 MINUTES
-
-
- /**
- * Filesystem used for serializing and deserializing.
- */
- private FileSystem fileSystem;
-
- /**
- * Working directory for the feed sla monitoring service.
- */
- private Path storePath;
-
- /**
- * Path to store the state of the monitoring service.
- */
- private Path filePath;
-
- @Override
- public void onAdd(Entity entity) throws FalconException {
- if (entity.getEntityType() == EntityType.FEED) {
- Feed feed = (Feed) entity;
- // currently sla service is enabled only for fileSystemStorage
- if (feed.getLocations() != null) {
- Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
- for (Cluster cluster : feed.getClusters().getClusters()) {
- if (currentClusters.contains(cluster.getName())) {
- if (FeedHelper.getSLA(cluster, feed) != null) {
- LOG.debug("Adding feed:{} for monitoring", feed.getName());
- MONITORING_JDBC_STATE_STORE.putMonitoredFeed(feed.getName());
- }
- }
- }
- }
- }
- }
-
- @Override
- public void onRemove(Entity entity) throws FalconException {
- if (entity.getEntityType() == EntityType.FEED) {
- Feed feed = (Feed) entity;
- // currently sla service is enabled only for fileSystemStorage
- if (feed.getLocations() != null) {
- Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
- for (Cluster cluster : feed.getClusters().getClusters()) {
- if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
- MONITORING_JDBC_STATE_STORE.deleteMonitoringFeed(feed.getName());
- MONITORING_JDBC_STATE_STORE.deletePendingInstances(feed.getName(), cluster.getName());
- }
- }
- }
- }
- }
-
- private boolean isSLAMonitoringEnabledInCurrentColo(Feed feed) {
- if (feed.getLocations() != null) {
- Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
- for (Cluster cluster : feed.getClusters().getClusters()) {
- if (currentClusters.contains(cluster.getName()) && FeedHelper.getSLA(cluster, feed) != null) {
- return true;
- }
- }
- }
- return false;
- }
-
- @Override
- public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
- if (newEntity.getEntityType() == EntityType.FEED) {
- Feed oldFeed = (Feed) oldEntity;
- Feed newFeed = (Feed) newEntity;
- if (!isSLAMonitoringEnabledInCurrentColo(newFeed)) {
- onRemove(oldFeed);
- } else if (!isSLAMonitoringEnabledInCurrentColo(oldFeed)) {
- onAdd(newFeed);
- } else {
- List<String> slaRemovedClusters = new ArrayList<>();
- for (String oldCluster : EntityUtil.getClustersDefinedInColos(oldFeed)) {
- if (FeedHelper.getSLA(oldCluster, oldFeed) != null
- && FeedHelper.getSLA(oldCluster, newFeed) == null) {
- slaRemovedClusters.add(oldCluster);
- }
- }
-
- for (String clusterName : slaRemovedClusters) {
- MONITORING_JDBC_STATE_STORE.deletePendingInstances(newFeed.getName(), clusterName);
- }
- }
- }
- }
-
- @Override
- public void onReload(Entity entity) throws FalconException {
- onAdd(entity);
- }
-
- @Override
- public String getName() {
- return FeedSLAMonitoringService.class.getSimpleName();
- }
-
- @Override
- public void init() throws FalconException {
- String uri = StartupProperties.get().getProperty("feed.sla.service.store.uri");
- storePath = new Path(uri);
- filePath = new Path(storePath, "feedSLAMonitoringService");
- fileSystem = initializeFileSystem();
-
- String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600");
- statusCheckFrequencySeconds = Integer.parseInt(freq);
-
- freq = StartupProperties.get().getProperty("feed.sla.lookAheadWindow.millis", "900000");
- lookAheadWindowMillis = Integer.parseInt(freq);
- LOG.debug("No old state exists at: {}, Initializing a clean state.", filePath.toString());
- ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
- executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds, TimeUnit.SECONDS);
- }
-
- public void makeFeedInstanceAvailable(String feedName, String clusterName, Date nominalTime)
- throws FalconException {
- LOG.info("Removing {} feed's instance {} in cluster {} from pendingSLA", feedName,
- clusterName, nominalTime);
- List<Date> instances = (MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName));
- // Slas for feeds not having sla tag are not stored.
- if (CollectionUtils.isEmpty(instances)){
- MONITORING_JDBC_STATE_STORE.deletePendingInstance(feedName, clusterName, nominalTime);
- }
- }
-
- private FileSystem initializeFileSystem() {
- try {
- fileSystem = HadoopClientFactory.get().createFalconFileSystem(storePath.toUri());
- if (!fileSystem.exists(storePath)) {
- LOG.info("Creating directory for pending feed instances: {}", storePath);
- // set permissions so config store dir is owned by falcon alone
- HadoopClientFactory.mkdirs(fileSystem, storePath, STORE_PERMISSION);
- }
- return fileSystem;
- } catch (Exception e) {
- throw new RuntimeException("Unable to bring up feed sla store for path: " + storePath, e);
- }
- }
-
- @Override
- public void destroy() throws FalconException {
- }
-
- //Periodically update status of pending instances, add new instances and take backup.
- private class Monitor implements Runnable {
-
- @Override
- public void run() {
- try {
- if (MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed().size() > 0) {
- checkPendingInstanceAvailability();
-
- // add Instances from last checked time to 10 minutes from now(some buffer for status check)
- Date now = new Date();
- Date newCheckPoint = new Date(now.getTime() + lookAheadWindowMillis);
- addNewPendingFeedInstances(newCheckPoint);
- }
- } catch (Throwable e) {
- LOG.error("Feed SLA monitoring failed: ", e);
- }
- }
- }
-
-
- void addNewPendingFeedInstances(Date to) throws FalconException {
- Set<String> currentClusters = DeploymentUtil.getCurrentClusters();
- List<MonitoredFeedsBean> feedsBeanList = MONITORING_JDBC_STATE_STORE.getAllMonitoredFeed();
- for(MonitoredFeedsBean monitoredFeedsBean : feedsBeanList) {
- String feedName = monitoredFeedsBean.getFeedName();
- Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
- for (Cluster feedCluster : feed.getClusters().getClusters()) {
- if (currentClusters.contains(feedCluster.getName())) {
- // get start of instances from the database
- Date nextInstanceTime = MONITORING_JDBC_STATE_STORE.getLastInstanceTime(feedName);
- Pair<String, String> key = new Pair<>(feed.getName(), feedCluster.getName());
- if (nextInstanceTime == null) {
- nextInstanceTime = getInitialStartTime(feed, feedCluster.getName());
- } else {
- nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
- }
-
- Set<Date> instances = new HashSet<>();
- org.apache.falcon.entity.v0.cluster.Cluster currentCluster =
- EntityUtil.getEntity(EntityType.CLUSTER, feedCluster.getName());
- nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime);
- Date endDate = FeedHelper.getClusterValidity(feed, currentCluster.getName()).getEnd();
- while (nextInstanceTime.before(to) && nextInstanceTime.before(endDate)) {
- LOG.debug("Adding instance={} for <feed,cluster>={}", nextInstanceTime, key);
- instances.add(nextInstanceTime);
- nextInstanceTime = new Date(nextInstanceTime.getTime() + ONE_MS);
- nextInstanceTime = EntityUtil.getNextStartTime(feed, currentCluster, nextInstanceTime);
- }
-
- for(Date date:instances){
- MONITORING_JDBC_STATE_STORE.putPendingInstances(feed.getName(), feedCluster.getName(), date);
- }
- }
- }
- }
- }
-
-
- /**
- * Checks the availability of all the pendingInstances and removes the ones which have become available.
- */
- private void checkPendingInstanceAvailability() throws FalconException {
- for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){
- for (Date date : MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getFeedName(),
- pendingInstanceBean.getClusterName())) {
- boolean status = checkFeedInstanceAvailability(pendingInstanceBean.getFeedName(),
- pendingInstanceBean.getClusterName(), date);
- if (status) {
- MONITORING_JDBC_STATE_STORE.deletePendingInstance(pendingInstanceBean.getFeedName(),
- pendingInstanceBean.getClusterName(), date);
- }
- }
- }
- }
-
- // checks whether a given feed instance is available or not
- private boolean checkFeedInstanceAvailability(String feedName, String clusterName, Date nominalTime) throws
- FalconException {
- Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
-
- try {
- LOG.debug("Checking instance availability status for feed:{}, cluster:{}, instanceTime:{}", feed.getName(),
- clusterName, nominalTime);
- FeedInstanceStatus.AvailabilityStatus status = FeedHelper.getFeedInstanceStatus(feed, clusterName,
- nominalTime);
- if (status.equals(FeedInstanceStatus.AvailabilityStatus.AVAILABLE)
- || status.equals(FeedInstanceStatus.AvailabilityStatus.EMPTY)) {
- LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is available.", feed.getName(),
- clusterName, nominalTime);
- return true;
- }
- } catch (Throwable e) {
- LOG.error("Couldn't find status for feed:{}, cluster:{}", feedName, clusterName, e);
- }
- LOG.debug("Feed instance(feed:{}, cluster:{}, instanceTime:{}) is not available.", feed.getName(),
- clusterName, nominalTime);
- return false;
- }
-
-
- /**
- * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances between given time range which have missed
- * slaLow or slaHigh.
- *
- * Only feeds which have defined sla in their definition are considered.
- * Only the feed instances between the given time range are considered.
- * Start time and end time are both inclusive.
- * @param start start time, inclusive
- * @param end end time, inclusive
- * @return Set of pending feed instances belonging to the given range which have missed SLA
- * @throws FalconException
- */
- public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(Date start, Date end)
- throws FalconException {
- Set<SchedulableEntityInstance> result = new HashSet<>();
- for(PendingInstanceBean pendingInstanceBean : MONITORING_JDBC_STATE_STORE.getAllInstances()){
- Pair<String, String> feedClusterPair = new Pair<>(pendingInstanceBean.getFeedName(),
- pendingInstanceBean.getClusterName());
- Feed feed = EntityUtil.getEntity(EntityType.FEED, feedClusterPair.first);
- Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
- Sla sla = FeedHelper.getSLA(cluster, feed);
- if (sla != null) {
- Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end,
- MONITORING_JDBC_STATE_STORE.getNominalInstances(pendingInstanceBean.getFeedName(),
- pendingInstanceBean.getClusterName()));
- for (Pair<Date, String> status : slaStatus) {
- SchedulableEntityInstance instance = new SchedulableEntityInstance(feedClusterPair.first,
- feedClusterPair.second, status.first, EntityType.FEED);
- instance.setTags(status.second);
- result.add(instance);
- }
- }
- }
- return result;
- }
-
- /**
- * Returns all {@link org.apache.falcon.entity.v0.feed.Feed} instances of a given feed between the given time range
- * which missed sla.Only those instances are included which have missed either slaLow or slaHigh.
- * @param feedName name of the feed
- * @param clusterName cluster name
- * @param start start time, inclusive
- * @param end end time, inclusive
- * @return Pending feed instances of the given feed which belong to the given time range and have missed SLA.
- * @throws FalconException
- */
- public Set<SchedulableEntityInstance> getFeedSLAMissPendingAlerts(String feedName, String clusterName,
- Date start, Date end) throws FalconException {
-
- Set<SchedulableEntityInstance> result = new HashSet<>();
- Pair<String, String> feedClusterPair = new Pair<>(feedName, clusterName);
- List<Date> missingInstances = MONITORING_JDBC_STATE_STORE.getNominalInstances(feedName, clusterName);
- Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
- Cluster cluster = FeedHelper.getCluster(feed, feedClusterPair.second);
- Sla sla = FeedHelper.getSLA(cluster, feed);
- if (missingInstances != null && sla != null) {
- Set<Pair<Date, String>> slaStatus = getSLAStatus(sla, start, end, missingInstances);
- for (Pair<Date, String> status : slaStatus){
- SchedulableEntityInstance instance = new SchedulableEntityInstance(feedName, clusterName, status.first,
- EntityType.FEED);
- instance.setTags(status.second);
- result.add(instance);
- }
- }
- return result;
- }
-
- Set<Pair<Date, String>> getSLAStatus(Sla sla, Date start, Date end, List<Date> missingInstances)
- throws FalconException {
- Date now = new Date();
- Frequency slaLow = sla.getSlaLow();
- Frequency slaHigh = sla.getSlaHigh();
- Set<Pair<Date, String>> result = new HashSet<>();
- for (Date nominalTime : missingInstances) {
- if (!nominalTime.before(start) && !nominalTime.after(end)) {
- ExpressionHelper.setReferenceDate(nominalTime);
- ExpressionHelper evaluator = ExpressionHelper.get();
- Long slaHighDuration = evaluator.evaluate(slaHigh.toString(), Long.class);
- Long slaLowDuration = evaluator.evaluate(slaLow.toString(), Long.class);
- Date slaCriticalTime = new Date(nominalTime.getTime() + slaHighDuration);
- Date slaWarnTime = new Date(nominalTime.getTime() + slaLowDuration);
- if (slaCriticalTime.before(now)) {
- result.add(new Pair<>(nominalTime, TAG_CRITICAL));
- } else if (slaWarnTime.before(now)) {
- result.add(new Pair<>(nominalTime, TAG_WARN));
- }
- }
- }
- return result;
- }
-
- @VisibleForTesting
- Date getInitialStartTime(Feed feed, String clusterName) throws FalconException {
- Sla sla = FeedHelper.getSLA(clusterName, feed);
- if (sla == null) {
- throw new IllegalStateException("InitialStartTime can not be determined as the feed: "
- + feed.getName() + " and cluster: " + clusterName + " does not have any sla");
- }
- Date startTime = FeedHelper.getFeedValidityStart(feed, clusterName);
- Frequency slaLow = sla.getSlaLow();
- Date slaTime = new Date(DateUtil.now().getTime() - DateUtil.getFrequencyInMillis(slaLow));
- return startTime.before(slaTime) ? startTime : slaTime;
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
index a4a95be..8cf2b2d 100644
--- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
+++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
@@ -25,6 +25,7 @@ import javax.persistence.Query;
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.service.FalconJPAService;
import org.apache.falcon.tools.FalconStateStoreDBCLI;
@@ -85,40 +86,44 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
@Test
public void testInsertRetrieveAndUpdate() throws Exception {
- monitoringJdbcStateStore.putMonitoredFeed("test_feed1");
- monitoringJdbcStateStore.putMonitoredFeed("test_feed2");
- Assert.assertEquals("test_feed1", monitoringJdbcStateStore.getMonitoredFeed("test_feed1").getFeedName());
+ monitoringJdbcStateStore.putMonitoredEntity("test_feed1", EntityType.FEED.toString());
+ monitoringJdbcStateStore.putMonitoredEntity("test_feed2", EntityType.FEED.toString());
+ Assert.assertEquals("test_feed1", monitoringJdbcStateStore.getMonitoredEntity("test_feed1",
+ EntityType.FEED.toString()).getFeedName());
Assert.assertEquals(monitoringJdbcStateStore.getAllMonitoredFeed().size(), 2);
- monitoringJdbcStateStore.deleteMonitoringFeed("test_feed1");
- monitoringJdbcStateStore.deleteMonitoringFeed("test_feed2");
+ monitoringJdbcStateStore.deleteMonitoringEntity("test_feed1", EntityType.FEED.toString());
+ monitoringJdbcStateStore.deleteMonitoringEntity("test_feed2", EntityType.FEED.toString());
Date dateOne = SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
Date dateTwo = SchemaHelper.parseDateUTC("2015-11-20T01:00Z");
- monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateOne);
- monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateTwo);
-
- Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 2);
- monitoringJdbcStateStore.deletePendingInstance("test_feed1", "test_cluster", dateOne);
- Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster").size(), 1);
- monitoringJdbcStateStore.deletePendingInstances("test_feed1", "test_cluster");
+ monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateOne, EntityType.FEED.toString());
+ monitoringJdbcStateStore.putPendingInstances("test_feed1", "test_cluster", dateTwo, EntityType.FEED.toString());
+
+ Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster",
+ EntityType.FEED.toString()).size(), 2);
+ monitoringJdbcStateStore.deletePendingInstance("test_feed1", "test_cluster", dateOne,
+ EntityType.FEED.toString());
+ Assert.assertEquals(monitoringJdbcStateStore.getNominalInstances("test_feed1", "test_cluster",
+ EntityType.FEED.toString()).size(), 1);
+ monitoringJdbcStateStore.deletePendingInstances("test_feed1", "test_cluster", EntityType.FEED.toString());
}
@Test
public void testEmptyLatestInstance() throws Exception {
MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
- store.putMonitoredFeed("test-feed1");
- store.putMonitoredFeed("test-feed2");
- Assert.assertNull(store.getLastInstanceTime("test-feed1"));
+ store.putMonitoredEntity("test-feed1", EntityType.FEED.toString());
+ store.putMonitoredEntity("test-feed2", EntityType.FEED.toString());
+ Assert.assertNull(store.getLastInstanceTime("test-feed1", EntityType.FEED.toString()));
Date dateOne = SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
Date dateTwo = SchemaHelper.parseDateUTC("2015-11-20T01:00Z");
- store.putPendingInstances("test-feed1", "test_cluster", dateTwo);
- store.putPendingInstances("test-feed1", "test_cluster", dateOne);
- store.putPendingInstances("test-feed2", "test_cluster", dateOne);
+ store.putPendingInstances("test-feed1", "test_cluster", dateTwo, EntityType.FEED.toString());
+ store.putPendingInstances("test-feed1", "test_cluster", dateOne, EntityType.FEED.toString());
+ store.putPendingInstances("test-feed2", "test_cluster", dateOne, EntityType.FEED.toString());
- Assert.assertTrue(dateTwo.equals(store.getLastInstanceTime("test-feed1")));
- Assert.assertTrue(dateOne.equals(store.getLastInstanceTime("test-feed2")));
+ Assert.assertTrue(dateTwo.equals(store.getLastInstanceTime("test-feed1", EntityType.FEED.toString())));
+ Assert.assertTrue(dateOne.equals(store.getLastInstanceTime("test-feed2", EntityType.FEED.toString())));
}
@@ -126,14 +131,15 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
public void testputSLALowCandidate() throws Exception{
MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
Date dateOne = SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
- store.putSLAAlertInstance("test-feed1", "test-cluster", dateOne, Boolean.TRUE, Boolean.FALSE);
- Assert.assertEquals(Boolean.TRUE, store.getFeedAlertInstance("test-feed1",
- "test-cluster", dateOne).getIsSLALowMissed());
- Assert.assertTrue(dateOne.equals(store.getFeedAlertInstance("test-feed1",
- "test-cluster", dateOne).getNominalTime()));
- store.updateSLAAlertInstance("test-feed1", "test-cluster", dateOne);
- Assert.assertEquals(Boolean.TRUE, store.getFeedAlertInstance("test-feed1",
- "test-cluster", dateOne).getIsSLAHighMissed());
+ store.putSLAAlertInstance("test-feed1", "test-cluster", EntityType.FEED.toString(),
+ dateOne, Boolean.TRUE, Boolean.FALSE);
+ Assert.assertEquals(Boolean.TRUE, store.getEntityAlertInstance("test-feed1",
+ "test-cluster", dateOne, EntityType.FEED.toString()).getIsSLALowMissed());
+ Assert.assertTrue(dateOne.equals(store.getEntityAlertInstance("test-feed1",
+ "test-cluster", dateOne, EntityType.FEED.toString()).getNominalTime()));
+ store.updateSLAAlertInstance("test-feed1", "test-cluster", dateOne, EntityType.FEED.toString());
+ Assert.assertEquals(Boolean.TRUE, store.getEntityAlertInstance("test-feed1",
+ "test-cluster", dateOne, EntityType.FEED.toString()).getIsSLAHighMissed());
}
@Test
@@ -141,21 +147,22 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
Date dateOne = SchemaHelper.parseDateUTC("2015-11-20T00:00Z");
- store.putSLAAlertInstance("test-feed1", "test-cluster", dateOne, Boolean.TRUE, Boolean.FALSE);
- store.updateSLAAlertInstance("test-feed1", "test-cluster", dateOne);
- Assert.assertEquals(Boolean.TRUE, store.getFeedAlertInstance("test-feed1",
- "test-cluster", dateOne).getIsSLAHighMissed());
+ store.putSLAAlertInstance("test-process", "test-cluster", EntityType.PROCESS.toString(),
+ dateOne, Boolean.TRUE, Boolean.FALSE);
+ store.updateSLAAlertInstance("test-process", "test-cluster", dateOne, EntityType.PROCESS.toString());
+ Assert.assertEquals(Boolean.TRUE, store.getEntityAlertInstance("test-process",
+ "test-cluster", dateOne, EntityType.PROCESS.toString()).getIsSLAHighMissed());
}
private void clear() {
EntityManager em = FalconJPAService.get().getEntityManager();
em.getTransaction().begin();
try {
- Query query = em.createNativeQuery("delete from MONITORED_FEEDS");
+ Query query = em.createNativeQuery("delete from PENDING_INSTANCES");
query.executeUpdate();
- query = em.createNativeQuery("delete from PENDING_INSTANCES");
+ query = em.createNativeQuery("delete from MONITORED_ENTITY");
query.executeUpdate();
- query = em.createNativeQuery("delete from FEED_SLA_ALERTS");
+ query = em.createNativeQuery("delete from ENTITY_SLA_ALERTS");
query.executeUpdate();
} finally {
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java
new file mode 100644
index 0000000..c8b4f5e
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/service/EntitySLAAlertServiceTest.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Sla;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
+import org.apache.falcon.tools.FalconStateStoreDBCLI;
+import org.apache.falcon.util.StateStoreProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.io.File;
+import java.util.Date;
+
+/**
+ * Test for EntitySLAMonitoringService.
+ */
+public class EntitySLAAlertServiceTest extends AbstractTestBase {
+ private static final String DB_BASE_DIR = "target/test-data/persistancedb";
+ protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
+ protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
+ protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
+ protected LocalFileSystem fs = new LocalFileSystem();
+
+ private static MonitoringJdbcStateStore monitoringJdbcStateStore;
+ private static FalconJPAService falconJPAService = FalconJPAService.get();
+
+ protected int execDBCLICommands(String[] args) {
+ return new FalconStateStoreDBCLI().run(args);
+ }
+
+ public void createDB(String file) {
+ File sqlFile = new File(file);
+ String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" };
+ int result = execDBCLICommands(argsCreate);
+ Assert.assertEquals(0, result);
+ Assert.assertTrue(sqlFile.exists());
+
+ }
+
+ @BeforeClass
+ public void setup() throws Exception{
+ StateStoreProperties.get().setProperty(FalconJPAService.URL, url);
+ Configuration localConf = new Configuration();
+ fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
+ fs.mkdirs(new Path(DB_BASE_DIR));
+ createDB(DB_SQL_FILE);
+ falconJPAService.init();
+ this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+ this.conf = dfsCluster.getConf();
+ monitoringJdbcStateStore = new MonitoringJdbcStateStore();
+ }
+
+ @BeforeMethod
+ public void init() {
+ clear();
+ }
+
+ private void clear() {
+ EntityManager em = FalconJPAService.get().getEntityManager();
+ em.getTransaction().begin();
+ try {
+ Query query = em.createNativeQuery("delete from PENDING_INSTANCES");
+ query.executeUpdate();
+ query = em.createNativeQuery("delete from MONITORED_ENTITY");
+ query.executeUpdate();
+ query = em.createNativeQuery("delete from ENTITY_SLA_ALERTS");
+ query.executeUpdate();
+
+ } finally {
+ em.getTransaction().commit();
+ em.close();
+ }
+ }
+
+ @Test
+ public static void processSLALowCandidates() throws FalconException, InterruptedException{
+
+ Date dateOne = new Date(System.currentTimeMillis()-100000);
+ monitoringJdbcStateStore.putPendingInstances("test-feed", "test-cluster", dateOne, EntityType.FEED.toString());
+ org.apache.falcon.entity.v0.feed.Clusters cluster = new org.apache.falcon.entity.v0.feed.Clusters();
+ org.apache.falcon.entity.v0.cluster.Cluster cluster1 = new org.apache.falcon.entity.v0.cluster.Cluster();
+ cluster1.setName("test-cluster");
+ Cluster testCluster = new Cluster();
+ testCluster.setName("test-cluster");
+ cluster.getClusters().add(testCluster);
+ Feed mockEntity = new Feed();
+ mockEntity.setName("test-feed");
+ mockEntity.setClusters(cluster);
+ cluster1.setColo("test-cluster");
+
+
+ if (ConfigurationStore.get().get(EntityType.FEED, mockEntity.getName()) == null) {
+ ConfigurationStore.get().publish(EntityType.FEED, mockEntity);
+ }
+ if (ConfigurationStore.get().get(EntityType.CLUSTER, cluster1.getName()) == null) {
+ ConfigurationStore.get().publish(EntityType.CLUSTER, cluster1);
+ }
+ Sla sla = new Sla();
+ Frequency frequencyLow = new Frequency("1", Frequency.TimeUnit.minutes);
+ Frequency frequencyHigh = new Frequency("2", Frequency.TimeUnit.minutes);
+ sla.setSlaLow(frequencyLow);
+ sla.setSlaHigh(frequencyHigh);
+ mockEntity.setSla(sla);
+
+ EntitySLAAlertService.get().init();
+ Thread.sleep(10*1000);
+ Assert.assertTrue(monitoringJdbcStateStore.getEntityAlertInstance("test-feed", "test-cluster",
+ dateOne, EntityType.FEED.toString()).getIsSLALowMissed());
+ }
+
+ @Test
+ public static void processSLACandidateProcess() throws FalconException, InterruptedException{
+ Date dateOne = new Date(System.currentTimeMillis()-130000);
+
+ monitoringJdbcStateStore.putPendingInstances("test-process", "test-cluster", dateOne,
+ EntityType.PROCESS.name());
+ EntitySLAAlertService.get().init();
+ org.apache.falcon.entity.v0.process.Clusters cluster = new org.apache.falcon.entity.v0.process.Clusters();
+ org.apache.falcon.entity.v0.cluster.Cluster processCluster = new org.apache.falcon.entity.v0.cluster.Cluster();
+ processCluster.setName("test-cluster");
+ org.apache.falcon.entity.v0.process.Cluster testCluster = new org.apache.falcon.entity.v0.process.Cluster();
+ testCluster.setName("test-cluster");
+ cluster.getClusters().add(testCluster);
+ Process process = new Process();
+ process.setName("test-process");
+ process.setClusters(cluster);
+ processCluster.setColo("test-cluster");
+
+ if (ConfigurationStore.get().get(EntityType.PROCESS, process.getName()) == null){
+ ConfigurationStore.get().publish(EntityType.PROCESS, process);
+ }
+ if (ConfigurationStore.get().get(EntityType.CLUSTER, processCluster.getName()) == null) {
+ ConfigurationStore.get().publish(EntityType.CLUSTER, processCluster);
+ }
+ org.apache.falcon.entity.v0.process.Sla sla = new org.apache.falcon.entity.v0.process.Sla();
+ Frequency processFrequency = new Frequency("1", Frequency.TimeUnit.minutes);
+ sla.setShouldEndIn(processFrequency);
+ process.setSla(sla);
+
+
+ Thread.sleep(10*1000);
+ Assert.assertTrue(monitoringJdbcStateStore.getEntityAlertInstance("test-process", "test-cluster", dateOne,
+ EntityType.PROCESS.name()).getIsSLAHighMissed());
+
+ }
+
+ @Test(expectedExceptions = javax.persistence.NoResultException.class)
+ public static void processSLAHighCandidates() throws FalconException, InterruptedException{
+
+ Date dateOne = new Date(System.currentTimeMillis()-130000);
+ monitoringJdbcStateStore.putPendingInstances("test-feed", "test-cluster", dateOne, EntityType.FEED.toString());
+ org.apache.falcon.entity.v0.feed.Clusters cluster = new org.apache.falcon.entity.v0.feed.Clusters();
+ org.apache.falcon.entity.v0.cluster.Cluster cluster1 = new org.apache.falcon.entity.v0.cluster.Cluster();
+ cluster1.setName("test-cluster");
+ Cluster testCluster = new Cluster();
+ testCluster.setName("test-cluster");
+ cluster.getClusters().add(testCluster);
+ Feed mockEntity = new Feed();
+ mockEntity.setName("test-feed");
+ mockEntity.setClusters(cluster);
+ cluster1.setColo("test-cluster");
+ if (ConfigurationStore.get().get(EntityType.FEED, mockEntity.getName()) == null) {
+ ConfigurationStore.get().publish(EntityType.FEED, mockEntity);
+ }
+ if (ConfigurationStore.get().get(EntityType.CLUSTER, cluster1.getName()) == null) {
+ ConfigurationStore.get().publish(EntityType.CLUSTER, cluster1);
+ }
+ Sla sla = new Sla();
+ Frequency frequencyLow = new Frequency("1", Frequency.TimeUnit.minutes);
+ Frequency frequencyHigh = new Frequency("2", Frequency.TimeUnit.minutes);
+ sla.setSlaLow(frequencyLow);
+ sla.setSlaHigh(frequencyHigh);
+ mockEntity.setSla(sla);
+
+ EntitySLAAlertService.get().init();
+ Thread.sleep(10*1000);
+ Assert.assertTrue(monitoringJdbcStateStore.getEntityAlertInstance("test-feed", "test-cluster",
+ dateOne, EntityType.FEED.toString()).getIsSLAHighMissed());
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java
deleted file mode 100644
index 7c886c1..0000000
--- a/prism/src/test/java/org/apache/falcon/service/FeedSLAAlertServiceTest.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.service;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.AbstractTestBase;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Sla;
-import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
-import org.apache.falcon.tools.FalconStateStoreDBCLI;
-import org.apache.falcon.util.StateStoreProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-import java.io.File;
-import java.util.Date;
-
-/**
- * Test for FeedSLAMonitoringService.
- */
-public class FeedSLAAlertServiceTest extends AbstractTestBase {
- private static final String DB_BASE_DIR = "target/test-data/persistancedb";
- protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
- protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
- protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
- protected LocalFileSystem fs = new LocalFileSystem();
-
- private static MonitoringJdbcStateStore monitoringJdbcStateStore;
- private static FalconJPAService falconJPAService = FalconJPAService.get();
-
- protected int execDBCLICommands(String[] args) {
- return new FalconStateStoreDBCLI().run(args);
- }
-
- public void createDB(String file) {
- File sqlFile = new File(file);
- String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" };
- int result = execDBCLICommands(argsCreate);
- Assert.assertEquals(0, result);
- Assert.assertTrue(sqlFile.exists());
-
- }
-
- @BeforeClass
- public void setup() throws Exception{
- StateStoreProperties.get().setProperty(FalconJPAService.URL, url);
- Configuration localConf = new Configuration();
- fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
- fs.mkdirs(new Path(DB_BASE_DIR));
- createDB(DB_SQL_FILE);
- falconJPAService.init();
- this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
- this.conf = dfsCluster.getConf();
- monitoringJdbcStateStore = new MonitoringJdbcStateStore();
- }
-
- @BeforeMethod
- public void init() {
- clear();
- }
-
- private void clear() {
- EntityManager em = FalconJPAService.get().getEntityManager();
- em.getTransaction().begin();
- try {
- Query query = em.createNativeQuery("delete from MONITORED_FEEDS");
- query.executeUpdate();
- query = em.createNativeQuery("delete from PENDING_INSTANCES");
- query.executeUpdate();
- query = em.createNativeQuery("delete from FEED_SLA_ALERTS");
- query.executeUpdate();
-
- } finally {
- em.getTransaction().commit();
- em.close();
- }
- }
-
- @Test
- public static void processSLALowCandidates() throws FalconException, InterruptedException{
-
- Date dateOne = new Date(System.currentTimeMillis()-100000);
- monitoringJdbcStateStore.putPendingInstances("test-feed", "test-cluster", dateOne);
- org.apache.falcon.entity.v0.feed.Clusters cluster = new org.apache.falcon.entity.v0.feed.Clusters();
- Cluster testCluster = new Cluster();
- testCluster.setName("test-cluster");
- cluster.getClusters().add(testCluster);
- Feed mockEntity = new Feed();
- mockEntity.setName("test-feed");
- mockEntity.setClusters(cluster);
- if (ConfigurationStore.get().get(EntityType.FEED, mockEntity.getName()) == null) {
- ConfigurationStore.get().publish(EntityType.FEED, mockEntity);
- }
- Sla sla = new Sla();
- Frequency frequencyLow = new Frequency("1", Frequency.TimeUnit.minutes);
- Frequency frequencyHigh = new Frequency("2", Frequency.TimeUnit.minutes);
- sla.setSlaLow(frequencyLow);
- sla.setSlaHigh(frequencyHigh);
- mockEntity.setSla(sla);
-
- FeedSLAAlertService.get().init();
- Thread.sleep(10*1000);
- Assert.assertTrue(monitoringJdbcStateStore.getFeedAlertInstance("test-feed", "test-cluster",
- dateOne).getIsSLALowMissed());
- }
-
- @Test(expectedExceptions = javax.persistence.NoResultException.class)
- public static void processSLAHighCandidates() throws FalconException, InterruptedException{
-
- Date dateOne = new Date(System.currentTimeMillis()-130000);
- monitoringJdbcStateStore.putPendingInstances("test-feed", "test-cluster", dateOne);
- org.apache.falcon.entity.v0.feed.Clusters cluster = new org.apache.falcon.entity.v0.feed.Clusters();
- Cluster testCluster = new Cluster();
- testCluster.setName("test-cluster");
- cluster.getClusters().add(testCluster);
- Feed mockEntity = new Feed();
- mockEntity.setName("test-feed");
- mockEntity.setClusters(cluster);
- if (ConfigurationStore.get().get(EntityType.FEED, mockEntity.getName()) == null) {
- ConfigurationStore.get().publish(EntityType.FEED, mockEntity);
- }
- Sla sla = new Sla();
- Frequency frequencyLow = new Frequency("1", Frequency.TimeUnit.minutes);
- Frequency frequencyHigh = new Frequency("2", Frequency.TimeUnit.minutes);
- sla.setSlaLow(frequencyLow);
- sla.setSlaHigh(frequencyHigh);
- mockEntity.setSla(sla);
-
- FeedSLAAlertService.get().init();
- Thread.sleep(10*1000);
- Assert.assertTrue(monitoringJdbcStateStore.getFeedAlertInstance("test-feed", "test-cluster",
- dateOne).getIsSLAHighMissed());
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
index 97cc459..9cf50c2 100644
--- a/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
+++ b/prism/src/test/java/org/apache/falcon/service/FeedSLAMonitoringTest.java
@@ -54,7 +54,7 @@ public class FeedSLAMonitoringTest extends AbstractTestBase {
private static final String CLUSTER_NAME = "testCluster";
private static final String FEED_NAME = "testFeed";
private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
- private static final String TAG_CRITICAL = FeedSLAMonitoringService.get().TAG_CRITICAL;
+ private static final String TAG_CRITICAL = EntitySLAMonitoringService.get().TAG_CRITICAL;
@Test
public void testSLAStatus() throws FalconException {
@@ -74,7 +74,8 @@ public class FeedSLAMonitoringTest extends AbstractTestBase {
missingInstances.add(SchemaHelper.parseDateUTC("2015-05-05T00:00Z")); // equal to end time
missingInstances.add(SchemaHelper.parseDateUTC("2015-05-06T00:00Z")); // after end time
- Set<Pair<Date, String>> result = FeedSLAMonitoringService.get().getSLAStatus(sla, start, end, missingInstances);
+ Set<Pair<Date, String>> result = EntitySLAMonitoringService.get().getFeedSLAStatus(sla, start, end,
+ missingInstances);
Set<Pair<Date, String>> expected = new HashSet<>();
expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-05T00:00Z"), TAG_CRITICAL));
expected.add(new Pair<>(SchemaHelper.parseDateUTC("2014-05-06T00:00Z"), TAG_CRITICAL));
[3/3] falcon git commit: FALCON-2052 Process SLA monitoring
Posted by aj...@apache.org.
FALCON-2052 Process SLA monitoring
Author: Praveen Adlakha <ad...@gmail.com>
Reviewers: Ajay Yadava <aj...@apache.org>, Pallavi Rao
Closes #202 from PraveenAdlakha/2052
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/60e2f68b
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/60e2f68b
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/60e2f68b
Branch: refs/heads/master
Commit: 60e2f68b867476f600ba43dc4dafb97971a78b2e
Parents: bd32b61
Author: Praveen Adlakha <ad...@gmail.com>
Authored: Wed Jul 6 08:51:03 2016 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Wed Jul 6 08:51:03 2016 +0530
----------------------------------------------------------------------
client/src/main/resources/process-0.1.xsd | 4 +-
.../org/apache/falcon/entity/ProcessHelper.java | 36 ++
.../falcon/persistence/EntitySLAAlertBean.java | 168 +++++
.../falcon/persistence/FeedSLAAlertBean.java | 134 ----
.../falcon/persistence/MonitoredEntityBean.java | 103 +++
.../falcon/persistence/MonitoredFeedsBean.java | 73 ---
.../falcon/persistence/PendingInstanceBean.java | 58 +-
.../persistence/PersistenceConstants.java | 10 +-
.../falcon/tools/FalconStateStoreDBCLI.java | 4 +-
.../src/main/resources/META-INF/persistence.xml | 18 +-
common/src/main/resources/startup.properties | 4 +-
.../apache/falcon/entity/AbstractTestBase.java | 2 +-
.../entity/store/FeedLocationStoreTest.java | 2 +-
docs/src/site/twiki/FalconNativeScheduler.twiki | 2 +-
docs/src/site/twiki/FeedSLAMonitoring.twiki | 2 +-
.../falcon/handler/SLAMonitoringHandler.java | 13 +-
.../falcon/jdbc/MonitoringJdbcStateStore.java | 134 ++--
.../AbstractSchedulableEntityManager.java | 8 +-
.../proxy/SchedulableEntityManagerProxy.java | 2 +-
.../falcon/service/EntitySLAAlertService.java | 168 +++++
.../falcon/service/EntitySLAListener.java | 3 +-
.../service/EntitySLAMonitoringService.java | 644 +++++++++++++++++++
.../falcon/service/FeedSLAAlertService.java | 163 -----
.../service/FeedSLAMonitoringService.java | 450 -------------
.../jdbc/MonitoringJdbcStateStoreTest.java | 77 ++-
.../service/EntitySLAAlertServiceTest.java | 213 ++++++
.../falcon/service/FeedSLAAlertServiceTest.java | 162 -----
.../falcon/service/FeedSLAMonitoringTest.java | 5 +-
src/build/findbugs-exclude.xml | 4 +-
src/conf/startup.properties | 6 +-
30 files changed, 1544 insertions(+), 1128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/client/src/main/resources/process-0.1.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/process-0.1.xsd b/client/src/main/resources/process-0.1.xsd
index 0d01e33..7ed8474 100644
--- a/client/src/main/resources/process-0.1.xsd
+++ b/client/src/main/resources/process-0.1.xsd
@@ -204,8 +204,7 @@
</xs:documentation>
</xs:annotation>
<xs:sequence>
- <xs:element type="cluster" name="cluster" maxOccurs="unbounded" minOccurs="1">
- </xs:element>
+ <xs:element type="cluster" name="cluster" maxOccurs="unbounded" minOccurs="1"/>
</xs:sequence>
</xs:complexType>
@@ -218,6 +217,7 @@
</xs:annotation>
<xs:sequence>
<xs:element type="validity" name="validity"/>
+ <xs:element type="sla" name="sla" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
<xs:attribute type="IDENTIFIER" name="name" use="required"/>
<xs:attribute type="xs:int" name="version" use="optional" default="0"/>
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
index bbfca68..e563d18 100644
--- a/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ProcessHelper.java
@@ -26,6 +26,8 @@ import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Validity;
+import org.apache.falcon.entity.v0.process.Sla;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.resource.SchedulableEntityInstance;
@@ -185,4 +187,38 @@ public final class ProcessHelper {
}
return result;
}
+
+ public static Validity getClusterValidity(Process process, String clusterName) throws FalconException {
+ org.apache.falcon.entity.v0.process.Cluster cluster = getCluster(process, clusterName);
+ if (cluster == null) {
+ throw new FalconException("Invalid cluster: " + clusterName + " for process: " + process.getName());
+ }
+ return cluster.getValidity();
+ }
+
+ public static Sla getSLA(String clusterName, Process process) throws FalconException{
+ Cluster cluster = getCluster(process, clusterName);
+ if (cluster == null){
+ throw new FalconException("Invalid cluster: " + clusterName + " for process: " + process.getName());
+ }
+ return getSLA(cluster, process);
+ }
+
+ public static Sla getSLA(Cluster cluster, Process process) {
+ final Sla clusterSla = cluster.getSla();
+ if (clusterSla != null) {
+ return clusterSla;
+ }
+ return process.getSla();
+ }
+
+ public static Date getProcessValidityStart(Process process, String clusterName) throws FalconException {
+ Cluster processCluster = getCluster(process, clusterName);
+ if (processCluster != null) {
+ return processCluster.getValidity().getStart();
+ } else {
+ throw new FalconException("No matching cluster " + clusterName
+ + "found for process " + process.getName());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java
new file mode 100644
index 0000000..e2096fe
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/EntitySLAAlertBean.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.persistence;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.EntityType;
+
+import java.util.Date;
+
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Feed SLA monitoring.
+ * */
+@Entity
+@NamedQueries({
+@NamedQuery(name = PersistenceConstants.GET_ENTITY_ALERTS, query = "select OBJECT(a) from EntitySLAAlertBean a where a.entityName = :entityName and a.entityType = :entityType"),
+@NamedQuery(name = PersistenceConstants.GET_ALL_ENTITY_ALERTS, query = "OBJECT(a) from PendingInstanceBean a "),
+@NamedQuery(name = PersistenceConstants.GET_SLA_HIGH_CANDIDATES, query = "select OBJECT(a) from EntitySLAAlertBean a where a.isSLALowMissed = true and a.isSLAHighMissed = false "),
+ @NamedQuery(name = PersistenceConstants.UPDATE_SLA_HIGH, query = "update EntitySLAAlertBean a set a.isSLAHighMissed = true where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
+@NamedQuery(name = PersistenceConstants.GET_ENTITY_ALERT_INSTANCE, query = "select OBJECT(a) from EntitySLAAlertBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
+ @NamedQuery(name = PersistenceConstants.DELETE_ENTITY_ALERT_INSTANCE, query = "delete from EntitySLAAlertBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
+})
+@Table(name = "ENTITY_SLA_ALERTS")
+//RESUME CHECKSTYLE CHECK LineLengthCheck
+public class EntitySLAAlertBean {
+ @NotNull
+ @GeneratedValue(strategy = GenerationType.AUTO)
+ @Id
+ private String id;
+
+ @Basic
+ @NotNull
+ @Column(name = "entity_name")
+ private String entityName;
+
+ @Basic
+ @NotNull
+ @Column(name = "cluster_name")
+ private String clusterName;
+
+ public String getEntityType() {
+ return entityType;
+ }
+
+ public void setEntityType(String entityType) throws FalconException {
+ checkEntityType(entityType);
+ this.entityType = entityType;
+ }
+
+ @Basic
+ @NotNull
+ @Column(name = "entity_type")
+ private String entityType;
+
+ @Basic
+ @NotNull
+ @Column(name = "nominal_time")
+ private Date nominalTime;
+
+ @Basic
+ @Column(name = "sla_low_missed")
+ private Boolean isSLALowMissed = false;
+
+ @Basic
+ @Column(name = "sla_high_missed")
+ private Boolean isSLAHighMissed = false;
+
+ @Basic
+ @Column(name = "sla_low_alert_sent")
+ private Boolean slaLowAlertSent;
+
+
+ @Basic
+ @Column(name = "sla_high_alert_sent")
+ private Boolean slaHighAlertSent;
+
+ public Date getNominalTime() {
+ return new Date(nominalTime.getTime());
+ }
+
+ public void setNominalTime(Date nominalTime) {
+ this.nominalTime = new Date(nominalTime.getTime());
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getEntityName() {
+ return entityName;
+ }
+
+ public void setEntityName(String entityName) {
+ this.entityName = entityName;
+ }
+
+ public Boolean getIsSLALowMissed() {
+ return isSLALowMissed;
+ }
+
+ public void setIsSLALowMissed(Boolean isSLALowMissed) {
+ this.isSLALowMissed = isSLALowMissed;
+ }
+
+ public Boolean getIsSLAHighMissed() {
+ return isSLAHighMissed;
+ }
+
+ public void setIsSLAHighMissed(Boolean isSLAHighMissed) {
+ this.isSLAHighMissed = isSLAHighMissed;
+ }
+
+ public static final String ENTITYNAME = "entityName";
+
+ public static final String CLUSTERNAME = "clusterName";
+
+ public static final String ENTITYTYPE = "entityType";
+
+ public static final String NOMINALTIME = "nominalTime";
+
+ void checkEntityType(String entityType)throws FalconException{
+ if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){
+ return;
+ } else {
+ throw new FalconException("EntityType"+ entityType
+ + " is not valid,Feed and Process are the valid input type.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java b/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java
deleted file mode 100644
index 4ea3454..0000000
--- a/common/src/main/java/org/apache/falcon/persistence/FeedSLAAlertBean.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.persistence;
-
-import java.util.Date;
-
-import javax.persistence.Basic;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.NamedQueries;
-import javax.persistence.NamedQuery;
-import javax.persistence.Table;
-import javax.validation.constraints.NotNull;
-
-//SUSPEND CHECKSTYLE CHECK LineLengthCheck
-/**
- * Feed SLA monitoring.
- * */
-@Entity
-@NamedQueries({
-@NamedQuery(name = PersistenceConstants.GET_FEED_ALERTS, query = "select OBJECT(a) from FeedSLAAlertBean a where a.feedName = :feedName"),
-@NamedQuery(name = PersistenceConstants.GET_ALL_FEED_ALERTS, query = "OBJECT(a) from PendingInstanceBean a "),
-@NamedQuery(name = PersistenceConstants.GET_SLA_HIGH_CANDIDATES, query = "select OBJECT(a) from FeedSLAAlertBean a where a.isSLALowMissed = true and a.isSLAHighMissed = false "),
- @NamedQuery(name = PersistenceConstants.UPDATE_SLA_HIGH, query = "update FeedSLAAlertBean a set a.isSLAHighMissed = true where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"),
-@NamedQuery(name = PersistenceConstants.GET_FEED_ALERT_INSTANCE, query = "select OBJECT(a) from FeedSLAAlertBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime "),
- @NamedQuery(name = PersistenceConstants.DELETE_FEED_ALERT_INSTANCE, query = "delete from FeedSLAAlertBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime")
-})
-@Table(name = "FEED_SLA_ALERTS")
-//RESUME CHECKSTYLE CHECK LineLengthCheck
-public class FeedSLAAlertBean {
- @NotNull
- @GeneratedValue(strategy = GenerationType.AUTO)
- @Id
- private String id;
-
- @Basic
- @NotNull
- @Column(name = "feed_name")
- private String feedName;
-
- @Basic
- @NotNull
- @Column(name = "cluster_name")
- private String clusterName;
-
- @Basic
- @NotNull
- @Column(name = "nominal_time")
- private Date nominalTime;
-
- @Basic
- @Column(name = "sla_low_missed")
- private Boolean isSLALowMissed = false;
-
- @Basic
- @Column(name = "sla_high_missed")
- private Boolean isSLAHighMissed = false;
-
- @Basic
- @Column(name = "sla_low_alert_sent")
- private Boolean slaLowAlertSent;
-
-
- @Basic
- @Column(name = "sla_high_alert_sent")
- private Boolean slaHighAlertSent;
-
- public Date getNominalTime() {
- return new Date(nominalTime.getTime());
- }
-
- public void setNominalTime(Date nominalTime) {
- this.nominalTime = new Date(nominalTime.getTime());
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getClusterName() {
- return clusterName;
- }
-
- public void setClusterName(String clusterName) {
- this.clusterName = clusterName;
- }
-
- public String getFeedName() {
- return feedName;
- }
-
- public void setFeedName(String feedName) {
- this.feedName = feedName;
- }
-
- public Boolean getIsSLALowMissed() {
- return isSLALowMissed;
- }
-
- public void setIsSLALowMissed(Boolean isSLALowMissed) {
- this.isSLALowMissed = isSLALowMissed;
- }
-
- public Boolean getIsSLAHighMissed() {
- return isSLAHighMissed;
- }
-
- public void setIsSLAHighMissed(Boolean isSLAHighMissed) {
- this.isSLAHighMissed = isSLAHighMissed;
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
new file mode 100644
index 0000000..20ce537
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/MonitoredEntityBean.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.EntityType;
+
+import javax.persistence.Entity;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Column;
+import javax.persistence.Basic;
+import javax.validation.constraints.NotNull;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+* The Feeds that are to be monitered will be stored in the db.
+* */
+
+@Entity
+@NamedQueries({
+ @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from "
+ + "MonitoredEntityBean a where a.entityName = :entityName and a.entityType = :entityType"),
+ @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredEntityBean "
+ + "a where a.entityName = :entityName and a.entityType = :entityType"),
+ @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select OBJECT(a) "
+ + "from MonitoredEntityBean a")
+})
+@Table(name="MONITORED_ENTITY")
+//RESUME CHECKSTYLE CHECK LineLengthCheck
+public class MonitoredEntityBean {
+ @NotNull
+ @GeneratedValue(strategy = GenerationType.AUTO)
+ @Id
+ private String id;
+
+ @Basic
+ @NotNull
+ @Column(name = "entity_name")
+ private String entityName;
+
+ public String getEntityType() {
+ return entityType;
+ }
+
+ public void setEntityType(String entityType) throws FalconException {
+ checkEntityType(entityType);
+ this.entityType = entityType;
+ }
+
+ @Basic
+ @NotNull
+ @Column(name = "entity_type")
+ private String entityType;
+
+ public String getFeedName() {
+ return entityName;
+ }
+
+ public void setEntityName(String feedName) {
+ this.entityName = feedName;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public static final String ENTITYNAME = "entityName";
+
+ public static final String ENTITYTYPE = "entityType";
+
+ void checkEntityType(String entityType)throws FalconException {
+ if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){
+ return;
+ } else {
+ throw new FalconException("EntityType"+ entityType
+ + " is not valid,Feed and Process are the valid input type.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java b/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
deleted file mode 100644
index 2b48569..0000000
--- a/common/src/main/java/org/apache/falcon/persistence/MonitoredFeedsBean.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.persistence;
-
-import javax.persistence.Entity;
-import javax.persistence.NamedQueries;
-import javax.persistence.NamedQuery;
-import javax.persistence.Table;
-import javax.persistence.GeneratedValue;
-import javax.persistence.GenerationType;
-import javax.persistence.Id;
-import javax.persistence.Column;
-import javax.persistence.Basic;
-import javax.validation.constraints.NotNull;
-
-//SUSPEND CHECKSTYLE CHECK LineLengthCheck
-/**
-* The Feeds that are to be monitered will be stored in the db.
-* */
-
-@Entity
-@NamedQueries({
- @NamedQuery(name = PersistenceConstants.GET_MONITERED_INSTANCE, query = "select OBJECT(a) from "
- + "MonitoredFeedsBean a where a.feedName = :feedName"),
- @NamedQuery(name = PersistenceConstants.DELETE_MONITORED_INSTANCES, query = "delete from MonitoredFeedsBean "
- + "a where a.feedName = :feedName"),
- @NamedQuery(name = PersistenceConstants.GET_ALL_MONITORING_FEEDS, query = "select OBJECT(a) "
- + "from MonitoredFeedsBean a")
-})
-@Table(name="MONITORED_FEEDS")
-//RESUME CHECKSTYLE CHECK LineLengthCheck
-public class MonitoredFeedsBean {
- @NotNull
- @GeneratedValue(strategy = GenerationType.AUTO)
- @Id
- private String id;
-
- @Basic
- @NotNull
- @Column(name = "feed_name")
- private String feedName;
-
- public String getFeedName() {
- return feedName;
- }
-
- public void setFeedName(String feedName) {
- this.feedName = feedName;
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
index 41eb048..863abdc 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PendingInstanceBean.java
@@ -17,6 +17,9 @@
*/
package org.apache.falcon.persistence;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.EntityType;
+
import javax.persistence.Entity;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
@@ -35,13 +38,13 @@ import java.util.Date;
* */
@Entity
@NamedQueries({
- @NamedQuery(name = PersistenceConstants.GET_LATEST_INSTANCE_TIME, query = "select max(a.nominalTime) from PendingInstanceBean a where a.feedName = :feedName"),
- @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName"),
- @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime"),
- @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED, query = "delete from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
- @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName"),
+ @NamedQuery(name = PersistenceConstants.GET_LATEST_INSTANCE_TIME, query = "select max(a.nominalTime) from PendingInstanceBean a where a.entityName = :entityName and a.entityType = :entityType"),
+ @NamedQuery(name = PersistenceConstants.GET_PENDING_INSTANCES, query = "select OBJECT(a) from PendingInstanceBean a where a.entityName = :entityName and a.entityType = :entityType"),
+ @NamedQuery(name = PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES , query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType"),
+ @NamedQuery(name = PersistenceConstants.DELETE_ALL_INSTANCES_FOR_ENTITY, query = "delete from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"),
+ @NamedQuery(name = PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES , query = "select a.nominalTime from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.entityType = :entityType"),
@NamedQuery(name= PersistenceConstants.GET_ALL_PENDING_INSTANCES , query = "select OBJECT(a) from PendingInstanceBean a "),
- @NamedQuery(name= PersistenceConstants.GET_PENDING_INSTANCE , query = "select OBJECT(a) from PendingInstanceBean a where a.feedName = :feedName and a.clusterName = :clusterName and a.nominalTime = :nominalTime")
+ @NamedQuery(name= PersistenceConstants.GET_PENDING_INSTANCE , query = "select OBJECT(a) from PendingInstanceBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
})
@Table(name = "PENDING_INSTANCES")
//RESUME CHECKSTYLE CHECK LineLengthCheck
@@ -53,8 +56,8 @@ public class PendingInstanceBean {
@Basic
@NotNull
- @Column(name = "feed_name")
- private String feedName;
+ @Column(name = "entity_name")
+ private String entityName;
@Basic
@NotNull
@@ -66,6 +69,20 @@ public class PendingInstanceBean {
@Column(name = "nominal_time")
private Date nominalTime;
+ public String getEntityType() {
+ return entityType;
+ }
+
+ public void setEntityType(String entityType) throws FalconException {
+ checkEntityType(entityType);
+ this.entityType = entityType;
+ }
+
+ @Basic
+ @NotNull
+ @Column(name = "entity_type")
+ private String entityType;
+
public Date getNominalTime() {
return nominalTime;
}
@@ -90,11 +107,28 @@ public class PendingInstanceBean {
this.clusterName = clusterName;
}
- public String getFeedName() {
- return feedName;
+ public String getEntityName() {
+ return entityName;
+ }
+
+ public void setEntityName(String entityName) {
+ this.entityName = entityName;
}
- public void setFeedName(String feedName) {
- this.feedName = feedName;
+ public static final String ENTITYNAME = "entityName";
+
+ public static final String CLUSTERNAME = "clusterName";
+
+ public static final String NOMINALTIME = "nominalTime";
+
+ public static final String ENTITYTYPE = "entityType";
+
+ void checkEntityType(String entityType)throws FalconException {
+ if (entityType.equals(EntityType.PROCESS.toString()) || entityType.equals(EntityType.FEED.toString())){
+ return;
+ } else {
+ throw new FalconException("EntityType"+ entityType
+ + " is not valid,Feed and Process are the valid input type.");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index 72c382e..f9aa1f5 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -30,7 +30,7 @@ public final class PersistenceConstants {
public static final String GET_PENDING_INSTANCES = "GET_PENDING_INSTANCES";
public static final String GET_PENDING_INSTANCE = "GET_PENDING_INSTANCE";
public static final String DELETE_PENDING_NOMINAL_INSTANCES = "DELETE_PENDING_NOMINAL_INSTANCES";
- public static final String DELETE_ALL_INSTANCES_FOR_FEED = "DELETE_ALL_INSTANCES_FOR_FEED";
+ public static final String DELETE_ALL_INSTANCES_FOR_ENTITY = "DELETE_ALL_INSTANCES_FOR_ENTITY";
public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES";
public static final String GET_ALL_PENDING_INSTANCES = "GET_ALL_PENDING_INSTANCES";
public static final String GET_ENTITY = "GET_ENTITY";
@@ -55,10 +55,10 @@ public final class PersistenceConstants {
public static final String DELETE_INSTANCES_TABLE = "DELETE_INSTANCES_TABLE";
public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE";
public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME";
- public static final String GET_FEED_ALERTS = "GET_FEED_ALERTS";
- public static final String GET_ALL_FEED_ALERTS = "GET_ALL_FEED_ALERTS";
+ public static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS";
+ public static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS";
public static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES";
public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH";
- public static final String GET_FEED_ALERT_INSTANCE = "GET_FEED_ALERT_INSTANCE";
- public static final String DELETE_FEED_ALERT_INSTANCE = "DELETE_FEED_ALERT_INSTANCE";
+ public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE";
+ public static final String DELETE_ENTITY_ALERT_INSTANCE = "DELETE_ENTITY_ALERT_INSTANCE";
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
index 1bdfc25..102b986 100644
--- a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
+++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -244,8 +244,8 @@ public class FalconStateStoreDBCLI {
args.add("org.apache.falcon.persistence.EntityBean");
args.add("org.apache.falcon.persistence.InstanceBean");
args.add("org.apache.falcon.persistence.PendingInstanceBean");
- args.add("org.apache.falcon.persistence.MonitoredFeedsBean");
- args.add("org.apache.falcon.persistence.FeedSLAAlertBean");
+ args.add("org.apache.falcon.persistence.MonitoredEntityBean");
+ args.add("org.apache.falcon.persistence.EntitySLAAlertBean");
return args.toArray(new String[args.size()]);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml
index c9b444d..ac2f397 100644
--- a/common/src/main/resources/META-INF/persistence.xml
+++ b/common/src/main/resources/META-INF/persistence.xml
@@ -26,8 +26,8 @@
<class>org.apache.falcon.persistence.EntityBean</class>
<class>org.apache.falcon.persistence.InstanceBean</class>
<class>org.apache.falcon.persistence.PendingInstanceBean</class>
- <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
- <class>org.apache.falcon.persistence.FeedSLAAlertBean</class>
+ <class>org.apache.falcon.persistence.MonitoredEntityBean</class>
+ <class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
<properties>
<property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -37,7 +37,7 @@
<property name="openjpa.MetaDataFactory"
value="jpa(Types=org.apache.falcon.persistence.EntityBean;
org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
- org.apache.falcon.persistence.MonitoredFeedsBean;org.apache.falcon.persistence.FeedSLAAlertBean)"></property>
+ org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean)"></property>
<property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
<property name="openjpa.LockManager" value="pessimistic"/>
@@ -58,8 +58,8 @@
<class>org.apache.falcon.persistence.EntityBean</class>
<class>org.apache.falcon.persistence.InstanceBean</class>
<class>org.apache.falcon.persistence.PendingInstanceBean</class>
- <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
- <class>org.apache.falcon.persistence.FeedSLAAlertBean</class>
+ <class>org.apache.falcon.persistence.MonitoredEntityBean</class>
+ <class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
<properties>
<property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -69,7 +69,7 @@
<property name="openjpa.MetaDataFactory"
value="jpa(Types=org.apache.falcon.persistence.EntityBean;
org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
- org.apache.falcon.persistence.MonitoredFeedsBean;org.apache.falcon.persistence.FeedSLAAlertBean)"></property>
+ org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean)"></property>
<property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
<property name="openjpa.LockManager" value="pessimistic"/>
@@ -88,9 +88,9 @@
<class>org.apache.falcon.persistence.EntityBean</class>
<class>org.apache.falcon.persistence.InstanceBean</class>
- <class>org.apache.falcon.persistence.MonitoredFeedsBean</class>
+ <class>org.apache.falcon.persistence.MonitoredEntityBean</class>
<class>org.apache.falcon.persistence.PendingInstanceBean</class>
- <class>org.apache.falcon.persistence.FeedSLAAlertBean</class>
+ <class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
<properties>
<property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -100,7 +100,7 @@
<property name="openjpa.MetaDataFactory"
value="jpa(Types=org.apache.falcon.persistence.EntityBean;
org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
- org.apache.falcon.persistence.MonitoredFeedsBean;org.apache.falcon.persistence.FeedSLAAlertBean)"></property>
+ org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean)"></property>
<property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
<property name="openjpa.LockManager" value="pessimistic"/>
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 374ff17..de24621 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -35,7 +35,7 @@
org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
org.apache.falcon.service.ProcessSubscriberService,\
org.apache.falcon.extensions.ExtensionService,\
- org.apache.falcon.service.FeedSLAMonitoringService,\
+ org.apache.falcon.service.EntitySLAMonitoringService,\
org.apache.falcon.service.LifecyclePolicyMap,\
org.apache.falcon.entity.store.ConfigurationStore,\
org.apache.falcon.rerun.service.RetryService,\
@@ -67,7 +67,7 @@
org.apache.falcon.entity.ColoClusterRelation,\
org.apache.falcon.group.FeedGroupMap,\
org.apache.falcon.entity.store.FeedLocationStore,\
- org.apache.falcon.service.FeedSLAMonitoringService,\
+ org.apache.falcon.service.EntitySLAMonitoringService,\
org.apache.falcon.service.SharedLibraryHostingService
## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ##
# org.apache.falcon.state.store.jdbc.JdbcStateStore
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
index 3817056..afd9307 100644
--- a/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
+++ b/common/src/test/java/org/apache/falcon/entity/AbstractTestBase.java
@@ -79,7 +79,7 @@ public class AbstractTestBase {
cleanupStore();
String listeners = StartupProperties.get().getProperty("configstore.listeners");
listeners = listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", "");
- listeners = listeners.replace("org.apache.falcon.service.FeedSLAMonitoringService", "");
+ listeners = listeners.replace("org.apache.falcon.service.EntitySLAMonitoringService", "");
StartupProperties.get().setProperty("configstore.listeners", listeners);
store = ConfigurationStore.get();
store.init();
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
index 40c077e..d13769e 100644
--- a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
@@ -63,7 +63,7 @@ public class FeedLocationStoreTest extends AbstractTestBase {
cleanupStore();
String listeners = StartupProperties.get().getProperty("configstore.listeners");
listeners = listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", "");
- listeners = listeners.replace("org.apache.falcon.service.FeedSLAMonitoringService", "");
+ listeners = listeners.replace("org.apache.falcon.service.EntitySLAMonitoringService", "");
StartupProperties.get().setProperty("configstore.listeners", listeners);
store = ConfigurationStore.get();
store.init();
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/docs/src/site/twiki/FalconNativeScheduler.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconNativeScheduler.twiki b/docs/src/site/twiki/FalconNativeScheduler.twiki
index 1f51739..b15fd5b 100644
--- a/docs/src/site/twiki/FalconNativeScheduler.twiki
+++ b/docs/src/site/twiki/FalconNativeScheduler.twiki
@@ -27,7 +27,7 @@ You can enable native scheduler by making changes to __$FALCON_HOME/conf/startup
*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
org.apache.falcon.service.ProcessSubscriberService,\
- org.apache.falcon.service.FeedSLAMonitoringService,\
+ org.apache.falcon.service.EntitySLAMonitoringService,\
org.apache.falcon.service.LifecyclePolicyMap,\
org.apache.falcon.service.FalconJPAService,\
org.apache.falcon.entity.store.ConfigurationStore,\
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/docs/src/site/twiki/FeedSLAMonitoring.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FeedSLAMonitoring.twiki b/docs/src/site/twiki/FeedSLAMonitoring.twiki
index 88132ce..469c0aa 100644
--- a/docs/src/site/twiki/FeedSLAMonitoring.twiki
+++ b/docs/src/site/twiki/FeedSLAMonitoring.twiki
@@ -6,7 +6,7 @@ Feed SLA monitoring service requires FalconJPAService to be up.Following are the
In startup.properties :
*.application.services= org.apache.falcon.state.store.service.FalconJPAService,
- org.apache.falcon.service.FeedSLAMonitoringService
+ org.apache.falcon.service.EntitySLAMonitoringService
These properties are required for FalconJPAService in statestore.properties:
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java b/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java
index df2a1e0..56376fc 100644
--- a/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java
+++ b/prism/src/main/java/org/apache/falcon/handler/SLAMonitoringHandler.java
@@ -26,7 +26,7 @@ import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.service.FeedSLAMonitoringService;
+import org.apache.falcon.service.EntitySLAMonitoringService;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.WorkflowExecutionListener;
import org.apache.hadoop.fs.Path;
@@ -45,8 +45,14 @@ public class SLAMonitoringHandler implements WorkflowExecutionListener {
@Override
public void onSuccess(WorkflowExecutionContext context) throws FalconException {
if (context.hasWorkflowSucceeded()) {
- updateSLAMonitoring(context.getClusterName(), context.getOutputFeedNamesList(),
+ if (context.getEntityType().toString().equals(EntityType.FEED.name())){
+ updateSLAMonitoring(context.getClusterName(), context.getOutputFeedNamesList(),
context.getOutputFeedInstancePathsList());
+ }
+ if (context.getEntityType().toString().equals(EntityType.PROCESS.name())){
+ EntitySLAMonitoringService.get().makeProcessInstanceAvailable(context.getClusterName(),
+ context.getEntityName(), context.getNominalTimeAsISO8601(), context.getEntityType());
+ }
}
}
@@ -60,7 +66,8 @@ public class SLAMonitoringHandler implements WorkflowExecutionListener {
String templatePath = new Path(storage.getUriTemplate(LocationType.DATA)).toUri().getPath();
Date date = FeedHelper.getDate(templatePath, new Path(outputFeedInstancePathsList[index]),
EntityUtil.getTimeZone(feed));
- FeedSLAMonitoringService.get().makeFeedInstanceAvailable(outputFeedNamesList[index], clusterName, date);
+ EntitySLAMonitoringService.get().makeFeedInstanceAvailable(outputFeedNamesList[index],
+ clusterName, date);
}
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
index 4fd1b53..c1f818a 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -18,11 +18,13 @@
package org.apache.falcon.jdbc;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.falcon.persistence.MonitoredFeedsBean;
-import org.apache.falcon.persistence.FeedSLAAlertBean;
-import org.apache.falcon.persistence.PersistenceConstants;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.persistence.MonitoredEntityBean;
import org.apache.falcon.persistence.PendingInstanceBean;
+import org.apache.falcon.persistence.PersistenceConstants;
import org.apache.falcon.persistence.ResultNotFoundException;
+import org.apache.falcon.persistence.EntitySLAAlertBean;
import org.apache.falcon.service.FalconJPAService;
import javax.persistence.EntityManager;
@@ -32,7 +34,7 @@ import java.util.Date;
import java.util.List;
/**
-* StateStore for MonitoringFeeds and PendingFeedInstances.
+* StateStore for MonitoringEntity and PendingEntityInstances.
*/
public class MonitoringJdbcStateStore {
@@ -42,23 +44,25 @@ public class MonitoringJdbcStateStore {
}
- public void putMonitoredFeed(String feedName){
+ public void putMonitoredEntity(String entityName, String entityType) throws FalconException{
- MonitoredFeedsBean monitoredFeedsBean = new MonitoredFeedsBean();
- monitoredFeedsBean.setFeedName(feedName);
+ MonitoredEntityBean monitoredEntityBean = new MonitoredEntityBean();
+ monitoredEntityBean.setEntityName(entityName);
+ monitoredEntityBean.setEntityType(entityType);
EntityManager entityManager = getEntityManager();
try {
beginTransaction(entityManager);
- entityManager.persist(monitoredFeedsBean);
+ entityManager.persist(monitoredEntityBean);
} finally {
commitAndCloseTransaction(entityManager);
}
}
- public MonitoredFeedsBean getMonitoredFeed(String feedName){
+ public MonitoredEntityBean getMonitoredEntity(String entityName, String entityType){
EntityManager entityManager = getEntityManager();
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_MONITERED_INSTANCE);
- q.setParameter("feedName", feedName);
+ q.setParameter(MonitoredEntityBean.ENTITYNAME, entityName);
+ q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType);
List result = q.getResultList();
try {
if (result.isEmpty()) {
@@ -67,14 +71,15 @@ public class MonitoringJdbcStateStore {
} finally {
entityManager.close();
}
- return ((MonitoredFeedsBean)result.get(0));
+ return ((MonitoredEntityBean)result.get(0));
}
- public void deleteMonitoringFeed(String feedName) {
+ public void deleteMonitoringEntity(String entityName, String entityType) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_MONITORED_INSTANCES);
- q.setParameter("feedName", feedName);
+ q.setParameter(MonitoredEntityBean.ENTITYNAME, entityName);
+ q.setParameter(MonitoredEntityBean.ENTITYTYPE, entityType);
try{
q.executeUpdate();
} finally {
@@ -82,7 +87,7 @@ public class MonitoringJdbcStateStore {
}
}
- public List<MonitoredFeedsBean> getAllMonitoredFeed() throws ResultNotFoundException {
+ public List<MonitoredEntityBean> getAllMonitoredFeed() throws ResultNotFoundException {
EntityManager entityManager = getEntityManager();
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_MONITORING_FEEDS);
List result = q.getResultList();
@@ -90,22 +95,24 @@ public class MonitoringJdbcStateStore {
return result;
}
- public Date getLastInstanceTime(String feedName) throws ResultNotFoundException {
+ public Date getLastInstanceTime(String entityName , String entityType) throws ResultNotFoundException {
EntityManager entityManager = getEntityManager();
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_LATEST_INSTANCE_TIME, Date.class);
- q.setParameter("feedName", feedName);
+ q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
+ q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
Date result = (Date)q.getSingleResult();
entityManager.close();
return result;
}
- public void deletePendingInstance(String feedName, String clusterName , Date nominalTime){
+ public void deletePendingInstance(String entityName, String clusterName , Date nominalTime, String entityType){
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_PENDING_NOMINAL_INSTANCES);
- q.setParameter("feedName", feedName);
- q.setParameter("clusterName", clusterName);
- q.setParameter("nominalTime", nominalTime);
+ q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
+ q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
+ q.setParameter(PendingInstanceBean.NOMINALTIME, nominalTime);
+ q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
try{
q.executeUpdate();
} finally {
@@ -113,12 +120,13 @@ public class MonitoringJdbcStateStore {
}
}
- public void deletePendingInstances(String feedName, String clusterName){
+ public void deletePendingInstances(String entityName, String clusterName, String entityType){
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
- Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_FEED);
- q.setParameter("feedName", feedName);
- q.setParameter("clusterName", clusterName);
+ Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_INSTANCES_FOR_ENTITY);
+ q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
+ q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
+ q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
try{
q.executeUpdate();
} finally {
@@ -126,23 +134,26 @@ public class MonitoringJdbcStateStore {
}
}
- public void putPendingInstances(String feed, String clusterName, Date nominalTime){
+ public void putPendingInstances(String entity, String clusterName, Date nominalTime, String entityType)
+ throws FalconException{
EntityManager entityManager = getEntityManager();
PendingInstanceBean pendingInstanceBean = new PendingInstanceBean();
- pendingInstanceBean.setFeedName(feed);
+ pendingInstanceBean.setEntityName(entity);
pendingInstanceBean.setClusterName(clusterName);
pendingInstanceBean.setNominalTime(nominalTime);
+ pendingInstanceBean.setEntityType(entityType);
beginTransaction(entityManager);
entityManager.persist(pendingInstanceBean);
commitAndCloseTransaction(entityManager);
}
- public List<Date> getNominalInstances(String feedName, String clusterName) {
+ public List<Date> getNominalInstances(String entityName, String clusterName, String entityType) {
EntityManager entityManager = getEntityManager();
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_DATE_FOR_PENDING_INSTANCES);
- q.setParameter("feedName", feedName);
- q.setParameter("clusterName", clusterName);
+ q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
+ q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
+ q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
List result = q.getResultList();
entityManager.close();
return result;
@@ -168,15 +179,17 @@ public class MonitoringJdbcStateStore {
entityManager.close();
}
- public PendingInstanceBean getPendingInstance(String feedName, String clusterName, Date nominalTime) {
+ public PendingInstanceBean getPendingInstance(String entityName, String clusterName, Date nominalTime,
+ String entityType) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
TypedQuery<PendingInstanceBean> q = entityManager.createNamedQuery(PersistenceConstants.GET_PENDING_INSTANCE,
PendingInstanceBean.class);
- q.setParameter("feedName", feedName);
+ q.setParameter(PendingInstanceBean.ENTITYNAME, entityName);
- q.setParameter("clusterName", clusterName);
- q.setParameter("nominalTime", nominalTime);
+ q.setParameter(PendingInstanceBean.CLUSTERNAME, clusterName);
+ q.setParameter(PendingInstanceBean.NOMINALTIME, nominalTime);
+ q.setParameter(PendingInstanceBean.ENTITYTYPE, entityType);
try {
return q.getSingleResult();
} finally {
@@ -184,14 +197,16 @@ public class MonitoringJdbcStateStore {
}
}
- public FeedSLAAlertBean getFeedAlertInstance(String feedName, String clusterName, Date nominalTime) {
+ public EntitySLAAlertBean getEntityAlertInstance(String entityName, String clusterName, Date nominalTime,
+ String entityType) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
- TypedQuery<FeedSLAAlertBean> q = entityManager.createNamedQuery(PersistenceConstants.GET_FEED_ALERT_INSTANCE,
- FeedSLAAlertBean.class);
- q.setParameter("feedName", feedName);
- q.setParameter("clusterName", clusterName);
- q.setParameter("nominalTime", nominalTime);
+ TypedQuery<EntitySLAAlertBean> q = entityManager.createNamedQuery(PersistenceConstants.
+ GET_ENTITY_ALERT_INSTANCE, EntitySLAAlertBean.class);
+ q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName);
+ q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName);
+ q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime);
+ q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType);
try {
return q.getSingleResult();
} finally {
@@ -199,30 +214,32 @@ public class MonitoringJdbcStateStore {
}
}
- public void putSLAAlertInstance(String feedName, String cluster, Date nominalTime, Boolean isSLALowMissed,
- Boolean isSLAHighMissed) {
+ public void putSLAAlertInstance(String entityName, String cluster, String entityType, Date nominalTime,
+ Boolean isSLALowMissed, Boolean isSLAHighMissed) throws FalconException{
EntityManager entityManager = getEntityManager();
- FeedSLAAlertBean feedSLAAlertBean = new FeedSLAAlertBean();
- feedSLAAlertBean.setFeedName(feedName);
- feedSLAAlertBean.setClusterName(cluster);
- feedSLAAlertBean.setNominalTime(nominalTime);
- feedSLAAlertBean.setIsSLALowMissed(isSLALowMissed);
- feedSLAAlertBean.setIsSLAHighMissed(isSLAHighMissed);
+ EntitySLAAlertBean entitySLAAlertBean = new EntitySLAAlertBean();
+ entitySLAAlertBean.setEntityName(entityName);
+ entitySLAAlertBean.setClusterName(cluster);
+ entitySLAAlertBean.setNominalTime(nominalTime);
+ entitySLAAlertBean.setIsSLALowMissed(isSLALowMissed);
+ entitySLAAlertBean.setIsSLAHighMissed(isSLAHighMissed);
+ entitySLAAlertBean.setEntityType(entityType);
try {
beginTransaction(entityManager);
- entityManager.persist(feedSLAAlertBean);
+ entityManager.persist(entitySLAAlertBean);
} finally {
commitAndCloseTransaction(entityManager);
}
}
- public void updateSLAAlertInstance(String feedName, String clusterName, Date nominalTime) {
+ public void updateSLAAlertInstance(String entityName, String clusterName, Date nominalTime, String entityType) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.UPDATE_SLA_HIGH);
- q.setParameter("feedName", feedName);
- q.setParameter("clusterName", clusterName);
- q.setParameter("nominalTime", nominalTime);
+ q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName);
+ q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName);
+ q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime);
+ q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType);
try{
q.executeUpdate();
} finally {
@@ -230,13 +247,14 @@ public class MonitoringJdbcStateStore {
}
}
- public void deleteFeedAlertInstance(String feedName, String clusterName, Date nominalTime){
+ public void deleteEntityAlertInstance(String entityName, String clusterName, Date nominalTime, String entityType){
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
- Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_FEED_ALERT_INSTANCE);
- q.setParameter("feedName", feedName);
- q.setParameter("clusterName", clusterName);
- q.setParameter("nominalTime", nominalTime);
+ Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ENTITY_ALERT_INSTANCE);
+ q.setParameter(EntitySLAAlertBean.ENTITYNAME, entityName);
+ q.setParameter(EntitySLAAlertBean.CLUSTERNAME, clusterName);
+ q.setParameter(EntitySLAAlertBean.NOMINALTIME, nominalTime);
+ q.setParameter(EntitySLAAlertBean.ENTITYTYPE, entityType);
try{
q.executeUpdate();
} finally {
@@ -245,7 +263,7 @@ public class MonitoringJdbcStateStore {
}
- public List<FeedSLAAlertBean> getSLAHighCandidates() {
+ public List<EntitySLAAlertBean> getSLAHighCandidates() {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_SLA_HIGH_CANDIDATES);
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index c6903a4..895f8b2 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -31,7 +31,7 @@ import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.UnschedulableEntityException;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.monitors.Dimension;
-import org.apache.falcon.service.FeedSLAMonitoringService;
+import org.apache.falcon.service.EntitySLAMonitoringService;
import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -162,11 +162,11 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM
Date end = (endStr == null) ? new Date() : EntityUtil.parseDateUTC(endStr);
if (StringUtils.isBlank(feedName)) {
- instances.addAll(FeedSLAMonitoringService.get().getFeedSLAMissPendingAlerts(start, end));
+ instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(start, end));
} else {
for (String clusterName : DeploymentUtil.getCurrentClusters()) {
- instances.addAll(FeedSLAMonitoringService.get().getFeedSLAMissPendingAlerts(feedName,
- clusterName, start, end));
+ instances.addAll(EntitySLAMonitoringService.get().getEntitySLAMissPendingAlerts(feedName,
+ clusterName, start, end, EntityType.FEED.toString()));
}
}
} catch (FalconException e) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 53a9de1..249c273 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -134,7 +134,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@Override
protected SchedulableEntityInstanceResult doExecute(String colo) throws FalconException {
- return getEntityManager(colo).invoke("getFeedSLAMissPendingAlerts", entityType, entityName,
+ return getEntityManager(colo).invoke("getEntitySLAMissPendingAlerts", entityType, entityName,
start, end, colo);
}
}.execute();
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
new file mode 100644
index 0000000..f023c35
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.service;
+
+
+import java.util.Date;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
+import org.apache.falcon.persistence.PendingInstanceBean;
+import org.apache.falcon.resource.SchedulableEntityInstance;
+import org.apache.falcon.util.ReflectionUtils;
+import org.apache.falcon.util.StartupProperties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Service to know which all feeds have missed SLA.
+ */
+public final class EntitySLAAlertService implements FalconService, EntitySLAListener {
+
+ private static final String NAME = "EntitySLAAlertService";
+
+ private static final Logger LOG = LoggerFactory.getLogger(EntitySLAAlertService.class);
+
+ private MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
+
+ private Set<EntitySLAListener> listeners = new LinkedHashSet<EntitySLAListener>();
+
+ private static final EntitySLAAlertService SERVICE = new EntitySLAAlertService();
+
+ public static EntitySLAAlertService get() {
+ return SERVICE;
+ }
+
+ private EntitySLAAlertService(){}
+
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public void init() throws FalconException {
+ String listenerClassNames = StartupProperties.get().
+ getProperty("feedAlert.listeners");
+ for (String listenerClassName : listenerClassNames.split(",")) {
+ listenerClassName = listenerClassName.trim();
+ if (listenerClassName.isEmpty()) {
+ continue;
+ }
+ EntitySLAListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName);
+ registerListener(listener);
+ }
+
+ String freq = StartupProperties.get().getProperty("feed.sla.statusCheck.frequency.seconds", "600");
+ int statusCheckFrequencySeconds = Integer.parseInt(freq);
+
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ executor.scheduleWithFixedDelay(new Monitor(), 0, statusCheckFrequencySeconds + 10, TimeUnit.SECONDS);
+ }
+
+ public void registerListener(EntitySLAListener listener) {
+ listeners.add(listener);
+ }
+
+ @Override
+ public void destroy() throws FalconException {
+
+ }
+
+
+ private class Monitor implements Runnable {
+
+ @Override
+ public void run() {
+ processSLACandidates();
+ }
+ }
+
+ void processSLACandidates(){
+ //Get all feeds instances to be monitored
+ List<PendingInstanceBean> pendingInstanceBeanList = store.getAllInstances();
+ if (pendingInstanceBeanList == null || pendingInstanceBeanList.isEmpty()){
+ return;
+ }
+
+ LOG.debug("In processSLACandidates :" + pendingInstanceBeanList.size());
+ try{
+ for (PendingInstanceBean pendingInstanceBean : pendingInstanceBeanList) {
+
+ String entityName = pendingInstanceBean.getEntityName();
+ String clusterName = pendingInstanceBean.getClusterName();
+ Date nominalTime = pendingInstanceBean.getNominalTime();
+ String entityType = pendingInstanceBean.getEntityType();
+
+ org.apache.falcon.entity.v0.cluster.Cluster cluster = ClusterHelper.getCluster(clusterName);
+
+ Set<SchedulableEntityInstance> schedulableEntityInstances= EntitySLAMonitoringService.get().
+ getEntitySLAMissPendingAlerts(entityName, cluster.getName(), nominalTime, nominalTime
+ , entityType);
+ if (schedulableEntityInstances.isEmpty()){
+ store.deleteEntityAlertInstance(entityName, cluster.getName(), nominalTime,
+ entityType);
+ return;
+ }
+ List<SchedulableEntityInstance> schedulableEntityList = new ArrayList<>(schedulableEntityInstances);
+ SchedulableEntityInstance schedulableEntityInstance = schedulableEntityList.get(0);
+
+
+ if (schedulableEntityInstance.getTags().contains(EntitySLAMonitoringService.get().TAG_WARN)) {
+ store.putSLAAlertInstance(entityName, clusterName, entityType,
+ nominalTime, true, false);
+ //Mark in DB as SLA missed
+ LOG.info("Feed :"+ entityName
+ + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "missed SLALow");
+ } else if (schedulableEntityInstance.getTags().contains(EntitySLAMonitoringService.get().TAG_CRITICAL)){
+ if (entityType.equals(EntityType.PROCESS.name())){
+ store.putSLAAlertInstance(entityName, clusterName, entityType,
+ nominalTime, true, false);
+ }
+ store.updateSLAAlertInstance(entityName, clusterName, nominalTime, entityType);
+ LOG.info("Entity :"+ entityName
+ + "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "EntityType:"+ entityType
+ + "missed SLAHigh");
+ highSLAMissed(entityName, clusterName, entityType, nominalTime);
+ }
+ }
+ } catch (FalconException e){
+ LOG.error("Exception in FeedSLAALertService:", e);
+ }
+
+ }
+
+ @Override
+ public void highSLAMissed(String entityName, String clusterName, String entityType , Date nominalTime
+ ) throws FalconException {
+ for (EntitySLAListener listener : listeners) {
+ listener.highSLAMissed(entityName, clusterName, entityType, nominalTime);
+ store.deleteEntityAlertInstance(entityName, clusterName, nominalTime, entityType);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/60e2f68b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
index 991052f..421ea38 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
@@ -18,7 +18,6 @@
package org.apache.falcon.service;
import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.EntityType;
import java.util.Date;
@@ -26,6 +25,6 @@ import java.util.Date;
* Interface for FeedSLAAlert to be used by Listeners.
*/
public interface EntitySLAListener {
- void highSLAMissed(String enityName, EntityType entityType, String clusterName, Date nominalTime)
+ void highSLAMissed(String entityName, String clusterName, String entityType, Date nominalTime)
throws FalconException;
}