You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/03/14 10:30:55 UTC
git commit: FALCON-355 Remove SLAMonitoringService. Contributed by
Shwetha GS
Repository: incubator-falcon
Updated Branches:
refs/heads/master bb55a2c9f -> 5e4352151
FALCON-355 Remove SLAMonitoringService. Contributed by Shwetha GS
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/5e435215
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/5e435215
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/5e435215
Branch: refs/heads/master
Commit: 5e43521519900f7778c7db938aaec85ee4489a52
Parents: bb55a2c
Author: Shwetha GS <sh...@gmail.com>
Authored: Fri Mar 14 15:00:43 2014 +0530
Committer: Shwetha GS <sh...@gmail.com>
Committed: Fri Mar 14 15:00:43 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
common/src/main/resources/startup.properties | 1 -
.../org/apache/falcon/aspect/GenericAlert.java | 9 -
.../workflow/engine/OozieWorkflowEngine.java | 325 ++++++++-----------
.../falcon/service/FalconTopicSubscriber.java | 15 -
.../falcon/service/SLAMonitoringService.java | 237 --------------
src/conf/startup.properties | 1 -
7 files changed, 136 insertions(+), 454 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5e435215/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b1f8cb1..5748d27 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,8 @@ Trunk (Unreleased)
(Venkatesh Seetharam)
IMPROVEMENTS
+ FALCON-355 Remove SLAMonitoringService. (Shwetha GS)
+
FALCON-333 jsp-api dependency is defined twice. (Jean-Baptiste
Onofrévia Shaik Idris)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5e435215/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 0b941eb..457b3a6 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -33,7 +33,6 @@
org.apache.falcon.entity.store.ConfigurationStore,\
org.apache.falcon.rerun.service.RetryService,\
org.apache.falcon.rerun.service.LateRunService,\
- org.apache.falcon.service.SLAMonitoringService,\
org.apache.falcon.service.LogCleanupService
*.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
org.apache.falcon.entity.ColoClusterRelation,\
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5e435215/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
index 0b680ba..5ab2f72 100644
--- a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
+++ b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
@@ -102,15 +102,6 @@ public final class GenericAlert {
}
- @Monitored(event = "sla-miss")
- public static String alertOnLikelySLAMiss(
- @Dimension(value = "cluster") String cluster,
- @Dimension(value = "entity-type") String entityType,
- @Dimension(value = "entity-name") String entityName,
- @Dimension(value = "nominal-time") String nominalTime) {
- return "IGNORE";
- }
-
@Monitored(event = "log-cleanup-service-failed")
public static String alertLogCleanupServiceFailed(
@Dimension(value = "message") String message,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5e435215/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index dee77c0..ac8862e 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -60,34 +60,32 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
public static final String ENGINE = "oozie";
private static final BundleJob MISSING = new NullBundleJob();
- private static final List<WorkflowJob.Status> WF_KILL_PRECOND = Arrays.asList(WorkflowJob.Status.PREP,
- WorkflowJob.Status.RUNNING, WorkflowJob.Status.SUSPENDED, WorkflowJob.Status.FAILED);
- private static final List<WorkflowJob.Status> WF_SUSPEND_PRECOND = Arrays
- .asList(WorkflowJob.Status.RUNNING);
- private static final List<WorkflowJob.Status> WF_RESUME_PRECOND = Arrays
- .asList(WorkflowJob.Status.SUSPENDED);
- private static final List<WorkflowJob.Status> WF_RERUN_PRECOND = Arrays.asList(WorkflowJob.Status.FAILED,
- WorkflowJob.Status.KILLED, WorkflowJob.Status.SUCCEEDED);
- private static final List<CoordinatorAction.Status> COORD_RERUN_PRECOND = Arrays
- .asList(CoordinatorAction.Status.TIMEDOUT, CoordinatorAction.Status.FAILED);
-
- private static final List<Job.Status> BUNDLE_ACTIVE_STATUS = Arrays.asList(
- Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUSPENDED,
- Job.Status.PREPSUSPENDED, Job.Status.DONEWITHERROR);
- private static final List<Job.Status> BUNDLE_SUSPENDED_STATUS = Arrays.asList(
- Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED);
- private static final List<Job.Status> BUNDLE_RUNNING_STATUS = Arrays.asList(
- Job.Status.PREP, Job.Status.RUNNING);
-
- private static final List<Job.Status> BUNDLE_SUSPEND_PRECOND = Arrays.asList(
- Job.Status.PREP, Job.Status.RUNNING, Job.Status.DONEWITHERROR);
- private static final List<Job.Status> BUNDLE_RESUME_PRECOND = Arrays.asList(
- Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED);
+ private static final List<WorkflowJob.Status> WF_KILL_PRECOND =
+ Arrays.asList(WorkflowJob.Status.PREP, WorkflowJob.Status.RUNNING, WorkflowJob.Status.SUSPENDED,
+ WorkflowJob.Status.FAILED);
+ private static final List<WorkflowJob.Status> WF_SUSPEND_PRECOND = Arrays.asList(WorkflowJob.Status.RUNNING);
+ private static final List<WorkflowJob.Status> WF_RESUME_PRECOND = Arrays.asList(WorkflowJob.Status.SUSPENDED);
+ private static final List<WorkflowJob.Status> WF_RERUN_PRECOND =
+ Arrays.asList(WorkflowJob.Status.FAILED, WorkflowJob.Status.KILLED, WorkflowJob.Status.SUCCEEDED);
+ private static final List<CoordinatorAction.Status> COORD_RERUN_PRECOND =
+ Arrays.asList(CoordinatorAction.Status.TIMEDOUT, CoordinatorAction.Status.FAILED);
+
+ private static final List<Job.Status> BUNDLE_ACTIVE_STATUS =
+ Arrays.asList(Job.Status.PREP, Job.Status.RUNNING, Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED,
+ Job.Status.DONEWITHERROR);
+ private static final List<Job.Status> BUNDLE_SUSPENDED_STATUS =
+ Arrays.asList(Job.Status.PREPSUSPENDED, Job.Status.SUSPENDED);
+ private static final List<Job.Status> BUNDLE_RUNNING_STATUS = Arrays.asList(Job.Status.PREP, Job.Status.RUNNING);
+
+ private static final List<Job.Status> BUNDLE_SUSPEND_PRECOND =
+ Arrays.asList(Job.Status.PREP, Job.Status.RUNNING, Job.Status.DONEWITHERROR);
+ private static final List<Job.Status> BUNDLE_RESUME_PRECOND =
+ Arrays.asList(Job.Status.SUSPENDED, Job.Status.PREPSUSPENDED);
private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters";
private static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters";
- private static final String[] BUNDLE_UPDATEABLE_PROPS = new String[]{
- "parallel", "clusters.clusters[\\d+].validity.end", };
+ private static final String[] BUNDLE_UPDATEABLE_PROPS =
+ new String[]{"parallel", "clusters.clusters[\\d+].validity.end", };
public OozieWorkflowEngine() {
registerListener(new OozieHouseKeepingService());
@@ -153,8 +151,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
ACTIVE, RUNNING, SUSPENDED
}
- private boolean isBundleInState(Entity entity, BundleStatus status)
- throws FalconException {
+ private boolean isBundleInState(Entity entity, BundleStatus status) throws FalconException {
Map<String, BundleJob> bundles = findLatestBundle(entity);
for (BundleJob bundle : bundles.values()) {
@@ -190,8 +187,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private List<BundleJob> findBundles(Entity entity, String clusterName) throws FalconException {
try {
- List<BundleJob> jobs = OozieClientFactory.get(clusterName).getBundleJobsInfo(
- OozieClient.FILTER_NAME + "=" + EntityUtil.getWorkflowName(entity) + ";", 0, 256);
+ List<BundleJob> jobs = OozieClientFactory.get(clusterName).getBundleJobsInfo(OozieClient.FILTER_NAME
+ + "=" + EntityUtil.getWorkflowName(entity) + ";", 0, 256);
if (jobs != null) {
List<BundleJob> filteredJobs = new ArrayList<BundleJob>();
for (BundleJob job : jobs) {
@@ -276,8 +273,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
return result;
}
- private String doBundleAction(Entity entity, BundleAction action, String cluster)
- throws FalconException {
+ private String doBundleAction(Entity entity, BundleAction action, String cluster) throws FalconException {
List<BundleJob> jobs = findBundles(entity, cluster);
if (jobs.isEmpty()) {
@@ -290,16 +286,16 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
switch (action) {
case SUSPEND:
// not already suspended and preconditions are true
- if (!BUNDLE_SUSPENDED_STATUS.contains(job.getStatus())
- && BUNDLE_SUSPEND_PRECOND.contains(job.getStatus())) {
+ if (!BUNDLE_SUSPENDED_STATUS.contains(job.getStatus()) && BUNDLE_SUSPEND_PRECOND.contains(
+ job.getStatus())) {
suspend(cluster, job.getId());
}
break;
case RESUME:
// not already running and preconditions are true
- if (!BUNDLE_RUNNING_STATUS.contains(job.getStatus())
- && BUNDLE_RESUME_PRECOND.contains(job.getStatus())) {
+ if (!BUNDLE_RUNNING_STATUS.contains(job.getStatus()) && BUNDLE_RESUME_PRECOND.contains(
+ job.getStatus())) {
resume(cluster, job.getId());
}
break;
@@ -327,8 +323,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
//set end time of bundle
- client.change(job.getId(),
- OozieClient.CHANGE_VALUE_ENDTIME + "=" + SchemaHelper.formatDateUTC(new Date()));
+ client.change(job.getId(), OozieClient.CHANGE_VALUE_ENDTIME + "=" + SchemaHelper.formatDateUTC(new Date()));
LOG.debug("Changed end time of bundle " + job.getId() + " on cluster " + clusterName);
//kill bundle
@@ -339,8 +334,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
}
- private void beforeAction(Entity entity, BundleAction action, String cluster)
- throws FalconException {
+ private void beforeAction(Entity entity, BundleAction action, String cluster) throws FalconException {
for (WorkflowEngineActionListener listener : listeners) {
switch (action) {
@@ -360,8 +354,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
}
- private void afterAction(Entity entity, BundleAction action, String cluster)
- throws FalconException {
+ private void afterAction(Entity entity, BundleAction action, String cluster) throws FalconException {
for (WorkflowEngineActionListener listener : listeners) {
switch (action) {
@@ -382,18 +375,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
@Override
- public InstancesResult getRunningInstances(Entity entity)
- throws FalconException {
+ public InstancesResult getRunningInstances(Entity entity) throws FalconException {
try {
- WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(
- ENGINE, entity);
+ WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
List<Instance> runInstances = new ArrayList<Instance>();
String[] wfNames = builder.getWorkflowNames(entity);
List<String> coordNames = new ArrayList<String>();
for (String wfName : wfNames) {
- if (EntityUtil.getWorkflowName(Tag.RETENTION, entity)
- .toString().equals(wfName)) {
+ if (EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString().equals(wfName)) {
continue;
}
coordNames.add(wfName);
@@ -409,23 +399,18 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
continue;
}
- CoordinatorAction action = client.getCoordActionInfo(wf
- .getParentId());
- String nominalTimeStr = SchemaHelper
- .formatDateUTC(action.getNominalTime());
- Instance instance = new Instance(cluster,
- nominalTimeStr, WorkflowStatus.RUNNING);
+ CoordinatorAction action = client.getCoordActionInfo(wf.getParentId());
+ String nominalTimeStr = SchemaHelper.formatDateUTC(action.getNominalTime());
+ Instance instance = new Instance(cluster, nominalTimeStr, WorkflowStatus.RUNNING);
instance.startTime = wf.getStartTime();
if (entity.getEntityType() == EntityType.FEED) {
- instance.sourceCluster = getSourceCluster(cluster,
- action, entity);
+ instance.sourceCluster = getSourceCluster(cluster, action, entity);
}
runInstances.add(instance);
}
}
}
- return new InstancesResult("Running Instances",
- runInstances.toArray(new Instance[runInstances.size()]));
+ return new InstancesResult("Running Instances", runInstances.toArray(new Instance[runInstances.size()]));
} catch (OozieClientException e) {
throw new FalconException(e);
@@ -433,39 +418,36 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
@Override
- public InstancesResult killInstances(Entity entity, Date start, Date end,
- Properties props) throws FalconException {
+ public InstancesResult killInstances(Entity entity, Date start, Date end, Properties props) throws FalconException {
return doJobAction(JobAction.KILL, entity, start, end, props);
}
@Override
- public InstancesResult reRunInstances(Entity entity, Date start, Date end,
- Properties props) throws FalconException {
+ public InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props)
+ throws FalconException {
return doJobAction(JobAction.RERUN, entity, start, end, props);
}
@Override
- public InstancesResult suspendInstances(Entity entity, Date start,
- Date end, Properties props) throws FalconException {
+ public InstancesResult suspendInstances(Entity entity, Date start, Date end, Properties props)
+ throws FalconException {
return doJobAction(JobAction.SUSPEND, entity, start, end, props);
}
@Override
- public InstancesResult resumeInstances(Entity entity, Date start, Date end,
- Properties props) throws FalconException {
+ public InstancesResult resumeInstances(Entity entity, Date start, Date end, Properties props)
+ throws FalconException {
return doJobAction(JobAction.RESUME, entity, start, end, props);
}
@Override
- public InstancesResult getStatus(Entity entity, Date start, Date end)
- throws FalconException {
+ public InstancesResult getStatus(Entity entity, Date start, Date end) throws FalconException {
return doJobAction(JobAction.STATUS, entity, start, end, null);
}
@Override
- public InstancesSummaryResult getSummary(Entity entity, Date start, Date end)
- throws FalconException {
+ public InstancesSummaryResult getSummary(Entity entity, Date start, Date end) throws FalconException {
return doSummaryJobAction(entity, start, end, null);
}
@@ -474,8 +456,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
KILL, SUSPEND, RESUME, RERUN, STATUS, SUMMARY
}
- private WorkflowJob getWorkflowInfo(String cluster, String wfId)
- throws FalconException {
+ private WorkflowJob getWorkflowInfo(String cluster, String wfId) throws FalconException {
try {
return OozieClientFactory.get(cluster).getJobInfo(wfId);
} catch (OozieClientException e) {
@@ -483,8 +464,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
}
- private InstancesResult doJobAction(JobAction action, Entity entity,
- Date start, Date end, Properties props) throws FalconException {
+ private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, Properties props)
+ throws FalconException {
Map<String, List<CoordinatorAction>> actionsMap = getCoordActions(entity, start, end);
List<String> clusterList = getIncludedClusters(props, FALCON_INSTANCE_ACTION_CLUSTERS);
List<String> sourceClusterList = getIncludedClusters(props, FALCON_INSTANCE_SOURCE_CLUSTERS);
@@ -518,8 +499,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
String nominalTimeStr = SchemaHelper.formatDateUTC(coordinatorAction.getNominalTime());
- InstancesResult.Instance instance = new InstancesResult.Instance(
- cluster, nominalTimeStr, WorkflowStatus.valueOf(status));
+ InstancesResult.Instance instance =
+ new InstancesResult.Instance(cluster, nominalTimeStr, WorkflowStatus.valueOf(status));
if (StringUtils.isNotEmpty(coordinatorAction.getExternalId())) {
WorkflowJob jobInfo = getWorkflowInfo(cluster, coordinatorAction.getExternalId());
instance.startTime = jobInfo.getStartTime();
@@ -539,8 +520,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
return instancesResult;
}
- private InstancesSummaryResult doSummaryJobAction(Entity entity,
- Date start, Date end, Properties props) throws FalconException {
+ private InstancesSummaryResult doSummaryJobAction(Entity entity, Date start, Date end, Properties props)
+ throws FalconException {
Map<String, List<BundleJob>> bundlesMap = findBundles(entity);
List<InstanceSummary> instances = new ArrayList<InstanceSummary>();
@@ -565,15 +546,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
TimeZone tz = EntityUtil.getTimeZone(coord.getTimeZone());
Date iterStart = EntityUtil.getNextStartTime(coord.getStartTime(), freq, tz, start);
Date iterEnd = (coord.getLastActionTime() != null && coord.getLastActionTime().before(end)
- ? coord.getLastActionTime() : end);
+ ? coord.getLastActionTime() : end);
if (i == (applicableCoords.size() - 1)) {
isLastCoord = true;
}
int startActionNumber = EntityUtil.getInstanceSequence(coord.getStartTime(), freq, tz, iterStart);
- int lastMaterializedActionNumber = EntityUtil.getInstanceSequence(coord.getStartTime(),
- freq, tz, iterEnd);
+ int lastMaterializedActionNumber =
+ EntityUtil.getInstanceSequence(coord.getStartTime(), freq, tz, iterEnd);
int endActionNumber = EntityUtil.getInstanceSequence(coord.getStartTime(), freq, tz, end);
if (lastMaterializedActionNumber < startActionNumber) {
@@ -587,7 +568,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
CoordinatorJob coordJob;
try {
coordJob = client.getCoordJobInfo(coord.getId(), null, startActionNumber,
- (lastMaterializedActionNumber - startActionNumber));
+ (lastMaterializedActionNumber - startActionNumber));
} catch (OozieClientException e) {
LOG.debug("Unable to get details for coordinator " + coord.getId() + " " + e.getMessage());
throw new FalconException(e);
@@ -602,12 +583,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
instancesSummary.put("UNSCHEDULED", unscheduledInstances);
}
- InstanceSummary summary= new InstanceSummary(cluster, instancesSummary);
+ InstanceSummary summary = new InstanceSummary(cluster, instancesSummary);
instances.add(summary);
}
- InstancesSummaryResult instancesSummaryResult = new InstancesSummaryResult(APIResult.Status.SUCCEEDED,
- JobAction.SUMMARY.name());
+ InstancesSummaryResult instancesSummaryResult =
+ new InstancesSummaryResult(APIResult.Status.SUCCEEDED, JobAction.SUMMARY.name());
instancesSummaryResult.setInstancesSummary(instances.toArray(new InstanceSummary[instances.size()]));
return instancesSummaryResult;
}
@@ -615,10 +596,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private void updateInstanceSummary(CoordinatorJob coordJob, Map<String, Long> instancesSummary) {
List<CoordinatorAction> actions = coordJob.getActions();
- for (CoordinatorAction coordAction : actions) {
+ for (CoordinatorAction coordAction : actions) {
if (instancesSummary.containsKey(coordAction.getStatus().name())) {
instancesSummary.put(coordAction.getStatus().name(),
- instancesSummary.get(coordAction.getStatus().name()) + 1L);
+ instancesSummary.get(coordAction.getStatus().name()) + 1L);
} else {
instancesSummary.put(coordAction.getStatus().name(), 1L);
}
@@ -626,7 +607,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private String performAction(String cluster, JobAction action, CoordinatorAction coordinatorAction,
- Properties props) throws FalconException {
+ Properties props) throws FalconException {
WorkflowJob jobInfo = null;
String status = coordinatorAction.getStatus().name();
if (StringUtils.isNotEmpty(coordinatorAction.getExternalId())) {
@@ -684,30 +665,28 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
return mapActionStatus(status);
}
- private void reRunCoordAction(String cluster, CoordinatorAction coordinatorAction) throws FalconException {
- try{
+ private void reRunCoordAction(String cluster, CoordinatorAction coordinatorAction) throws FalconException {
+ try {
OozieClient client = OozieClientFactory.get(cluster);
- client.reRunCoord(coordinatorAction.getJobId(),
- RestConstants.JOB_COORD_RERUN_ACTION,
- Integer.toString(coordinatorAction.getActionNumber()), true, true);
+ client.reRunCoord(coordinatorAction.getJobId(), RestConstants.JOB_COORD_RERUN_ACTION,
+ Integer.toString(coordinatorAction.getActionNumber()), true, true);
assertCoordActionStatus(cluster, coordinatorAction.getId(),
org.apache.oozie.client.CoordinatorAction.Status.RUNNING,
- org.apache.oozie.client.CoordinatorAction.Status.WAITING,
+ org.apache.oozie.client.CoordinatorAction.Status.WAITING,
org.apache.oozie.client.CoordinatorAction.Status.READY);
LOG.info("Rerun job " + coordinatorAction.getId() + " on cluster " + cluster);
- }catch (Exception e) {
+ } catch (Exception e) {
LOG.error("Unable to rerun workflows", e);
throw new FalconException(e);
}
}
private void assertCoordActionStatus(String cluster, String coordActionId,
- org.apache.oozie.client.CoordinatorAction.Status... statuses)
- throws FalconException, OozieClientException {
+ org.apache.oozie.client.CoordinatorAction.Status... statuses) throws FalconException, OozieClientException {
OozieClient client = OozieClientFactory.get(cluster);
CoordinatorAction actualStatus = client.getCoordActionInfo(coordActionId);
for (int counter = 0; counter < 3; counter++) {
- for(org.apache.oozie.client.CoordinatorAction.Status status : statuses) {
+ for (org.apache.oozie.client.CoordinatorAction.Status status : statuses) {
if (status.equals(actualStatus.getStatus())) {
return;
}
@@ -719,9 +698,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
actualStatus = client.getCoordActionInfo(coordActionId);
}
- throw new FalconException("For Job" + coordActionId + ", actual statuses: "
- +actualStatus + ", expected statuses: "
- + Arrays.toString(statuses));
+ throw new FalconException("For Job" + coordActionId + ", actual statuses: " + actualStatus + ", "
+ + "expected statuses: " + Arrays.toString(statuses));
}
private String getSourceCluster(String cluster, CoordinatorAction coordinatorAction, Entity entity)
@@ -734,10 +712,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
}
- private List<String> getIncludedClusters(Properties props,
- String clustersType) {
- String clusters = props == null ? "" : props.getProperty(clustersType,
- "");
+ private List<String> getIncludedClusters(Properties props, String clustersType) {
+ String clusters = props == null ? "" : props.getProperty(clustersType, "");
List<String> clusterList = new ArrayList<String>();
for (String cluster : clusters.split(",")) {
if (StringUtils.isNotEmpty(cluster)) {
@@ -749,8 +725,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private String mapActionStatus(String status) {
if (CoordinatorAction.Status.READY.toString().equals(status)
- || CoordinatorAction.Status.WAITING.toString().equals(status)
- || CoordinatorAction.Status.SUBMITTED.toString().equals(status)) {
+ || CoordinatorAction.Status.WAITING.toString().equals(status)
+ || CoordinatorAction.Status.SUBMITTED.toString().equals(status)) {
return InstancesResult.WorkflowStatus.WAITING.name();
} else if (CoordinatorAction.Status.DISCARDED.toString().equals(status)) {
return InstancesResult.WorkflowStatus.KILLED.name();
@@ -763,8 +739,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
}
- protected Map<String, List<CoordinatorAction>> getCoordActions(
- Entity entity, Date start, Date end) throws FalconException {
+ protected Map<String, List<CoordinatorAction>> getCoordActions(Entity entity, Date start, Date end)
+ throws FalconException {
Map<String, List<BundleJob>> bundlesMap = findBundles(entity);
Map<String, List<CoordinatorAction>> actionsMap = new HashMap<String, List<CoordinatorAction>>();
@@ -804,17 +780,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private Frequency createFrequency(String frequency, Timeunit timeUnit) {
- return new Frequency(frequency, OozieTimeUnit.valueOf(timeUnit.name())
- .getFalconTimeUnit());
+ return new Frequency(frequency, OozieTimeUnit.valueOf(timeUnit.name()).getFalconTimeUnit());
}
/**
* TimeUnit as understood by Oozie.
*/
private enum OozieTimeUnit {
- MINUTE(TimeUnit.minutes), HOUR(TimeUnit.hours), DAY(TimeUnit.days), WEEK(
- null), MONTH(TimeUnit.months), END_OF_DAY(null), END_OF_MONTH(
- null), NONE(null);
+ MINUTE(TimeUnit.minutes), HOUR(TimeUnit.hours), DAY(TimeUnit.days), WEEK(null), MONTH(TimeUnit.months),
+ END_OF_DAY(null), END_OF_MONTH(null), NONE(null);
private TimeUnit falconTimeUnit;
@@ -824,32 +798,27 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
public TimeUnit getFalconTimeUnit() {
if (falconTimeUnit == null) {
- throw new IllegalStateException("Invalid coord frequency: "
- + name());
+ throw new IllegalStateException("Invalid coord frequency: " + name());
}
return falconTimeUnit;
}
}
- private List<CoordinatorJob> getApplicableCoords(Entity entity, ProxyOozieClient client,
- Date start, Date end, List<BundleJob> bundles)
- throws FalconException {
+ private List<CoordinatorJob> getApplicableCoords(Entity entity, ProxyOozieClient client, Date start, Date end,
+ List<BundleJob> bundles) throws FalconException {
List<CoordinatorJob> applicableCoords = new ArrayList<CoordinatorJob>();
try {
for (BundleJob bundle : bundles) {
- List<CoordinatorJob> coords = client.getBundleJobInfo(
- bundle.getId()).getCoordinators();
+ List<CoordinatorJob> coords = client.getBundleJobInfo(bundle.getId()).getCoordinators();
for (CoordinatorJob coord : coords) {
- String coordName = EntityUtil.getWorkflowName(
- Tag.RETENTION, entity).toString();
+ String coordName = EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString();
if (coordName.equals(coord.getAppName())) {
continue;
}
// if end time is before coord-start time or start time is
// after coord-end time ignore.
- if (!(end.compareTo(coord.getStartTime()) <= 0 || start
- .compareTo(coord.getEndTime()) >= 0)) {
+ if (!(end.compareTo(coord.getStartTime()) <= 0 || start.compareTo(coord.getEndTime()) >= 0)) {
applicableCoords.add(coord);
}
}
@@ -894,8 +863,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
LOG.info("Updating entity through Workflow Engine" + newEntity.toShortString());
Date newEndTime = EntityUtil.getEndTime(newEntity, cluster);
if (newEndTime.before(now())) {
- throw new FalconException("New end time for " + newEntity.getName()
- + " is past current time. Entity can't be updated. Use remove and add");
+ throw new FalconException("New end time for " + newEntity.getName() + " is past current time. Entity "
+ + "can't be updated. Use remove and add");
}
LOG.debug("Updating for cluster : " + cluster + ", bundle: " + bundle.getId());
@@ -904,12 +873,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
// only concurrency and endtime are changed. So, change coords
LOG.info("Change operation is adequate! : " + cluster + ", bundle: " + bundle.getId());
updateCoords(cluster, bundle, EntityUtil.getParallel(newEntity),
- EntityUtil.getEndTime(newEntity, cluster));
+ EntityUtil.getEndTime(newEntity, cluster));
return newEndTime;
}
- LOG.debug("Going to update ! : " + newEntity.toShortString() + "for cluster " + cluster + ", bundle: "
- + bundle.getId());
+ LOG.debug("Going to update ! : " + newEntity.toShortString() + "for cluster " + cluster + ", "
+ + "bundle: " + bundle.getId());
effectiveTime = updateInternal(oldEntity, newEntity, cluster, bundle, false, effectiveTime);
LOG.info("Entity update complete : " + newEntity.toShortString() + cluster + ", bundle: " + bundle.getId());
}
@@ -933,22 +902,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
LOG.info("Triggering update for " + cluster + ", " + affectedProcBundle.getId());
- //TODO handle roll forward
-// BundleJob feedBundle = findLatestBundle(newEntity, cluster);
-// if (feedBundle == MISSING) {
-// throw new IllegalStateException("Unable to find feed bundle in " + cluster
-// + " for entity " + newEntity.getName());
-// }
-// boolean processCreated = feedBundle.getCreatedTime().before(
-// affectedProcBundle.getCreatedTime());
-
Date depEndTime =
- updateInternal(affectedEntity, affectedEntity, cluster, affectedProcBundle, false, effectiveTime);
+ updateInternal(affectedEntity, affectedEntity, cluster, affectedProcBundle, false, effectiveTime);
if (effectiveTime == null || effectiveTime.after(depEndTime)) {
effectiveTime = depEndTime;
}
- LOG.info("Entity update complete : " + affectedEntity.toShortString() + cluster
- + ", bundle: " + affectedProcBundle.getId());
+ LOG.info("Entity update complete : " + affectedEntity.toShortString() + cluster + ", "+ "bundle: "
+ + affectedProcBundle.getId());
}
LOG.info("Entity update and all dependent entities updated: " + oldEntity.toShortString());
return effectiveTime;
@@ -979,8 +939,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private Date getCoordLastActionTime(CoordinatorJob coord) {
if (coord.getNextMaterializedTime() != null) {
- Calendar cal = Calendar.getInstance(EntityUtil.getTimeZone(coord
- .getTimeZone()));
+ Calendar cal = Calendar.getInstance(EntityUtil.getTimeZone(coord.getTimeZone()));
cal.setTime(coord.getLastActionTime());
Frequency freq = createFrequency(String.valueOf(coord.getFrequency()), coord.getTimeUnit());
cal.add(freq.getTimeUnit().getCalendarUnit(), -freq.getFrequencyAsInt());
@@ -989,16 +948,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
return null;
}
- private void updateCoords(String cluster, BundleJob bundle, int concurrency,
- Date endTime) throws FalconException {
+ private void updateCoords(String cluster, BundleJob bundle, int concurrency, Date endTime) throws FalconException {
if (endTime.compareTo(now()) <= 0) {
throw new FalconException("End time " + SchemaHelper.formatDateUTC(endTime) + " can't be in the past");
}
// change coords
for (CoordinatorJob coord : bundle.getCoordinators()) {
- LOG.debug("Updating endtime of coord " + coord.getId() + " to "
- + SchemaHelper.formatDateUTC(endTime) + " on cluster " + cluster);
+ LOG.debug("Updating endtime of coord " + coord.getId() + " to " + SchemaHelper.formatDateUTC(endTime)
+ + " on cluster " + cluster);
Date lastActionTime = getCoordLastActionTime(coord);
if (lastActionTime == null) { // nothing is materialized
LOG.info("Nothing is materialized for this coord: " + coord.getId());
@@ -1011,12 +969,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
} else {
LOG.info("Actions have materialized for this coord: " + coord.getId() + ", last action "
- + SchemaHelper.formatDateUTC(lastActionTime));
+ + SchemaHelper.formatDateUTC(lastActionTime));
if (!endTime.after(lastActionTime)) {
Date pauseTime = offsetTime(endTime, -1);
// set pause time which deletes future actions
- LOG.info("Setting pause time on coord : " + coord.getId() + " to "
- + SchemaHelper.formatDateUTC(pauseTime));
+ LOG.info("Setting pause time on coord : " + coord.getId() + " to " + SchemaHelper.formatDateUTC(
+ pauseTime));
change(cluster, coord.getId(), concurrency, null, SchemaHelper.formatDateUTC(pauseTime));
}
change(cluster, coord.getId(), concurrency, endTime, "");
@@ -1037,9 +995,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private Date updateInternal(Entity oldEntity, Entity newEntity, String cluster, BundleJob oldBundle,
- boolean alreadyCreated, Date inEffectiveTime) throws FalconException {
+ boolean alreadyCreated, Date inEffectiveTime) throws FalconException {
OozieWorkflowBuilder<Entity> builder =
- (OozieWorkflowBuilder<Entity>) WorkflowBuilder.getBuilder(ENGINE, oldEntity);
+ (OozieWorkflowBuilder<Entity>) WorkflowBuilder.getBuilder(ENGINE, oldEntity);
Job.Status oldBundleStatus = oldBundle.getStatus();
//Suspend coords as bundle suspend doesn't suspend coords synchronously
@@ -1075,7 +1033,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
} else {
LOG.info("New bundle has already been created. Bundle Id: " + latestBundle.getId() + ", Start: "
- + SchemaHelper.formatDateUTC(latestBundle.getStartTime()) + ", End: " + latestBundle.getEndTime());
+ + SchemaHelper.formatDateUTC(latestBundle.getStartTime()) + ", End: " + latestBundle.getEndTime());
//pick effectiveTime from already created bundle
effectiveTime = getMinStartTime(latestBundle);
@@ -1132,14 +1090,11 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
}
- private List<WorkflowJob> getRunningWorkflows(String cluster,
- List<String> wfNames) throws FalconException {
+ private List<WorkflowJob> getRunningWorkflows(String cluster, List<String> wfNames) throws FalconException {
StringBuilder filter = new StringBuilder();
- filter.append(OozieClient.FILTER_STATUS).append('=')
- .append(Job.Status.RUNNING.name());
+ filter.append(OozieClient.FILTER_STATUS).append('=').append(Job.Status.RUNNING.name());
for (String wfName : wfNames) {
- filter.append(';').append(OozieClient.FILTER_NAME).append('=')
- .append(wfName);
+ filter.append(';').append(OozieClient.FILTER_NAME).append('=').append(wfName);
}
try {
@@ -1150,8 +1105,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
@Override
- public void reRun(String cluster, String jobId, Properties props)
- throws FalconException {
+ public void reRun(String cluster, String jobId, Properties props) throws FalconException {
ProxyOozieClient client = OozieClientFactory.get(cluster);
try {
@@ -1175,8 +1129,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
}
- private void assertStatus(String cluster, String jobId, Status... statuses)
- throws FalconException {
+ private void assertStatus(String cluster, String jobId, Status... statuses) throws FalconException {
String actualStatus = getWorkflowStatus(cluster, jobId);
for (int counter = 0; counter < 3; counter++) {
@@ -1191,9 +1144,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
actualStatus = getWorkflowStatus(cluster, jobId);
}
- throw new FalconException("For Job" + jobId + ", actual statuses: "
- + actualStatus + ", expected statuses: "
- + Arrays.toString(statuses));
+ throw new FalconException("For Job" + jobId + ", actual statuses: " + actualStatus + ", expected statuses: "
+ + Arrays.toString(statuses));
}
private boolean statusEquals(String left, Status... right) {
@@ -1206,8 +1158,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
@Override
- public String getWorkflowStatus(String cluster, String jobId)
- throws FalconException {
+ public String getWorkflowStatus(String cluster, String jobId) throws FalconException {
ProxyOozieClient client = OozieClientFactory.get(cluster);
try {
@@ -1242,8 +1193,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private String run(String cluster, Properties props) throws FalconException {
try {
String jobId = OozieClientFactory.get(cluster).run(props);
- LOG.info("Submitted " + jobId + " on cluster " + cluster
- + " with properties : " + props);
+ LOG.info("Submitted " + jobId + " on cluster " + cluster + " with properties : " + props);
return jobId;
} catch (OozieClientException e) {
LOG.error("Unable to schedule workflows", e);
@@ -1254,8 +1204,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private void suspend(String cluster, String jobId) throws FalconException {
try {
OozieClientFactory.get(cluster).suspend(jobId);
- assertStatus(cluster, jobId, Status.PREPSUSPENDED, Status.SUSPENDED, Status.SUCCEEDED,
- Status.FAILED, Status.KILLED);
+ assertStatus(cluster, jobId, Status.PREPSUSPENDED, Status.SUSPENDED, Status.SUCCEEDED, Status.FAILED,
+ Status.KILLED);
LOG.info("Suspended job " + jobId + " on cluster " + cluster);
} catch (OozieClientException e) {
throw new FalconException(e);
@@ -1275,27 +1225,24 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private void kill(String cluster, String jobId) throws FalconException {
try {
OozieClientFactory.get(cluster).kill(jobId);
- assertStatus(cluster, jobId, Status.KILLED, Status.SUCCEEDED,
- Status.FAILED);
+ assertStatus(cluster, jobId, Status.KILLED, Status.SUCCEEDED, Status.FAILED);
LOG.info("Killed job " + jobId + " on cluster " + cluster);
} catch (OozieClientException e) {
throw new FalconException(e);
}
}
- private void change(String cluster, String jobId, String changeValue)
- throws FalconException {
+ private void change(String cluster, String jobId, String changeValue) throws FalconException {
try {
OozieClientFactory.get(cluster).change(jobId, changeValue);
- LOG.info("Changed bundle/coord " + jobId + ": " + changeValue
- + " on cluster " + cluster);
+ LOG.info("Changed bundle/coord " + jobId + ": " + changeValue + " on cluster " + cluster);
} catch (OozieClientException e) {
throw new FalconException(e);
}
}
- private void change(String cluster, String id, int concurrency,
- Date endTime, String pauseTime) throws FalconException {
+ private void change(String cluster, String id, int concurrency, Date endTime, String pauseTime)
+ throws FalconException {
StringBuilder changeValue = new StringBuilder();
changeValue.append(OozieClient.CHANGE_VALUE_CONCURRENCY).append("=").append(concurrency).append(";");
if (endTime != null) {
@@ -1319,9 +1266,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
CoordinatorJob coord = client.getCoordJobInfo(id);
for (int counter = 0; counter < 3; counter++) {
Date intendedPauseTime = (StringUtils.isEmpty(pauseTime) ? null : SchemaHelper.parseDateUTC(pauseTime));
- if (coord.getConcurrency() != concurrency
- || (endTime != null && !coord.getEndTime().equals(endTime))
- || (intendedPauseTime != null && !intendedPauseTime.equals(coord.getPauseTime()))) {
+ if (coord.getConcurrency() != concurrency || (endTime != null && !coord.getEndTime().equals(endTime))
+ || (intendedPauseTime != null && !intendedPauseTime.equals(coord.getPauseTime()))) {
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {
@@ -1332,12 +1278,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
coord = client.getCoordJobInfo(id);
}
- LOG.error("Failed to change coordinator. Current value "
- + coord.getConcurrency() + ", "
- + SchemaHelper.formatDateUTC(coord.getEndTime()) + ", "
- + SchemaHelper.formatDateUTC(coord.getPauseTime()));
- throw new FalconException("Failed to change coordinator " + id
- + " with change value " + changeValueStr);
+ LOG.error("Failed to change coordinator. Current value " + coord.getConcurrency() + ", "
+ + SchemaHelper.formatDateUTC(coord.getEndTime()) + ", " + SchemaHelper.formatDateUTC(
+ coord.getPauseTime()));
+ throw new FalconException("Failed to change coordinator " + id + " with change value " + changeValueStr);
} catch (OozieClientException e) {
throw new FalconException(e);
}
@@ -1355,8 +1299,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
@Override
- public InstancesResult getJobDetails(String cluster, String jobId)
- throws FalconException {
+ public InstancesResult getJobDetails(String cluster, String jobId) throws FalconException {
Instance[] instances = new Instance[1];
Instance instance = new Instance();
try {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5e435215/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
index e780c18..a23c396 100644
--- a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
+++ b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
@@ -126,7 +126,6 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
entityName, nominalTime, workflowId, workflowUser, runId, operation,
SchemaHelper.formatDateUTC(startTime), duration);
- notifySLAService(cluster, entityName, entityType, nominalTime, duration);
notifyMetadataMappingService(entityName, operation, mapMessage.getString(ARG.logDir.getArgName()));
}
} catch (JMSException e) {
@@ -138,20 +137,6 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
}
}
- private void notifySLAService(String cluster, String entityName,
- String entityType, String nominalTime, Long duration) {
- try {
- getSLAMonitoringService().notifyCompletion(EntityUtil.getEntity(entityType, entityName),
- cluster, SchemaHelper.parseDateUTC(nominalTime), duration);
- } catch (Throwable e) {
- LOG.warn("Unable to notify SLA Service", e);
- }
- }
-
- private SLAMonitoringService getSLAMonitoringService() {
- return Services.get().getService(SLAMonitoringService.SERVICE_NAME);
- }
-
private void notifyMetadataMappingService(String entityName, String operation,
String logDir) throws FalconException {
if (Services.get().isRegistered(MetadataMappingService.SERVICE_NAME)) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5e435215/prism/src/main/java/org/apache/falcon/service/SLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/SLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/SLAMonitoringService.java
deleted file mode 100644
index d3d9e19..0000000
--- a/prism/src/main/java/org/apache/falcon/service/SLAMonitoringService.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.service;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.aspect.GenericAlert;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.workflow.WorkflowEngineFactory;
-import org.apache.falcon.workflow.engine.WorkflowEngineActionListener;
-import org.apache.log4j.Logger;
-
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-
-/**
- * A service implementation for SLA Monitoring.
- */
-public class SLAMonitoringService implements FalconService, WorkflowEngineActionListener {
- private static final Logger LOG = Logger.getLogger(SLAMonitoringService.class);
- public static final String SERVICE_NAME = "SLAMonitor";
-
- private ConcurrentMap<String, Long> monitoredEntities = new ConcurrentHashMap<String, Long>();
-
- private ConcurrentMap<String, ConcurrentMap<Date, Date>> pendingJobs
- = new ConcurrentHashMap<String, ConcurrentMap<Date, Date>>();
-
- private static final long INITIAL_LATENCY_SECS = 12 * 3600;
-
- private static final long POLL_PERIODICITY_SECS = 300;
-
- @Override
- public String getName() {
- return SERVICE_NAME;
- }
-
- @Override
- public void init() throws FalconException {
- WorkflowEngineFactory.getWorkflowEngine().registerListener(this);
- ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
- executor.scheduleWithFixedDelay(new Monitor(), POLL_PERIODICITY_SECS, POLL_PERIODICITY_SECS, TimeUnit.SECONDS);
- }
-
- @Override
- public void destroy() throws FalconException {
- }
-
- @Override
- public void afterSchedule(Entity entity, String cluster) throws FalconException {
- addEntityForMonitoring(entity, cluster);
- }
-
- @Override
- public void afterDelete(Entity entity, String cluster) throws FalconException {
- removeMonitoredEntity(entity, cluster);
- }
-
- @Override
- public void afterSuspend(Entity entity, String cluster) throws FalconException {
- removeMonitoredEntity(entity, cluster);
- }
-
- @Override
- public void afterResume(Entity entity, String cluster) throws FalconException {
- addEntityForMonitoring(entity, cluster);
- }
-
- public void notifyCompletion(Entity entity, String cluster, Date nominalTime, long duration) {
- if (!isEntityMonitored(entity, cluster)) {
- addEntityForMonitoring(entity, cluster);
- }
- updateLatency(entity, cluster, duration);
- removeFromPendingList(entity, cluster, nominalTime);
- }
-
- private String getKey(Entity entity, String cluster) {
- return entity.toShortString() + "/" + cluster;
- }
-
- private void addEntityForMonitoring(Entity entity, String cluster) {
- monitoredEntities.putIfAbsent(getKey(entity, cluster), INITIAL_LATENCY_SECS);
- }
-
- private void removeMonitoredEntity(Entity entity, String cluster) {
- monitoredEntities.remove(getKey(entity, cluster));
- pendingJobs.remove(getKey(entity, cluster));
- }
-
- private boolean isEntityMonitored(Entity entity, String cluster) {
- return monitoredEntities.containsKey(getKey(entity, cluster));
- }
-
- private void updateLatency(Entity entity, String cluster, long duration) {
- long newLatency = (duration + monitoredEntities.get(getKey(entity, cluster))) / 2;
- monitoredEntities.put(getKey(entity, cluster), newLatency);
- }
-
- private void removeFromPendingList(Entity entity, String cluster, Date nominalTime) {
- ConcurrentMap<Date, Date> pendingInstances = pendingJobs.get(getKey(entity, cluster));
- if (pendingInstances != null) {
- LOG.debug("Removing from pending jobs: " + getKey(entity, cluster) + " ---> "
- + SchemaHelper.formatDateUTC(nominalTime));
- pendingInstances.remove(nominalTime);
- }
- }
-
- private class Monitor implements Runnable {
-
- @Override
- public void run() {
- try {
- if (monitoredEntities.isEmpty()) {
- return;
- }
- Set<String> keys = new HashSet<String>(monitoredEntities.keySet());
- checkSLAMissOnPendingEntities(keys);
- addNewPendingEntities(keys);
- } catch (Throwable e) {
- LOG.error("Monitor failed: ", e);
- }
- }
-
- private void checkSLAMissOnPendingEntities(Set<String> keys) throws FalconException {
- Date now = new Date();
- for (String key : keys) {
- ConcurrentMap<Date, Date> pendingInstances = pendingJobs.get(key);
- if (pendingInstances == null) {
- continue;
- }
- ConcurrentMap<Date, Date> interim =
- new ConcurrentHashMap<Date, Date>(pendingInstances);
- for (Map.Entry<Date, Date> entry : interim.entrySet()) {
- if (entry.getValue().before(now)) {
- Entity entity = getEntity(key);
- String cluster = getCluster(key);
- GenericAlert.alertOnLikelySLAMiss(cluster, entity.getEntityType().name(),
- entity.getName(), SchemaHelper.formatDateUTC(entry.getKey()));
- LOG.debug("Removing from pending jobs: " + key + " ---> "
- + SchemaHelper.formatDateUTC(entry.getKey()));
- pendingInstances.remove(entry.getKey());
- }
- }
- interim.clear();
- }
- }
-
- private void addNewPendingEntities(Set<String> keys) throws FalconException {
- Date now = new Date();
- Date windowEndTime = new Date(now.getTime() + POLL_PERIODICITY_SECS * 1000);
- for (String key : keys) {
- Entity entity = getEntity(key);
- String cluster = getCluster(key);
- if (entity == null) {
- LOG.warn("No entity for " + key);
- continue;
- }
- Date startTime = EntityUtil.getStartTime(entity, cluster);
- Frequency frequency = EntityUtil.getFrequency(entity);
- TimeZone timeZone = EntityUtil.getTimeZone(entity);
- Date nextStart = EntityUtil.getNextStartTime(startTime, frequency, timeZone, now);
- if (nextStart.after(windowEndTime)) {
- continue;
- }
- ConcurrentMap<Date, Date> pendingInstances = pendingJobs.get(key);
- while (!nextStart.after(windowEndTime)) {
- if (pendingInstances == null) {
- pendingJobs.putIfAbsent(key, new ConcurrentHashMap<Date, Date>());
- pendingInstances = pendingJobs.get(key);
- }
- Long latency = monitoredEntities.get(key);
- if (latency == null) {
- break;
- }
- // 1.5 times latency is when it is supposed to have breached
- pendingInstances.putIfAbsent(nextStart, new Date(nextStart.getTime() + latency * 1500));
- LOG.debug("Adding to pending jobs: " + key + " ---> " + SchemaHelper.formatDateUTC(nextStart));
- Calendar startCal = Calendar.getInstance(timeZone);
- startCal.setTime(nextStart);
- startCal.add(frequency.getTimeUnit().getCalendarUnit(), frequency.getFrequencyAsInt());
- nextStart = startCal.getTime();
- }
- }
- }
- }
-
- private static final Pattern PATTERN = Pattern.compile("[()\\s/]");
-
- private Entity getEntity(String key) throws FalconException {
- String[] parts = PATTERN.split(key);
- String name = parts[3];
- String type = parts[1];
- return EntityUtil.getEntity(type, name);
- }
-
- private String getCluster(String key) throws FalconException {
- String[] parts = PATTERN.split(key);
- return parts[4];
- }
-
- @Override
- public void beforeSchedule(Entity entity, String cluster) throws FalconException {
- }
-
- @Override
- public void beforeDelete(Entity entity, String cluster) throws FalconException {
- }
-
- @Override
- public void beforeSuspend(Entity entity, String cluster) throws FalconException {
- }
-
- @Override
- public void beforeResume(Entity entity, String cluster) throws FalconException {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5e435215/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index f413019..2457862 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -37,7 +37,6 @@
org.apache.falcon.entity.store.ConfigurationStore,\
org.apache.falcon.rerun.service.RetryService,\
org.apache.falcon.rerun.service.LateRunService,\
- org.apache.falcon.service.SLAMonitoringService,\
org.apache.falcon.metadata.MetadataMappingService,\
org.apache.falcon.service.LogCleanupService
prism.application.services=org.apache.falcon.entity.store.ConfigurationStore