You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/05/11 18:37:06 UTC

hadoop git commit: HDDS-25. Simple async event processing for SCM. Contributed by Elek, Marton.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 1f10a3602 -> ba12e8805


HDDS-25. Simple async event processing for SCM.
Contributed by Elek, Marton.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ba12e880
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ba12e880
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ba12e880

Branch: refs/heads/trunk
Commit: ba12e8805e2ae6f125042bfb1d6b3cfc10faf9ed
Parents: 1f10a36
Author: Anu Engineer <ae...@apache.org>
Authored: Fri May 11 11:35:21 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Fri May 11 11:36:52 2018 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hdds/server/events/Event.java |  42 ++++
 .../hdds/server/events/EventExecutor.java       |  68 ++++++
 .../hadoop/hdds/server/events/EventHandler.java |  33 +++
 .../hdds/server/events/EventPublisher.java      |  28 +++
 .../hadoop/hdds/server/events/EventQueue.java   | 213 +++++++++++++++++++
 .../server/events/SingleThreadExecutor.java     | 103 +++++++++
 .../hadoop/hdds/server/events/TypedEvent.java   |  51 +++++
 .../hadoop/hdds/server/events/package-info.java |  23 ++
 .../hdds/server/events/TestEventQueue.java      | 113 ++++++++++
 .../hdds/server/events/TestEventQueueChain.java |  79 +++++++
 10 files changed, 753 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba12e880/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/Event.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/Event.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/Event.java
new file mode 100644
index 0000000..810c8b3
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/Event.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+/**
+ * Identifier of an async event.
+ *
+ * @param <PAYLOAD> THe message payload type of this event.
+ */
+public interface Event<PAYLOAD> {
+
+  /**
+   * The type of the event payload. Payload contains all the required data
+   * to process the event.
+   *
+   */
+  Class<PAYLOAD> getPayloadType();
+
+  /**
+   * The human readable name of the event.
+   *
+   * Used for display in thread names
+   * and monitoring.
+   *
+   */
+  String getName();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba12e880/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java
new file mode 100644
index 0000000..4257839
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutor.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+/**
+ * Executors defined the  way how an EventHandler should be called.
+ * <p>
+ * Executors are used only by the EventQueue and they do the thread separation
+ * between the caller and the EventHandler.
+ * <p>
+ * Executors should guarantee that only one thread is executing one
+ * EventHandler at the same time.
+ *
+ * @param <PAYLOAD> the payload type of the event.
+ */
+public interface EventExecutor<PAYLOAD> extends AutoCloseable {
+
+  /**
+   * Process an event payload.
+   *
+   * @param handler      the handler to process the payload
+   * @param eventPayload to be processed.
+   * @param publisher    to send response/other message forward to the chain.
+   */
+  void onMessage(EventHandler<PAYLOAD> handler,
+      PAYLOAD eventPayload,
+      EventPublisher
+          publisher);
+
+  /**
+   * Return the number of the failed events.
+   */
+  long failedEvents();
+
+
+  /**
+   * Return the number of the processed events.
+   */
+  long successfulEvents();
+
+  /**
+   * Return the number of the not-yet processed events.
+   */
+  long queuedEvents();
+
+  /**
+   * The human readable name for the event executor.
+   * <p>
+   * Used in monitoring and logging.
+   *
+   */
+  String getName();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba12e880/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventHandler.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventHandler.java
new file mode 100644
index 0000000..f40fc9e
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventHandler.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+/**
+ * Processor to react on an event.
+ *
+ * EventExecutors should guarantee that the implementations are called only
+ * from one thread.
+ *
+ * @param <PAYLOAD>
+ */
+@FunctionalInterface
+public interface EventHandler<PAYLOAD> {
+
+  void onMessage(PAYLOAD payload, EventPublisher publisher);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba12e880/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventPublisher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventPublisher.java
new file mode 100644
index 0000000..a47fb57
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventPublisher.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+/**
+ * Client interface to send a new event.
+ */
+public interface EventPublisher {
+
+  <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void
+      fireEvent(EVENT_TYPE event, PAYLOAD payload);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba12e880/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
new file mode 100644
index 0000000..44d85f5
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Simple async event processing utility.
+ * <p>
+ * Event queue handles a collection of event handlers and routes the incoming
+ * events to one (or more) event handler.
+ */
+public class EventQueue implements EventPublisher, AutoCloseable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(EventQueue.class);
+
+  private final Map<Event, Map<EventExecutor, List<EventHandler>>> executors =
+      new HashMap<>();
+
+  private final AtomicLong queuedCount = new AtomicLong(0);
+
+  private final AtomicLong eventCount = new AtomicLong(0);
+
+  public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
+      EVENT_TYPE event, EventHandler<PAYLOAD> handler) {
+
+    this.addHandler(event, new SingleThreadExecutor<>(
+        event.getName()), handler);
+  }
+
+  public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
+      EVENT_TYPE event,
+      EventExecutor<PAYLOAD> executor,
+      EventHandler<PAYLOAD> handler) {
+
+    executors.putIfAbsent(event, new HashMap<>());
+    executors.get(event).putIfAbsent(executor, new ArrayList<>());
+
+    executors.get(event)
+        .get(executor)
+        .add(handler);
+  }
+
+  /**
+   * Creates one executor with multiple event handlers.
+   */
+  public void addHandlerGroup(String name, HandlerForEvent<?>...
+      eventsAndHandlers) {
+    SingleThreadExecutor sharedExecutor =
+        new SingleThreadExecutor(name);
+    for (HandlerForEvent handlerForEvent : eventsAndHandlers) {
+      addHandler(handlerForEvent.event, sharedExecutor,
+          handlerForEvent.handler);
+    }
+
+  }
+
+  /**
+   * Route an event with payload to the right listener(s).
+   *
+   * @param event   The event identifier
+   * @param payload The payload of the event.
+   * @throws IllegalArgumentException If there is no EventHandler for
+   *                                  the specific event.
+   */
+  public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
+      EVENT_TYPE event, PAYLOAD payload) {
+
+    Map<EventExecutor, List<EventHandler>> eventExecutorListMap =
+        this.executors.get(event);
+
+    eventCount.incrementAndGet();
+    if (eventExecutorListMap != null) {
+
+      for (Map.Entry<EventExecutor, List<EventHandler>> executorAndHandlers :
+          eventExecutorListMap.entrySet()) {
+
+        for (EventHandler handler : executorAndHandlers.getValue()) {
+          queuedCount.incrementAndGet();
+
+          executorAndHandlers.getKey()
+              .onMessage(handler, payload, this);
+
+        }
+      }
+
+    } else {
+      throw new IllegalArgumentException(
+          "No event handler registered for event " + event);
+    }
+
+  }
+
+  /**
+   * This is just for unit testing, don't use it for production code.
+   * <p>
+   * It waits for all messages to be processed. If one event handler invokes an
+   * other one, the later one also should be finished.
+   * <p>
+   * Long counter overflow is not handled, therefore it's safe only for unit
+   * testing.
+   * <p>
+   * This method is just eventually consistent. In some cases it could return
+   * even if there are new messages in some of the handler. But in a simple
+   * case (one message) it will return only if the message is processed and
+   * all the dependent messages (messages which are sent by current handlers)
+   * are processed.
+   *
+   * @param timeout Timeout in seconds to wait for the processing.
+   */
+  @VisibleForTesting
+  public void processAll(long timeout) {
+    long currentTime = Time.now();
+    while (true) {
+
+      long processed = 0;
+
+      Stream<EventExecutor> allExecutor = this.executors.values().stream()
+          .flatMap(handlerMap -> handlerMap.keySet().stream());
+
+      boolean allIdle =
+          allExecutor.allMatch(executor -> executor.queuedEvents() == executor
+              .successfulEvents() + executor.failedEvents());
+
+      if (allIdle) {
+        return;
+      }
+
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+
+      if (Time.now() > currentTime + timeout) {
+        throw new AssertionError(
+            "Messages are not processed in the given timeframe. Queued: "
+                + queuedCount.get() + " Processed: " + processed);
+      }
+    }
+  }
+
+  public void close() {
+
+    Set<EventExecutor> allExecutors = this.executors.values().stream()
+        .flatMap(handlerMap -> handlerMap.keySet().stream())
+        .collect(Collectors.toSet());
+
+    allExecutors.forEach(executor -> {
+      try {
+        executor.close();
+      } catch (Exception ex) {
+        LOG.error("Can't close the executor " + executor.getName(), ex);
+      }
+    });
+  }
+
+  /**
+   * Event identifier together with the handler.
+   *
+   * @param <PAYLOAD>
+   */
+  public static class HandlerForEvent<PAYLOAD> {
+
+    private final Event<PAYLOAD> event;
+
+    private final EventHandler<PAYLOAD> handler;
+
+    public HandlerForEvent(
+        Event<PAYLOAD> event,
+        EventHandler<PAYLOAD> handler) {
+      this.event = event;
+      this.handler = handler;
+    }
+
+    public Event<PAYLOAD> getEvent() {
+      return event;
+    }
+
+    public EventHandler<PAYLOAD> getHandler() {
+      return handler;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba12e880/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
new file mode 100644
index 0000000..a64e3d7
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Simple EventExecutor to call all the event handler one-by-one.
+ *
+ * @param <T>
+ */
+public class SingleThreadExecutor<T> implements EventExecutor<T> {
+
+  public static final String THREAD_NAME_PREFIX = "EventQueue";
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SingleThreadExecutor.class);
+
+  private final String name;
+
+  private final ThreadPoolExecutor executor;
+
+  private final AtomicLong queuedCount = new AtomicLong(0);
+
+  private final AtomicLong successfulCount = new AtomicLong(0);
+
+  private final AtomicLong failedCount = new AtomicLong(0);
+
+  public SingleThreadExecutor(String name) {
+    this.name = name;
+
+    LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
+    executor =
+        new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, workQueue,
+            runnable -> {
+              Thread thread = new Thread(runnable);
+              thread.setName(THREAD_NAME_PREFIX + "-" + name);
+              return thread;
+            });
+
+  }
+
+  @Override
+  public void onMessage(EventHandler<T> handler, T message, EventPublisher
+      publisher) {
+    queuedCount.incrementAndGet();
+    executor.execute(() -> {
+      try {
+        handler.onMessage(message, publisher);
+        successfulCount.incrementAndGet();
+      } catch (Exception ex) {
+        LOG.error("Error on execution message {}", message, ex);
+        failedCount.incrementAndGet();
+      }
+    });
+  }
+
+  @Override
+  public long failedEvents() {
+    return failedCount.get();
+  }
+
+  @Override
+  public long successfulEvents() {
+    return successfulCount.get();
+  }
+
+  @Override
+  public long queuedEvents() {
+    return queuedCount.get();
+  }
+
+  @Override
+  public void close() {
+    executor.shutdown();
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba12e880/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java
new file mode 100644
index 0000000..c2159ad
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+/**
+ * Basic event implementation to implement custom events.
+ *
+ * @param <T>
+ */
+public class TypedEvent<T> implements Event<T> {
+
+  private final Class<T> payloadType;
+
+  private final String name;
+
+  public TypedEvent(Class<T> payloadType, String name) {
+    this.payloadType = payloadType;
+    this.name = name;
+  }
+
+  public TypedEvent(Class<T> payloadType) {
+    this.payloadType = payloadType;
+    this.name = payloadType.getSimpleName();
+  }
+
+  @Override
+  public Class<T> getPayloadType() {
+    return payloadType;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba12e880/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/package-info.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/package-info.java
new file mode 100644
index 0000000..89999ee
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.server.events;
+
+/**
+ * Simple event queue implementation for hdds/ozone components.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba12e880/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
new file mode 100644
index 0000000..3944409
--- /dev/null
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Testing the basic functionality of the event queue.
+ */
+public class TestEventQueue {
+
+  private static final Event<Long> EVENT1 =
+      new TypedEvent<>(Long.class, "SCM_EVENT1");
+  private static final Event<Long> EVENT2 =
+      new TypedEvent<>(Long.class, "SCM_EVENT2");
+
+  private static final Event<Long> EVENT3 =
+      new TypedEvent<>(Long.class, "SCM_EVENT3");
+  private static final Event<Long> EVENT4 =
+      new TypedEvent<>(Long.class, "SCM_EVENT4");
+
+  private EventQueue queue;
+
+  @Before
+  public void startEventQueue() {
+    queue = new EventQueue();
+  }
+
+  @After
+  public void stopEventQueue() {
+    queue.close();
+  }
+
+  @Test
+  public void simpleEvent() {
+
+    final long[] result = new long[2];
+
+    queue.addHandler(EVENT1, (payload, publisher) -> result[0] = payload);
+
+    queue.fireEvent(EVENT1, 11L);
+    queue.processAll(1000);
+    Assert.assertEquals(11, result[0]);
+
+  }
+
+  @Test
+  public void multipleSubscriber() {
+    final long[] result = new long[2];
+    queue.addHandler(EVENT2, (payload, publisher) -> result[0] = payload);
+
+    queue.addHandler(EVENT2, (payload, publisher) -> result[1] = payload);
+
+    queue.fireEvent(EVENT2, 23L);
+    queue.processAll(1000);
+    Assert.assertEquals(23, result[0]);
+    Assert.assertEquals(23, result[1]);
+
+  }
+
+  @Test
+  public void handlerGroup() {
+    final long[] result = new long[2];
+    queue.addHandlerGroup(
+        "group",
+        new EventQueue.HandlerForEvent<>(EVENT3, (payload, publisher) ->
+            result[0] = payload),
+        new EventQueue.HandlerForEvent<>(EVENT4, (payload, publisher) ->
+            result[1] = payload)
+    );
+
+    queue.fireEvent(EVENT3, 23L);
+    queue.fireEvent(EVENT4, 42L);
+
+    queue.processAll(1000);
+
+    Assert.assertEquals(23, result[0]);
+    Assert.assertEquals(42, result[1]);
+
+    Set<String> eventQueueThreadNames =
+        Thread.getAllStackTraces().keySet()
+            .stream()
+            .filter(t -> t.getName().startsWith(SingleThreadExecutor
+                .THREAD_NAME_PREFIX))
+            .map(Thread::getName)
+            .collect(Collectors.toSet());
+    System.out.println(eventQueueThreadNames);
+    Assert.assertEquals(1, eventQueueThreadNames.size());
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ba12e880/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueueChain.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueueChain.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueueChain.java
new file mode 100644
index 0000000..bb05ef4
--- /dev/null
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueueChain.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import org.junit.Test;
+
+/**
+ * More realistic event test with sending event from one listener.
+ */
+public class TestEventQueueChain {
+
+  private static final Event<FailedNode> DECOMMISSION =
+      new TypedEvent<>(FailedNode.class);
+
+  private static final Event<FailedNode> DECOMMISSION_START =
+      new TypedEvent<>(FailedNode.class);
+
+  @Test
+  public void simpleEvent() {
+    EventQueue queue = new EventQueue();
+
+    queue.addHandler(DECOMMISSION, new PipelineManager());
+    queue.addHandler(DECOMMISSION_START, new NodeWatcher());
+
+    queue.fireEvent(DECOMMISSION, new FailedNode("node1"));
+
+    queue.processAll(5000);
+  }
+
+
+  static class FailedNode {
+    private final String nodeId;
+
+    FailedNode(String nodeId) {
+      this.nodeId = nodeId;
+    }
+
+    String getNodeId() {
+      return nodeId;
+    }
+  }
+
+  private static class PipelineManager implements EventHandler<FailedNode> {
+
+    @Override
+    public void onMessage(FailedNode message, EventPublisher publisher) {
+
+      System.out.println(
+          "Closing pipelines for all pipelines including node: " + message
+              .getNodeId());
+
+      publisher.fireEvent(DECOMMISSION_START, message);
+    }
+
+  }
+
+  private static class NodeWatcher implements EventHandler<FailedNode> {
+
+    @Override
+    public void onMessage(FailedNode message, EventPublisher publisher) {
+      System.out.println("Clear timer");
+    }
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org