You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/05/14 17:44:32 UTC
[29/50] [abbrv] hadoop git commit: HDDS-25. Simple async event
processing for SCM. Contributed by Elek, Marton.
HDDS-25. Simple async event processing for SCM.
Contributed by Elek, Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a4ef3514
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a4ef3514
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a4ef3514
Branch: refs/heads/HDDS-4
Commit: a4ef3514621304777bee607114b35e2b8553a53a
Parents: a95bd94
Author: Anu Engineer <ae...@apache.org>
Authored: Fri May 11 11:35:21 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Mon May 14 10:31:09 2018 -0700
----------------------------------------------------------------------
.../apache/hadoop/hdds/server/events/Event.java | 42 ++++
.../hdds/server/events/EventExecutor.java | 68 ++++++
.../hadoop/hdds/server/events/EventHandler.java | 33 +++
.../hdds/server/events/EventPublisher.java | 28 +++
.../hadoop/hdds/server/events/EventQueue.java | 213 +++++++++++++++++++
.../server/events/SingleThreadExecutor.java | 103 +++++++++
.../hadoop/hdds/server/events/TypedEvent.java | 51 +++++
.../hadoop/hdds/server/events/package-info.java | 23 ++
.../hdds/server/events/TestEventQueue.java | 113 ++++++++++
.../hdds/server/events/TestEventQueueChain.java | 79 +++++++
10 files changed, 753 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4ef3514/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/Event.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/Event.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/Event.java
new file mode 100644
index 0000000..810c8b3
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/Event.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+/**
+ * Identifier of an async event.
+ *
+ * @param <PAYLOAD> THe message payload type of this event.
+ */
+public interface Event<PAYLOAD> {
+
+ /**
+ * The type of the event payload. Payload contains all the required data
+ * to process the event.
+ *
+ */
+ Class<PAYLOAD> getPayloadType();
+
+ /**
+ * The human readable name of the event.
+ *
+ * Used for display in thread names
+ * and monitoring.
+ *
+ */
+ String getName();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4ef3514/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java
new file mode 100644
index 0000000..4257839
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+/**
+ * Executors defined the way how an EventHandler should be called.
+ * <p>
+ * Executors are used only by the EventQueue and they do the thread separation
+ * between the caller and the EventHandler.
+ * <p>
+ * Executors should guarantee that only one thread is executing one
+ * EventHandler at the same time.
+ *
+ * @param <PAYLOAD> the payload type of the event.
+ */
+public interface EventExecutor<PAYLOAD> extends AutoCloseable {
+
+ /**
+ * Process an event payload.
+ *
+ * @param handler the handler to process the payload
+ * @param eventPayload to be processed.
+ * @param publisher to send response/other message forward to the chain.
+ */
+ void onMessage(EventHandler<PAYLOAD> handler,
+ PAYLOAD eventPayload,
+ EventPublisher
+ publisher);
+
+ /**
+ * Return the number of the failed events.
+ */
+ long failedEvents();
+
+
+ /**
+ * Return the number of the processed events.
+ */
+ long successfulEvents();
+
+ /**
+ * Return the number of the not-yet processed events.
+ */
+ long queuedEvents();
+
+ /**
+ * The human readable name for the event executor.
+ * <p>
+ * Used in monitoring and logging.
+ *
+ */
+ String getName();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4ef3514/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventHandler.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventHandler.java
new file mode 100644
index 0000000..f40fc9e
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventHandler.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+/**
+ * Processor to react on an event.
+ *
+ * EventExecutors should guarantee that the implementations are called only
+ * from one thread.
+ *
+ * @param <PAYLOAD>
+ */
+@FunctionalInterface
+public interface EventHandler<PAYLOAD> {
+
+ void onMessage(PAYLOAD payload, EventPublisher publisher);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4ef3514/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventPublisher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventPublisher.java
new file mode 100644
index 0000000..a47fb57
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventPublisher.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+/**
+ * Client interface to send a new event.
+ */
+public interface EventPublisher {
+
+ <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void
+ fireEvent(EVENT_TYPE event, PAYLOAD payload);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4ef3514/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
new file mode 100644
index 0000000..44d85f5
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Simple async event processing utility.
+ * <p>
+ * Event queue handles a collection of event handlers and routes the incoming
+ * events to one (or more) event handler.
+ */
+public class EventQueue implements EventPublisher, AutoCloseable {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(EventQueue.class);
+
+ private final Map<Event, Map<EventExecutor, List<EventHandler>>> executors =
+ new HashMap<>();
+
+ private final AtomicLong queuedCount = new AtomicLong(0);
+
+ private final AtomicLong eventCount = new AtomicLong(0);
+
+ public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
+ EVENT_TYPE event, EventHandler<PAYLOAD> handler) {
+
+ this.addHandler(event, new SingleThreadExecutor<>(
+ event.getName()), handler);
+ }
+
+ public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
+ EVENT_TYPE event,
+ EventExecutor<PAYLOAD> executor,
+ EventHandler<PAYLOAD> handler) {
+
+ executors.putIfAbsent(event, new HashMap<>());
+ executors.get(event).putIfAbsent(executor, new ArrayList<>());
+
+ executors.get(event)
+ .get(executor)
+ .add(handler);
+ }
+
+ /**
+ * Creates one executor with multiple event handlers.
+ */
+ public void addHandlerGroup(String name, HandlerForEvent<?>...
+ eventsAndHandlers) {
+ SingleThreadExecutor sharedExecutor =
+ new SingleThreadExecutor(name);
+ for (HandlerForEvent handlerForEvent : eventsAndHandlers) {
+ addHandler(handlerForEvent.event, sharedExecutor,
+ handlerForEvent.handler);
+ }
+
+ }
+
+ /**
+ * Route an event with payload to the right listener(s).
+ *
+ * @param event The event identifier
+ * @param payload The payload of the event.
+ * @throws IllegalArgumentException If there is no EventHandler for
+ * the specific event.
+ */
+ public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
+ EVENT_TYPE event, PAYLOAD payload) {
+
+ Map<EventExecutor, List<EventHandler>> eventExecutorListMap =
+ this.executors.get(event);
+
+ eventCount.incrementAndGet();
+ if (eventExecutorListMap != null) {
+
+ for (Map.Entry<EventExecutor, List<EventHandler>> executorAndHandlers :
+ eventExecutorListMap.entrySet()) {
+
+ for (EventHandler handler : executorAndHandlers.getValue()) {
+ queuedCount.incrementAndGet();
+
+ executorAndHandlers.getKey()
+ .onMessage(handler, payload, this);
+
+ }
+ }
+
+ } else {
+ throw new IllegalArgumentException(
+ "No event handler registered for event " + event);
+ }
+
+ }
+
+ /**
+ * This is just for unit testing, don't use it for production code.
+ * <p>
+ * It waits for all messages to be processed. If one event handler invokes an
+ * other one, the later one also should be finished.
+ * <p>
+ * Long counter overflow is not handled, therefore it's safe only for unit
+ * testing.
+ * <p>
+ * This method is just eventually consistent. In some cases it could return
+ * even if there are new messages in some of the handler. But in a simple
+ * case (one message) it will return only if the message is processed and
+ * all the dependent messages (messages which are sent by current handlers)
+ * are processed.
+ *
+ * @param timeout Timeout in seconds to wait for the processing.
+ */
+ @VisibleForTesting
+ public void processAll(long timeout) {
+ long currentTime = Time.now();
+ while (true) {
+
+ long processed = 0;
+
+ Stream<EventExecutor> allExecutor = this.executors.values().stream()
+ .flatMap(handlerMap -> handlerMap.keySet().stream());
+
+ boolean allIdle =
+ allExecutor.allMatch(executor -> executor.queuedEvents() == executor
+ .successfulEvents() + executor.failedEvents());
+
+ if (allIdle) {
+ return;
+ }
+
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ if (Time.now() > currentTime + timeout) {
+ throw new AssertionError(
+ "Messages are not processed in the given timeframe. Queued: "
+ + queuedCount.get() + " Processed: " + processed);
+ }
+ }
+ }
+
+ public void close() {
+
+ Set<EventExecutor> allExecutors = this.executors.values().stream()
+ .flatMap(handlerMap -> handlerMap.keySet().stream())
+ .collect(Collectors.toSet());
+
+ allExecutors.forEach(executor -> {
+ try {
+ executor.close();
+ } catch (Exception ex) {
+ LOG.error("Can't close the executor " + executor.getName(), ex);
+ }
+ });
+ }
+
+ /**
+ * Event identifier together with the handler.
+ *
+ * @param <PAYLOAD>
+ */
+ public static class HandlerForEvent<PAYLOAD> {
+
+ private final Event<PAYLOAD> event;
+
+ private final EventHandler<PAYLOAD> handler;
+
+ public HandlerForEvent(
+ Event<PAYLOAD> event,
+ EventHandler<PAYLOAD> handler) {
+ this.event = event;
+ this.handler = handler;
+ }
+
+ public Event<PAYLOAD> getEvent() {
+ return event;
+ }
+
+ public EventHandler<PAYLOAD> getHandler() {
+ return handler;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4ef3514/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
new file mode 100644
index 0000000..a64e3d7
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Simple EventExecutor to call all the event handler one-by-one.
+ *
+ * @param <T>
+ */
+public class SingleThreadExecutor<T> implements EventExecutor<T> {
+
+ public static final String THREAD_NAME_PREFIX = "EventQueue";
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SingleThreadExecutor.class);
+
+ private final String name;
+
+ private final ThreadPoolExecutor executor;
+
+ private final AtomicLong queuedCount = new AtomicLong(0);
+
+ private final AtomicLong successfulCount = new AtomicLong(0);
+
+ private final AtomicLong failedCount = new AtomicLong(0);
+
+ public SingleThreadExecutor(String name) {
+ this.name = name;
+
+ LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
+ executor =
+ new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, workQueue,
+ runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setName(THREAD_NAME_PREFIX + "-" + name);
+ return thread;
+ });
+
+ }
+
+ @Override
+ public void onMessage(EventHandler<T> handler, T message, EventPublisher
+ publisher) {
+ queuedCount.incrementAndGet();
+ executor.execute(() -> {
+ try {
+ handler.onMessage(message, publisher);
+ successfulCount.incrementAndGet();
+ } catch (Exception ex) {
+ LOG.error("Error on execution message {}", message, ex);
+ failedCount.incrementAndGet();
+ }
+ });
+ }
+
+ @Override
+ public long failedEvents() {
+ return failedCount.get();
+ }
+
+ @Override
+ public long successfulEvents() {
+ return successfulCount.get();
+ }
+
+ @Override
+ public long queuedEvents() {
+ return queuedCount.get();
+ }
+
+ @Override
+ public void close() {
+ executor.shutdown();
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4ef3514/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java
new file mode 100644
index 0000000..c2159ad
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+/**
+ * Basic event implementation to implement custom events.
+ *
+ * @param <T>
+ */
+public class TypedEvent<T> implements Event<T> {
+
+ private final Class<T> payloadType;
+
+ private final String name;
+
+ public TypedEvent(Class<T> payloadType, String name) {
+ this.payloadType = payloadType;
+ this.name = name;
+ }
+
+ public TypedEvent(Class<T> payloadType) {
+ this.payloadType = payloadType;
+ this.name = payloadType.getSimpleName();
+ }
+
+ @Override
+ public Class<T> getPayloadType() {
+ return payloadType;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4ef3514/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/package-info.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/package-info.java
new file mode 100644
index 0000000..89999ee
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.server.events;
+
+/**
+ * Simple event queue implementation for hdds/ozone components.
+ */
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4ef3514/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
new file mode 100644
index 0000000..3944409
--- /dev/null
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Testing the basic functionality of the event queue.
+ */
+public class TestEventQueue {
+
+ private static final Event<Long> EVENT1 =
+ new TypedEvent<>(Long.class, "SCM_EVENT1");
+ private static final Event<Long> EVENT2 =
+ new TypedEvent<>(Long.class, "SCM_EVENT2");
+
+ private static final Event<Long> EVENT3 =
+ new TypedEvent<>(Long.class, "SCM_EVENT3");
+ private static final Event<Long> EVENT4 =
+ new TypedEvent<>(Long.class, "SCM_EVENT4");
+
+ private EventQueue queue;
+
+ @Before
+ public void startEventQueue() {
+ queue = new EventQueue();
+ }
+
+ @After
+ public void stopEventQueue() {
+ queue.close();
+ }
+
+ @Test
+ public void simpleEvent() {
+
+ final long[] result = new long[2];
+
+ queue.addHandler(EVENT1, (payload, publisher) -> result[0] = payload);
+
+ queue.fireEvent(EVENT1, 11L);
+ queue.processAll(1000);
+ Assert.assertEquals(11, result[0]);
+
+ }
+
+ @Test
+ public void multipleSubscriber() {
+ final long[] result = new long[2];
+ queue.addHandler(EVENT2, (payload, publisher) -> result[0] = payload);
+
+ queue.addHandler(EVENT2, (payload, publisher) -> result[1] = payload);
+
+ queue.fireEvent(EVENT2, 23L);
+ queue.processAll(1000);
+ Assert.assertEquals(23, result[0]);
+ Assert.assertEquals(23, result[1]);
+
+ }
+
+ @Test
+ public void handlerGroup() {
+ final long[] result = new long[2];
+ queue.addHandlerGroup(
+ "group",
+ new EventQueue.HandlerForEvent<>(EVENT3, (payload, publisher) ->
+ result[0] = payload),
+ new EventQueue.HandlerForEvent<>(EVENT4, (payload, publisher) ->
+ result[1] = payload)
+ );
+
+ queue.fireEvent(EVENT3, 23L);
+ queue.fireEvent(EVENT4, 42L);
+
+ queue.processAll(1000);
+
+ Assert.assertEquals(23, result[0]);
+ Assert.assertEquals(42, result[1]);
+
+ Set<String> eventQueueThreadNames =
+ Thread.getAllStackTraces().keySet()
+ .stream()
+ .filter(t -> t.getName().startsWith(SingleThreadExecutor
+ .THREAD_NAME_PREFIX))
+ .map(Thread::getName)
+ .collect(Collectors.toSet());
+ System.out.println(eventQueueThreadNames);
+ Assert.assertEquals(1, eventQueueThreadNames.size());
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4ef3514/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueueChain.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueueChain.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueueChain.java
new file mode 100644
index 0000000..bb05ef4
--- /dev/null
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueueChain.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import org.junit.Test;
+
+/**
+ * More realistic event test with sending event from one listener.
+ */
+public class TestEventQueueChain {
+
+ private static final Event<FailedNode> DECOMMISSION =
+ new TypedEvent<>(FailedNode.class);
+
+ private static final Event<FailedNode> DECOMMISSION_START =
+ new TypedEvent<>(FailedNode.class);
+
+ @Test
+ public void simpleEvent() {
+ EventQueue queue = new EventQueue();
+
+ queue.addHandler(DECOMMISSION, new PipelineManager());
+ queue.addHandler(DECOMMISSION_START, new NodeWatcher());
+
+ queue.fireEvent(DECOMMISSION, new FailedNode("node1"));
+
+ queue.processAll(5000);
+ }
+
+
+ static class FailedNode {
+ private final String nodeId;
+
+ FailedNode(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ String getNodeId() {
+ return nodeId;
+ }
+ }
+
+ private static class PipelineManager implements EventHandler<FailedNode> {
+
+ @Override
+ public void onMessage(FailedNode message, EventPublisher publisher) {
+
+ System.out.println(
+ "Closing pipelines for all pipelines including node: " + message
+ .getNodeId());
+
+ publisher.fireEvent(DECOMMISSION_START, message);
+ }
+
+ }
+
+ private static class NodeWatcher implements EventHandler<FailedNode> {
+
+ @Override
+ public void onMessage(FailedNode message, EventPublisher publisher) {
+ System.out.println("Clear timer");
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org