You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/07/29 03:11:02 UTC
falcon git commit: FALCON-2059 BacklogMetricEmitter Service for
Falcon Processes
Repository: falcon
Updated Branches:
refs/heads/master e94dd72fa -> c7996deb7
FALCON-2059 BacklogMetricEmitter Service for Falcon Processes
Author: pavan.kolamuri <pa...@gmail.com>
Author: Pavan Kolamuri <pa...@appdynamics.com>
Reviewers: @pallavi-rao
Closes #212 from pavankumar526/master and squashes the following commits:
ad84f0f [Pavan Kolamuri] Fixed checkstyle issues
dea8f93 [pavan.kolamuri] Added doc in startup.properties
dbe3a7f [pavan.kolamuri] Added more log statements
d72d228 [pavan.kolamuri] Exception changed to throwable
46dcef8 [pavan.kolamuri] Fixed bug in oozieworkflowengine FALCON-2059
e92d3bc [pavan.kolamuri] Add isMissing method FALCON-2059
6d6cf81 [pavan.kolamuri] Handled when entity was deleted FALCON-2059
6c03701 [pavan.kolamuri] Fixed User authentication issue in oozie
81f0b03 [pavan.kolamuri] Rebased the patch
e3dbe88 [pavan.kolamuri] Handled multiple pipelines processes FALCON-2059
b5d9e70 [pavan.kolamuri] Addressed based on comments FALCON-2059
fb78fba [pavan.kolamuri] Refactored changes based on EntitySLAAlert service
80c015a [pavan.kolamuri] FALCON-2059 BacklogMetricEmitter Service for Falcon Processes
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c7996deb
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c7996deb
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c7996deb
Branch: refs/heads/master
Commit: c7996deb76819fc7ef98dd28fd4dff19474b7602
Parents: e94dd72
Author: pavan.kolamuri <pa...@gmail.com>
Authored: Fri Jul 29 08:40:49 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Fri Jul 29 08:40:49 2016 +0530
----------------------------------------------------------------------
.../falcon/persistence/BacklogMetricBean.java | 116 ++++++
.../persistence/PersistenceConstants.java | 2 +
.../falcon/tools/FalconStateStoreDBCLI.java | 1 +
.../workflow/engine/AbstractWorkflowEngine.java | 2 +
.../src/main/resources/META-INF/persistence.xml | 9 +-
common/src/main/resources/startup.properties | 4 +
.../entity/parser/ProcessEntityParserTest.java | 2 +-
.../resources/config/process/process-0.1.xml | 2 +-
.../workflow/engine/OozieWorkflowEngine.java | 26 +-
.../apache/falcon/jdbc/BacklogMetricStore.java | 121 +++++++
.../falcon/resource/channel/HTTPChannel.java | 5 +-
.../service/BacklogMetricEmitterService.java | 356 +++++++++++++++++++
.../falcon/service/EntitySLAAlertService.java | 6 +-
.../falcon/service/EntitySLAListener.java | 3 +-
.../java/org/apache/falcon/util/MetricInfo.java | 79 ++++
.../BacklogMetricEmitterServiceTest.java | 133 +++++++
prism/src/test/resources/startup.properties | 338 ++++++++++++++++++
.../workflow/engine/FalconWorkflowEngine.java | 5 +
src/build/findbugs-exclude.xml | 6 +
src/conf/startup.properties | 16 +
20 files changed, 1217 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java b/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java
new file mode 100644
index 0000000..b563da7
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/BacklogMetricBean.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import org.apache.openjpa.persistence.jdbc.Index;
+
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+import java.util.Date;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Backlog Metric Object stored in DB.
+ */
+@Entity
+@NamedQueries({
+ @NamedQuery(name = PersistenceConstants.GET_ALL_BACKLOG_INSTANCES, query = "select OBJECT(a) from BacklogMetricBean a "),
+ @NamedQuery(name = PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE, query = "delete from BacklogMetricBean a where a.entityName = :entityName and a.clusterName = :clusterName and a.nominalTime = :nominalTime and a.entityType = :entityType")
+})
+//RESUME CHECKSTYLE CHECK LineLengthCheck
+
+@Table(name = "BACKLOG_METRIC")
+public class BacklogMetricBean {
+
+ @NotNull
+ @GeneratedValue(strategy = GenerationType.AUTO)
+ @Id
+ private String id;
+
+ @Basic
+ @NotNull
+ @Index
+ @Column(name = "entity_name")
+ private String entityName;
+
+ @Basic
+ @NotNull
+ @Column(name = "cluster_name")
+ private String clusterName;
+
+ @Basic
+ @NotNull
+ @Index
+ @Column(name = "nominal_time")
+ private Date nominalTime;
+
+ @Basic
+ @NotNull
+ @Index
+ @Column(name = "entity_type")
+ private String entityType;
+
+
+ public String getId() {
+ return id;
+ }
+
+ public String getEntityName() {
+ return entityName;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public Date getNominalTime() {
+ return nominalTime;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public void setEntityName(String entityName) {
+ this.entityName = entityName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public void setNominalTime(Date nominalTime) {
+ this.nominalTime = nominalTime;
+ }
+
+ public String getEntityType() {
+ return entityType;
+ }
+
+ public void setEntityType(String entityType) {
+ this.entityType = entityType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index 7c2479d..5c3de51 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -61,5 +61,7 @@ public final class PersistenceConstants {
public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH";
public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE";
public static final String DELETE_ENTITY_ALERT_INSTANCE = "DELETE_ENTITY_ALERT_INSTANCE";
+ public static final String DELETE_BACKLOG_METRIC_INSTANCE = "DELETE_BACKLOG_METRIC_INSTANCE";
+ public static final String GET_ALL_BACKLOG_INSTANCES = "GET_ALL_BACKLOG_INSTANCES";
public static final String GET_ALL_MONITORING_ENTITY = "GET_ALL_MONITORING_ENTITY";
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
index 102b986..9c6e8b3 100644
--- a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
+++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -246,6 +246,7 @@ public class FalconStateStoreDBCLI {
args.add("org.apache.falcon.persistence.PendingInstanceBean");
args.add("org.apache.falcon.persistence.MonitoredEntityBean");
args.add("org.apache.falcon.persistence.EntitySLAAlertBean");
+ args.add("org.apache.falcon.persistence.BacklogMetricBean");
return args.toArray(new String[args.size()]);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
index 4d8402a..0db7e9b 100644
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
@@ -72,6 +72,8 @@ public abstract class AbstractWorkflowEngine {
public abstract boolean isCompleted(Entity entity) throws FalconException;
+ public abstract boolean isMissing(Entity entity) throws FalconException;
+
public abstract InstancesResult getRunningInstances(Entity entity,
List<LifeCycle> lifeCycles) throws FalconException;
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml
index ac2f397..d58e21c 100644
--- a/common/src/main/resources/META-INF/persistence.xml
+++ b/common/src/main/resources/META-INF/persistence.xml
@@ -28,6 +28,7 @@
<class>org.apache.falcon.persistence.PendingInstanceBean</class>
<class>org.apache.falcon.persistence.MonitoredEntityBean</class>
<class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
+ <class>org.apache.falcon.persistence.BacklogMetricBean</class>
<properties>
<property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -60,7 +61,7 @@
<class>org.apache.falcon.persistence.PendingInstanceBean</class>
<class>org.apache.falcon.persistence.MonitoredEntityBean</class>
<class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
-
+ <class>org.apache.falcon.persistence.BacklogMetricBean</class>
<properties>
<property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -91,7 +92,7 @@
<class>org.apache.falcon.persistence.MonitoredEntityBean</class>
<class>org.apache.falcon.persistence.PendingInstanceBean</class>
<class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
-
+ <class>org.apache.falcon.persistence.BacklogMetricBean</class>
<properties>
<property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -100,8 +101,8 @@
<property name="openjpa.MetaDataFactory"
value="jpa(Types=org.apache.falcon.persistence.EntityBean;
org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
- org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean)"></property>
-
+ org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean;
+ org.apache.falcon.persistence.BacklogMetricBean)"/>
<property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
<property name="openjpa.LockManager" value="pessimistic"/>
<property name="openjpa.ReadLockLevel" value="read"/>
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index de24621..4b692a2 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -332,3 +332,7 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
#*.falcon.graphite.port=2003
#*.falcon.graphite.frequency=1
#*.falcon.graphite.prefix=falcon
+
+# Backlog Metric Properties
+#*.falcon.backlog.metricservice.emit.interval.millisecs=60000
+#*.falcon.backlog.metricservice.recheck.interval.millisecs=600000
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
index c4bfff6..3398c26 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/ProcessEntityParserTest.java
@@ -103,7 +103,7 @@ public class ProcessEntityParserTest extends AbstractTestBase {
Assert.assertEquals(process.getTags(),
"consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting");
- Assert.assertEquals(process.getPipelines(), "testPipeline,dataReplication_Pipeline");
+ Assert.assertEquals(process.getPipelines(), "testPipeline");
Assert.assertEquals(process.getInputs().getInputs().get(0).getName(), "impression");
Assert.assertEquals(process.getInputs().getInputs().get(0).getFeed(), "impressionFeed");
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/common/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/common/src/test/resources/config/process/process-0.1.xml b/common/src/test/resources/config/process/process-0.1.xml
index 4ce7ad1..1550101 100644
--- a/common/src/test/resources/config/process/process-0.1.xml
+++ b/common/src/test/resources/config/process/process-0.1.xml
@@ -18,7 +18,7 @@
-->
<process name="sample" version="0" xmlns="uri:falcon:process:0.1">
<tags>consumer=consumer@xyz.com, owner=producer@xyz.com, _department_type=forecasting</tags>
- <pipelines>testPipeline,dataReplication_Pipeline</pipelines>
+ <pipelines>testPipeline</pipelines>
<clusters>
<cluster name="testCluster">
<validity start="2011-11-02T00:00Z" end="2091-12-30T00:00Z"/>
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 9a09f18..38a6c00 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -272,6 +272,24 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
|| isBundleInState(bundles, BundleStatus.KILLED));
}
+ @Override
+ public boolean isMissing(Entity entity) throws FalconException {
+ List<String> bundlesToRemove = new ArrayList<>();
+ Map<String, BundleJob> bundles = findLatestBundle(entity);
+ for (Map.Entry<String, BundleJob> clusterBundle : bundles.entrySet()) {
+ if (clusterBundle.getValue() == MISSING) { // There is no active bundle for this cluster
+ bundlesToRemove.add(clusterBundle.getKey());
+ }
+ }
+ for (String bundleToRemove : bundlesToRemove) {
+ bundles.remove(bundleToRemove);
+ }
+ if (bundles.size() == 0) {
+ return true;
+ }
+ return false;
+ }
+
private enum BundleStatus {
ACTIVE, RUNNING, SUSPENDED, FAILED, KILLED, SUCCEEDED
}
@@ -1216,9 +1234,11 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private boolean isCoordApplicable(String appName, List<LifeCycle> lifeCycles) {
- for (LifeCycle lifeCycle : lifeCycles) {
- if (appName.contains(lifeCycle.getTag().name())) {
- return true;
+ if (lifeCycles != null && !lifeCycles.isEmpty()) {
+ for (LifeCycle lifeCycle : lifeCycles) {
+ if (appName.contains(lifeCycle.getTag().name())) {
+ return true;
+ }
}
}
return false;
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
new file mode 100644
index 0000000..ef9a396
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/jdbc/BacklogMetricStore.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.jdbc;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.persistence.BacklogMetricBean;
+import org.apache.falcon.persistence.PersistenceConstants;
+import org.apache.falcon.service.BacklogMetricEmitterService;
+import org.apache.falcon.service.FalconJPAService;
+import org.apache.falcon.util.MetricInfo;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Backlog Metric Store for entitties.
+ */
+public class BacklogMetricStore {
+
+ private EntityManager getEntityManager() {
+ return FalconJPAService.get().getEntityManager();
+ }
+
+
+ public void addInstance(String entityName, String cluster, Date nominalTime, EntityType entityType) {
+ BacklogMetricBean backlogMetricBean = new BacklogMetricBean();
+ backlogMetricBean.setClusterName(cluster);
+ backlogMetricBean.setEntityName(entityName);
+ backlogMetricBean.setNominalTime(nominalTime);
+ backlogMetricBean.setEntityType(entityType.name());
+ EntityManager entityManager = getEntityManager();
+ try {
+ beginTransaction(entityManager);
+ entityManager.persist(backlogMetricBean);
+ } finally {
+ commitAndCloseTransaction(entityManager);
+ }
+ }
+
+ public synchronized void deleteMetricInstance(String entityName, String cluster, Date nominalTime,
+ EntityType entityType) {
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_BACKLOG_METRIC_INSTANCE);
+ q.setParameter("entityName", entityName);
+ q.setParameter("clusterName", cluster);
+ q.setParameter("nominalTime", nominalTime);
+ q.setParameter("entityType", entityType.name());
+ try{
+ q.executeUpdate();
+ } finally {
+ commitAndCloseTransaction(entityManager);
+ }
+ }
+
+
+ private void beginTransaction(EntityManager entityManager) {
+ entityManager.getTransaction().begin();
+ }
+
+ private void commitAndCloseTransaction(EntityManager entityManager) {
+ if (entityManager != null) {
+ entityManager.getTransaction().commit();
+ entityManager.close();
+ }
+ }
+
+ public Map<Entity, List<MetricInfo>> getAllInstances() throws FalconException {
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_BACKLOG_INSTANCES);
+ List<BacklogMetricBean> result = q.getResultList();
+
+ try {
+ if (CollectionUtils.isEmpty(result)) {
+ return null;
+ }
+ } finally{
+ entityManager.close();
+ }
+
+ Map<Entity, List<MetricInfo>> backlogMetrics = new HashMap<>();
+ for (BacklogMetricBean backlogMetricBean : result) {
+ Entity entity = EntityUtil.getEntity(backlogMetricBean.getEntityType(),
+ backlogMetricBean.getEntityName());
+ if (!backlogMetrics.containsKey(entity)) {
+ backlogMetrics.put(entity, new ArrayList<MetricInfo>());
+ }
+ List<MetricInfo> metrics = backlogMetrics.get(entity);
+ MetricInfo metricInfo = new MetricInfo(BacklogMetricEmitterService.DATE_FORMAT.get()
+ .format(backlogMetricBean.getNominalTime()),
+ backlogMetricBean.getClusterName());
+ metrics.add(metricInfo);
+ backlogMetrics.put(entity, metrics);
+ }
+ return backlogMetrics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
index a63ae63..187d6c7 100644
--- a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
+++ b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
@@ -46,6 +46,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status.Family;
import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.List;
@@ -138,8 +139,8 @@ public class HTTPChannel extends AbstractChannel {
if (incomingRequest != null) {
incomingRequest.getInputStream().reset();
}
- } catch (Exception ignore) {
- // nothing to be done;
+ } catch (IOException e) {
+ LOG.error("Error in HTTPChannel", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
new file mode 100644
index 0000000..801ab36
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/service/BacklogMetricEmitterService.java
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.service;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.jdbc.BacklogMetricStore;
+import org.apache.falcon.metrics.MetricNotificationService;
+import org.apache.falcon.resource.InstancesResult;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.MetricInfo;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowExecutionListener;
+import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.falcon.workflow.WorkflowEngineFactory.getWorkflowEngine;
+
+/**
+ * Backlog Metric Emitter Service to publish metrics to Graphite.
+ */
+public final class BacklogMetricEmitterService implements FalconService,
+ EntitySLAListener, WorkflowExecutionListener {
+
+ private static final String METRIC_PREFIX = "falcon";
+ private static final String METRIC_SEPARATOR = ".";
+ private static final String BACKLOG_METRIC_EMIT_INTERVAL = "falcon.backlog.metricservice.emit.interval.millisecs";
+ private static final String BACKLOG_METRIC_RECHECK_INTERVAL = "falcon.backlog.metricservice."
+ + "recheck.interval.millisecs";
+ private static final String DEFAULT_PIPELINE = "DEFAULT";
+
+ private static final Logger LOG = LoggerFactory.getLogger(BacklogMetricEmitterService.class);
+
+ private static BacklogMetricStore backlogMetricStore = new BacklogMetricStore();
+
+ private static final BacklogMetricEmitterService SERVICE = new BacklogMetricEmitterService();
+
+ private static MetricNotificationService metricNotificationService =
+ Services.get().getService(MetricNotificationService.SERVICE_NAME);
+
+ public static BacklogMetricEmitterService get() {
+ return SERVICE;
+ }
+
+ private BacklogMetricEmitterService() {
+ }
+
+ private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor1 = new ScheduledThreadPoolExecutor(1);
+ private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor2 = new ScheduledThreadPoolExecutor(1);
+
+
+ public static final ThreadLocal<SimpleDateFormat> DATE_FORMAT = new ThreadLocal<SimpleDateFormat>() {
+ @Override
+ protected SimpleDateFormat initialValue() {
+ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH-mm'Z'");
+ format.setTimeZone(TimeZone.getTimeZone("UTC"));
+ return format;
+ }
+ };
+
+ private static ConcurrentHashMap<Entity, List<MetricInfo>> entityBacklogs = new ConcurrentHashMap<>();
+
+ @Override
+ public void highSLAMissed(String entityName, String clusterName, EntityType entityType, Date nominalTime)
+ throws FalconException {
+
+ if (entityType != EntityType.PROCESS) {
+ return;
+ }
+ Entity entity = EntityUtil.getEntity(entityType, entityName);
+ entityBacklogs.putIfAbsent(entity, Collections.synchronizedList(new ArrayList<MetricInfo>()));
+ List<MetricInfo> metricInfoList = entityBacklogs.get(entity);
+ String nominalTimeStr = DATE_FORMAT.get().format(nominalTime);
+ MetricInfo metricInfo = new MetricInfo(nominalTimeStr, clusterName);
+ if (!metricInfoList.contains(metricInfo)) {
+ synchronized (metricInfoList) {
+ backlogMetricStore.addInstance(entityName, clusterName, nominalTime, entityType);
+ metricInfoList.add(metricInfo);
+ }
+ }
+ }
+
+ @Override
+ public String getName() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public void init() throws FalconException {
+ initInstances();
+ int emitInterval = Integer.parseInt(StartupProperties.get().getProperty(BACKLOG_METRIC_EMIT_INTERVAL,
+ "60000"));
+ int recheckInterval = Integer.parseInt(StartupProperties.get().getProperty(BACKLOG_METRIC_RECHECK_INTERVAL,
+ "60000"));
+ scheduledThreadPoolExecutor1.scheduleAtFixedRate(new BacklogMetricEmitter(),
+ 1, emitInterval, TimeUnit.MILLISECONDS);
+ scheduledThreadPoolExecutor2.scheduleAtFixedRate(new BacklogCheckService(),
+ 1, recheckInterval, TimeUnit.MILLISECONDS);
+ }
+
+ private void initInstances() throws FalconException {
+ LOG.info("Initializing backlog instances from state store");
+ Map<Entity, List<MetricInfo>> backlogInstances = backlogMetricStore.getAllInstances();
+ if (backlogInstances != null && !backlogInstances.isEmpty()) {
+ for (Map.Entry<Entity, List<MetricInfo>> entry : backlogInstances.entrySet()) {
+ List<MetricInfo> metricsInDB = entry.getValue();
+ List<MetricInfo> metricInfoList = Collections.synchronizedList(metricsInDB);
+ entityBacklogs.put(entry.getKey(), metricInfoList);
+ LOG.debug("Backlog of entity " + entry.getKey().getName() + " for instances " + metricInfoList);
+ }
+ }
+ }
+
+ @Override
+ public void destroy() throws FalconException {
+ scheduledThreadPoolExecutor1.shutdown();
+ scheduledThreadPoolExecutor2.shutdown();
+ }
+
+ @Override
+ public synchronized void onSuccess(WorkflowExecutionContext context) throws FalconException {
+ Entity entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName());
+ if (entity.getEntityType() != EntityType.PROCESS) {
+ return;
+ }
+ if (entityBacklogs.containsKey(entity)) {
+ List<MetricInfo> metrics = entityBacklogs.get(entity);
+ synchronized (metrics) {
+ Date date = SchemaHelper.parseDateUTC(context.getNominalTimeAsISO8601());
+ backlogMetricStore.deleteMetricInstance(entity.getName(), context.getClusterName(),
+ date, entity.getEntityType());
+ metrics.remove(new MetricInfo(DATE_FORMAT.get().format(date), context.getClusterName()));
+ if (metrics.isEmpty()) {
+ entityBacklogs.remove(entity);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(WorkflowExecutionContext context) throws FalconException {
+ // Do Nothing
+ }
+
+ @Override
+ public void onStart(WorkflowExecutionContext context) throws FalconException {
+ // Do Nothing
+ }
+
+ @Override
+ public void onSuspend(WorkflowExecutionContext context) throws FalconException {
+ // Do Nothing
+ }
+
+ @Override
+ public void onWait(WorkflowExecutionContext context) throws FalconException {
+ // Do Nothing
+ }
+
+ /**
+ * Service which executes backlog evaluation and publishing metrics to Graphite parallel for entities.
+ */
+ public static class BacklogMetricEmitter implements Runnable {
+ private ThreadPoolExecutor executor;
+
+ @Override
+ public void run() {
+ LOG.debug("BacklogMetricEmitter running for entities");
+ executor = new ScheduledThreadPoolExecutor(10);
+ List<Future> futures = new ArrayList<>();
+ try {
+ for (Entity entity : entityBacklogs.keySet()) {
+ futures.add(executor.submit(new BacklogCalcService(entity, entityBacklogs.get(entity))));
+ }
+ waitForFuturesToComplete(futures);
+ } finally {
+ executor.shutdown();
+ }
+ }
+
+ private void waitForFuturesToComplete(List<Future> futures) {
+ try {
+ for (Future future : futures) {
+ future.get();
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Interruption while executing tasks " + e);
+ } catch (ExecutionException e) {
+ LOG.error("Error in executing threads " + e);
+ }
+ }
+ }
+
+ /**
+ * Service which calculates backlog for given entity and publish to graphite.
+ */
+ public static class BacklogCalcService implements Runnable {
+
+ private Entity entityObj;
+ private List<MetricInfo> metrics;
+
+ BacklogCalcService(Entity entity, List<MetricInfo> metricInfoList) {
+ this.entityObj = entity;
+ this.metrics = metricInfoList;
+ }
+
+ @Override
+ public void run() {
+
+ MetricInfo metricInfo = null;
+ HashMap<String, Long> backLogsCluster = new HashMap<>();
+ synchronized (metrics) {
+ long currentTime = System.currentTimeMillis();
+ Iterator iter = metrics.iterator();
+ while (iter.hasNext()) {
+ try {
+ metricInfo = (MetricInfo) iter.next();
+ long time = DATE_FORMAT.get().parse(metricInfo.getNominalTime()).getTime();
+ long backlog = backLogsCluster.containsKey(metricInfo.getCluster())
+ ? backLogsCluster.get(metricInfo.getCluster()) : 0;
+ backlog += (currentTime - time);
+ backLogsCluster.put(metricInfo.getCluster(), backlog);
+ } catch (ParseException e) {
+ LOG.error("Unable to parse nominal time" + metricInfo.getNominalTime());
+ }
+ }
+
+ }
+ org.apache.falcon.entity.v0.process.Process process = (Process) entityObj;
+
+ if (backLogsCluster != null && !backLogsCluster.isEmpty()) {
+ for (Map.Entry<String, Long> entry : backLogsCluster.entrySet()) {
+ String clusterName = entry.getKey();
+ String pipelinesStr = process.getPipelines();
+ String metricName;
+ Long backlog = entry.getValue() / (60 * 1000L); // Converting to minutes
+ if (pipelinesStr != null && !pipelinesStr.isEmpty()) {
+ String[] pipelines = pipelinesStr.split(",");
+ for (String pipeline : pipelines) {
+ metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR
+ + pipeline + METRIC_SEPARATOR + LifeCycle.EXECUTION.name()
+ + METRIC_SEPARATOR + entityObj.getName() + METRIC_SEPARATOR
+ + "backlogInMins";
+ metricNotificationService.publish(metricName, backlog);
+ }
+ } else {
+ metricName = METRIC_PREFIX + METRIC_SEPARATOR + clusterName + METRIC_SEPARATOR
+ + DEFAULT_PIPELINE + METRIC_SEPARATOR + LifeCycle.EXECUTION.name()
+ + METRIC_SEPARATOR + entityObj.getName() + METRIC_SEPARATOR
+ + "backlogInMins";
+ metricNotificationService.publish(metricName, backlog);
+ }
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Service runs periodically and removes succeeded instances from backlog list.
+ */
+ public static class BacklogCheckService implements Runnable {
+
+ @Override
+ public void run() {
+ LOG.debug("BacklogCheckService running for entities");
+ try {
+ AbstractWorkflowEngine wfEngine = getWorkflowEngine();
+ for (Entity entity : entityBacklogs.keySet()) {
+ List<MetricInfo> metrics = entityBacklogs.get(entity);
+ if (!metrics.isEmpty()) {
+ synchronized (metrics) {
+ Iterator iterator = metrics.iterator();
+ while (iterator.hasNext()) {
+ MetricInfo metricInfo = (MetricInfo) iterator.next();
+ String nominalTimeStr = metricInfo.getNominalTime();
+ Date nominalTime;
+ try {
+ nominalTime = DATE_FORMAT.get().parse(nominalTimeStr);
+ if (entity.getACL().getOwner() != null && !entity.getACL().getOwner().isEmpty()) {
+ CurrentUser.authenticate(entity.getACL().getOwner());
+ } else {
+ CurrentUser.authenticate(System.getProperty("user.name"));
+ }
+ if (wfEngine.isMissing(entity)) {
+ LOG.info("Entity of name {} was deleted so removing instance of "
+ + "nominaltime {} ", entity.getName(), nominalTimeStr);
+ backlogMetricStore.deleteMetricInstance(entity.getName(),
+ metricInfo.getCluster(), nominalTime, entity.getEntityType());
+ iterator.remove();
+ continue;
+ }
+ InstancesResult status = wfEngine.getStatus(entity, nominalTime,
+ nominalTime, null, null);
+ if (status.getInstances().length > 0
+ && status.getInstances()[0].status == InstancesResult.
+ WorkflowStatus.SUCCEEDED) {
+ LOG.debug("Instance of nominaltime {} of entity {} was succeeded, removing "
+ + "from backlog entries", nominalTimeStr, entity.getName());
+ backlogMetricStore.deleteMetricInstance(entity.getName(),
+ metricInfo.getCluster(), nominalTime, entity.getEntityType());
+ iterator.remove();
+ }
+ } catch (ParseException e) {
+ LOG.error("Unable to parse date " + nominalTimeStr);
+ }
+ }
+ }
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error("Error while checking backlog metrics" + e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
index 57e46b7..a7cafeb 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAAlertService.java
@@ -150,7 +150,7 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
LOG.info("Entity :"+ entityName
+ "Cluster:" + clusterName + "Nominal Time:" + nominalTime + "EntityType:"+ entityType
+ "missed SLAHigh");
- highSLAMissed(entityName, clusterName, entityType, nominalTime);
+ highSLAMissed(entityName, clusterName, EntityType.valueOf(entityType), nominalTime);
}
}
} catch (FalconException e){
@@ -160,12 +160,12 @@ public final class EntitySLAAlertService implements FalconService, EntitySLAList
}
@Override
- public void highSLAMissed(String entityName, String clusterName, String entityType , Date nominalTime
+ public void highSLAMissed(String entityName, String clusterName, EntityType entityType , Date nominalTime
) throws FalconException {
LOG.debug("Listners called...");
for (EntitySLAListener listener : listeners) {
listener.highSLAMissed(entityName, clusterName, entityType, nominalTime);
- store.deleteEntityAlertInstance(entityName, clusterName, nominalTime, entityType);
+ store.deleteEntityAlertInstance(entityName, clusterName, nominalTime, entityType.name());
}
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
index 421ea38..73d383b 100644
--- a/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
+++ b/prism/src/main/java/org/apache/falcon/service/EntitySLAListener.java
@@ -18,6 +18,7 @@
package org.apache.falcon.service;
import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.EntityType;
import java.util.Date;
@@ -25,6 +26,6 @@ import java.util.Date;
* Interface for FeedSLAAlert to be used by Listeners.
*/
public interface EntitySLAListener {
- void highSLAMissed(String entityName, String clusterName, String entityType, Date nominalTime)
+ void highSLAMissed(String entityName, String clusterName, EntityType entityType, Date nominalTime)
throws FalconException;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/main/java/org/apache/falcon/util/MetricInfo.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/util/MetricInfo.java b/prism/src/main/java/org/apache/falcon/util/MetricInfo.java
new file mode 100644
index 0000000..694bb87
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/util/MetricInfo.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.util;
+
+/**
+ * Storage for Backlog Metrics.
+ */
+public class MetricInfo {
+
+ private String nominalTime;
+ private String cluster;
+
+ public MetricInfo(String nominalTimeStr, String clusterName) {
+ this.nominalTime = nominalTimeStr;
+ this.cluster = clusterName;
+ }
+
+ public String getNominalTime() {
+ return nominalTime;
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ public void setCluster(String cluster) {
+ this.cluster = cluster;
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || !o.getClass().equals(this.getClass())) {
+ return false;
+ }
+
+ MetricInfo other = (MetricInfo) o;
+
+ boolean nominalTimeEqual = this.getNominalTime() != null
+ ? this.getNominalTime().equals(other.getNominalTime()) : other.getNominalTime() == null;
+
+ boolean clusterEqual = this.getCluster() != null
+ ? this.getCluster().equals(other.getCluster()) : other.getCluster() == null;
+
+ return this == other
+ || (nominalTimeEqual && clusterEqual);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = nominalTime != null ? nominalTime.hashCode() : 0;
+ result = 31 * result + (cluster != null ? cluster.hashCode() : 0);
+ return result;
+ }
+
+ public String toString() {
+ return "Nominaltime: " + this.getNominalTime() + " cluster: " + this.getCluster();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java b/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java
new file mode 100644
index 0000000..67d256e
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/service/BacklogMetricEmitterServiceTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.service;
+
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.AbstractTestBase;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.jdbc.BacklogMetricStore;
+import org.apache.falcon.metrics.MetricNotificationService;
+import org.apache.falcon.tools.FalconStateStoreDBCLI;
+import org.apache.falcon.util.StateStoreProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test cases for Backlog Metric Store.
+ */
+public class BacklogMetricEmitterServiceTest extends AbstractTestBase{
+ private static final String DB_BASE_DIR = "target/test-data/backlogmetricdb";
+ protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
+ protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
+ protected static final String DB_SQL_FILE = DB_BASE_DIR + File.separator + "out.sql";
+ protected LocalFileSystem fs = new LocalFileSystem();
+
+ private static BacklogMetricStore backlogMetricStore;
+ private static FalconJPAService falconJPAService = FalconJPAService.get();
+ private static BacklogMetricEmitterService backlogMetricEmitterService;
+ private MetricNotificationService mockMetricNotificationService;
+
+ protected int execDBCLICommands(String[] args) {
+ return new FalconStateStoreDBCLI().run(args);
+ }
+
+ public void createDB(String file) {
+ File sqlFile = new File(file);
+ String[] argsCreate = { "create", "-sqlfile", sqlFile.getAbsolutePath(), "-run" };
+ int result = execDBCLICommands(argsCreate);
+ Assert.assertEquals(0, result);
+ Assert.assertTrue(sqlFile.exists());
+
+ }
+
+ @AfterClass
+ public void cleanup() throws IOException {
+ cleanupDB();
+ }
+
+ private void cleanupDB() throws IOException {
+ fs.delete(new Path(DB_BASE_DIR), true);
+ }
+
+ @BeforeClass
+ public void setup() throws Exception{
+ StateStoreProperties.get().setProperty(FalconJPAService.URL, url);
+ Configuration localConf = new Configuration();
+ fs.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
+ fs.mkdirs(new Path(DB_BASE_DIR));
+ createDB(DB_SQL_FILE);
+ falconJPAService.init();
+ this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
+ this.conf = dfsCluster.getConf();
+ backlogMetricStore = new BacklogMetricStore();
+ mockMetricNotificationService = Mockito.mock(MetricNotificationService.class);
+ Mockito.when(mockMetricNotificationService.getName()).thenReturn("MetricNotificationService");
+ Services.get().register(mockMetricNotificationService);
+ Services.get().register(BacklogMetricEmitterService.get());
+ backlogMetricEmitterService = BacklogMetricEmitterService.get();
+
+ }
+
+
+ @Test
+ public void testBacklogEmitter() throws Exception {
+ backlogMetricEmitterService.init();
+ storeEntity(EntityType.PROCESS, "entity1");
+ backlogMetricEmitterService.highSLAMissed("entity1", "cluster1", EntityType.PROCESS,
+ BacklogMetricEmitterService.DATE_FORMAT.get().parse("2016-06-30T00-00Z"));
+ Thread.sleep(10);
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ ArgumentCaptor<Long> valueCaptor = ArgumentCaptor.forClass(Long.class);
+ Mockito.verify(mockMetricNotificationService, Mockito.atLeastOnce()).publish(captor.capture(),
+ valueCaptor.capture());
+ Assert.assertEquals(captor.getValue(), "falcon.cluster1.testPipeline.EXECUTION.entity1.backlogInMins");
+ WorkflowExecutionContext workflowExecutionContext = getWorkflowExecutionContext();
+ backlogMetricEmitterService.onSuccess(workflowExecutionContext);
+ Thread.sleep(100);
+ Mockito.reset(mockMetricNotificationService);
+ Mockito.verify(mockMetricNotificationService, Mockito.times(0)).publish(Mockito.any(String.class),
+ Mockito.any(Long.class));
+
+ }
+
+ private WorkflowExecutionContext getWorkflowExecutionContext() {
+ Map<WorkflowExecutionArgs, String> args = new HashMap<>();
+ args.put(WorkflowExecutionArgs.ENTITY_TYPE, "process");
+ args.put(WorkflowExecutionArgs.CLUSTER_NAME, "cluster1");
+ args.put(WorkflowExecutionArgs.ENTITY_NAME, "entity1");
+ args.put(WorkflowExecutionArgs.NOMINAL_TIME, "2016-06-30-00-00");
+ args.put(WorkflowExecutionArgs.OPERATION, "GENERATE");
+ WorkflowExecutionContext workflowExecutionContext = new WorkflowExecutionContext(args);
+ return workflowExecutionContext;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/prism/src/test/resources/startup.properties
----------------------------------------------------------------------
diff --git a/prism/src/test/resources/startup.properties b/prism/src/test/resources/startup.properties
new file mode 100644
index 0000000..d72dbba
--- /dev/null
+++ b/prism/src/test/resources/startup.properties
@@ -0,0 +1,338 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+*.domain=debug
+
+######### Implementation classes #########
+## DONT MODIFY UNLESS SURE ABOUT CHANGE ##
+
+*.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine
+*.lifecycle.engine.impl=org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory
+*.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder
+*.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder
+*.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager
+*.ConfigSyncService.impl=org.apache.falcon.resource.ConfigSyncService
+*.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager
+*.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService
+
+##### Falcon Services #####
+*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
+ org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
+ org.apache.falcon.service.ProcessSubscriberService,\
+ org.apache.falcon.extensions.ExtensionService,\
+ org.apache.falcon.service.EntitySLAMonitoringService,\
+ org.apache.falcon.service.LifecyclePolicyMap,\
+ org.apache.falcon.entity.store.ConfigurationStore,\
+ org.apache.falcon.rerun.service.RetryService,\
+ org.apache.falcon.rerun.service.LateRunService,\
+ org.apache.falcon.metadata.MetadataMappingService,\
+ org.apache.falcon.service.LogCleanupService,\
+ org.apache.falcon.service.GroupsService,\
+ org.apache.falcon.service.ProxyUserService,\
+ org.apache.falcon.service.FalconJPAService
+##Add if you want to send data to graphite
+# org.apache.falcon.metrics.MetricNotificationService\
+## Add if you want to use Falcon Azure integration ##
+# org.apache.falcon.adfservice.ADFProviderService
+## If you wish to use Falcon native scheduler add the commented out services below to application.services ##
+# org.apache.falcon.notification.service.impl.JobCompletionService,\
+# org.apache.falcon.notification.service.impl.SchedulerService,\
+# org.apache.falcon.notification.service.impl.AlarmService,\
+# org.apache.falcon.notification.service.impl.DataAvailabilityService,\
+# org.apache.falcon.execution.FalconExecutionService,\
+
+
+
+# List of Lifecycle policies configured.
+*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete
+# List of builders for the policies.
+*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder
+##### Falcon Configuration Store Change listeners #####
+*.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
+ org.apache.falcon.entity.ColoClusterRelation,\
+ org.apache.falcon.group.FeedGroupMap,\
+ org.apache.falcon.entity.store.FeedLocationStore,\
+ org.apache.falcon.service.EntitySLAMonitoringService,\
+ org.apache.falcon.service.SharedLibraryHostingService
+## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ##
+# org.apache.falcon.state.store.jdbc.JdbcStateStore
+
+## If you wish to use Feed Alert to know when a feed misses a high SLA register your class here
+*.feedAlert.listeners=
+
+##### JMS MQ Broker Implementation class #####
+*.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
+
+##### List of shared libraries for Falcon workflows #####
+*.shared.libs=activemq-all,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el
+
+##### Workflow Job Execution Completion listeners #####
+*.workflow.execution.listeners=
+
+######### Implementation classes #########
+
+
+######### System startup parameters #########
+
+# Location of libraries that is shipped to Hadoop
+*.system.lib.location=${FALCON_HOME}/sharedlibs
+
+# Location to store user entity configurations
+
+#Configurations used in UTs
+debug.config.store.uri=file://${user.dir}/target/store
+#Location to store state of Feed SLA monitoring service
+debug.feed.sla.service.store.uri= file://${user.dir}/target/data/sla/pendingfeedinstances
+debug.config.oozie.conf.uri=${user.dir}/target/oozie
+debug.system.lib.location=${system.lib.location}
+debug.broker.url=vm://localhost
+debug.retry.recorder.path=${user.dir}/target/retry
+debug.libext.feed.retention.paths=${falcon.libext}
+debug.libext.feed.replication.paths=${falcon.libext}
+debug.libext.process.paths=${falcon.libext}
+
+debug.extension.store.uri=file://${user.dir}/target/extension/store
+
+#Configurations used in ITs
+it.config.store.uri=file://${user.dir}/target/store
+it.config.oozie.conf.uri=${user.dir}/target/oozie
+it.system.lib.location=${system.lib.location}
+it.broker.url=tcp://localhost:61616
+it.retry.recorder.path=${user.dir}/target/retry
+it.libext.feed.retention.paths=${falcon.libext}
+it.libext.feed.replication.paths=${falcon.libext}
+it.libext.process.paths=${falcon.libext}
+it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandler
+
+*.falcon.cleanup.service.frequency=minutes(5)
+
+######### Properties for Feed SLA Monitoring #########
+# frequency of serialization for the state of FeedSLAMonitoringService - 1 hour
+*.feed.sla.serialization.frequency.millis=3600000
+
+# Maximum number of pending instances per feed that will be recorded. After this older instances will be removed in
+# a FIFO fashion.
+*.feed.sla.queue.size=288
+
+# Do not change unless really sure
+# Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60
+*.feed.sla.statusCheck.frequency.seconds=600
+
+# Do not change unless really sure
+# Time Duration (in milliseconds) in future for generating pending feed instances.
+# In every cycle pending feed instances are added for monitoring, till this time in future.
+# It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000
+*.feed.sla.lookAheadWindow.millis=900000
+
+
+######### Properties for configuring JMS provider - activemq #########
+# Default Active MQ url
+*.broker.url=tcp://localhost:61616
+
+# default time-to-live for a JMS message 3 days (time in minutes)
+*.broker.ttlInMins=4320
+*.entity.topic=FALCON.ENTITY.TOPIC
+*.max.retry.failure.count=1
+*.retry.recorder.path=${user.dir}/logs/retry
+
+######### Properties for configuring iMon client and metric #########
+*.internal.queue.size=1000
+
+
+######### Graph Database Properties #########
+# Graph implementation
+*.falcon.graph.blueprints.graph=com.thinkaurelius.titan.core.TitanFactory
+
+# Graph Storage
+# IMPORTANT: Please enable one of the graph db backend: hbase or berkeleydb, per instructions below.
+
+# Enable the following for Berkeley DB. Make sure je-5.0.73.jar is downloaded and available
+# under Falcon webapp directory or under falcon server classpath.
+#*.falcon.graph.storage.backend=berkeleyje
+#*.falcon.graph.storage.directory=/${falcon.home}/data/graphdb
+#*.falcon.graph.serialize.path=${user.dir}/target/graphdb
+
+# Enable the following for HBase
+#*.falcon.graph.storage.backend=hbase
+# For standalone mode , set hostname to localhost; for distributed mode, set to the zookeeper quorum
+# @see http://s3.thinkaurelius.com/docs/titan/current/hbase.html#_remote_server_mode_2
+#*.falcon.graph.storage.hostname=localhost
+#*.falcon.graph.storage.hbase.table=falcon_titan
+
+# Avoid acquiring read lock when iterating over large graphs
+# See http://s3.thinkaurelius.com/docs/titan/0.5.4/bdb.html
+*.falcon.graph.storage.transactions=false
+
+# Uncomment and override the following properties for enabling metrics for titan db and pushing them to graphite. You
+# can use other reporters like ganglia also.
+# Refer (http://thinkaurelius.github.io/titan/wikidoc/0.4.2/Titan-Performance-and-Monitoring)for finding the
+# relevant configurations for your use case. NOTE: you have to prefix all the properties with "*.falcon.graph."
+# *.falcon.graph.storage.enable-basic-metrics = true
+# Required; IP or hostname string
+# *.falcon.graph.metrics.graphite.hostname = 192.168.0.1
+# Required; specify logging interval in milliseconds
+# *.falcon.graph.metrics.graphite.interval = 60000
+
+######### Authentication Properties #########
+
+# Authentication type must be specified: simple|kerberos
+*.falcon.authentication.type=simple
+
+##### Service Configuration
+
+# Indicates the Kerberos principal to be used in Falcon Service.
+*.falcon.service.authentication.kerberos.principal=
+
+# Location of the keytab file with the credentials for the Service principal.
+*.falcon.service.authentication.kerberos.keytab=
+
+# name node principal to talk to config store
+*.dfs.namenode.kerberos.principal=
+
+##### SPNEGO Configuration
+
+# Authentication type must be specified: simple|kerberos|<class>
+# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility
+*.falcon.http.authentication.type=simple
+
+# Indicates how long (in seconds) an authentication token is valid before it has to be renewed.
+*.falcon.http.authentication.token.validity=36000
+
+# The signature secret for signing the authentication tokens.
+*.falcon.http.authentication.signature.secret=falcon
+
+# The domain to use for the HTTP cookie that stores the authentication token.
+*.falcon.http.authentication.cookie.domain=
+
+# Indicates if anonymous requests are allowed when using 'simple' authentication.
+*.falcon.http.authentication.simple.anonymous.allowed=false
+
+# Indicates the Kerberos principal to be used for HTTP endpoint.
+# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
+*.falcon.http.authentication.kerberos.principal=
+
+# Location of the keytab file with the credentials for the HTTP principal.
+*.falcon.http.authentication.kerberos.keytab=
+
+# The kerberos names rules is to resolve kerberos principal names, refer to Hadoop's KerberosName for more details.
+*.falcon.http.authentication.kerberos.name.rules=DEFAULT
+
+# Comma separated list of black listed users
+*.falcon.http.authentication.blacklisted.users=
+
+######### Authentication Properties #########
+
+
+######### Authorization Properties #########
+
+# Authorization Enabled flag: false (default)|true
+*.falcon.security.authorization.enabled=false
+
+# The name of the group of super-users
+*.falcon.security.authorization.superusergroup=falcon
+
+# Admin Users, comma separated users
+*.falcon.security.authorization.admin.users=falcon,ambari-qa
+
+# Admin Group Membership, comma separated users
+*.falcon.security.authorization.admin.groups=falcon,staff
+
+# Authorization Provider Implementation Fully Qualified Class Name
+*.falcon.security.authorization.provider=org.apache.falcon.security.DefaultAuthorizationProvider
+
+######### Authorization Properties #########
+
+######### ADF Configurations start #########
+
+# A String object that represents the namespace
+*.microsoft.windowsazure.services.servicebus.namespace=
+
+# Request and status queues on the namespace
+*.microsoft.windowsazure.services.servicebus.requestqueuename=
+*.microsoft.windowsazure.services.servicebus.statusqueuename=
+
+# A String object that contains the SAS key name
+*.microsoft.windowsazure.services.servicebus.sasKeyName=
+
+# A String object that contains the SAS key
+*.microsoft.windowsazure.services.servicebus.sasKey=
+
+# A String object containing the base URI that is added to your Service Bus namespace to form the URI to connect
+# to the Service Bus service. To access the default public Azure service, pass ".servicebus.windows.net"
+*.microsoft.windowsazure.services.servicebus.serviceBusRootUri=
+
+# Service bus polling frequency
+*.microsoft.windowsazure.services.servicebus.polling.frequency=
+
+# Super user
+*.microsoft.windowsazure.services.servicebus.superuser=
+
+######### ADF Configurations end ###########
+
+######### SMTP Properties ########
+
+# Setting SMTP hostname
+#*.falcon.email.smtp.host=localhost
+
+# Setting SMTP port number
+#*.falcon.email.smtp.port=25
+
+# Setting email from address
+#*.falcon.email.from.address=falcon@localhost
+
+# Setting email Auth
+#*.falcon.email.smtp.auth=false
+
+#Setting user name
+#*.falcon.email.smtp.user=""
+
+#Setting password
+#*.falcon.email.smtp.password=""
+
+# Setting monitoring plugin, if SMTP parameters is defined
+#*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\
+# org.apache.falcon.plugin.EmailNotificationPlugin
+
+######### StateStore Properties #####
+#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
+#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
+#*.falcon.statestore.jdbc.url=jdbc:derby:data/statestore.db;create=true
+#*.falcon.statestore.jdbc.username=sa
+#*.falcon.statestore.jdbc.password=
+#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
+## Maximum number of active connections that can be allocated from this pool at the same time.
+#*.falcon.statestore.pool.max.active.conn=10
+#*.falcon.statestore.connection.properties=
+## Indicates the interval (in milliseconds) between eviction runs.
+#*.falcon.statestore.validate.db.connection.eviction.interval=300000
+## The number of objects to examine during each run of the idle object evictor thread.
+#*.falcon.statestore.validate.db.connection.eviction.num=10
+## Creates Falcon DB.
+## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
+## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
+#*.falcon.statestore.create.db.schema=true
+
+# Graphite properties
+*.falcon.graphite.hostname=localhost
+*.falcon.graphite.port=2003
+*.falcon.graphite.frequency=1
+*.falcon.graphite.prefix=falcon
+
+# Backlog Metric Properties
+*.falcon.backlog.metricservice.emit.interval.millisecs=10
+*.falcon.backlog.metricservice.recheck.interval.millisecs=1000
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index 6dbec0c..7b7da0a 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -128,6 +128,11 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
}
@Override
+ public boolean isMissing(Entity entity) throws FalconException {
+ return !STATE_STORE.entityExists(new EntityID(entity));
+ }
+
+ @Override
public String suspend(Entity entity) throws FalconException {
EXECUTION_SERVICE.suspend(entity);
return "SUCCESS";
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/src/build/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/src/build/findbugs-exclude.xml b/src/build/findbugs-exclude.xml
index 5c35b8c..346583d 100644
--- a/src/build/findbugs-exclude.xml
+++ b/src/build/findbugs-exclude.xml
@@ -57,6 +57,12 @@
<Bug pattern="UWF_UNWRITTEN_FIELD,NP_BOOLEAN_RETURN_NULL" />
</Match>
+ <Match>
+ <Class name="org.apache.falcon.persistence.BacklogMetricBean" />
+ <Bug pattern="UWF_UNWRITTEN_FIELD,NP_BOOLEAN_RETURN_NULL" />
+ </Match>
+
+
<Match>
<Class name="org.apache.falcon.persistence.MonitoredEntityBean" />
http://git-wip-us.apache.org/repos/asf/falcon/blob/c7996deb/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index a107eca..ef07e57 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -58,6 +58,15 @@
##Add if you want to send data to graphite
# org.apache.falcon.metrics.MetricNotificationService\
+
+##Add if you want to enable BacklogMetricService
+# org.apache.falcon.service.FalconJPAService,\
+# org.apache.falcon.metrics.MetricNotificationService,\
+# org.apache.falcon.service.EntitySLAMonitoringService,\
+# org.apache.falcon.service.EntitySLAAlertService,\
+# org.apache.falcon.service.BacklogMetricEmitterService
+
+
## Add if you want to use Falcon Azure integration ##
# org.apache.falcon.adfservice.ADFProviderService
## If you wish to use Falcon native scheduler uncomment out below application services and comment out above application services ##
@@ -160,6 +169,9 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
# It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000
*.feed.sla.lookAheadWindow.millis=900000
+##Add if you want to enable BacklogMetricService
+#*.feedAlert.listeners=org.apache.falcon.service.BacklogMetricEmitterService
+
######### Properties for configuring JMS provider - activemq #########
# Default Active MQ url
*.broker.url=tcp://localhost:61616
@@ -337,3 +349,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
#*.falcon.graphite.port=2003
#*.falcon.graphite.frequency=1
#*.falcon.graphite.prefix=falcon
+
+# Backlog Metric Properties
+#*.falcon.backlog.metricservice.emit.interval.millisecs=60000
+#*.falcon.backlog.metricservice.recheck.interval.millisecs=600000