You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2016/11/29 18:09:45 UTC
falcon git commit: FALCON-2044 Persist Process stats in db
Repository: falcon
Updated Branches:
refs/heads/master 49fa46e29 -> 1f28bde6f
FALCON-2044 Persist Process stats in db
Author: Praveen Adlakha <ad...@gmail.com>
Reviewers: Ajay Yadava <aj...@apache.org>
Closes #196 from PraveenAdlakha/2044
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/1f28bde6
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/1f28bde6
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/1f28bde6
Branch: refs/heads/master
Commit: 1f28bde6f49aedd2ca95181f483c609a5304aecc
Parents: 49fa46e
Author: Praveen Adlakha <ad...@gmail.com>
Authored: Tue Nov 29 13:09:23 2016 -0500
Committer: Ajay Yadava <aj...@apache.org>
Committed: Tue Nov 29 13:09:23 2016 -0500
----------------------------------------------------------------------
.../persistence/PersistenceConstants.java | 2 +-
.../persistence/ProcessInstanceInfoBean.java | 131 +++++++++++++++++++
.../falcon/tools/FalconStateStoreDBCLI.java | 1 +
.../src/main/resources/META-INF/persistence.xml | 16 ++-
common/src/main/resources/startup.properties | 3 +-
.../site/twiki/GraphiteMetricCollection.twiki | 24 ----
docs/src/site/twiki/MetricCollection.twiki | 37 ++++++
docs/src/site/twiki/Operability.twiki | 6 +-
.../falcon/jdbc/MonitoringJdbcStateStore.java | 36 +++++
.../plugin/ProcessExecutionStatsPlugin.java | 76 +++++++++++
.../jdbc/MonitoringJdbcStateStoreTest.java | 11 ++
src/build/findbugs-exclude.xml | 5 +
src/conf/startup.properties | 5 +-
13 files changed, 317 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index fc82ae7..26a5cd4 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -79,5 +79,5 @@ public final class PersistenceConstants {
public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS";
public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB";
public static final String GET_EXTENSION_JOB = "GET_EXTENSION_JOB";
-
+ public static final String GET_ALL_PROCESS_INFO_INSTANCES = "GET_ALL_PROCESS_INFO_INSTANCES";
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/common/src/main/java/org/apache/falcon/persistence/ProcessInstanceInfoBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/ProcessInstanceInfoBean.java b/common/src/main/java/org/apache/falcon/persistence/ProcessInstanceInfoBean.java
new file mode 100644
index 0000000..c408510
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/ProcessInstanceInfoBean.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.persistence;
+
+import javax.persistence.Entity;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Column;
+import javax.validation.constraints.NotNull;
+import java.util.Date;
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Class to store info regarding process history.
+ */
+@Entity
+@NamedQueries({
+ @NamedQuery(name= PersistenceConstants.GET_ALL_PROCESS_INFO_INSTANCES , query = "select OBJECT(a) from ProcessInstanceInfoBean a ")
+})
+@Table(name = "ProcessInstanceInfo")
+//RESUME CHECKSTYLE CHECK LineLengthCheck
+public class ProcessInstanceInfoBean {
+ @NotNull
+ @GeneratedValue(strategy = GenerationType.AUTO)
+ @Id
+ private String id;
+
+ @NotNull
+ @Column(name = "process_name")
+ private String processName;
+
+ @NotNull
+ @Column(name = "colo")
+ private String colo;
+
+ public String getPipeline() {
+ return pipeline;
+ }
+
+ public void setPipeline(String pipeline) {
+ this.pipeline = pipeline;
+ }
+
+ @NotNull
+ @Column(name = "pipeline")
+ private String pipeline;
+
+ @NotNull
+ @Column(name = "status")
+ private String status;
+
+ @NotNull
+ @Column(name = "nominal_time")
+ private Date nominalTime;
+
+ @NotNull
+ @Column(name = "start_delay")
+ private long startDelay;
+
+ @NotNull
+ @Column(name = "processing_time")
+ private long processingTime;
+
+ public Date getNominalTime() {
+ return nominalTime;
+ }
+
+ public void setNominalTime(Date nominalTime) {
+ this.nominalTime = nominalTime;
+ }
+
+ public String getProcessName() {
+ return processName;
+ }
+
+ public void setProcessName(String processName) {
+ this.processName = processName;
+ }
+
+ public String getColo() {
+ return colo;
+ }
+
+ public void setColo(String colo) {
+ this.colo = colo;
+ }
+
+ public long getStartDelay() {
+ return startDelay;
+ }
+
+ public void setStartDelay(long startDelay) {
+ this.startDelay = startDelay;
+ }
+
+ public long getProcessingTime() {
+ return processingTime;
+ }
+
+ public void setProcessingTime(long processingTime) {
+ this.processingTime = processingTime;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
index 0c04da3..6ad887e 100644
--- a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
+++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -249,6 +249,7 @@ public class FalconStateStoreDBCLI {
args.add("org.apache.falcon.persistence.BacklogMetricBean");
args.add("org.apache.falcon.persistence.ExtensionBean");
args.add("org.apache.falcon.persistence.ExtensionJobsBean");
+ args.add("org.apache.falcon.persistence.ProcessInstanceInfoBean");
return args.toArray(new String[args.size()]);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/common/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml
index 0f20103..8d0bd25 100644
--- a/common/src/main/resources/META-INF/persistence.xml
+++ b/common/src/main/resources/META-INF/persistence.xml
@@ -31,7 +31,7 @@
<class>org.apache.falcon.persistence.BacklogMetricBean</class>
<class>org.apache.falcon.persistence.ExtensionBean</class>
<class>org.apache.falcon.persistence.ExtensionJobsBean</class>
-
+ <class>org.apache.falcon.persistence.ProcessInstanceInfoBean</class>
<properties>
<property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -41,8 +41,8 @@
value="jpa(Types=org.apache.falcon.persistence.EntityBean;
org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean;
- org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean)"/>
-
+ org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean;
+ org.apache.falcon.persistence.ProcessInstanceInfoBean)"/>
<property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
<property name="openjpa.LockManager" value="pessimistic"/>
<property name="openjpa.ReadLockLevel" value="read"/>
@@ -67,6 +67,7 @@
<class>org.apache.falcon.persistence.BacklogMetricBean</class>
<class>org.apache.falcon.persistence.ExtensionBean</class>
<class>org.apache.falcon.persistence.ExtensionJobsBean</class>
+ <class>org.apache.falcon.persistence.ProcessInstanceInfoBean</class>
<properties>
<property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -76,8 +77,8 @@
value="jpa(Types=org.apache.falcon.persistence.EntityBean;
org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean;
- org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean)"/>
-
+ org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean;
+ org.apache.falcon.persistence.ProcessInstanceInfoBean)"/>
<property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
<property name="openjpa.LockManager" value="pessimistic"/>
<property name="openjpa.ReadLockLevel" value="read"/>
@@ -101,6 +102,7 @@
<class>org.apache.falcon.persistence.BacklogMetricBean</class>
<class>org.apache.falcon.persistence.ExtensionBean</class>
<class>org.apache.falcon.persistence.ExtensionJobsBean</class>
+ <class>org.apache.falcon.persistence.ProcessInstanceInfoBean</class>
<properties>
<property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -110,8 +112,8 @@
value="jpa(Types=org.apache.falcon.persistence.EntityBean;
org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean;
- org.apache.falcon.persistence.BacklogMetricBean;org.apache.falcon.persistence.ExtensionBean;
- org.apache.falcon.persistence.ExtensionJobsBean)"/>
+ org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean;
+ org.apache.falcon.persistence.ProcessInstanceInfoBean)"/>
<property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
<property name="openjpa.LockManager" value="pessimistic"/>
<property name="openjpa.ReadLockLevel" value="read"/>
http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 5d5da5a..f91f3b6 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -313,7 +313,8 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
# Setting monitoring plugin, if SMTP parameters is defined
#*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\
-# org.apache.falcon.plugin.EmailNotificationPlugin
+# org.apache.falcon.plugin.EmailNotificationPlugin,\
+# org.apache.falcon.plugin.ProcessExecutionStatsPlugin
######### StateStore Properties #####
#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/docs/src/site/twiki/GraphiteMetricCollection.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/GraphiteMetricCollection.twiki b/docs/src/site/twiki/GraphiteMetricCollection.twiki
deleted file mode 100644
index 0ae0498..0000000
--- a/docs/src/site/twiki/GraphiteMetricCollection.twiki
+++ /dev/null
@@ -1,24 +0,0 @@
----++Graphite Metric Collection
-
-Graphite Metric Collection currently allows to collect the following metrics at process level :
-
-1. Processing time the process spent in the running state in seconds (workflow_end_time - workflow_start_time)
-2. Wait time that the process spent in the waiting/ready state. (workflow_start_time - workflow_nominal_time)
-3. Number of instances that are failed for a process.
-
-To send data to graphite we need to intialize metricNotificationService in startup.properties:
-
-<verbatim>
-*.application.services= org.apache.falcon.metrics.MetricNotificationService,
-</verbatim>
-
-Add following properties for graphiteNotificationPlugin :
-
-*Graphite properties*
-<verbatim>
- * *.falcon.graphite.hostname=localhost
- * *.falcon.graphite.port=2003
- * *.falcon.graphite.frequency=1
- * *.falcon.graphite.prefix=falcon
-</verbatim>
-The falcon.graphite.frequency is in seconds and all the time that is being sent to graphite is in seconds.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/docs/src/site/twiki/MetricCollection.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/MetricCollection.twiki b/docs/src/site/twiki/MetricCollection.twiki
new file mode 100644
index 0000000..636c739
--- /dev/null
+++ b/docs/src/site/twiki/MetricCollection.twiki
@@ -0,0 +1,37 @@
+---++Metric Collection
+
+Metric Collection currently allows to collect the following metrics at process level:
+
+ 1. Processing time the process spent in the running state in seconds (workflow_end_time - workflow_start_time)
+ 1.Wait time that the process spent in the waiting/ready state. (workflow_start_time - workflow_nominal_time)
+ 1.Number of instances that are failed for a process.
+
+To send data to *Graphite*
+
+Falcon need to intialize metricNotificationService in startup.properties:
+
+<verbatim>
+*.application.services= org.apache.falcon.metrics.MetricNotificationService,
+</verbatim>
+
+Add following properties for graphiteNotificationPlugin :
+
+*Graphite properties*
+<verbatim>
+ * *.falcon.graphite.hostname=localhost
+ * *.falcon.graphite.port=2003
+ * *.falcon.graphite.frequency=1
+ * *.falcon.graphite.prefix=falcon
+</verbatim>
+The falcon.graphite.frequency is in seconds and all the time that is being sent to graphite is in seconds.
+
+
+To send data to *Falcon DB*
+
+Falcon needs to *!ProcessInstanceInfo* table in the database have a look at [[FalconDatabase]] to know how to create it.
+
+Add the following properties in the startup.properties:
+
+<verbatim>
+#*.monitoring.plugins=org.apache.falcon.plugin.ProcessExecutionStatsPlugin
+</verbatim>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/docs/src/site/twiki/Operability.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Operability.twiki b/docs/src/site/twiki/Operability.twiki
index f01c235..e21ada8 100644
--- a/docs/src/site/twiki/Operability.twiki
+++ b/docs/src/site/twiki/Operability.twiki
@@ -228,6 +228,8 @@ Users may also extend the Falcon Audit plugin to send audits to systems like Apa
extending org.apache.falcon.plugin.AuditingPlugin interface.
----++ Metrics Collection In Graphite
+---++ Metrics Collection In Graphite and Database
-Falcon has support to send metrics to graphite more details regarding this can be found on [[GraphiteMetricCollection][Graphite Metric Collection]]
\ No newline at end of file
+Falcon has support to send process metrics like waiting time ,exection time and number of failures to graphite and falcon db.
+
+For details go through [[MetricCollection][Metric Collection]]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
index 552ebde..669e18d 100644
--- a/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
+++ b/prism/src/main/java/org/apache/falcon/jdbc/MonitoringJdbcStateStore.java
@@ -25,6 +25,7 @@ import org.apache.falcon.persistence.PendingInstanceBean;
import org.apache.falcon.persistence.PersistenceConstants;
import org.apache.falcon.persistence.ResultNotFoundException;
import org.apache.falcon.persistence.EntitySLAAlertBean;
+import org.apache.falcon.persistence.ProcessInstanceInfoBean;
import org.apache.falcon.service.FalconJPAService;
import javax.persistence.EntityManager;
@@ -198,6 +199,41 @@ public class MonitoringJdbcStateStore {
return result;
}
+ public void putProcessInstance(String processName, String colo, Long nominalTime, Long startDelay,
+ Long processingTime, String pipeline, String status){
+ ProcessInstanceInfoBean processInstanceInfoBean = new ProcessInstanceInfoBean();
+ processInstanceInfoBean.setProcessName(processName);
+ processInstanceInfoBean.setColo(colo);
+ processInstanceInfoBean.setNominalTime(new Date(nominalTime));
+ processInstanceInfoBean.setStartDelay(startDelay);
+ processInstanceInfoBean.setProcessingTime(processingTime);
+ processInstanceInfoBean.setPipeline(pipeline);
+ processInstanceInfoBean.setStatus(status);
+
+ EntityManager entityManager = getEntityManager();
+ try {
+ beginTransaction(entityManager);
+ entityManager.persist(processInstanceInfoBean);
+ } finally {
+ commitAndCloseTransaction(entityManager);
+ }
+ }
+
+ public List<ProcessInstanceInfoBean> getAllInstancesProcessInstance(){
+ EntityManager entityManager = getEntityManager();
+ Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_PROCESS_INFO_INSTANCES);
+ List result = q.getResultList();
+
+ try {
+ if (CollectionUtils.isEmpty(result)) {
+ return null;
+ }
+ } finally{
+ entityManager.close();
+ }
+ return result;
+ }
+
private void commitAndCloseTransaction(EntityManager entityManager) {
entityManager.getTransaction().commit();
entityManager.close();
http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/prism/src/main/java/org/apache/falcon/plugin/ProcessExecutionStatsPlugin.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/plugin/ProcessExecutionStatsPlugin.java b/prism/src/main/java/org/apache/falcon/plugin/ProcessExecutionStatsPlugin.java
new file mode 100644
index 0000000..676c17b
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/plugin/ProcessExecutionStatsPlugin.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.plugin;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.aspect.ResourceMessage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.jdbc.MonitoringJdbcStateStore;
+import org.joda.time.DateTime;
+import org.joda.time.Seconds;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This plugin writes process completion time ,number of failures and wait time to DB.
+ */
+public class ProcessExecutionStatsPlugin implements MonitoringPlugin {
+ private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutionStatsPlugin.class);
+
+ private static final MonitoringJdbcStateStore MONITORING_JDBC_STATE_STORE = new MonitoringJdbcStateStore();
+
+ @Override
+ public void monitor(ResourceMessage message) {
+ try {
+ String entityType = StringUtils.isNotBlank(message.getDimensions().get("entityType"))
+ ? message.getDimensions().get("entityType") :message.getDimensions().get("entity-type");
+ String entityName = StringUtils.isNotBlank(message.getDimensions().get("entityName"))
+ ? message.getDimensions().get("entityName") :message.getDimensions().get("entity-name");
+ LOG.debug("message:" + message.getAction());
+ if (entityType.equalsIgnoreCase(EntityType.PROCESS.name())
+ && ConfigurationStore.get().get(EntityType.PROCESS, entityName) != null) {
+ Process process = ConfigurationStore.get().get(EntityType.PROCESS, entityName);
+ String pipelines = StringUtils.isNotBlank(process.getPipelines()) ? process.getPipelines()
+ : "__untagged";
+ String cluster = message.getDimensions().get("cluster");
+ DateTime nominalTime = new DateTime(message.getDimensions().get("nominal-time"));
+ DateTime startTime = new DateTime(message.getDimensions().get("start-time"));
+ Long startDelay = (long) Seconds.secondsBetween(nominalTime, startTime).getSeconds();
+ Long timeTaken = message.getExecutionTime() / 1000000000;
+
+ String [] pipelineNames = pipelines.split(",");
+
+ for(String name : pipelineNames){
+
+ if ((message.getAction().equals("wf-instance-succeeded"))) {
+ MONITORING_JDBC_STATE_STORE.putProcessInstance(entityName, cluster, nominalTime.getMillis(),
+ startDelay, timeTaken, name, "succeeded");
+ }
+ if (message.getAction().equals("wf-instance-failed")){
+ MONITORING_JDBC_STATE_STORE.putProcessInstance(entityName, cluster, nominalTime.getMillis(),
+ startDelay, timeTaken, name, "failed");
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Exception in sending metrics to FalconDB:", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
index a64b654..860fbfc 100644
--- a/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
+++ b/prism/src/test/java/org/apache/falcon/jdbc/MonitoringJdbcStateStoreTest.java
@@ -19,6 +19,7 @@ package org.apache.falcon.jdbc;
import java.io.File;
import java.util.Date;
+import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.Query;
@@ -27,6 +28,7 @@ import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.AbstractTestBase;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.persistence.ProcessInstanceInfoBean;
import org.apache.falcon.service.FalconJPAService;
import org.apache.falcon.tools.FalconStateStoreDBCLI;
import org.apache.falcon.util.StateStoreProperties;
@@ -173,6 +175,15 @@ public class MonitoringJdbcStateStoreTest extends AbstractTestBase {
"test-cluster", dateOne, EntityType.PROCESS.toString()).getIsSLAHighMissed());
}
+ @Test
+ public void putProcessInstance() throws Exception{
+ MonitoringJdbcStateStore store = new MonitoringJdbcStateStore();
+ store.putProcessInstance("test-process", "test-colo", 1466602429423L, 99999999L, 99999999L, "test", "failed");
+ List<ProcessInstanceInfoBean> list = store.getAllInstancesProcessInstance();
+ ProcessInstanceInfoBean processInstanceInfoBean = list.get(0);
+ Assert.assertEquals("test-process", processInstanceInfoBean.getProcessName());
+ }
+
private void clear() {
EntityManager em = FalconJPAService.get().getEntityManager();
em.getTransaction().begin();
http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/src/build/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/src/build/findbugs-exclude.xml b/src/build/findbugs-exclude.xml
index 04e267f..189f2f8 100644
--- a/src/build/findbugs-exclude.xml
+++ b/src/build/findbugs-exclude.xml
@@ -75,6 +75,11 @@
</Match>
<Match>
+ <Class name="org.apache.falcon.persistence.ProcessInstanceInfoBean" />
+ <Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" />
+ </Match>
+
+ <Match>
<Class name="org.apache.falcon.persistence.ExtensionJobsBean" />
<Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" />
</Match>
http://git-wip-us.apache.org/repos/asf/falcon/blob/1f28bde6/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 6a95cce..901c3a9 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -358,8 +358,11 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
#*.falcon.email.smtp.password=""
# Setting monitoring plugin, if SMTP parameters is defined
+# DefaultMonitoringPlugin
#*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\
-# org.apache.falcon.plugin.EmailNotificationPlugin
+# org.apache.falcon.plugin.EmailNotificationPlugin,\
+# org.apache.falcon.plugin.ProcessExecutionStatsPlugin
+
# Graphite properties
#*.falcon.graphite.hostname=localhost
#*.falcon.graphite.port=2003