You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2018/07/11 23:10:35 UTC

[35/56] [abbrv] hadoop git commit: Revert "HDDS-224. Create metrics for Event Watcher."

Revert "HDDS-224. Create metrics for Event Watcher."

This reverts commit cb5e225868a069d6d16244b462ebada44465dce8.
The JIRA number is wrong, reverting to fix it.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3c0a66ab
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3c0a66ab
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3c0a66ab

Branch: refs/heads/HDFS-12943
Commit: 3c0a66abe632277e89fccd8dced9e71ca5d87df0
Parents: cb5e225
Author: Anu Engineer <ae...@apache.org>
Authored: Mon Jul 9 13:03:57 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Mon Jul 9 13:03:57 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hdds/server/events/EventQueue.java   | 108 ++++++++-----------
 .../server/events/SingleThreadExecutor.java     |  35 ++----
 .../hdds/server/events/TestEventQueue.java      |  35 +++++-
 3 files changed, 87 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c0a66ab/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
index 7e29223..44d85f5 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
@@ -18,11 +18,7 @@
 package org.apache.hadoop.hdds.server.events;
 
 import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
-
-import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,8 +42,6 @@ public class EventQueue implements EventPublisher, AutoCloseable {
   private static final Logger LOG =
       LoggerFactory.getLogger(EventQueue.class);
 
-  private static final String EXECUTOR_NAME_SEPARATOR = "For";
-
   private final Map<Event, Map<EventExecutor, List<EventHandler>>> executors =
       new HashMap<>();
 
@@ -57,73 +51,37 @@ public class EventQueue implements EventPublisher, AutoCloseable {
 
   public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
       EVENT_TYPE event, EventHandler<PAYLOAD> handler) {
-    this.addHandler(event, handler, generateHandlerName(handler));
-  }
-
-  /**
-   * Add new handler to the event queue.
-   * <p>
-   * By default a separated single thread executor will be dedicated to
-   * deliver the events to the registered event handler.
-   *
-   * @param event        Triggering event.
-   * @param handler      Handler of event (will be called from a separated
-   *                     thread)
-   * @param handlerName  The name of handler (should be unique together with
-   *                     the event name)
-   * @param <PAYLOAD>    The type of the event payload.
-   * @param <EVENT_TYPE> The type of the event identifier.
-   */
-  public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
-      EVENT_TYPE event, EventHandler<PAYLOAD> handler, String handlerName) {
-    validateEvent(event);
-    Preconditions.checkNotNull(handler, "Handler name should not be null.");
-    String executorName =
-        StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR
-            + handlerName;
-    this.addHandler(event, new SingleThreadExecutor<>(executorName), handler);
-  }
-
-  private <EVENT_TYPE extends Event<?>> void validateEvent(EVENT_TYPE event) {
-    Preconditions
-        .checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR),
-            "Event name should not contain " + EXECUTOR_NAME_SEPARATOR
-                + " string.");
 
+    this.addHandler(event, new SingleThreadExecutor<>(
+        event.getName()), handler);
   }
 
-  private <PAYLOAD> String generateHandlerName(EventHandler<PAYLOAD> handler) {
-    if (!"".equals(handler.getClass().getSimpleName())) {
-      return handler.getClass().getSimpleName();
-    } else {
-      return handler.getClass().getName();
-    }
-  }
-
-  /**
-   * Add event handler with custom executor.
-   *
-   * @param event        Triggering event.
-   * @param executor     The executor imlementation to deliver events from a
-   *                     separated threads. Please keep in your mind that
-   *                     registering metrics is the responsibility of the
-   *                     caller.
-   * @param handler      Handler of event (will be called from a separated
-   *                     thread)
-   * @param <PAYLOAD>    The type of the event payload.
-   * @param <EVENT_TYPE> The type of the event identifier.
-   */
   public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
-      EVENT_TYPE event, EventExecutor<PAYLOAD> executor,
+      EVENT_TYPE event,
+      EventExecutor<PAYLOAD> executor,
       EventHandler<PAYLOAD> handler) {
-    validateEvent(event);
+
     executors.putIfAbsent(event, new HashMap<>());
     executors.get(event).putIfAbsent(executor, new ArrayList<>());
 
-    executors.get(event).get(executor).add(handler);
+    executors.get(event)
+        .get(executor)
+        .add(handler);
   }
 
+  /**
+   * Creates one executor with multiple event handlers.
+   */
+  public void addHandlerGroup(String name, HandlerForEvent<?>...
+      eventsAndHandlers) {
+    SingleThreadExecutor sharedExecutor =
+        new SingleThreadExecutor(name);
+    for (HandlerForEvent handlerForEvent : eventsAndHandlers) {
+      addHandler(handlerForEvent.event, sharedExecutor,
+          handlerForEvent.handler);
+    }
 
+  }
 
   /**
    * Route an event with payload to the right listener(s).
@@ -225,5 +183,31 @@ public class EventQueue implements EventPublisher, AutoCloseable {
     });
   }
 
+  /**
+   * Event identifier together with the handler.
+   *
+   * @param <PAYLOAD>
+   */
+  public static class HandlerForEvent<PAYLOAD> {
+
+    private final Event<PAYLOAD> event;
+
+    private final EventHandler<PAYLOAD> handler;
+
+    public HandlerForEvent(
+        Event<PAYLOAD> event,
+        EventHandler<PAYLOAD> handler) {
+      this.event = event;
+      this.handler = handler;
+    }
+
+    public Event<PAYLOAD> getEvent() {
+      return event;
+    }
+
+    public EventHandler<PAYLOAD> getHandler() {
+      return handler;
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c0a66ab/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
index 3253f2d..a64e3d7 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
@@ -23,18 +23,13 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Simple EventExecutor to call all the event handler one-by-one.
  *
  * @param <T>
  */
-@Metrics(context = "EventQueue")
 public class SingleThreadExecutor<T> implements EventExecutor<T> {
 
   public static final String THREAD_NAME_PREFIX = "EventQueue";
@@ -46,24 +41,14 @@ public class SingleThreadExecutor<T> implements EventExecutor<T> {
 
   private final ThreadPoolExecutor executor;
 
-  @Metric
-  private MutableCounterLong queued;
+  private final AtomicLong queuedCount = new AtomicLong(0);
 
-  @Metric
-  private MutableCounterLong done;
+  private final AtomicLong successfulCount = new AtomicLong(0);
 
-  @Metric
-  private MutableCounterLong failed;
+  private final AtomicLong failedCount = new AtomicLong(0);
 
-  /**
-   * Create SingleThreadExecutor.
-   *
-   * @param name Unique name used in monitoring and metrics.
-   */
   public SingleThreadExecutor(String name) {
     this.name = name;
-    DefaultMetricsSystem.instance()
-        .register("EventQueue" + name, "Event Executor metrics ", this);
 
     LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
     executor =
@@ -79,31 +64,31 @@ public class SingleThreadExecutor<T> implements EventExecutor<T> {
   @Override
   public void onMessage(EventHandler<T> handler, T message, EventPublisher
       publisher) {
-    queued.incr();
+    queuedCount.incrementAndGet();
     executor.execute(() -> {
       try {
         handler.onMessage(message, publisher);
-        done.incr();
+        successfulCount.incrementAndGet();
       } catch (Exception ex) {
         LOG.error("Error on execution message {}", message, ex);
-        failed.incr();
+        failedCount.incrementAndGet();
       }
     });
   }
 
   @Override
   public long failedEvents() {
-    return failed.value();
+    return failedCount.get();
   }
 
   @Override
   public long successfulEvents() {
-    return done.value();
+    return successfulCount.get();
   }
 
   @Override
   public long queuedEvents() {
-    return queued.value();
+    return queuedCount.get();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c0a66ab/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
index 2bdf705..3944409 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
@@ -25,8 +25,6 @@ import org.junit.Test;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-
 /**
  * Testing the basic functionality of the event queue.
  */
@@ -46,13 +44,11 @@ public class TestEventQueue {
 
   @Before
   public void startEventQueue() {
-    DefaultMetricsSystem.initialize(getClass().getSimpleName());
     queue = new EventQueue();
   }
 
   @After
   public void stopEventQueue() {
-    DefaultMetricsSystem.shutdown();
     queue.close();
   }
 
@@ -83,4 +79,35 @@ public class TestEventQueue {
 
   }
 
+  @Test
+  public void handlerGroup() {
+    final long[] result = new long[2];
+    queue.addHandlerGroup(
+        "group",
+        new EventQueue.HandlerForEvent<>(EVENT3, (payload, publisher) ->
+            result[0] = payload),
+        new EventQueue.HandlerForEvent<>(EVENT4, (payload, publisher) ->
+            result[1] = payload)
+    );
+
+    queue.fireEvent(EVENT3, 23L);
+    queue.fireEvent(EVENT4, 42L);
+
+    queue.processAll(1000);
+
+    Assert.assertEquals(23, result[0]);
+    Assert.assertEquals(42, result[1]);
+
+    Set<String> eventQueueThreadNames =
+        Thread.getAllStackTraces().keySet()
+            .stream()
+            .filter(t -> t.getName().startsWith(SingleThreadExecutor
+                .THREAD_NAME_PREFIX))
+            .map(Thread::getName)
+            .collect(Collectors.toSet());
+    System.out.println(eventQueueThreadNames);
+    Assert.assertEquals(1, eventQueueThreadNames.size());
+
+  }
+
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org