You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2015/11/09 13:13:15 UTC

falcon git commit: FALCON-1592 Code Refactoring: Introduce Event type for scheduler events (Ajay Yadava)

Repository: falcon
Updated Branches:
  refs/heads/master 36a7d6b8b -> aa18bfdae


FALCON-1592 Code Refactoring: Introduce Event type for scheduler events (Ajay Yadava)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/aa18bfda
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/aa18bfda
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/aa18bfda

Branch: refs/heads/master
Commit: aa18bfdaefd7f771f45ca9314beb0ff8ac4d53f2
Parents: 36a7d6b
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Mon Nov 9 17:37:44 2015 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Mon Nov 9 17:37:44 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../execution/ProcessExecutionInstance.java     |  8 +++---
 .../falcon/execution/ProcessExecutor.java       | 15 +++++-----
 .../notification/service/event/DataEvent.java   |  9 ++----
 .../notification/service/event/Event.java       | 15 +++++-----
 .../notification/service/event/EventType.java   | 30 ++++++++++++++++++++
 .../service/event/JobCompletedEvent.java        |  9 ++----
 .../service/event/JobScheduledEvent.java        |  9 ++----
 .../service/event/TimeElapsedEvent.java         |  9 ++----
 .../service/impl/SchedulerService.java          |  3 +-
 .../org/apache/falcon/predicate/Predicate.java  |  8 +++---
 11 files changed, 66 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f87c7cd..586847e 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,8 @@ Trunk (Unreleased)
     FALCON-1213 Base framework of the native scheduler(Pallavi Rao)
 
   IMPROVEMENTS
+    FALCON-1592 Code Refactoring: Introduce Event type for scheduler events (Ajay Yadava via Pallavi Rao)
+
     FALCON-1593 Oozie setup failing in setup phase (Praveen Adlakha via Ajay Yadava)
 
     FALCON-1582 Documentation for globally disabling retries (Pallavi Rao)

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
index 19089c4..8c84f2b 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
@@ -135,16 +135,16 @@ public class ProcessExecutionInstance extends ExecutionInstance {
 
     @Override
     public void onEvent(Event event) throws FalconException {
-        switch (event.getSource()) {
-        case JOB_SCHEDULE:
+        switch (event.getType()) {
+        case JOB_SCHEDULED:
             JobScheduledEvent jobScheduleEvent = (JobScheduledEvent) event;
             setExternalID(jobScheduleEvent.getExternalID());
             setActualStart(jobScheduleEvent.getStartTime());
             break;
-        case JOB_COMPLETION:
+        case JOB_COMPLETED:
             setActualEnd(((JobCompletedEvent)event).getEndTime());
             break;
-        case DATA:
+        case DATA_AVAILABLE:
             // Data has not become available and the wait time has passed
             if (((DataEvent) event).getStatus() == DataEvent.STATUS.UNAVAILABLE) {
                 if (getTimeOutInMillis() <= (System.currentTimeMillis() - getCreationTime().getMillis())) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
index 68c34e7..d10d2fd 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
@@ -29,6 +29,7 @@ import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.exception.InvalidStateTransitionException;
 import org.apache.falcon.notification.service.NotificationServicesRegistry;
 import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.EventType;
 import org.apache.falcon.notification.service.event.JobCompletedEvent;
 import org.apache.falcon.notification.service.event.TimeElapsedEvent;
 import org.apache.falcon.notification.service.impl.JobCompletionService;
@@ -271,7 +272,7 @@ public class ProcessExecutor extends EntityExecutor {
 
     private ProcessExecutionInstance buildInstance(Event event) throws FalconException {
         // If a time triggered instance, use nominal time from event
-        if (event.getSource() == NotificationServicesRegistry.SERVICE.TIME) {
+        if (event.getType() == EventType.TIME_ELAPSED) {
             TimeElapsedEvent timeEvent = (TimeElapsedEvent) event;
             LOG.debug("Creating a new process instance for nominal time {}.", timeEvent.getInstanceTime());
             return new ProcessExecutionInstance(process, timeEvent.getInstanceTime(), cluster);
@@ -299,7 +300,7 @@ public class ProcessExecutor extends EntityExecutor {
                 }
             }
         } catch (Exception e) {
-            throw new FalconException("Unable to handle event with source : " + event.getSource() + " with target:"
+            throw new FalconException("Unable to handle event of type : " + event.getType() + " with target:"
                     + event.getTarget(), e);
         }
     }
@@ -307,14 +308,14 @@ public class ProcessExecutor extends EntityExecutor {
     private void handleEvent(Event event) throws FalconException {
         ProcessExecutionInstance instance;
         try {
-            switch (event.getSource()) {
+            switch (event.getType()) {
             // TODO : Handle cases where scheduling fails.
-            case JOB_SCHEDULE:
+            case JOB_SCHEDULED:
                 instance = instances.get(event.getTarget());
                 instance.onEvent(event);
                 stateService.handleStateChange(instance, InstanceState.EVENT.SCHEDULE, this);
                 break;
-            case JOB_COMPLETION:
+            case JOB_COMPLETED:
                 instance = instances.get(event.getTarget());
                 instance.onEvent(event);
                 switch (((JobCompletedEvent) event).getStatus()) {
@@ -395,8 +396,8 @@ public class ProcessExecutor extends EntityExecutor {
     // Or, if it is job run or job complete notifications, so it can handle the instance's state transition.
     private boolean shouldHandleEvent(Event event) {
         return event.getTarget().equals(id)
-                || event.getSource() == NotificationServicesRegistry.SERVICE.JOB_COMPLETION
-                || event.getSource() == NotificationServicesRegistry.SERVICE.JOB_SCHEDULE;
+                || event.getType() == EventType.JOB_COMPLETED
+                || event.getType() == EventType.JOB_SCHEDULED;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
index 4883fe7..1036339 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/DataEvent.java
@@ -19,7 +19,6 @@ package org.apache.falcon.notification.service.event;
 
 
 import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.notification.service.NotificationServicesRegistry;
 import org.apache.falcon.state.ID;
 import org.apache.hadoop.fs.Path;
 
@@ -27,7 +26,7 @@ import org.apache.hadoop.fs.Path;
  * An event generated by {@link org.apache.falcon.notification.service.impl.DataAvailabilityService}
  * indicating availability or non-availability of a dataset.
  */
-public class DataEvent implements Event {
+public class DataEvent extends Event {
     private final ID callbackID;
     private Path dataLocation;
     private LocationType dataType;
@@ -46,6 +45,7 @@ public class DataEvent implements Event {
         this.dataLocation = location;
         this.dataType = locType;
         this.status = availability;
+        this.type = EventType.DATA_AVAILABLE;
     }
 
     public STATUS getStatus() {
@@ -65,11 +65,6 @@ public class DataEvent implements Event {
     }
 
     @Override
-    public NotificationServicesRegistry.SERVICE getSource() {
-        return NotificationServicesRegistry.SERVICE.DATA;
-    }
-
-    @Override
     public ID getTarget() {
         return callbackID;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
index 140973b..e162b48 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/Event.java
@@ -17,21 +17,22 @@
  */
 package org.apache.falcon.notification.service.event;
 
-import org.apache.falcon.notification.service.NotificationServicesRegistry;
 import org.apache.falcon.state.ID;
 
 /**
  * An events that are generated by notification services must implement this interface.
  */
-public interface Event {
+public abstract class Event {
 
-    /**
-     * @return The service that generated this event
-     */
-    NotificationServicesRegistry.SERVICE getSource();
+
+    protected EventType type;
+
+    public EventType getType() {
+        return this.type;
+    }
 
     /**
      * @return ID of the notification handler for which this event was meant for.
      */
-    ID getTarget();
+    public abstract ID getTarget();
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java
new file mode 100644
index 0000000..59f5cba
--- /dev/null
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/EventType.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.notification.service.event;
+
+/**
+ * Types of event.
+ */
+public enum EventType {
+        TIME_ELAPSED,
+        DATA_AVAILABLE,
+        JOB_COMPLETED,
+        JOB_SCHEDULED
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
index c587343..df7c621 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobCompletedEvent.java
@@ -17,7 +17,6 @@
  */
 package org.apache.falcon.notification.service.event;
 
-import org.apache.falcon.notification.service.NotificationServicesRegistry;
 import org.apache.falcon.state.ID;
 import org.apache.oozie.client.WorkflowJob;
 import org.joda.time.DateTime;
@@ -26,7 +25,7 @@ import org.joda.time.DateTime;
  * An event generated by {@link org.apache.falcon.notification.service.impl.JobCompletionService}
  * indicating completion of a Job.
  */
-public class JobCompletedEvent implements Event {
+public class JobCompletedEvent extends Event {
 
     private WorkflowJob.Status status;
     private final ID callbackID;
@@ -36,6 +35,7 @@ public class JobCompletedEvent implements Event {
         this.callbackID = callbackID;
         this.status = jobStatus;
         this.endTime = end;
+        this.type = EventType.JOB_COMPLETED;
     }
 
     public WorkflowJob.Status getStatus() {
@@ -43,11 +43,6 @@ public class JobCompletedEvent implements Event {
     }
 
     @Override
-    public NotificationServicesRegistry.SERVICE getSource() {
-        return NotificationServicesRegistry.SERVICE.JOB_COMPLETION;
-    }
-
-    @Override
     public ID getTarget() {
         return callbackID;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
index 55023e7..3f48cdc 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/JobScheduledEvent.java
@@ -17,7 +17,6 @@
  */
 package org.apache.falcon.notification.service.event;
 
-import org.apache.falcon.notification.service.NotificationServicesRegistry;
 import org.apache.falcon.state.ID;
 import org.joda.time.DateTime;
 
@@ -25,7 +24,7 @@ import org.joda.time.DateTime;
  * An event generated by {@link org.apache.falcon.notification.service.impl.SchedulerService}
  * indicating if an instance was scheduled for execution.
  */
-public class JobScheduledEvent implements Event {
+public class JobScheduledEvent extends Event {
     private final ID callbackID;
     private String externalID;
     private STATUS status;
@@ -34,6 +33,7 @@ public class JobScheduledEvent implements Event {
     public JobScheduledEvent(ID callbackID, STATUS status) {
         this.callbackID = callbackID;
         this.status = status;
+        this.type = EventType.JOB_SCHEDULED;
     }
 
     public String getExternalID() {
@@ -45,11 +45,6 @@ public class JobScheduledEvent implements Event {
     }
 
     @Override
-    public NotificationServicesRegistry.SERVICE getSource() {
-        return NotificationServicesRegistry.SERVICE.JOB_SCHEDULE;
-    }
-
-    @Override
     public ID getTarget() {
         return callbackID;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java b/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
index 7ec4de6..84738ad 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/event/TimeElapsedEvent.java
@@ -17,7 +17,6 @@
  */
 package org.apache.falcon.notification.service.event;
 
-import org.apache.falcon.notification.service.NotificationServicesRegistry;
 import org.apache.falcon.state.ID;
 import org.joda.time.DateTime;
 
@@ -25,7 +24,7 @@ import org.joda.time.DateTime;
  * An event generated by {@link org.apache.falcon.notification.service.impl.AlarmService}
  * indicating that a given time duration has elapsed.
  */
-public class TimeElapsedEvent implements Event {
+public class TimeElapsedEvent extends Event {
     private DateTime startTime;
     private DateTime endTime;
     private DateTime instanceTime;
@@ -48,11 +47,7 @@ public class TimeElapsedEvent implements Event {
         this.startTime = start;
         this.endTime = end;
         this.instanceTime = instTime;
-    }
-
-    @Override
-    public NotificationServicesRegistry.SERVICE getSource() {
-        return NotificationServicesRegistry.SERVICE.TIME;
+        this.type = EventType.TIME_ELAPSED;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
index 848f89c..a70bc3c 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
@@ -34,6 +34,7 @@ import org.apache.falcon.execution.NotificationHandler;
 import org.apache.falcon.notification.service.FalconNotificationService;
 import org.apache.falcon.notification.service.NotificationServicesRegistry;
 import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.EventType;
 import org.apache.falcon.notification.service.event.JobScheduledEvent;
 import org.apache.falcon.notification.service.request.JobCompletionNotificationRequest;
 import org.apache.falcon.notification.service.request.JobScheduleNotificationRequest;
@@ -179,7 +180,7 @@ public class SchedulerService implements FalconNotificationService, Notification
     @Override
     public void onEvent(Event event) throws FalconException {
         // Interested only in job completion events.
-        if (event.getSource() == NotificationServicesRegistry.SERVICE.JOB_COMPLETION) {
+        if (event.getType() == EventType.JOB_COMPLETED) {
             try {
                 // Check if the instance is awaited.
                 ID id = event.getTarget();

http://git-wip-us.apache.org/repos/asf/falcon/blob/aa18bfda/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
index fb4ce82..fb4c8c9 100644
--- a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
+++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
@@ -20,9 +20,9 @@ package org.apache.falcon.predicate;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.execution.NotificationHandler;
-import org.apache.falcon.notification.service.NotificationServicesRegistry;
 import org.apache.falcon.notification.service.event.DataEvent;
 import org.apache.falcon.notification.service.event.Event;
+import org.apache.falcon.notification.service.event.EventType;
 import org.apache.falcon.notification.service.event.TimeElapsedEvent;
 import org.apache.falcon.state.ID;
 
@@ -173,7 +173,7 @@ public class Predicate implements Serializable {
      * @throws FalconException
      */
     public static Predicate getPredicate(Event event) throws FalconException {
-        if (event.getSource() == NotificationServicesRegistry.SERVICE.DATA) {
+        if (event.getType() == EventType.DATA_AVAILABLE) {
             DataEvent dataEvent = (DataEvent) event;
             if (dataEvent.getDataLocation() != null && dataEvent.getDataType() != null) {
                 Location loc = new Location();
@@ -183,7 +183,7 @@ public class Predicate implements Serializable {
             } else {
                 throw new FalconException("Event does not have enough data to create a predicate");
             }
-        } else if (event.getSource() == NotificationServicesRegistry.SERVICE.TIME) {
+        } else if (event.getType() == EventType.TIME_ELAPSED) {
             TimeElapsedEvent timeEvent = (TimeElapsedEvent) event;
             if (timeEvent.getStartTime() != null && timeEvent.getEndTime() != null) {
                 long instanceTime = (timeEvent.getInstanceTime() == null)? -1 : timeEvent.getInstanceTime().getMillis();
@@ -194,7 +194,7 @@ public class Predicate implements Serializable {
             }
 
         } else {
-            throw new FalconException("Unhandled event type " + event.getSource());
+            throw new FalconException("Unhandled event type " + event.getType());
         }
     }