You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by pr...@apache.org on 2016/09/06 09:48:11 UTC

lens git commit: LENS-1298: Thread should have ability to wait for Events to be processed

Repository: lens
Updated Branches:
  refs/heads/master 5db78ce18 -> d9911f618


LENS-1298: Thread should have ability to wait for Events to be processed


Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/d9911f61
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/d9911f61
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/d9911f61

Branch: refs/heads/master
Commit: d9911f618f345347f62c20184dc2023e07498bc8
Parents: 5db78ce
Author: Lavkesh Lahngir <la...@linux.com>
Authored: Tue Sep 6 15:17:34 2016 +0530
Committer: Rajat Khandelwal <ra...@gmail.com>
Committed: Tue Sep 6 15:17:34 2016 +0530

----------------------------------------------------------------------
 .../server/api/events/LensEventService.java     |  7 +++
 .../server/api/events/SchedulerAlarmEvent.java  |  7 ++-
 .../apache/lens/server/EventServiceImpl.java    | 16 ++++++-
 .../lens/server/scheduler/AlarmService.java     | 46 +++++++++++++++++---
 .../server/scheduler/SchedulerServiceImpl.java  |  1 +
 .../lens/server/query/TestEventService.java     | 33 +++++++++++++-
 6 files changed, 100 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server-api/src/main/java/org/apache/lens/server/api/events/LensEventService.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/events/LensEventService.java b/lens-server-api/src/main/java/org/apache/lens/server/api/events/LensEventService.java
index 4536a18..7000dcc 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/events/LensEventService.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/events/LensEventService.java
@@ -73,4 +73,11 @@ public interface LensEventService extends LensService {
    */
   <T extends LensEvent> Collection<LensEventListener> getListeners(Class<T> changeType);
 
+  /**
+   * Process an event synchronously.
+   * It does not return until the processing is finished.
+   * @param event
+   * @throws LensException
+   */
+  void notifyEventSync(LensEvent event) throws LensException;
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java b/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java
index 0f2dabe..7f4ec6b 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java
@@ -40,7 +40,7 @@ public class SchedulerAlarmEvent extends LensEvent {
   private SchedulerJobInstanceHandle previousInstance;
 
   public SchedulerAlarmEvent(SchedulerJobHandle jobHandle, DateTime nominalTime, EventType type,
-      SchedulerJobInstanceHandle previousInstance) {
+    SchedulerJobInstanceHandle previousInstance) {
     super(nominalTime.getMillis());
     this.jobHandle = jobHandle;
     this.nominalTime = nominalTime;
@@ -53,6 +53,11 @@ public class SchedulerAlarmEvent extends LensEvent {
     return jobHandle.getHandleIdString();
   }
 
+  @Override
+  public String toString() {
+    return "Job Handle : " + jobHandle + ", Nominal Time :" + nominalTime + ", type : " + type;
+  }
+
   /**
    * Event type to know what kind of operations we want.
    */

http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java
index 369885d..ba12040 100644
--- a/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java
@@ -118,7 +118,9 @@ public class EventServiceImpl extends AbstractService implements LensEventServic
    */
   private final class EventHandler implements Runnable {
 
-    /** The event. */
+    /**
+     * The event.
+     */
     final LensEvent event;
 
     /**
@@ -180,6 +182,18 @@ public class EventServiceImpl extends AbstractService implements LensEventServic
   }
 
   @Override
+  public void notifyEventSync(LensEvent event) throws LensException {
+    if (getServiceState() != STATE.STARTED) {
+      throw new LensException("Event service is not in STARTED state. Current state is " + getServiceState());
+    }
+    if (event == null) {
+      return;
+    }
+    // Call the run() method directly and not submit to Executor Service.
+    new EventHandler(event).run();
+  }
+
+  @Override
   public HealthStatus getHealthStatus() {
     return (this.getServiceState().equals(STATE.STARTED)
         && !eventHandlerPool.isShutdown()

http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java
index 6487323..269acd9 100644
--- a/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java
+++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java
@@ -18,6 +18,9 @@
 
 package org.apache.lens.server.scheduler;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.lens.api.scheduler.SchedulerJobHandle;
 import org.apache.lens.api.scheduler.XFrequency;
 import org.apache.lens.api.scheduler.XFrequencyEnum;
@@ -36,6 +39,8 @@ import org.joda.time.DateTime;
 import org.quartz.*;
 import org.quartz.impl.StdSchedulerFactory;
 
+import lombok.Getter;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
 /**
@@ -53,6 +58,11 @@ public class AlarmService extends AbstractService implements LensService {
 
   public static final String LENS_JOBS = "LensJobs";
   public static final String ALARM_SERVICE = "AlarmService";
+  private static final String JOB_HANDLE = "jobHandle";
+  private static final String SHOULD_WAIT = "shouldWaitForProcessing";
+  @Getter
+  @Setter
+  private boolean shouldWaitForScheduleEventProcessing = false;
 
   private Scheduler scheduler;
 
@@ -116,6 +126,15 @@ public class AlarmService extends AbstractService implements LensService {
     }
   }
 
+  public List<JobExecutionContext> getCurrentlyExecutingJobs() {
+    try {
+      return scheduler.getCurrentlyExecutingJobs();
+    } catch (SchedulerException e) {
+      log.error("Failed to get currently executing jobs");
+    }
+    return new ArrayList<>();
+  }
+
   /**
    * This method can be used by any consumer who wants to receive notifications during a time range at a given
    * frequency.
@@ -132,8 +151,8 @@ public class AlarmService extends AbstractService implements LensService {
   public void schedule(DateTime start, DateTime end, XFrequency frequency, String jobHandle) throws LensException {
     // accept the schedule and then keep on sending the notifications for that schedule
     JobDataMap map = new JobDataMap();
-    map.put("jobHandle", jobHandle);
-
+    map.put(JOB_HANDLE, jobHandle);
+    map.put(SHOULD_WAIT, shouldWaitForScheduleEventProcessing);
     JobDetail job = JobBuilder.newJob(LensJob.class).withIdentity(jobHandle, LENS_JOBS).usingJobData(map).build();
 
     Trigger trigger;
@@ -230,24 +249,37 @@ public class AlarmService extends AbstractService implements LensService {
 
   public static class LensJob implements Job {
 
+    private void notifyEventService(SchedulerAlarmEvent alarmEvent, boolean shouldWait)
+      throws LensException, InterruptedException {
+      LensEventService eventService = LensServices.get().getService(LensEventService.NAME);
+      if (shouldWait) {
+        eventService.notifyEventSync(alarmEvent);
+      } else {
+        eventService.notifyEvent(alarmEvent);
+      }
+    }
+
     @Override
     public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
       JobDataMap data = jobExecutionContext.getMergedJobDataMap();
       DateTime nominalTime = new DateTime(jobExecutionContext.getScheduledFireTime());
-      SchedulerJobHandle jobHandle = SchedulerJobHandle.fromString(data.getString("jobHandle"));
+      SchedulerJobHandle jobHandle = SchedulerJobHandle.fromString(data.getString(JOB_HANDLE));
+      boolean shouldWait = data.getBoolean(SHOULD_WAIT);
       SchedulerAlarmEvent alarmEvent = new SchedulerAlarmEvent(jobHandle, nominalTime,
         SchedulerAlarmEvent.EventType.SCHEDULE, null);
       try {
-        LensEventService eventService = LensServices.get().getService(LensEventService.NAME);
-        eventService.notifyEvent(alarmEvent);
+        notifyEventService(alarmEvent, shouldWait);
         if (jobExecutionContext.getNextFireTime() == null) {
-          eventService
-            .notifyEvent(new SchedulerAlarmEvent(jobHandle, nominalTime, SchedulerAlarmEvent.EventType.EXPIRE, null));
+          SchedulerAlarmEvent expireEvent = (new SchedulerAlarmEvent(jobHandle, nominalTime,
+            SchedulerAlarmEvent.EventType.EXPIRE, null));
+          notifyEventService(expireEvent, shouldWait);
         }
       } catch (LensException e) {
         log.error("Failed to notify SchedulerAlarmEvent for jobHandle: {} and scheduleTime: {}",
           jobHandle.getHandleIdString(), nominalTime.toString());
         throw new JobExecutionException("Failed to notify alarmEvent", e);
+      } catch (InterruptedException e) {
+        log.error("Job execution tread interrupted", e);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
index 323e053..9cee0c2 100644
--- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
@@ -74,6 +74,7 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe
   @Getter
   @VisibleForTesting
   protected SchedulerDAO schedulerDAO;
+  @Getter
   private AlarmService alarmService;
 
   /**

http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
----------------------------------------------------------------------
diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
index 94011d2..31b6625 100644
--- a/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
+++ b/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
@@ -543,6 +543,38 @@ public class TestEventService {
       "DummyAsncEventListener_AsyncThread-5")));
   }
 
+  /**
+   * Test synchronous events
+   * @throws Exception
+   */
+  @Test
+  public void testNotifySync() throws Exception {
+    service.addListenerForType(new TestEventHandler(), TestEvent.class);
+    TestEvent testEvent = new TestEvent("ID");
+    service.notifyEventSync(testEvent);
+    assertTrue(testEvent.processed);
+  }
+
+  private static class TestEvent extends LensEvent{
+    String id;
+    boolean processed = false;
+    public TestEvent(String id) {
+      super(System.currentTimeMillis());
+      this.id = id;
+    }
+    @Override
+    public String getEventId() {
+      return id;
+    }
+  }
+
+  private static class TestEventHandler extends AsyncEventListener<TestEvent> {
+
+    @Override
+    public void process(TestEvent event) {
+      event.processed = true;
+    }
+  }
   private static class DummyAsncEventListener extends AsyncEventListener<QuerySuccess> {
     public DummyAsncEventListener(){
       super(5); //core pool = 5
@@ -552,5 +584,4 @@ public class TestEventService {
       throw new RuntimeException("Simulated Exception");
     }
   }
-
 }