You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/03/04 07:51:00 UTC

falcon git commit: FALCON-1826 Execution order not honoured when instances are KILLED

Repository: falcon
Updated Branches:
  refs/heads/master 7e554e791 -> 3251e3aa1


FALCON-1826 Execution order not honoured when instances are KILLED

The problem was that the unregister for schedule did not unregister the instance immediately. It would add to instancesToIgnore and would get ignored while scheduling. This causes issues as instancesToIgnore will need to be scanned at more than one place.

The fix is to remove the instance from "awaitingSchedule" list synchronously for unregister.

Author: Pallavi Rao <pa...@inmobi.com>

Reviewers: "Pavan Kumar Kolamuri <pa...@gmail.com>"

Closes #58 from pallavi-rao/1826 and squashes the following commits:

9ace53d [Pallavi Rao] FALCON-1826 Addressed review comments
dec29c0 [Pallavi Rao] FALCON-1826 Execution order not honoured when instances are KILLED


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3251e3aa
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3251e3aa
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3251e3aa

Branch: refs/heads/master
Commit: 3251e3aa17d1f6f15e49e6d2c7c4e75c7260b387
Parents: 7e554e7
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Fri Mar 4 12:18:26 2016 +0530
Committer: pavankumar526 <pa...@gmail.com>
Committed: Fri Mar 4 12:18:26 2016 +0530

----------------------------------------------------------------------
 .../service/impl/SchedulerService.java          | 35 ++++++++------------
 .../service/SchedulerServiceTest.java           | 11 +++---
 2 files changed, 20 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/3251e3aa/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
index f5a7c86..635fec4 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
@@ -17,7 +17,6 @@
  */
 package org.apache.falcon.notification.service.impl;
 
-import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -86,7 +85,6 @@ public class SchedulerService implements FalconNotificationService, Notification
 
     private static final StateStore STATE_STORE = AbstractStateStore.get();
 
-    private Cache<InstanceID, Object> instancesToIgnore;
     // TODO : limit the no. of awaiting instances per entity
     private LoadingCache<EntityClusterID, SortedMap<Integer, ExecutionInstance>> executorAwaitedInstances;
 
@@ -96,22 +94,28 @@ public class SchedulerService implements FalconNotificationService, Notification
         if (request.getInstance() == null) {
             throw new NotificationServiceException("Request must contain an instance.");
         }
-        // When the instance is getting rescheduled for run. As in the case of suspend and resume.
-        Object obj = instancesToIgnore.getIfPresent(request.getInstance().getId());
-        if (obj != null) {
-            instancesToIgnore.invalidate(request.getInstance().getId());
-        }
         LOG.debug("Received request to schedule instance {} with sequence {}.", request.getInstance().getId(),
                 request.getInstance().getInstanceSequence());
         runQueue.execute(new InstanceRunner(request));
     }
 
     @Override
-    public void unregister(NotificationHandler handler, ID listenerID) {
+    public void unregister(NotificationHandler handler, ID listenerID) throws NotificationServiceException {
         // If ID is that of an entity, do nothing
         if (listenerID instanceof InstanceID) {
-            // Not efficient to iterate over elements to remove this. Add to ignore list.
-            instancesToIgnore.put((InstanceID) listenerID, new Object());
+            try {
+                InstanceID instanceID = (InstanceID) listenerID;
+                SortedMap<Integer, ExecutionInstance> instances = executorAwaitedInstances.get(instanceID
+                            .getEntityClusterID());
+                if (instances != null && !instances.isEmpty()) {
+                    synchronized (instances) {
+                        instances.remove(STATE_STORE.getExecutionInstance(instanceID)
+                                .getInstance().getInstanceSequence());
+                    }
+                }
+            } catch (Exception e) {
+                throw new NotificationServiceException(e);
+            }
         }
     }
 
@@ -155,10 +159,6 @@ public class SchedulerService implements FalconNotificationService, Notification
                 .removalListener(this)
                 .build(instanceCacheLoader);
 
-        instancesToIgnore = CacheBuilder.newBuilder()
-                .expireAfterWrite(1, TimeUnit.HOURS)
-                .concurrencyLevel(1)
-                .build();
         // Interested in all job completion events.
         JobCompletionNotificationRequest completionRequest = (JobCompletionNotificationRequest)
                 NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_COMPLETION)
@@ -243,7 +243,6 @@ public class SchedulerService implements FalconNotificationService, Notification
     @Override
     public void destroy() throws FalconException {
         runQueue.shutdownNow();
-        instancesToIgnore.invalidateAll();
     }
 
     private void notifyFailureEvent(JobScheduleNotificationRequest request) throws FalconException {
@@ -290,12 +289,6 @@ public class SchedulerService implements FalconNotificationService, Notification
         @Override
         public void run() {
             try {
-                // If de-registered
-                if (instancesToIgnore.getIfPresent(instance.getId()) != null) {
-                    LOG.debug("Instance {} has been deregistered. Ignoring.", instance.getId());
-                    instancesToIgnore.invalidate(instance.getId());
-                    return;
-                }
                 LOG.debug("Received request to run instance {}", instance.getId());
                 if (checkConditions()) {
                     String externalId = instance.getExternalID();

http://git-wip-us.apache.org/repos/asf/falcon/blob/3251e3aa/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
index a7ce748..a442738 100644
--- a/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
+++ b/scheduler/src/test/java/org/apache/falcon/notification/service/SchedulerServiceTest.java
@@ -242,7 +242,7 @@ public class SchedulerServiceTest extends AbstractTestBase {
     public void testDeRegistration() throws Exception {
         storeEntity(EntityType.PROCESS, "summarize4");
         Process mockProcess = getStore().get(EntityType.PROCESS, "summarize4");
-        mockProcess.setParallel(3);
+        mockProcess.setParallel(2);
         Date startTime = EntityUtil.getStartTime(mockProcess, cluster);
         ExecutionInstance instance1 = new ProcessExecutionInstance(mockProcess, new DateTime(startTime), cluster);
         // Schedule 3 instances.
@@ -263,14 +263,15 @@ public class SchedulerServiceTest extends AbstractTestBase {
         request3.setInstance(instance3);
         scheduler.register(request3.build());
 
-        // Abort second instance
-        scheduler.unregister(handler, instance2.getId());
+        // Abort third instance
+        stateStore.putExecutionInstance(new InstanceState(instance3));
+        scheduler.unregister(handler, instance3.getId());
 
         Thread.sleep(100);
         Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance1), new Integer(1));
-        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance3), new Integer(1));
+        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), new Integer(1));
         // Second instance should not run.
-        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance2), null);
+        Assert.assertEquals(((MockDAGEngine) mockDagEngine).getTotalRuns(instance3), null);
     }
 
     @Test