You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by de...@apache.org on 2016/02/03 13:10:28 UTC

[08/51] [abbrv] lens git commit: LENS-887 : Add exception handling over event process threads and increase pool size for QueryEndNotifier and ResultFormatter

LENS-887 : Add exception handling over event process threads and increase pool size for QueryEndNotifier and ResultFormatter


Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/73f92430
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/73f92430
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/73f92430

Branch: refs/heads/current-release-line
Commit: 73f92430c70664cf5b8c63ec9b174a4a1b27d2ad
Parents: 36166a2
Author: Puneet Gupta <pu...@gmail.com>
Authored: Tue Dec 15 18:22:40 2015 +0530
Committer: Amareshwari Sriramadasu <am...@apache.org>
Committed: Tue Dec 15 18:22:40 2015 +0530

----------------------------------------------------------------------
 .../server/api/events/AsyncEventListener.java   | 50 +++++++++-----
 .../apache/lens/server/EventServiceImpl.java    |  9 ++-
 .../lens/server/query/QueryEndNotifier.java     | 72 +++++++++++---------
 .../lens/server/query/ResultFormatter.java      |  5 ++
 .../lens/server/query/TestEventService.java     | 45 ++++++++++++
 5 files changed, 131 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/73f92430/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java
----------------------------------------------------------------------
diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java b/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java
index 547c008..84728e5 100644
--- a/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java
+++ b/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java
@@ -22,12 +22,18 @@ import java.util.concurrent.*;
 
 import org.apache.lens.server.api.error.LensException;
 
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 /**
  * Event listeners should implement this class if they wish to process events asynchronously. This should be used when
  * event processing can block, or is computationally intensive.
  *
  * @param <T> the generic type
  */
+@Slf4j
 public abstract class AsyncEventListener<T extends LensEvent> implements LensEventListener<T> {
 
   /**
@@ -41,49 +47,57 @@ public abstract class AsyncEventListener<T extends LensEvent> implements LensEve
   protected final BlockingQueue<Runnable> eventQueue;
 
   /**
+   * Name of this Asynchronous Event Listener. Will be used for logging and to name the threads in thread pool that
+   * allow asynchronous handling of events. If required, Sub Classes can override <code>getName</code> method to
+   * provide more appropriate name.
+   *
+   * Default value is the class Name (Example QueryEndNotifier, ResultFormatter, etc)
+   */
+  @Getter(AccessLevel.PROTECTED)
+  private final String name = this.getClass().getSimpleName();
+
+  /**
    * Create a single threaded event listener with an unbounded queue, with daemon threads.
    */
   public AsyncEventListener() {
-    this(1);
+    this(1, 1);
   }
 
   /**
    * Create a event listener with poolSize threads with an unbounded queue and daemon threads.
    *
    * @param poolSize the pool size
+   * @param maxPoolSize the max pool size
    */
-  public AsyncEventListener(int poolSize) {
-    this(poolSize, -1, 10, true);
+  public AsyncEventListener(int poolSize, int maxPoolSize) {
+    this(poolSize, maxPoolSize, -1, 10, true);
   }
 
   /**
    * Create an asynchronous event listener which uses a thread poool to process events.
    *
    * @param poolSize       size of the event processing pool
+   * @param maxPoolSize    the max pool size
    * @param maxQueueSize   max size of the event queue, if this is non positive, then the queue is unbounded
    * @param timeOutSeconds time out in seconds when an idle thread is destroyed
    * @param isDaemon       if the threads used to process should be daemon threads,
    *                       if false, then implementation should call stop()
    *                       to stop the thread pool
    */
-  public AsyncEventListener(int poolSize, int maxQueueSize, long timeOutSeconds, final boolean isDaemon) {
+  public AsyncEventListener(int poolSize, int maxPoolSize, int maxQueueSize, long timeOutSeconds,
+      final boolean isDaemon) {
     if (maxQueueSize <= 0) {
       eventQueue = new LinkedBlockingQueue<Runnable>();
     } else {
       eventQueue = new ArrayBlockingQueue<Runnable>(maxQueueSize);
     }
 
-    processor = new ThreadPoolExecutor(poolSize, poolSize, timeOutSeconds, TimeUnit.SECONDS, eventQueue,
-      new ThreadFactory() {
-        @Override
-        public Thread newThread(Runnable runnable) {
-          Thread th = new Thread(runnable);
-          th.setName("event_processor_thread");
-          th.setDaemon(isDaemon);
-          return th;
-        }
-      });
-    processor.allowCoreThreadTimeOut(true);
+    ThreadFactory factory = new BasicThreadFactory.Builder()
+      .namingPattern(getName()+"_AsyncThread-%d")
+      .daemon(isDaemon)
+      .priority(Thread.NORM_PRIORITY)
+      .build();
+    processor = new ThreadPoolExecutor(poolSize, maxPoolSize, timeOutSeconds, TimeUnit.SECONDS, eventQueue, factory);
   }
 
   /**
@@ -98,7 +112,11 @@ public abstract class AsyncEventListener<T extends LensEvent> implements LensEve
       processor.execute(new Runnable() {
         @Override
         public void run() {
-          process(event);
+          try {
+            process(event);
+          } catch (Throwable e) {
+            log.error("{} Failed to process event {}", getName(), event, e);
+          }
         }
       });
     } catch (RejectedExecutionException rejected) {

http://git-wip-us.apache.org/repos/asf/lens/blob/73f92430/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java
index a276828..369885d 100644
--- a/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java
@@ -21,6 +21,7 @@ package org.apache.lens.server;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.lens.server.api.LensConfConstants;
 import org.apache.lens.server.api.error.LensException;
@@ -29,6 +30,7 @@ import org.apache.lens.server.api.events.LensEventListener;
 import org.apache.lens.server.api.events.LensEventService;
 import org.apache.lens.server.api.health.HealthStatus;
 
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.service.AbstractService;
 
@@ -64,8 +66,13 @@ public class EventServiceImpl extends AbstractService implements LensEventServic
   @Override
   public synchronized void init(HiveConf hiveConf) {
     int numProcs = Runtime.getRuntime().availableProcessors();
+    ThreadFactory factory = new BasicThreadFactory.Builder()
+      .namingPattern("Event_Service_Thread-%d")
+      .daemon(false)
+      .priority(Thread.NORM_PRIORITY)
+      .build();
     eventHandlerPool = Executors.newFixedThreadPool(hiveConf.getInt(LensConfConstants.EVENT_SERVICE_THREAD_POOL_SIZE,
-      numProcs));
+      numProcs), factory);
     super.init(hiveConf);
   }
 

http://git-wip-us.apache.org/repos/asf/lens/blob/73f92430/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
index 110624a..ca00b4d 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java
@@ -77,12 +77,17 @@ public class QueryEndNotifier extends AsyncEventListener<QueryEnded> {
 
   private final LogSegregationContext logSegregationContext;
 
+  /** QueryEndNotifier core and max pool size */
+  private static final int CORE_POOL_SIZE = 2;
+  private static final int MAX_POOL_SIZE = 5;
+
   /** Instantiates a new query end notifier.
    *
    * @param queryService the query service
    * @param hiveConf     the hive conf */
   public QueryEndNotifier(QueryExecutionServiceImpl queryService, HiveConf hiveConf,
     @NonNull final LogSegregationContext logSegregationContext) {
+    super(CORE_POOL_SIZE, MAX_POOL_SIZE);
     this.queryService = queryService;
     HiveConf conf = hiveConf;
     from = conf.get(MAIL_FROM_ADDRESS);
@@ -113,23 +118,30 @@ public class QueryEndNotifier extends AsyncEventListener<QueryEnded> {
 
     boolean whetherMailNotify = Boolean.parseBoolean(queryContext.getConf().get(QUERY_MAIL_NOTIFY,
       WHETHER_MAIL_NOTIFY_DEFAULT));
-
     if (!whetherMailNotify) {
       return;
     }
 
-    String queryName = queryContext.getQueryName();
-    String mailSubject = "Query " + (StringUtils.isBlank(queryName) ? "" : (queryName + " "))
-      + queryContext.getStatus().getStatus() + ": " + event.getQueryHandle();
+    try {
+      //Create and Send EMAIL
+      String queryName = queryContext.getQueryName();
+      String mailSubject = "Query " + (StringUtils.isBlank(queryName) ? "" : (queryName + " "))
+        + queryContext.getStatus().getStatus() + ": " + event.getQueryHandle();
 
-    String mailMessage = createMailMessage(queryContext);
+      String mailMessage = createMailMessage(queryContext);
 
-    String to = queryContext.getSubmittedUser() + "@" + queryService.getServerDomain();
+      String to = queryContext.getSubmittedUser() + "@" + queryService.getServerDomain();
 
-    String cc = queryContext.getConf().get(QUERY_RESULT_EMAIL_CC, QUERY_RESULT_DEFAULT_EMAIL_CC);
+      String cc = queryContext.getConf().get(QUERY_RESULT_EMAIL_CC, QUERY_RESULT_DEFAULT_EMAIL_CC);
 
-    log.info("Sending completion email for query handle: {}", event.getQueryHandle());
-    sendMail(host, port, new Email(from, to, cc, mailSubject, mailMessage), mailSmtpTimeout, mailSmtpConnectionTimeout);
+      log.info("Sending completion email for query handle: {}", event.getQueryHandle());
+      sendMail(host, port, new Email(from, to, cc, mailSubject, mailMessage), mailSmtpTimeout,
+          mailSmtpConnectionTimeout);
+    } catch (Exception e) {
+      MetricsService metricsService = LensServices.get().getService(MetricsService.NAME);
+      metricsService.incrCounter(QueryEndNotifier.class, EMAIL_ERROR_COUNTER);
+      log.error("Error sending query end email", e);
+    }
   }
 
   /** Creates the mail message.
@@ -184,38 +196,32 @@ public class QueryEndNotifier extends AsyncEventListener<QueryEnded> {
    * @param mailSmtpTimeout           the mail smtp timeout
    * @param mailSmtpConnectionTimeout the mail smtp connection timeout */
   public static void sendMail(String host, String port,
-    Email email, int mailSmtpTimeout, int mailSmtpConnectionTimeout) {
+    Email email, int mailSmtpTimeout, int mailSmtpConnectionTimeout) throws Exception{
     Properties props = System.getProperties();
     props.put("mail.smtp.host", host);
     props.put("mail.smtp.port", port);
     props.put("mail.smtp.timeout", mailSmtpTimeout);
     props.put("mail.smtp.connectiontimeout", mailSmtpConnectionTimeout);
     Session session = Session.getDefaultInstance(props, null);
-    try {
-      MimeMessage message = new MimeMessage(session);
-      message.setFrom(new InternetAddress(email.getFrom()));
-      for (String recipient : email.getTo().trim().split("\\s*,\\s*")) {
-        message.addRecipients(Message.RecipientType.TO, InternetAddress.parse(recipient));
-      }
-      if (email.getCc() != null && email.getCc().length() > 0) {
-        for (String recipient : email.getCc().trim().split("\\s*,\\s*")) {
-          message.addRecipients(Message.RecipientType.CC, InternetAddress.parse(recipient));
-        }
+    MimeMessage message = new MimeMessage(session);
+    message.setFrom(new InternetAddress(email.getFrom()));
+    for (String recipient : email.getTo().trim().split("\\s*,\\s*")) {
+      message.addRecipients(Message.RecipientType.TO, InternetAddress.parse(recipient));
+    }
+    if (email.getCc() != null && email.getCc().length() > 0) {
+      for (String recipient : email.getCc().trim().split("\\s*,\\s*")) {
+        message.addRecipients(Message.RecipientType.CC, InternetAddress.parse(recipient));
       }
-      message.setSubject(email.getSubject());
-      message.setSentDate(new Date());
+    }
+    message.setSubject(email.getSubject());
+    message.setSentDate(new Date());
 
-      MimeBodyPart messagePart = new MimeBodyPart();
-      messagePart.setText(email.getMessage());
-      Multipart multipart = new MimeMultipart();
+    MimeBodyPart messagePart = new MimeBodyPart();
+    messagePart.setText(email.getMessage());
+    Multipart multipart = new MimeMultipart();
 
-      multipart.addBodyPart(messagePart);
-      message.setContent(multipart);
-      Transport.send(message);
-    } catch (Exception e) {
-      MetricsService metricsService = LensServices.get().getService(MetricsService.NAME);
-      metricsService.incrCounter(QueryEndNotifier.class, EMAIL_ERROR_COUNTER);
-      log.error("Error sending query end email", e);
-    }
+    multipart.addBodyPart(messagePart);
+    message.setContent(multipart);
+    Transport.send(message);
   }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/73f92430/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
index f568b17..9955278 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java
@@ -46,6 +46,10 @@ public class ResultFormatter extends AsyncEventListener<QueryExecuted> {
   /** The query service. */
   QueryExecutionServiceImpl queryService;
 
+  /** ResultFormatter core and max pool size */
+  private static final int CORE_POOL_SIZE = 5;
+  private static final int MAX_POOL_SIZE = 10;
+
   private final LogSegregationContext logSegregationContext;
 
   /**
@@ -54,6 +58,7 @@ public class ResultFormatter extends AsyncEventListener<QueryExecuted> {
    * @param queryService the query service
    */
   public ResultFormatter(QueryExecutionServiceImpl queryService, @NonNull LogSegregationContext logSegregationContext) {
+    super(CORE_POOL_SIZE, MAX_POOL_SIZE);
     this.queryService = queryService;
     this.logSegregationContext = logSegregationContext;
   }

http://git-wip-us.apache.org/repos/asf/lens/blob/73f92430/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
----------------------------------------------------------------------
diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
index 702a529..a2ca17f 100644
--- a/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
+++ b/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java
@@ -20,6 +20,9 @@ package org.apache.lens.server.query;
 
 import static org.testng.Assert.*;
 
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -503,4 +506,46 @@ public class TestEventService {
 
   }
 
+  @Test
+  public void testAysncEventListenerPoolThreads(){
+    AsyncEventListener<QuerySuccess> ayncListener = new DummyAsncEventListener();
+    for(int i=0; i<10; i++){
+      try {
+        //A pool thread is created each time an event is submitted until core pool size is reached which is 5
+        //for this test case.  @see org.apache.lens.server.api.events.AsyncEventListener.processor
+        ayncListener.onEvent(null);
+      } catch (LensException e) {
+        assert(false); //Not Expected
+      }
+    }
+
+    //Verify the core pool Threads after the events have been fired
+    ThreadGroup currentTG = Thread.currentThread().getThreadGroup();
+    int count = currentTG.activeCount();
+    Thread[] threads = new Thread[count];
+    currentTG.enumerate(threads);
+    Set<String> aysncThreadNames = new HashSet<String>();
+    for(Thread t : threads){
+      if (t.getName().contains("DummyAsncEventListener_AsyncThread")){
+        aysncThreadNames.add(t.getName());
+      }
+    }
+    assertTrue(aysncThreadNames.containsAll(Arrays.asList(
+      "DummyAsncEventListener_AsyncThread-1",
+      "DummyAsncEventListener_AsyncThread-2",
+      "DummyAsncEventListener_AsyncThread-3",
+      "DummyAsncEventListener_AsyncThread-4",
+      "DummyAsncEventListener_AsyncThread-5")));
+  }
+
+  private static class DummyAsncEventListener extends AsyncEventListener<QuerySuccess> {
+    public DummyAsncEventListener(){
+      super(5, 10); //core pool = 5 and max Pool size =10
+    }
+    @Override
+    public void process(QuerySuccess event) {
+      throw new RuntimeException("Simulated Exception");
+    }
+  }
+
 }