You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2018/07/09 20:20:15 UTC

[39/50] [abbrv] hadoop git commit: HDDS-224. Create metrics for Event Watcher. Contributed by Elek, Marton.

HDDS-224. Create metrics for Event Watcher.
Contributed by Elek, Marton.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e12d93bf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e12d93bf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e12d93bf

Branch: refs/heads/HDDS-48
Commit: e12d93bfc1a0efd007bc84758e60b5149c3aa663
Parents: 895845e
Author: Anu Engineer <ae...@apache.org>
Authored: Mon Jul 9 12:02:20 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Mon Jul 9 12:10:12 2018 -0700

----------------------------------------------------------------------
 hadoop-hdds/framework/pom.xml                   |   5 +
 .../hadoop/hdds/server/events/EventWatcher.java |  43 +++++++-
 .../hdds/server/events/EventWatcherMetrics.java |  79 ++++++++++++++
 .../hdds/server/events/TestEventWatcher.java    | 107 ++++++++++++++++---
 4 files changed, 220 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12d93bf/hadoop-hdds/framework/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/pom.xml b/hadoop-hdds/framework/pom.xml
index a497133..6e1927d 100644
--- a/hadoop-hdds/framework/pom.xml
+++ b/hadoop-hdds/framework/pom.xml
@@ -39,6 +39,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>hadoop-hdds-common</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12d93bf/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
index 19fddde..8c5605a 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
@@ -26,12 +26,17 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.lease.Lease;
 import org.apache.hadoop.ozone.lease.LeaseAlreadyExistException;
 import org.apache.hadoop.ozone.lease.LeaseExpiredException;
 import org.apache.hadoop.ozone.lease.LeaseManager;
 import org.apache.hadoop.ozone.lease.LeaseNotFoundException;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.map.HashedMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,18 +63,39 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
 
   private final LeaseManager<UUID> leaseManager;
 
+  private final EventWatcherMetrics metrics;
+
+  private final String name;
+
   protected final Map<UUID, TIMEOUT_PAYLOAD> trackedEventsByUUID =
       new ConcurrentHashMap<>();
 
   protected final Set<TIMEOUT_PAYLOAD> trackedEvents = new HashSet<>();
 
-  public EventWatcher(Event<TIMEOUT_PAYLOAD> startEvent,
+  private final Map<UUID, Long> startTrackingTimes = new HashedMap();
+
+  public EventWatcher(String name, Event<TIMEOUT_PAYLOAD> startEvent,
       Event<COMPLETION_PAYLOAD> completionEvent,
       LeaseManager<UUID> leaseManager) {
     this.startEvent = startEvent;
     this.completionEvent = completionEvent;
     this.leaseManager = leaseManager;
+    this.metrics = new EventWatcherMetrics();
+    Preconditions.checkNotNull(name);
+    if (name.equals("")) {
+      name = getClass().getSimpleName();
+    }
+    if (name.equals("")) {
+      //for anonymous inner classes
+      name = getClass().getName();
+    }
+    this.name = name;
+  }
 
+  public EventWatcher(Event<TIMEOUT_PAYLOAD> startEvent,
+      Event<COMPLETION_PAYLOAD> completionEvent,
+      LeaseManager<UUID> leaseManager) {
+    this("", startEvent, completionEvent, leaseManager);
   }
 
   public void start(EventQueue queue) {
@@ -87,11 +113,16 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
       }
     });
 
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.register(name, "EventWatcher metrics", metrics);
   }
 
   private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload,
       EventPublisher publisher) {
+    metrics.incrementTrackedEvents();
     UUID identifier = payload.getUUID();
+    startTrackingTimes.put(identifier, System.currentTimeMillis());
+
     trackedEventsByUUID.put(identifier, payload);
     trackedEvents.add(payload);
     try {
@@ -112,16 +143,21 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
 
   private synchronized void handleCompletion(UUID uuid,
       EventPublisher publisher) throws LeaseNotFoundException {
+    metrics.incrementCompletedEvents();
     leaseManager.release(uuid);
     TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(uuid);
     trackedEvents.remove(payload);
+    long originalTime = startTrackingTimes.remove(uuid);
+    metrics.updateFinishingTime(System.currentTimeMillis() - originalTime);
     onFinished(publisher, payload);
   }
 
   private synchronized void handleTimeout(EventPublisher publisher,
       UUID identifier) {
+    metrics.incrementTimedOutEvents();
     TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(identifier);
     trackedEvents.remove(payload);
+    startTrackingTimes.remove(payload.getUUID());
     onTimeout(publisher, payload);
   }
 
@@ -154,4 +190,9 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
     return trackedEventsByUUID.values().stream().filter(predicate)
         .collect(Collectors.toList());
   }
+
+  @VisibleForTesting
+  protected EventWatcherMetrics getMetrics() {
+    return metrics;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12d93bf/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java
new file mode 100644
index 0000000..1db81a9
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Metrics for any event watcher.
+ */
+public class EventWatcherMetrics {
+
+  @Metric()
+  private MutableCounterLong trackedEvents;
+
+  @Metric()
+  private MutableCounterLong timedOutEvents;
+
+  @Metric()
+  private MutableCounterLong completedEvents;
+
+  @Metric()
+  private MutableRate completionTime;
+
+  public void incrementTrackedEvents() {
+    trackedEvents.incr();
+  }
+
+  public void incrementTimedOutEvents() {
+    timedOutEvents.incr();
+  }
+
+  public void incrementCompletedEvents() {
+    completedEvents.incr();
+  }
+
+  @VisibleForTesting
+  public void updateFinishingTime(long duration) {
+    completionTime.add(duration);
+  }
+
+  @VisibleForTesting
+  public MutableCounterLong getTrackedEvents() {
+    return trackedEvents;
+  }
+
+  @VisibleForTesting
+  public MutableCounterLong getTimedOutEvents() {
+    return timedOutEvents;
+  }
+
+  @VisibleForTesting
+  public MutableCounterLong getCompletedEvents() {
+    return completedEvents;
+  }
+
+  @VisibleForTesting
+  public MutableRate getCompletionTime() {
+    return completionTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12d93bf/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
index 1731350..38e1554 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
@@ -21,8 +21,13 @@ import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
 
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.test.MetricsAsserts;
 
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -46,6 +51,7 @@ public class TestEventWatcher {
 
   @Before
   public void startLeaseManager() {
+    DefaultMetricsSystem.instance();
     leaseManager = new LeaseManager<>(2000l);
     leaseManager.start();
   }
@@ -53,12 +59,12 @@ public class TestEventWatcher {
   @After
   public void stopLeaseManager() {
     leaseManager.shutdown();
+    DefaultMetricsSystem.shutdown();
   }
 
 
   @Test
   public void testEventHandling() throws InterruptedException {
-
     EventQueue queue = new EventQueue();
 
     EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>
@@ -139,26 +145,101 @@ public class TestEventWatcher {
     Assert.assertEquals(0, c1todo.size());
     Assert.assertFalse(replicationWatcher.contains(event1));
 
+  }
+
+  @Test
+  public void testMetrics() throws InterruptedException {
+
+    DefaultMetricsSystem.initialize("test");
+
+    EventQueue queue = new EventQueue();
+
+    EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>
+        replicationWatcher = createEventWatcher();
+
+    EventHandlerStub<UnderreplicatedEvent> underReplicatedEvents =
+        new EventHandlerStub<>();
+
+    queue.addHandler(UNDER_REPLICATED, underReplicatedEvents);
+
+    replicationWatcher.start(queue);
+
+    //send 3 event to track 3 in-progress activity
+    UnderreplicatedEvent event1 =
+        new UnderreplicatedEvent(UUID.randomUUID(), "C1");
+
+    UnderreplicatedEvent event2 =
+        new UnderreplicatedEvent(UUID.randomUUID(), "C2");
+
+    UnderreplicatedEvent event3 =
+        new UnderreplicatedEvent(UUID.randomUUID(), "C1");
+
+    queue.fireEvent(WATCH_UNDER_REPLICATED, event1);
+
+    queue.fireEvent(WATCH_UNDER_REPLICATED, event2);
+
+    queue.fireEvent(WATCH_UNDER_REPLICATED, event3);
+
+    //1st event is completed, don't need to track any more
+    ReplicationCompletedEvent event1Completed =
+        new ReplicationCompletedEvent(event1.UUID, "C1", "D1");
+
+    queue.fireEvent(REPLICATION_COMPLETED, event1Completed);
+
+
+    Thread.sleep(2200l);
+
+    //until now: 3 in-progress activities are tracked with three
+    // UnderreplicatedEvents. The first one is completed, the remaining two
+    // are timed out (as the timeout -- defined in the leasmanager -- is 2000ms.
 
+    EventWatcherMetrics metrics = replicationWatcher.getMetrics();
+
+    //3 events are received
+    Assert.assertEquals(3, metrics.getTrackedEvents().value());
+
+    //one is finished. doesn't need to be resent
+    Assert.assertEquals(1, metrics.getCompletedEvents().value());
+
+    //Other two are timed out and resent
+    Assert.assertEquals(2, metrics.getTimedOutEvents().value());
+
+    DefaultMetricsSystem.shutdown();
   }
 
   private EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>
   createEventWatcher() {
-    return new EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>(
-        WATCH_UNDER_REPLICATED, REPLICATION_COMPLETED, leaseManager) {
+    return new CommandWatcherExample(WATCH_UNDER_REPLICATED,
+        REPLICATION_COMPLETED, leaseManager);
+  }
 
-      @Override
-      void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
-        publisher.fireEvent(UNDER_REPLICATED, payload);
-      }
+  private class CommandWatcherExample
+      extends EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent> {
 
-      @Override
-      void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
-        //Good job. We did it.
-      }
-    };
+    public CommandWatcherExample(Event<UnderreplicatedEvent> startEvent,
+        Event<ReplicationCompletedEvent> completionEvent,
+        LeaseManager<UUID> leaseManager) {
+      super("TestCommandWatcher", startEvent, completionEvent, leaseManager);
+    }
+
+    @Override
+    void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
+      publisher.fireEvent(UNDER_REPLICATED, payload);
+    }
+
+    @Override
+    void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
+      //Good job. We did it.
+    }
+
+    @Override
+    public EventWatcherMetrics getMetrics() {
+      return super.getMetrics();
+    }
   }
 
+  ;
+
   private static class ReplicationCompletedEvent
       implements IdentifiableEventPayload {
 
@@ -217,4 +298,4 @@ public class TestEventWatcher {
     }
   }
 
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org