You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/07/09 20:12:06 UTC
[1/2] hadoop git commit: Revert "HDDS-224. Create metrics for Event
Watcher."
Repository: hadoop
Updated Branches:
refs/heads/trunk cb5e22586 -> 2403231c8
Revert "HDDS-224. Create metrics for Event Watcher."
This reverts commit cb5e225868a069d6d16244b462ebada44465dce8.
The JIRA number is wrong, reverting to fix it.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3c0a66ab
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3c0a66ab
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3c0a66ab
Branch: refs/heads/trunk
Commit: 3c0a66abe632277e89fccd8dced9e71ca5d87df0
Parents: cb5e225
Author: Anu Engineer <ae...@apache.org>
Authored: Mon Jul 9 13:03:57 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Mon Jul 9 13:03:57 2018 -0700
----------------------------------------------------------------------
.../hadoop/hdds/server/events/EventQueue.java | 108 ++++++++-----------
.../server/events/SingleThreadExecutor.java | 35 ++----
.../hdds/server/events/TestEventQueue.java | 35 +++++-
3 files changed, 87 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c0a66ab/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
index 7e29223..44d85f5 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
@@ -18,11 +18,7 @@
package org.apache.hadoop.hdds.server.events;
import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
-
-import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,8 +42,6 @@ public class EventQueue implements EventPublisher, AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(EventQueue.class);
- private static final String EXECUTOR_NAME_SEPARATOR = "For";
-
private final Map<Event, Map<EventExecutor, List<EventHandler>>> executors =
new HashMap<>();
@@ -57,73 +51,37 @@ public class EventQueue implements EventPublisher, AutoCloseable {
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
EVENT_TYPE event, EventHandler<PAYLOAD> handler) {
- this.addHandler(event, handler, generateHandlerName(handler));
- }
-
- /**
- * Add new handler to the event queue.
- * <p>
- * By default a separated single thread executor will be dedicated to
- * deliver the events to the registered event handler.
- *
- * @param event Triggering event.
- * @param handler Handler of event (will be called from a separated
- * thread)
- * @param handlerName The name of handler (should be unique together with
- * the event name)
- * @param <PAYLOAD> The type of the event payload.
- * @param <EVENT_TYPE> The type of the event identifier.
- */
- public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
- EVENT_TYPE event, EventHandler<PAYLOAD> handler, String handlerName) {
- validateEvent(event);
- Preconditions.checkNotNull(handler, "Handler name should not be null.");
- String executorName =
- StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR
- + handlerName;
- this.addHandler(event, new SingleThreadExecutor<>(executorName), handler);
- }
-
- private <EVENT_TYPE extends Event<?>> void validateEvent(EVENT_TYPE event) {
- Preconditions
- .checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR),
- "Event name should not contain " + EXECUTOR_NAME_SEPARATOR
- + " string.");
+ this.addHandler(event, new SingleThreadExecutor<>(
+ event.getName()), handler);
}
- private <PAYLOAD> String generateHandlerName(EventHandler<PAYLOAD> handler) {
- if (!"".equals(handler.getClass().getSimpleName())) {
- return handler.getClass().getSimpleName();
- } else {
- return handler.getClass().getName();
- }
- }
-
- /**
- * Add event handler with custom executor.
- *
- * @param event Triggering event.
- * @param executor The executor imlementation to deliver events from a
- * separated threads. Please keep in your mind that
- * registering metrics is the responsibility of the
- * caller.
- * @param handler Handler of event (will be called from a separated
- * thread)
- * @param <PAYLOAD> The type of the event payload.
- * @param <EVENT_TYPE> The type of the event identifier.
- */
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
- EVENT_TYPE event, EventExecutor<PAYLOAD> executor,
+ EVENT_TYPE event,
+ EventExecutor<PAYLOAD> executor,
EventHandler<PAYLOAD> handler) {
- validateEvent(event);
+
executors.putIfAbsent(event, new HashMap<>());
executors.get(event).putIfAbsent(executor, new ArrayList<>());
- executors.get(event).get(executor).add(handler);
+ executors.get(event)
+ .get(executor)
+ .add(handler);
}
+ /**
+ * Creates one executor with multiple event handlers.
+ */
+ public void addHandlerGroup(String name, HandlerForEvent<?>...
+ eventsAndHandlers) {
+ SingleThreadExecutor sharedExecutor =
+ new SingleThreadExecutor(name);
+ for (HandlerForEvent handlerForEvent : eventsAndHandlers) {
+ addHandler(handlerForEvent.event, sharedExecutor,
+ handlerForEvent.handler);
+ }
+ }
/**
* Route an event with payload to the right listener(s).
@@ -225,5 +183,31 @@ public class EventQueue implements EventPublisher, AutoCloseable {
});
}
+ /**
+ * Event identifier together with the handler.
+ *
+ * @param <PAYLOAD>
+ */
+ public static class HandlerForEvent<PAYLOAD> {
+
+ private final Event<PAYLOAD> event;
+
+ private final EventHandler<PAYLOAD> handler;
+
+ public HandlerForEvent(
+ Event<PAYLOAD> event,
+ EventHandler<PAYLOAD> handler) {
+ this.event = event;
+ this.handler = handler;
+ }
+
+ public Event<PAYLOAD> getEvent() {
+ return event;
+ }
+
+ public EventHandler<PAYLOAD> getHandler() {
+ return handler;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c0a66ab/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
index 3253f2d..a64e3d7 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
@@ -23,18 +23,13 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Simple EventExecutor to call all the event handler one-by-one.
*
* @param <T>
*/
-@Metrics(context = "EventQueue")
public class SingleThreadExecutor<T> implements EventExecutor<T> {
public static final String THREAD_NAME_PREFIX = "EventQueue";
@@ -46,24 +41,14 @@ public class SingleThreadExecutor<T> implements EventExecutor<T> {
private final ThreadPoolExecutor executor;
- @Metric
- private MutableCounterLong queued;
+ private final AtomicLong queuedCount = new AtomicLong(0);
- @Metric
- private MutableCounterLong done;
+ private final AtomicLong successfulCount = new AtomicLong(0);
- @Metric
- private MutableCounterLong failed;
+ private final AtomicLong failedCount = new AtomicLong(0);
- /**
- * Create SingleThreadExecutor.
- *
- * @param name Unique name used in monitoring and metrics.
- */
public SingleThreadExecutor(String name) {
this.name = name;
- DefaultMetricsSystem.instance()
- .register("EventQueue" + name, "Event Executor metrics ", this);
LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
executor =
@@ -79,31 +64,31 @@ public class SingleThreadExecutor<T> implements EventExecutor<T> {
@Override
public void onMessage(EventHandler<T> handler, T message, EventPublisher
publisher) {
- queued.incr();
+ queuedCount.incrementAndGet();
executor.execute(() -> {
try {
handler.onMessage(message, publisher);
- done.incr();
+ successfulCount.incrementAndGet();
} catch (Exception ex) {
LOG.error("Error on execution message {}", message, ex);
- failed.incr();
+ failedCount.incrementAndGet();
}
});
}
@Override
public long failedEvents() {
- return failed.value();
+ return failedCount.get();
}
@Override
public long successfulEvents() {
- return done.value();
+ return successfulCount.get();
}
@Override
public long queuedEvents() {
- return queued.value();
+ return queuedCount.get();
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c0a66ab/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
index 2bdf705..3944409 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
@@ -25,8 +25,6 @@ import org.junit.Test;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-
/**
* Testing the basic functionality of the event queue.
*/
@@ -46,13 +44,11 @@ public class TestEventQueue {
@Before
public void startEventQueue() {
- DefaultMetricsSystem.initialize(getClass().getSimpleName());
queue = new EventQueue();
}
@After
public void stopEventQueue() {
- DefaultMetricsSystem.shutdown();
queue.close();
}
@@ -83,4 +79,35 @@ public class TestEventQueue {
}
+ @Test
+ public void handlerGroup() {
+ final long[] result = new long[2];
+ queue.addHandlerGroup(
+ "group",
+ new EventQueue.HandlerForEvent<>(EVENT3, (payload, publisher) ->
+ result[0] = payload),
+ new EventQueue.HandlerForEvent<>(EVENT4, (payload, publisher) ->
+ result[1] = payload)
+ );
+
+ queue.fireEvent(EVENT3, 23L);
+ queue.fireEvent(EVENT4, 42L);
+
+ queue.processAll(1000);
+
+ Assert.assertEquals(23, result[0]);
+ Assert.assertEquals(42, result[1]);
+
+ Set<String> eventQueueThreadNames =
+ Thread.getAllStackTraces().keySet()
+ .stream()
+ .filter(t -> t.getName().startsWith(SingleThreadExecutor
+ .THREAD_NAME_PREFIX))
+ .map(Thread::getName)
+ .collect(Collectors.toSet());
+ System.out.println(eventQueueThreadNames);
+ Assert.assertEquals(1, eventQueueThreadNames.size());
+
+ }
+
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: HDDS-240. Implement metrics for EventQueue.
Contributed by Elek, Marton.
Posted by ae...@apache.org.
HDDS-240. Implement metrics for EventQueue.
Contributed by Elek, Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2403231c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2403231c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2403231c
Branch: refs/heads/trunk
Commit: 2403231c8c3685ba08cd6bdf715d281cae611e45
Parents: 3c0a66a
Author: Anu Engineer <ae...@apache.org>
Authored: Mon Jul 9 13:04:44 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Mon Jul 9 13:04:44 2018 -0700
----------------------------------------------------------------------
.../hadoop/hdds/server/events/EventQueue.java | 108 +++++++++++--------
.../server/events/SingleThreadExecutor.java | 35 ++++--
.../hdds/server/events/TestEventQueue.java | 35 +-----
3 files changed, 91 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2403231c/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
index 44d85f5..7e29223 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
@@ -18,7 +18,11 @@
package org.apache.hadoop.hdds.server.events;
import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
+
+import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +46,8 @@ public class EventQueue implements EventPublisher, AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(EventQueue.class);
+ private static final String EXECUTOR_NAME_SEPARATOR = "For";
+
private final Map<Event, Map<EventExecutor, List<EventHandler>>> executors =
new HashMap<>();
@@ -51,38 +57,74 @@ public class EventQueue implements EventPublisher, AutoCloseable {
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
EVENT_TYPE event, EventHandler<PAYLOAD> handler) {
-
- this.addHandler(event, new SingleThreadExecutor<>(
- event.getName()), handler);
+ this.addHandler(event, handler, generateHandlerName(handler));
}
+ /**
+ * Add new handler to the event queue.
+ * <p>
+ * By default a separated single thread executor will be dedicated to
+ * deliver the events to the registered event handler.
+ *
+ * @param event Triggering event.
+ * @param handler Handler of event (will be called from a separated
+ * thread)
+ * @param handlerName The name of handler (should be unique together with
+ * the event name)
+ * @param <PAYLOAD> The type of the event payload.
+ * @param <EVENT_TYPE> The type of the event identifier.
+ */
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
- EVENT_TYPE event,
- EventExecutor<PAYLOAD> executor,
- EventHandler<PAYLOAD> handler) {
+ EVENT_TYPE event, EventHandler<PAYLOAD> handler, String handlerName) {
+ validateEvent(event);
+ Preconditions.checkNotNull(handler, "Handler name should not be null.");
+ String executorName =
+ StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR
+ + handlerName;
+ this.addHandler(event, new SingleThreadExecutor<>(executorName), handler);
+ }
- executors.putIfAbsent(event, new HashMap<>());
- executors.get(event).putIfAbsent(executor, new ArrayList<>());
+ private <EVENT_TYPE extends Event<?>> void validateEvent(EVENT_TYPE event) {
+ Preconditions
+ .checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR),
+ "Event name should not contain " + EXECUTOR_NAME_SEPARATOR
+ + " string.");
- executors.get(event)
- .get(executor)
- .add(handler);
+ }
+
+ private <PAYLOAD> String generateHandlerName(EventHandler<PAYLOAD> handler) {
+ if (!"".equals(handler.getClass().getSimpleName())) {
+ return handler.getClass().getSimpleName();
+ } else {
+ return handler.getClass().getName();
+ }
}
/**
- * Creates one executor with multiple event handlers.
+ * Add event handler with custom executor.
+ *
+ * @param event Triggering event.
+ * @param executor The executor imlementation to deliver events from a
+ * separated threads. Please keep in your mind that
+ * registering metrics is the responsibility of the
+ * caller.
+ * @param handler Handler of event (will be called from a separated
+ * thread)
+ * @param <PAYLOAD> The type of the event payload.
+ * @param <EVENT_TYPE> The type of the event identifier.
*/
- public void addHandlerGroup(String name, HandlerForEvent<?>...
- eventsAndHandlers) {
- SingleThreadExecutor sharedExecutor =
- new SingleThreadExecutor(name);
- for (HandlerForEvent handlerForEvent : eventsAndHandlers) {
- addHandler(handlerForEvent.event, sharedExecutor,
- handlerForEvent.handler);
- }
+ public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
+ EVENT_TYPE event, EventExecutor<PAYLOAD> executor,
+ EventHandler<PAYLOAD> handler) {
+ validateEvent(event);
+ executors.putIfAbsent(event, new HashMap<>());
+ executors.get(event).putIfAbsent(executor, new ArrayList<>());
+ executors.get(event).get(executor).add(handler);
}
+
+
/**
* Route an event with payload to the right listener(s).
*
@@ -183,31 +225,5 @@ public class EventQueue implements EventPublisher, AutoCloseable {
});
}
- /**
- * Event identifier together with the handler.
- *
- * @param <PAYLOAD>
- */
- public static class HandlerForEvent<PAYLOAD> {
-
- private final Event<PAYLOAD> event;
-
- private final EventHandler<PAYLOAD> handler;
-
- public HandlerForEvent(
- Event<PAYLOAD> event,
- EventHandler<PAYLOAD> handler) {
- this.event = event;
- this.handler = handler;
- }
-
- public Event<PAYLOAD> getEvent() {
- return event;
- }
-
- public EventHandler<PAYLOAD> getHandler() {
- return handler;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2403231c/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
index a64e3d7..3253f2d 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
@@ -23,13 +23,18 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
/**
* Simple EventExecutor to call all the event handler one-by-one.
*
* @param <T>
*/
+@Metrics(context = "EventQueue")
public class SingleThreadExecutor<T> implements EventExecutor<T> {
public static final String THREAD_NAME_PREFIX = "EventQueue";
@@ -41,14 +46,24 @@ public class SingleThreadExecutor<T> implements EventExecutor<T> {
private final ThreadPoolExecutor executor;
- private final AtomicLong queuedCount = new AtomicLong(0);
+ @Metric
+ private MutableCounterLong queued;
- private final AtomicLong successfulCount = new AtomicLong(0);
+ @Metric
+ private MutableCounterLong done;
- private final AtomicLong failedCount = new AtomicLong(0);
+ @Metric
+ private MutableCounterLong failed;
+ /**
+ * Create SingleThreadExecutor.
+ *
+ * @param name Unique name used in monitoring and metrics.
+ */
public SingleThreadExecutor(String name) {
this.name = name;
+ DefaultMetricsSystem.instance()
+ .register("EventQueue" + name, "Event Executor metrics ", this);
LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
executor =
@@ -64,31 +79,31 @@ public class SingleThreadExecutor<T> implements EventExecutor<T> {
@Override
public void onMessage(EventHandler<T> handler, T message, EventPublisher
publisher) {
- queuedCount.incrementAndGet();
+ queued.incr();
executor.execute(() -> {
try {
handler.onMessage(message, publisher);
- successfulCount.incrementAndGet();
+ done.incr();
} catch (Exception ex) {
LOG.error("Error on execution message {}", message, ex);
- failedCount.incrementAndGet();
+ failed.incr();
}
});
}
@Override
public long failedEvents() {
- return failedCount.get();
+ return failed.value();
}
@Override
public long successfulEvents() {
- return successfulCount.get();
+ return done.value();
}
@Override
public long queuedEvents() {
- return queuedCount.get();
+ return queued.value();
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2403231c/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
index 3944409..2bdf705 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
@@ -25,6 +25,8 @@ import org.junit.Test;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+
/**
* Testing the basic functionality of the event queue.
*/
@@ -44,11 +46,13 @@ public class TestEventQueue {
@Before
public void startEventQueue() {
+ DefaultMetricsSystem.initialize(getClass().getSimpleName());
queue = new EventQueue();
}
@After
public void stopEventQueue() {
+ DefaultMetricsSystem.shutdown();
queue.close();
}
@@ -79,35 +83,4 @@ public class TestEventQueue {
}
- @Test
- public void handlerGroup() {
- final long[] result = new long[2];
- queue.addHandlerGroup(
- "group",
- new EventQueue.HandlerForEvent<>(EVENT3, (payload, publisher) ->
- result[0] = payload),
- new EventQueue.HandlerForEvent<>(EVENT4, (payload, publisher) ->
- result[1] = payload)
- );
-
- queue.fireEvent(EVENT3, 23L);
- queue.fireEvent(EVENT4, 42L);
-
- queue.processAll(1000);
-
- Assert.assertEquals(23, result[0]);
- Assert.assertEquals(42, result[1]);
-
- Set<String> eventQueueThreadNames =
- Thread.getAllStackTraces().keySet()
- .stream()
- .filter(t -> t.getName().startsWith(SingleThreadExecutor
- .THREAD_NAME_PREFIX))
- .map(Thread::getName)
- .collect(Collectors.toSet());
- System.out.println(eventQueueThreadNames);
- Assert.assertEquals(1, eventQueueThreadNames.size());
-
- }
-
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org