You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2015/11/09 13:13:15 UTC
falcon git commit: FALCON-1592 Code Refactoring: Introduce Event type
for scheduler events (Ajay Yadava)
Repository: falcon
Updated Branches:
refs/heads/master 36a7d6b8b -> aa18bfdae
FALCON-1592 Code Refactoring: Introduce Event type for scheduler events (Ajay Yadava)
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/aa18bfda
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/aa18bfda
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/aa18bfda
Branch: refs/heads/master
Commit: aa18bfdaefd7f771f45ca9314beb0ff8ac4d53f2
Parents: 36a7d6b
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Mon Nov 9 17:37:44 2015 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Mon Nov 9 17:37:44 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 ++
.../execution/ProcessExecutionInstance.java | 8 +++---
.../falcon/execution/ProcessExecutor.java | 15 +++++-----
.../notification/service/event/DataEvent.java | 9 ++----
.../notification/service/event/Event.java | 15 +++++-----
.../notification/service/event/EventType.java | 30 ++++++++++++++++++++
.../service/event/JobCompletedEvent.java | 9 ++----
.../service/event/JobScheduledEvent.java | 9 ++----
.../service/event/TimeElapsedEvent.java | 9 ++----
.../service/impl/SchedulerService.java | 3 +-
.../org/apache/falcon/predicate/Predicate.java | 8 +++---
11 files changed, 66 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f87c7cd..586847e 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,8 @@ Trunk (Unreleased)
FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
IMPROVEMENTS
+ FALCON-1592 Code Refactoring: Introduce Event type for scheduler events (Ajay Yadava via Pallavi Rao)
+
FALCON-1593 Oozie setup failing in setup phase (Praveen Adlakha via Ajay Yadava)
FALCON-1582 Documentation for globally disabling retries (Pallavi Rao)
http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
index 19089c4..8c84f2b 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
@@ -135,16 +135,16 @@ public class ProcessExecutionInstance extends ExecutionInstance {
@Override
public void onEvent(Event event) throws FalconException {
- switch (event.getSource()) {
- case JOB_SCHEDULE:
+ switch (event.getType()) {
+ case JOB_SCHEDULED:
JobScheduledEvent jobScheduleEvent = (JobScheduledEvent) event;
setExternalID(jobScheduleEvent.getExternalID());
setActualStart(jobScheduleEvent.getStartTime());
break;
- case JOB_COMPLETION:
+ case JOB_COMPLETED:
setActualEnd(((JobCompletedEvent)event).getEndTime());
break;
- case DATA:
+ case DATA_AVAILABLE:
// Data has not become available and the wait time has passed
if (((DataEvent) event).getStatus() == DataEvent.STATUS.UNAVAILABLE) {
if (getTimeOutInMillis() <= (System.currentTimeMillis() - getCreationTime().getMillis())) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
index 68c34e7..d10d2fd 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
@@ -29,6 +29,7 @@ import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.exception.InvalidStateTransitionException;
import org.apache.falcon.notification.service.NotificationServicesRegistry;
import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.EventType;
import org.apache.falcon.notification.service.event.JobCompletedEvent;
import org.apache.falcon.notification.service.event.TimeElapsedEvent;
import org.apache.falcon.notification.service.impl.JobCompletionService;
@@ -271,7 +272,7 @@ public class ProcessExecutor extends EntityExecutor {
private ProcessExecutionInstance buildInstance(Event event) throws FalconException {
// If a time triggered instance, use nominal time from event
- if (event.getSource() == NotificationServicesRegistry.SERVICE.TIME) {
+ if (event.getType() == EventType.TIME_ELAPSED) {
TimeElapsedEvent timeEvent = (TimeElapsedEvent) event;
LOG.debug("Creating a new process instance for nominal time {}.", timeEvent.getInstanceTime());
return new ProcessExecutionInstance(process, timeEvent.getInstanceTime(), cluster);
@@ -299,7 +300,7 @@ public class ProcessExecutor extends EntityExecutor {
}
}
} catch (Exception e) {
- throw new FalconException("Unable to handle event with source : " + event.getSource() + " with target:"
+ throw new FalconException("Unable to handle event of type : " + event.getType() + " with target:"
+ event.getTarget(), e);
}
}
@@ -307,14 +308,14 @@ public class ProcessExecutor extends EntityExecutor {
private void handleEvent(Event event) throws FalconException {
ProcessExecutionInstance instance;
try {
- switch (event.getSource()) {
+ switch (event.getType()) {
// TODO : Handle cases where scheduling fails.
- case JOB_SCHEDULE:
+ case JOB_SCHEDULED:
instance = instances.get(event.getTarget());
instance.onEvent(event);
stateService.handleStateChange(instance, InstanceState.EVENT.SCHEDULE, this);
break;
- case JOB_COMPLETION:
+ case JOB_COMPLETED:
instance = instances.get(event.getTarget());
instance.onEvent(event);
switch (((JobCompletedEvent) event).getStatus()) {
@@ -395,8 +396,8 @@ public class ProcessExecutor extends EntityExecutor {
// Or, if it is job run or job complete notifications, so it can handle the instance's state transition.
private boolean shouldHandleEvent(Event event) {
return event.getTarget().equals(id)
- || event.getSource() == NotificationServicesRegistry.SERVICE.JOB_COMPLETION
- || event.getSource() == NotificationServicesRegistry.SERVICE.JOB_SCHEDULE;
+ || event.getType() == EventType.JOB_COMPLETED
+ || event.getType() == EventType.JOB_SCHEDULED;
}
@Override
http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
index 4883fe7..1036339 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
@@ -19,7 +19,6 @@ package org.apache.falcon.notification.service.event;
import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.notification.service.NotificationServicesRegistry;
import org.apache.falcon.state.ID;
import org.apache.hadoop.fs.Path;
@@ -27,7 +26,7 @@ import org.apache.hadoop.fs.Path;
* An event generated by {@link org.apache.falcon.notification.service.impl.DataAvailabilityService}
* indicating availability or non-availability of a dataset.
*/
-public class DataEvent implements Event {
+public class DataEvent extends Event {
private final ID callbackID;
private Path dataLocation;
private LocationType dataType;
@@ -46,6 +45,7 @@ public class DataEvent implements Event {
this.dataLocation = location;
this.dataType = locType;
this.status = availability;
+ this.type = EventType.DATA_AVAILABLE;
}
public STATUS getStatus() {
@@ -65,11 +65,6 @@ public class DataEvent implements Event {
}
@Override
- public NotificationServicesRegistry.SERVICE getSource() {
- return NotificationServicesRegistry.SERVICE.DATA;
- }
-
- @Override
public ID getTarget() {
return callbackID;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
index 140973b..e162b48 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
@@ -17,21 +17,22 @@
*/
package org.apache.falcon.notification.service.event;
-import org.apache.falcon.notification.service.NotificationServicesRegistry;
import org.apache.falcon.state.ID;
/**
* An events that are generated by notification services must implement this interface.
*/
-public interface Event {
+public abstract class Event {
- /**
- * @return The service that generated this event
- */
- NotificationServicesRegistry.SERVICE getSource();
+
+ protected EventType type;
+
+ public EventType getType() {
+ return this.type;
+ }
/**
* @return ID of the notification handler for which this event was meant for.
*/
- ID getTarget();
+ public abstract ID getTarget();
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java
new file mode 100644
index 0000000..59f5cba
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.notification.service.event;
+
+/**
+ * Types of event.
+ */
+public enum EventType {
+ TIME_ELAPSED,
+ DATA_AVAILABLE,
+ JOB_COMPLETED,
+ JOB_SCHEDULED
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
index c587343..df7c621 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
@@ -17,7 +17,6 @@
*/
package org.apache.falcon.notification.service.event;
-import org.apache.falcon.notification.service.NotificationServicesRegistry;
import org.apache.falcon.state.ID;
import org.apache.oozie.client.WorkflowJob;
import org.joda.time.DateTime;
@@ -26,7 +25,7 @@ import org.joda.time.DateTime;
* An event generated by {@link org.apache.falcon.notification.service.impl.JobCompletionService}
* indicating completion of a Job.
*/
-public class JobCompletedEvent implements Event {
+public class JobCompletedEvent extends Event {
private WorkflowJob.Status status;
private final ID callbackID;
@@ -36,6 +35,7 @@ public class JobCompletedEvent implements Event {
this.callbackID = callbackID;
this.status = jobStatus;
this.endTime = end;
+ this.type = EventType.JOB_COMPLETED;
}
public WorkflowJob.Status getStatus() {
@@ -43,11 +43,6 @@ public class JobCompletedEvent implements Event {
}
@Override
- public NotificationServicesRegistry.SERVICE getSource() {
- return NotificationServicesRegistry.SERVICE.JOB_COMPLETION;
- }
-
- @Override
public ID getTarget() {
return callbackID;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
index 55023e7..3f48cdc 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
@@ -17,7 +17,6 @@
*/
package org.apache.falcon.notification.service.event;
-import org.apache.falcon.notification.service.NotificationServicesRegistry;
import org.apache.falcon.state.ID;
import org.joda.time.DateTime;
@@ -25,7 +24,7 @@ import org.joda.time.DateTime;
* An event generated by {@link org.apache.falcon.notification.service.impl.SchedulerService}
* indicating if an instance was scheduled for execution.
*/
-public class JobScheduledEvent implements Event {
+public class JobScheduledEvent extends Event {
private final ID callbackID;
private String externalID;
private STATUS status;
@@ -34,6 +33,7 @@ public class JobScheduledEvent implements Event {
public JobScheduledEvent(ID callbackID, STATUS status) {
this.callbackID = callbackID;
this.status = status;
+ this.type = EventType.JOB_SCHEDULED;
}
public String getExternalID() {
@@ -45,11 +45,6 @@ public class JobScheduledEvent implements Event {
}
@Override
- public NotificationServicesRegistry.SERVICE getSource() {
- return NotificationServicesRegistry.SERVICE.JOB_SCHEDULE;
- }
-
- @Override
public ID getTarget() {
return callbackID;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
index 7ec4de6..84738ad 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
@@ -17,7 +17,6 @@
*/
package org.apache.falcon.notification.service.event;
-import org.apache.falcon.notification.service.NotificationServicesRegistry;
import org.apache.falcon.state.ID;
import org.joda.time.DateTime;
@@ -25,7 +24,7 @@ import org.joda.time.DateTime;
* An event generated by {@link org.apache.falcon.notification.service.impl.AlarmService}
* indicating that a given time duration has elapsed.
*/
-public class TimeElapsedEvent implements Event {
+public class TimeElapsedEvent extends Event {
private DateTime startTime;
private DateTime endTime;
private DateTime instanceTime;
@@ -48,11 +47,7 @@ public class TimeElapsedEvent implements Event {
this.startTime = start;
this.endTime = end;
this.instanceTime = instTime;
- }
-
- @Override
- public NotificationServicesRegistry.SERVICE getSource() {
- return NotificationServicesRegistry.SERVICE.TIME;
+ this.type = EventType.TIME_ELAPSED;
}
@Override
http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
index 848f89c..a70bc3c 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
@@ -34,6 +34,7 @@ import org.apache.falcon.execution.NotificationHandler;
import org.apache.falcon.notification.service.FalconNotificationService;
import org.apache.falcon.notification.service.NotificationServicesRegistry;
import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.EventType;
import org.apache.falcon.notification.service.event.JobScheduledEvent;
import org.apache.falcon.notification.service.request.JobCompletionNotificationRequest;
import org.apache.falcon.notification.service.request.JobScheduleNotificationRequest;
@@ -179,7 +180,7 @@ public class SchedulerService implements FalconNotificationService, Notification
@Override
public void onEvent(Event event) throws FalconException {
// Interested only in job completion events.
- if (event.getSource() == NotificationServicesRegistry.SERVICE.JOB_COMPLETION) {
+ if (event.getType() == EventType.JOB_COMPLETED) {
try {
// Check if the instance is awaited.
ID id = event.getTarget();
http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
index fb4ce82..fb4c8c9 100644
--- a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
+++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
@@ -20,9 +20,9 @@ package org.apache.falcon.predicate;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.execution.NotificationHandler;
-import org.apache.falcon.notification.service.NotificationServicesRegistry;
import org.apache.falcon.notification.service.event.DataEvent;
import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.EventType;
import org.apache.falcon.notification.service.event.TimeElapsedEvent;
import org.apache.falcon.state.ID;
@@ -173,7 +173,7 @@ public class Predicate implements Serializable {
* @throws FalconException
*/
public static Predicate getPredicate(Event event) throws FalconException {
- if (event.getSource() == NotificationServicesRegistry.SERVICE.DATA) {
+ if (event.getType() == EventType.DATA_AVAILABLE) {
DataEvent dataEvent = (DataEvent) event;
if (dataEvent.getDataLocation() != null && dataEvent.getDataType() != null) {
Location loc = new Location();
@@ -183,7 +183,7 @@ public class Predicate implements Serializable {
} else {
throw new FalconException("Event does not have enough data to create a predicate");
}
- } else if (event.getSource() == NotificationServicesRegistry.SERVICE.TIME) {
+ } else if (event.getType() == EventType.TIME_ELAPSED) {
TimeElapsedEvent timeEvent = (TimeElapsedEvent) event;
if (timeEvent.getStartTime() != null && timeEvent.getEndTime() != null) {
long instanceTime = (timeEvent.getInstanceTime() == null)? -1 : timeEvent.getInstanceTime().getMillis();
@@ -194,7 +194,7 @@ public class Predicate implements Serializable {
}
} else {
- throw new FalconException("Unhandled event type " + event.getSource());
+ throw new FalconException("Unhandled event type " + event.getType());
}
}