You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by pr...@apache.org on 2016/09/06 09:48:11 UTC
lens git commit: LENS-1298: Thread should have ability to wait for
Events to be processed
Repository: lens
Updated Branches:
refs/heads/master 5db78ce18 -> d9911f618
LENS-1298: Thread should have ability to wait for Events to be processed
Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/d9911f61
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/d9911f61
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/d9911f61
Branch: refs/heads/master
Commit: d9911f618f345347f62c20184dc2023e07498bc8
Parents: 5db78ce
Author: Lavkesh Lahngir <la...@linux.com>
Authored: Tue Sep 6 15:17:34 2016 +0530
Committer: Rajat Khandelwal <ra...@gmail.com>
Committed: Tue Sep 6 15:17:34 2016 +0530
----------------------------------------------------------------------
.../server/api/events/LensEventService.java | 7 +++
.../server/api/events/SchedulerAlarmEvent.java | 7 ++-
.../apache/lens/server/EventServiceImpl.java | 16 ++++++-
.../lens/server/scheduler/AlarmService.java | 46 +++++++++++++++++---
.../server/scheduler/SchedulerServiceImpl.java | 1 +
.../lens/server/query/TestEventService.java | 33 +++++++++++++-
6 files changed, 100 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server-api/src/main/java/org/apache/lens/server/api/events/LensEventService.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/events/LensEventService.java b/lens-server-api/src/main/java/org/apache/lens/server/api/events/LensEventService.java
index 4536a18..7000dcc 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/events/LensEventService.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/events/LensEventService.java
@@ -73,4 +73,11 @@ public interface LensEventService extends LensService {
*/
<T extends LensEvent> Collection<LensEventListener> getListeners(Class<T> changeType);
+ /**
+ * Process an event synchronously.
+ * It does not return until the processing is finished.
+ * @param event
+ * @throws LensException
+ */
+ void notifyEventSync(LensEvent event) throws LensException;
}
http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java b/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java
index 0f2dabe..7f4ec6b 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/events/SchedulerAlarmEvent.java
@@ -40,7 +40,7 @@ public class SchedulerAlarmEvent extends LensEvent {
private SchedulerJobInstanceHandle previousInstance;
public SchedulerAlarmEvent(SchedulerJobHandle jobHandle, DateTime nominalTime, EventType type,
- SchedulerJobInstanceHandle previousInstance) {
+ SchedulerJobInstanceHandle previousInstance) {
super(nominalTime.getMillis());
this.jobHandle = jobHandle;
this.nominalTime = nominalTime;
@@ -53,6 +53,11 @@ public class SchedulerAlarmEvent extends LensEvent {
return jobHandle.getHandleIdString();
}
+ @Override
+ public String toString() {
+ return "Job Handle : " + jobHandle + ", Nominal Time :" + nominalTime + ", type : " + type;
+ }
+
/**
* Event type to know what kind of operations we want.
*/
http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java
index 369885d..ba12040 100644
--- a/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java
@@ -118,7 +118,9 @@ public class EventServiceImpl extends AbstractService implements LensEventServic
*/
private final class EventHandler implements Runnable {
- /** The event. */
+ /**
+ * The event.
+ */
final LensEvent event;
/**
@@ -180,6 +182,18 @@ public class EventServiceImpl extends AbstractService implements LensEventServic
}
@Override
+ public void notifyEventSync(LensEvent event) throws LensException {
+ if (getServiceState() != STATE.STARTED) {
+ throw new LensException("Event service is not in STARTED state. Current state is " + getServiceState());
+ }
+ if (event == null) {
+ return;
+ }
+ // Call the run() method directly and not submit to Executor Service.
+ new EventHandler(event).run();
+ }
+
+ @Override
public HealthStatus getHealthStatus() {
return (this.getServiceState().equals(STATE.STARTED)
&& !eventHandlerPool.isShutdown()
http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java
index 6487323..269acd9 100644
--- a/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java
+++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/AlarmService.java
@@ -18,6 +18,9 @@
package org.apache.lens.server.scheduler;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.lens.api.scheduler.SchedulerJobHandle;
import org.apache.lens.api.scheduler.XFrequency;
import org.apache.lens.api.scheduler.XFrequencyEnum;
@@ -36,6 +39,8 @@ import org.joda.time.DateTime;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
+import lombok.Getter;
+import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
@@ -53,6 +58,11 @@ public class AlarmService extends AbstractService implements LensService {
public static final String LENS_JOBS = "LensJobs";
public static final String ALARM_SERVICE = "AlarmService";
+ private static final String JOB_HANDLE = "jobHandle";
+ private static final String SHOULD_WAIT = "shouldWaitForProcessing";
+ @Getter
+ @Setter
+ private boolean shouldWaitForScheduleEventProcessing = false;
private Scheduler scheduler;
@@ -116,6 +126,15 @@ public class AlarmService extends AbstractService implements LensService {
}
}
+ public List<JobExecutionContext> getCurrentlyExecutingJobs() {
+ try {
+ return scheduler.getCurrentlyExecutingJobs();
+ } catch (SchedulerException e) {
+ log.error("Failed to get currently executing jobs");
+ }
+ return new ArrayList<>();
+ }
+
/**
* This method can be used by any consumer who wants to receive notifications during a time range at a given
* frequency.
@@ -132,8 +151,8 @@ public class AlarmService extends AbstractService implements LensService {
public void schedule(DateTime start, DateTime end, XFrequency frequency, String jobHandle) throws LensException {
// accept the schedule and then keep on sending the notifications for that schedule
JobDataMap map = new JobDataMap();
- map.put("jobHandle", jobHandle);
-
+ map.put(JOB_HANDLE, jobHandle);
+ map.put(SHOULD_WAIT, shouldWaitForScheduleEventProcessing);
JobDetail job = JobBuilder.newJob(LensJob.class).withIdentity(jobHandle, LENS_JOBS).usingJobData(map).build();
Trigger trigger;
@@ -230,24 +249,37 @@ public class AlarmService extends AbstractService implements LensService {
public static class LensJob implements Job {
+ private void notifyEventService(SchedulerAlarmEvent alarmEvent, boolean shouldWait)
+ throws LensException, InterruptedException {
+ LensEventService eventService = LensServices.get().getService(LensEventService.NAME);
+ if (shouldWait) {
+ eventService.notifyEventSync(alarmEvent);
+ } else {
+ eventService.notifyEvent(alarmEvent);
+ }
+ }
+
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
JobDataMap data = jobExecutionContext.getMergedJobDataMap();
DateTime nominalTime = new DateTime(jobExecutionContext.getScheduledFireTime());
- SchedulerJobHandle jobHandle = SchedulerJobHandle.fromString(data.getString("jobHandle"));
+ SchedulerJobHandle jobHandle = SchedulerJobHandle.fromString(data.getString(JOB_HANDLE));
+ boolean shouldWait = data.getBoolean(SHOULD_WAIT);
SchedulerAlarmEvent alarmEvent = new SchedulerAlarmEvent(jobHandle, nominalTime,
SchedulerAlarmEvent.EventType.SCHEDULE, null);
try {
- LensEventService eventService = LensServices.get().getService(LensEventService.NAME);
- eventService.notifyEvent(alarmEvent);
+ notifyEventService(alarmEvent, shouldWait);
if (jobExecutionContext.getNextFireTime() == null) {
- eventService
- .notifyEvent(new SchedulerAlarmEvent(jobHandle, nominalTime, SchedulerAlarmEvent.EventType.EXPIRE, null));
+ SchedulerAlarmEvent expireEvent = (new SchedulerAlarmEvent(jobHandle, nominalTime,
+ SchedulerAlarmEvent.EventType.EXPIRE, null));
+ notifyEventService(expireEvent, shouldWait);
}
} catch (LensException e) {
log.error("Failed to notify SchedulerAlarmEvent for jobHandle: {} and scheduleTime: {}",
jobHandle.getHandleIdString(), nominalTime.toString());
throw new JobExecutionException("Failed to notify alarmEvent", e);
+ } catch (InterruptedException e) {
+ log.error("Job execution tread interrupted", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
index 323e053..9cee0c2 100644
--- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
@@ -74,6 +74,7 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe
@Getter
@VisibleForTesting
protected SchedulerDAO schedulerDAO;
+ @Getter
private AlarmService alarmService;
/**
http://git-wip-us.apache.org/repos/asf/lens/blob/d9911f61/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
----------------------------------------------------------------------
diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
index 94011d2..31b6625 100644
--- a/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
+++ b/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
@@ -543,6 +543,38 @@ public class TestEventService {
"DummyAsncEventListener_AsyncThread-5")));
}
+ /**
+ * Test synchronous events
+ * @throws Exception
+ */
+ @Test
+ public void testNotifySync() throws Exception {
+ service.addListenerForType(new TestEventHandler(), TestEvent.class);
+ TestEvent testEvent = new TestEvent("ID");
+ service.notifyEventSync(testEvent);
+ assertTrue(testEvent.processed);
+ }
+
+ private static class TestEvent extends LensEvent{
+ String id;
+ boolean processed = false;
+ public TestEvent(String id) {
+ super(System.currentTimeMillis());
+ this.id = id;
+ }
+ @Override
+ public String getEventId() {
+ return id;
+ }
+ }
+
+ private static class TestEventHandler extends AsyncEventListener<TestEvent> {
+
+ @Override
+ public void process(TestEvent event) {
+ event.processed = true;
+ }
+ }
private static class DummyAsncEventListener extends AsyncEventListener<QuerySuccess> {
public DummyAsncEventListener(){
super(5); //core pool = 5
@@ -552,5 +584,4 @@ public class TestEventService {
throw new RuntimeException("Simulated Exception");
}
}
-
}