You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by de...@apache.org on 2016/02/03 13:10:28 UTC
[08/51] [abbrv] lens git commit: LENS-887 : Add exception handling
over event process threads and increase pool size for QueryEndNotifier and
ResultFormatter
LENS-887 : Add exception handling over event process threads and increase pool size for QueryEndNotifier and ResultFormatter
Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/73f92430
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/73f92430
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/73f92430
Branch: refs/heads/current-release-line
Commit: 73f92430c70664cf5b8c63ec9b174a4a1b27d2ad
Parents: 36166a2
Author: Puneet Gupta <pu...@gmail.com>
Authored: Tue Dec 15 18:22:40 2015 +0530
Committer: Amareshwari Sriramadasu <am...@apache.org>
Committed: Tue Dec 15 18:22:40 2015 +0530
----------------------------------------------------------------------
.../server/api/events/AsyncEventListener.java | 50 +++++++++-----
.../apache/lens/server/EventServiceImpl.java | 9 ++-
.../lens/server/query/QueryEndNotifier.java | 72 +++++++++++---------
.../lens/server/query/ResultFormatter.java | 5 ++
.../lens/server/query/TestEventService.java | 45 ++++++++++++
5 files changed, 131 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lens/blob/73f92430/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java b/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java
index 547c008..84728e5 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java
@@ -22,12 +22,18 @@ import java.util.concurrent.*;
import org.apache.lens.server.api.error.LensException;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
/**
* Event listeners should implement this class if they wish to process events asynchronously. This should be used when
* event processing can block, or is computationally intensive.
*
* @param <T> the generic type
*/
+@Slf4j
public abstract class AsyncEventListener<T extends LensEvent> implements LensEventListener<T> {
/**
@@ -41,49 +47,57 @@ public abstract class AsyncEventListener<T extends LensEvent> implements LensEve
protected final BlockingQueue<Runnable> eventQueue;
/**
+ * Name of this Asynchronous Event Listener. Will be used for logging and to name the threads in thread pool that
+ * allow asynchronous handling of events. If required, Sub Classes can override <code>getName</code> method to
+ * provide more appropriate name.
+ *
+ * Default value is the class Name (Example QueryEndNotifier, ResultFormatter, etc)
+ */
+ @Getter(AccessLevel.PROTECTED)
+ private final String name = this.getClass().getSimpleName();
+
+ /**
* Create a single threaded event listener with an unbounded queue, with daemon threads.
*/
public AsyncEventListener() {
- this(1);
+ this(1, 1);
}
/**
* Create a event listener with poolSize threads with an unbounded queue and daemon threads.
*
* @param poolSize the pool size
+ * @param maxPoolSize the max pool size
*/
- public AsyncEventListener(int poolSize) {
- this(poolSize, -1, 10, true);
+ public AsyncEventListener(int poolSize, int maxPoolSize) {
+ this(poolSize, maxPoolSize, -1, 10, true);
}
/**
* Create an asynchronous event listener which uses a thread poool to process events.
*
* @param poolSize size of the event processing pool
+ * @param maxPoolSize the max pool size
* @param maxQueueSize max size of the event queue, if this is non positive, then the queue is unbounded
* @param timeOutSeconds time out in seconds when an idle thread is destroyed
* @param isDaemon if the threads used to process should be daemon threads,
* if false, then implementation should call stop()
* to stop the thread pool
*/
- public AsyncEventListener(int poolSize, int maxQueueSize, long timeOutSeconds, final boolean isDaemon) {
+ public AsyncEventListener(int poolSize, int maxPoolSize, int maxQueueSize, long timeOutSeconds,
+ final boolean isDaemon) {
if (maxQueueSize <= 0) {
eventQueue = new LinkedBlockingQueue<Runnable>();
} else {
eventQueue = new ArrayBlockingQueue<Runnable>(maxQueueSize);
}
- processor = new ThreadPoolExecutor(poolSize, poolSize, timeOutSeconds, TimeUnit.SECONDS, eventQueue,
- new ThreadFactory() {
- @Override
- public Thread newThread(Runnable runnable) {
- Thread th = new Thread(runnable);
- th.setName("event_processor_thread");
- th.setDaemon(isDaemon);
- return th;
- }
- });
- processor.allowCoreThreadTimeOut(true);
+ ThreadFactory factory = new BasicThreadFactory.Builder()
+ .namingPattern(getName()+"_AsyncThread-%d")
+ .daemon(isDaemon)
+ .priority(Thread.NORM_PRIORITY)
+ .build();
+ processor = new ThreadPoolExecutor(poolSize, maxPoolSize, timeOutSeconds, TimeUnit.SECONDS, eventQueue, factory);
}
/**
@@ -98,7 +112,11 @@ public abstract class AsyncEventListener<T extends LensEvent> implements LensEve
processor.execute(new Runnable() {
@Override
public void run() {
- process(event);
+ try {
+ process(event);
+ } catch (Throwable e) {
+ log.error("{} Failed to process event {}", getName(), event, e);
+ }
}
});
} catch (RejectedExecutionException rejected) {
http://git-wip-us.apache.org/repos/asf/lens/blob/73f92430/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java
index a276828..369885d 100644
--- a/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java
@@ -21,6 +21,7 @@ package org.apache.lens.server;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import org.apache.lens.server.api.LensConfConstants;
import org.apache.lens.server.api.error.LensException;
@@ -29,6 +30,7 @@ import org.apache.lens.server.api.events.LensEventListener;
import org.apache.lens.server.api.events.LensEventService;
import org.apache.lens.server.api.health.HealthStatus;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.AbstractService;
@@ -64,8 +66,13 @@ public class EventServiceImpl extends AbstractService implements LensEventServic
@Override
public synchronized void init(HiveConf hiveConf) {
int numProcs = Runtime.getRuntime().availableProcessors();
+ ThreadFactory factory = new BasicThreadFactory.Builder()
+ .namingPattern("Event_Service_Thread-%d")
+ .daemon(false)
+ .priority(Thread.NORM_PRIORITY)
+ .build();
eventHandlerPool = Executors.newFixedThreadPool(hiveConf.getInt(LensConfConstants.EVENT_SERVICE_THREAD_POOL_SIZE,
- numProcs));
+ numProcs), factory);
super.init(hiveConf);
}
http://git-wip-us.apache.org/repos/asf/lens/blob/73f92430/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
index 110624a..ca00b4d 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
@@ -77,12 +77,17 @@ public class QueryEndNotifier extends AsyncEventListener<QueryEnded> {
private final LogSegregationContext logSegregationContext;
+ /** QueryEndNotifier core and max pool size */
+ private static final int CORE_POOL_SIZE = 2;
+ private static final int MAX_POOL_SIZE = 5;
+
/** Instantiates a new query end notifier.
*
* @param queryService the query service
* @param hiveConf the hive conf */
public QueryEndNotifier(QueryExecutionServiceImpl queryService, HiveConf hiveConf,
@NonNull final LogSegregationContext logSegregationContext) {
+ super(CORE_POOL_SIZE, MAX_POOL_SIZE);
this.queryService = queryService;
HiveConf conf = hiveConf;
from = conf.get(MAIL_FROM_ADDRESS);
@@ -113,23 +118,30 @@ public class QueryEndNotifier extends AsyncEventListener<QueryEnded> {
boolean whetherMailNotify = Boolean.parseBoolean(queryContext.getConf().get(QUERY_MAIL_NOTIFY,
WHETHER_MAIL_NOTIFY_DEFAULT));
-
if (!whetherMailNotify) {
return;
}
- String queryName = queryContext.getQueryName();
- String mailSubject = "Query " + (StringUtils.isBlank(queryName) ? "" : (queryName + " "))
- + queryContext.getStatus().getStatus() + ": " + event.getQueryHandle();
+ try {
+ //Create and Send EMAIL
+ String queryName = queryContext.getQueryName();
+ String mailSubject = "Query " + (StringUtils.isBlank(queryName) ? "" : (queryName + " "))
+ + queryContext.getStatus().getStatus() + ": " + event.getQueryHandle();
- String mailMessage = createMailMessage(queryContext);
+ String mailMessage = createMailMessage(queryContext);
- String to = queryContext.getSubmittedUser() + "@" + queryService.getServerDomain();
+ String to = queryContext.getSubmittedUser() + "@" + queryService.getServerDomain();
- String cc = queryContext.getConf().get(QUERY_RESULT_EMAIL_CC, QUERY_RESULT_DEFAULT_EMAIL_CC);
+ String cc = queryContext.getConf().get(QUERY_RESULT_EMAIL_CC, QUERY_RESULT_DEFAULT_EMAIL_CC);
- log.info("Sending completion email for query handle: {}", event.getQueryHandle());
- sendMail(host, port, new Email(from, to, cc, mailSubject, mailMessage), mailSmtpTimeout, mailSmtpConnectionTimeout);
+ log.info("Sending completion email for query handle: {}", event.getQueryHandle());
+ sendMail(host, port, new Email(from, to, cc, mailSubject, mailMessage), mailSmtpTimeout,
+ mailSmtpConnectionTimeout);
+ } catch (Exception e) {
+ MetricsService metricsService = LensServices.get().getService(MetricsService.NAME);
+ metricsService.incrCounter(QueryEndNotifier.class, EMAIL_ERROR_COUNTER);
+ log.error("Error sending query end email", e);
+ }
}
/** Creates the mail message.
@@ -184,38 +196,32 @@ public class QueryEndNotifier extends AsyncEventListener<QueryEnded> {
* @param mailSmtpTimeout the mail smtp timeout
* @param mailSmtpConnectionTimeout the mail smtp connection timeout */
public static void sendMail(String host, String port,
- Email email, int mailSmtpTimeout, int mailSmtpConnectionTimeout) {
+ Email email, int mailSmtpTimeout, int mailSmtpConnectionTimeout) throws Exception{
Properties props = System.getProperties();
props.put("mail.smtp.host", host);
props.put("mail.smtp.port", port);
props.put("mail.smtp.timeout", mailSmtpTimeout);
props.put("mail.smtp.connectiontimeout", mailSmtpConnectionTimeout);
Session session = Session.getDefaultInstance(props, null);
- try {
- MimeMessage message = new MimeMessage(session);
- message.setFrom(new InternetAddress(email.getFrom()));
- for (String recipient : email.getTo().trim().split("\\s*,\\s*")) {
- message.addRecipients(Message.RecipientType.TO, InternetAddress.parse(recipient));
- }
- if (email.getCc() != null && email.getCc().length() > 0) {
- for (String recipient : email.getCc().trim().split("\\s*,\\s*")) {
- message.addRecipients(Message.RecipientType.CC, InternetAddress.parse(recipient));
- }
+ MimeMessage message = new MimeMessage(session);
+ message.setFrom(new InternetAddress(email.getFrom()));
+ for (String recipient : email.getTo().trim().split("\\s*,\\s*")) {
+ message.addRecipients(Message.RecipientType.TO, InternetAddress.parse(recipient));
+ }
+ if (email.getCc() != null && email.getCc().length() > 0) {
+ for (String recipient : email.getCc().trim().split("\\s*,\\s*")) {
+ message.addRecipients(Message.RecipientType.CC, InternetAddress.parse(recipient));
}
- message.setSubject(email.getSubject());
- message.setSentDate(new Date());
+ }
+ message.setSubject(email.getSubject());
+ message.setSentDate(new Date());
- MimeBodyPart messagePart = new MimeBodyPart();
- messagePart.setText(email.getMessage());
- Multipart multipart = new MimeMultipart();
+ MimeBodyPart messagePart = new MimeBodyPart();
+ messagePart.setText(email.getMessage());
+ Multipart multipart = new MimeMultipart();
- multipart.addBodyPart(messagePart);
- message.setContent(multipart);
- Transport.send(message);
- } catch (Exception e) {
- MetricsService metricsService = LensServices.get().getService(MetricsService.NAME);
- metricsService.incrCounter(QueryEndNotifier.class, EMAIL_ERROR_COUNTER);
- log.error("Error sending query end email", e);
- }
+ multipart.addBodyPart(messagePart);
+ message.setContent(multipart);
+ Transport.send(message);
}
}
http://git-wip-us.apache.org/repos/asf/lens/blob/73f92430/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
index f568b17..9955278 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
@@ -46,6 +46,10 @@ public class ResultFormatter extends AsyncEventListener<QueryExecuted> {
/** The query service. */
QueryExecutionServiceImpl queryService;
+ /** ResultFormatter core and max pool size */
+ private static final int CORE_POOL_SIZE = 5;
+ private static final int MAX_POOL_SIZE = 10;
+
private final LogSegregationContext logSegregationContext;
/**
@@ -54,6 +58,7 @@ public class ResultFormatter extends AsyncEventListener<QueryExecuted> {
* @param queryService the query service
*/
public ResultFormatter(QueryExecutionServiceImpl queryService, @NonNull LogSegregationContext logSegregationContext) {
+ super(CORE_POOL_SIZE, MAX_POOL_SIZE);
this.queryService = queryService;
this.logSegregationContext = logSegregationContext;
}
http://git-wip-us.apache.org/repos/asf/lens/blob/73f92430/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
----------------------------------------------------------------------
diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
index 702a529..a2ca17f 100644
--- a/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
+++ b/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
@@ -20,6 +20,9 @@ package org.apache.lens.server.query;
import static org.testng.Assert.*;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -503,4 +506,46 @@ public class TestEventService {
}
+ @Test
+ public void testAysncEventListenerPoolThreads(){
+ AsyncEventListener<QuerySuccess> ayncListener = new DummyAsncEventListener();
+ for(int i=0; i<10; i++){
+ try {
+ //A pool thread is created each time an event is submitted until core pool size is reached which is 5
+ //for this test case. @see org.apache.lens.server.api.events.AsyncEventListener.processor
+ ayncListener.onEvent(null);
+ } catch (LensException e) {
+ assert(false); //Not Expected
+ }
+ }
+
+ //Verify the core pool Threads after the events have been fired
+ ThreadGroup currentTG = Thread.currentThread().getThreadGroup();
+ int count = currentTG.activeCount();
+ Thread[] threads = new Thread[count];
+ currentTG.enumerate(threads);
+ Set<String> aysncThreadNames = new HashSet<String>();
+ for(Thread t : threads){
+ if (t.getName().contains("DummyAsncEventListener_AsyncThread")){
+ aysncThreadNames.add(t.getName());
+ }
+ }
+ assertTrue(aysncThreadNames.containsAll(Arrays.asList(
+ "DummyAsncEventListener_AsyncThread-1",
+ "DummyAsncEventListener_AsyncThread-2",
+ "DummyAsncEventListener_AsyncThread-3",
+ "DummyAsncEventListener_AsyncThread-4",
+ "DummyAsncEventListener_AsyncThread-5")));
+ }
+
+ private static class DummyAsncEventListener extends AsyncEventListener<QuerySuccess> {
+ public DummyAsncEventListener(){
+ super(5, 10); //core pool = 5 and max Pool size =10
+ }
+ @Override
+ public void process(QuerySuccess event) {
+ throw new RuntimeException("Simulated Exception");
+ }
+ }
+
}