You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2019/06/28 22:12:53 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-807] TimingEvent is now closeable, extends GobblinEventBuilder

This is an automated email from the ASF dual-hosted git repository.

ibuenros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c81c25  [GOBBLIN-807] TimingEvent is now closeable, extends GobblinEventBuilder
2c81c25 is described below

commit 2c81c25b78bacde44fb2bfd543f6795fe2fd7d6a
Author: vbohra <vb...@linkedin.com>
AuthorDate: Fri Jun 28 15:12:46 2019 -0700

    [GOBBLIN-807] TimingEvent is now closeable, extends GobblinEventBuilder
    
    Closes #2678 from vikrambohra/master
---
 .../gobblin/metrics/event/CountEventBuilder.java   | 95 +++++++++++++++++++++
 .../gobblin/metrics/event/EventSubmitter.java      | 46 +++++++++-
 .../gobblin/metrics/event/FailureEventBuilder.java | 47 +++++++++--
 .../gobblin/metrics/event/GobblinEventBuilder.java | 12 ++-
 .../apache/gobblin/metrics/event/TimingEvent.java  | 97 ++++++++++++++++++----
 .../metrics/event/lineage/LineageEventBuilder.java |  4 +
 .../metrics/event/CountEventBuilderTest.java       | 82 ++++++++++++++++++
 .../gobblin/metrics/event/TimingEventTest.java     | 86 +++++++++++++++++++
 .../apache/gobblin/runtime/SafeDatasetCommit.java  |  3 +-
 9 files changed, 446 insertions(+), 26 deletions(-)

diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java
new file mode 100644
index 0000000..bcaa732
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.metrics.event;
+
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+
+/**
+ * The builder builds builds a specific {@link GobblinTrackingEvent} whose metadata has
+ * {@value GobblinEventBuilder#EVENT_TYPE} to be {@value #COUNT_EVENT_TYPE}
+ *
+ * <p>
+ * Note: A {@link CountEventBuilder} instance is not reusable
+ */
+public class CountEventBuilder extends GobblinEventBuilder {
+
+  public static final String COUNT_EVENT_TYPE = "CountEvent";
+  public static final String COUNT_KEY = "count";
+  @Setter
+  @Getter
+  private int count;
+
+  public CountEventBuilder(String name, int count) {
+    this(name, NAMESPACE, count);
+  }
+
+  public CountEventBuilder(String name, String namespace, int count) {
+    super(name, namespace);
+    this.metadata.put(EVENT_TYPE, COUNT_EVENT_TYPE);
+    this.count = count;
+  }
+
+  /**
+   *
+   * @return {@link GobblinTrackingEvent}
+   */
+  @Override
+  public GobblinTrackingEvent build() {
+    this.metadata.put(COUNT_KEY, Integer.toString(count));
+    return super.build();
+  }
+
+  /**
+   * Check if the given {@link GobblinTrackingEvent} is a {@link CountEventBuilder}
+   */
+  public static boolean isCountEvent(GobblinTrackingEvent event) {
+    String eventType = (event.getMetadata() == null) ? "" : event.getMetadata().get(EVENT_TYPE);
+    return StringUtils.isNotEmpty(eventType) && eventType.equals(COUNT_EVENT_TYPE);
+  }
+  /**
+   * Create a {@link CountEventBuilder} from a {@link GobblinTrackingEvent}. An inverse function
+   * to {@link CountEventBuilder#build()}
+   */
+  public static CountEventBuilder fromEvent(GobblinTrackingEvent event) {
+    if(!isCountEvent(event)) {
+      return null;
+    }
+
+    Map<String, String> metadata = event.getMetadata();
+    int count = Integer.parseInt(metadata.getOrDefault(COUNT_KEY, "0"));
+    CountEventBuilder countEventBuilder = new CountEventBuilder(event.getName(), count);
+    metadata.forEach((key, value) -> {
+      switch (key) {
+        case COUNT_KEY:
+          countEventBuilder.setCount(Integer.parseInt(value));
+          break;
+        default:
+          countEventBuilder.addMetadata(key, value);
+          break;
+      }
+    });
+
+    return countEventBuilder;
+  }
+}
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
index 16ee2de..d3c0d1f 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
@@ -36,9 +36,7 @@ import lombok.Getter;
  *   Instances of this class are immutable. Calling set* methods returns a copy of the calling instance.
  * </p>
  *
- * @deprecated Use {@link GobblinEventBuilder}
  */
-@Deprecated
 public class EventSubmitter {
 
   public static final String EVENT_TYPE = "eventType";
@@ -79,23 +77,55 @@ public class EventSubmitter {
     }
   }
 
+  /**
+   * GobblinEventBuilder namespace trumps over the namespace of the EventSubmitter unless it's null
+   *
+   * @param eventBuilder
+   */
+  public void submit(GobblinEventBuilder eventBuilder) {
+    eventBuilder.addAdditionalMetadata(this.metadata);
+    if(eventBuilder.namespace == null) {
+      eventBuilder.setNamespace(this.namespace);
+    }
+    this.metricContext.get().submitEvent(eventBuilder.build());
+  }
+
+  /**
+   * This is a convenient way to submit an Event without using an EventSubmitter
+   * namespace should never be null and is defaulted to {@link GobblinEventBuilder#NAMESPACE}
+   * @param context
+   * @param builder
+   */
+  public static void submit(MetricContext context, GobblinEventBuilder builder) {
+    if(builder.namespace == null) {
+      builder.setNamespace(GobblinEventBuilder.NAMESPACE);
+    }
+    context.submitEvent(builder.build());
+  }
+
   private EventSubmitter(Builder builder) {
     this.metadata = builder.metadata;
     this.namespace = builder.namespace;
     this.metricContext = builder.metricContext;
   }
 
+
   /**
    * Submits the {@link org.apache.gobblin.metrics.GobblinTrackingEvent} to the {@link org.apache.gobblin.metrics.MetricContext}.
    * @param name Name of the event.
+   * @deprecated Use {{@link #submit(GobblinEventBuilder)}}
    */
+  @Deprecated
   public void submit(String name) {
     submit(name, ImmutableMap.<String, String>of());
   }
 
+
   /**
-   * Calls submit on submitter if present.
+   * Calls submit on submitter if present.timing
+   * @deprecated Use {{@link #submit(GobblinEventBuilder)}}
    */
+  @Deprecated
   public static void submit(Optional<EventSubmitter> submitter, String name) {
     if (submitter.isPresent()) {
       submitter.get().submit(name);
@@ -106,7 +136,9 @@ public class EventSubmitter {
    * Submits the {@link org.apache.gobblin.metrics.GobblinTrackingEvent} to the {@link org.apache.gobblin.metrics.MetricContext}.
    * @param name Name of the event.
    * @param metadataEls List of keys and values for metadata of the form key1, value2, key2, value2, ...
+   * @deprecated Use {{@link #submit(GobblinEventBuilder)}}
    */
+  @Deprecated
   public void submit(String name, String... metadataEls) {
     if(metadataEls.length % 2 != 0) {
       throw new IllegalArgumentException("Unmatched keys in metadata elements.");
@@ -121,7 +153,9 @@ public class EventSubmitter {
 
   /**
    * Calls submit on submitter if present.
+   * @deprecated Use {{@link #submit(GobblinEventBuilder)}}
    */
+  @Deprecated
   public static void submit(Optional<EventSubmitter> submitter, String name, String... metadataEls) {
     if (submitter.isPresent()) {
       submitter.get().submit(name, metadataEls);
@@ -132,7 +166,9 @@ public class EventSubmitter {
    * Submits the {@link org.apache.gobblin.metrics.GobblinTrackingEvent} to the {@link org.apache.gobblin.metrics.MetricContext}.
    * @param name Name of the event.
    * @param additionalMetadata Additional metadata to be added to the event.
+   * @deprecated Use {{@link #submit(GobblinEventBuilder)}}
    */
+  @Deprecated
   public void submit(String name, Map<String, String> additionalMetadata) {
     if(this.metricContext.isPresent()) {
       Map<String, String> finalMetadata = Maps.newHashMap(this.metadata);
@@ -147,7 +183,9 @@ public class EventSubmitter {
 
   /**
    * Calls submit on submitter if present.
+   * @deprecated Use {{@link #submit(GobblinEventBuilder)}}
    */
+  @Deprecated
   public static void submit(Optional<EventSubmitter> submitter, String name, Map<String, String> additionalMetadata) {
     if (submitter.isPresent()) {
       submitter.get().submit(name, additionalMetadata);
@@ -158,7 +196,9 @@ public class EventSubmitter {
    * Get a {@link org.apache.gobblin.metrics.event.TimingEvent} attached to this {@link org.apache.gobblin.metrics.event.EventSubmitter}.
    * @param name Name of the {@link org.apache.gobblin.metrics.GobblinTrackingEvent} that will be generated.
    * @return a {@link org.apache.gobblin.metrics.event.TimingEvent}.
+   * @deprecated Use {{@link TimingEvent)}}
    */
+  @Deprecated
   public TimingEvent getTimingEvent(String name) {
     return new TimingEvent(this, name);
   }
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
index d1ce681..dd6cf38 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
@@ -18,9 +18,16 @@
 package org.apache.gobblin.metrics.event;
 
 
+import java.util.Map;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
+
+import lombok.Getter;
+
+import org.apache.gobblin.dataset.Descriptor;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
 
 
 /**
@@ -34,7 +41,8 @@ public class FailureEventBuilder extends GobblinEventBuilder {
   private static final String FAILURE_EVENT_TYPE = "FailureEvent";
   private static final String ROOT_CAUSE = "rootException";
 
-  private Throwable rootCause;
+  @Getter
+  private String rootCause;
 
   public FailureEventBuilder(String name) {
     this(name, NAMESPACE);
@@ -49,15 +57,18 @@ public class FailureEventBuilder extends GobblinEventBuilder {
    * Given an throwable, get its root cause and set as a metadata
    */
   public void setRootCause(Throwable t) {
-    rootCause = getRootCause(t);
+    Throwable rootCause = getRootCause(t);
+    if(rootCause != null) {
+      this.rootCause = ExceptionUtils.getStackTrace(rootCause);
+    }
   }
 
   /**
    * Build as {@link GobblinTrackingEvent}
    */
   public GobblinTrackingEvent build() {
-    if (rootCause != null) {
-      metadata.put(ROOT_CAUSE, ExceptionUtils.getStackTrace(rootCause));
+    if (this.rootCause != null) {
+      metadata.put(ROOT_CAUSE, this.rootCause);
     }
     return new GobblinTrackingEvent(0L, namespace, name, metadata);
   }
@@ -66,7 +77,7 @@ public class FailureEventBuilder extends GobblinEventBuilder {
    * Check if the given {@link GobblinTrackingEvent} is a failure event
    */
   public static boolean isFailureEvent(GobblinTrackingEvent event) {
-    String eventType = event.getMetadata().get(EVENT_TYPE);
+    String eventType = (event.getMetadata() == null) ? "" : event.getMetadata().get(EVENT_TYPE);
     return StringUtils.isNotEmpty(eventType) && eventType.equals(FAILURE_EVENT_TYPE);
   }
 
@@ -77,4 +88,30 @@ public class FailureEventBuilder extends GobblinEventBuilder {
     }
     return rootCause;
   }
+
+  /**
+   * Create a {@link FailureEventBuilder} from a {@link GobblinTrackingEvent}. An inverse function
+   * to {@link FailureEventBuilder#build()}
+   */
+  public static FailureEventBuilder fromEvent(GobblinTrackingEvent event) {
+    if(!isFailureEvent(event)) {
+      return null;
+    }
+
+    Map<String, String> metadata = event.getMetadata();
+    FailureEventBuilder failureEvent = new FailureEventBuilder(event.getName());
+
+    metadata.forEach((key, value) -> {
+      switch (key) {
+        case ROOT_CAUSE:
+          failureEvent.rootCause = value;
+          break;
+        default:
+          failureEvent.addMetadata(key, value);
+          break;
+      }
+    });
+
+    return failureEvent;
+  }
 }
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
index 6b82342..0f03e93 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
 import lombok.Getter;
+import lombok.Setter;
 
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
@@ -40,11 +41,12 @@ public class GobblinEventBuilder {
   @Getter
   protected final String name;
   @Getter
-  protected final String namespace;
+  @Setter
+  protected String namespace;
   protected final Map<String, String> metadata;
 
   public GobblinEventBuilder(String name) {
-    this(name, NAMESPACE);
+    this(name, null);
   }
 
   public GobblinEventBuilder(String name, String namespace) {
@@ -79,8 +81,14 @@ public class GobblinEventBuilder {
   }
   /**
    * Submit the event
+   * @deprecated Use {@link EventSubmitter#submit(MetricContext, GobblinEventBuilder)}
    */
+  @Deprecated
   public void submit(MetricContext context) {
+    if(namespace == null) {
+      namespace = NAMESPACE;
+    }
     context.submitEvent(build());
   }
+
 }
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 8a1e6ef..d699238 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -17,15 +17,23 @@
 
 package org.apache.gobblin.metrics.event;
 
+import java.io.Closeable;
 import java.util.Map;
 
+import org.apache.commons.lang.StringUtils;
+
 import com.google.common.collect.Maps;
 
+import lombok.Getter;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+
+
 /**
  * Event to time actions in the program. Automatically reports start time, end time, and duration from the time
  * the {@link org.apache.gobblin.metrics.event.TimingEvent} was created to the time {@link #stop} is called.
  */
-public class TimingEvent {
+public class TimingEvent extends GobblinEventBuilder implements Closeable {
 
   public static class LauncherTimings {
     public static final String FULL_JOB_EXECUTION = "FullJobExecutionTimer";
@@ -51,7 +59,7 @@ public class TimingEvent {
     public static final String MR_DISTRIBUTED_CACHE_SETUP = "JobMrDistributedCacheSetupTimer";
     public static final String MR_JOB_SETUP = "JobMrSetupTimer";
     public static final String MR_JOB_RUN = "JobMrRunTimer";
-    public static final String HELIX_JOB_SUBMISSION= "JobHelixSubmissionTimer";
+    public static final String HELIX_JOB_SUBMISSION = "JobHelixSubmissionTimer";
     public static final String HELIX_JOB_RUN = "JobHelixRunTimer";
   }
 
@@ -84,14 +92,18 @@ public class TimingEvent {
   public static final String JOB_START_TIME = "jobStartTime";
   public static final String JOB_END_TIME = "jobEndTime";
 
-  private final String name;
-  private final Long startTime;
+  @Getter
+  private Long startTime;
+  @Getter
+  private Long endTime;
+  @Getter
+  private Long duration;
   private final EventSubmitter submitter;
   private boolean stopped;
 
   public TimingEvent(EventSubmitter submitter, String name) {
+    super(name);
     this.stopped = false;
-    this.name = name;
     this.submitter = submitter;
     this.startTime = System.currentTimeMillis();
   }
@@ -100,7 +112,7 @@ public class TimingEvent {
    * Stop the timer and submit the event. If the timer was already stopped before, this is a no-op.
    */
   public void stop() {
-    stop(Maps.<String, String> newHashMap());
+    stop(Maps.<String, String>newHashMap());
   }
 
   /**
@@ -108,22 +120,77 @@ public class TimingEvent {
    * before, this is a no-op.
    *
    * @param additionalMetadata a {@link Map} of additional metadata that should be submitted along with this event
+   * @deprecated Use {@link #close()}
    */
+  @Deprecated
   public void stop(Map<String, String> additionalMetadata) {
     if (this.stopped) {
       return;
     }
+
+    this.metadata.putAll(additionalMetadata);
+    doStop();
+    this.submitter.submit(name, this.metadata);
+  }
+
+  public void doStop() {
+    if (this.stopped) {
+      return;
+    }
+
     this.stopped = true;
-    long endTime = System.currentTimeMillis();
-    long duration = endTime - this.startTime;
+    this.endTime = System.currentTimeMillis();
+    this.duration = this.endTime - this.startTime;
+
+    this.metadata.put(EventSubmitter.EVENT_TYPE, METADATA_TIMING_EVENT);
+    this.metadata.put(METADATA_START_TIME, Long.toString(this.startTime));
+    this.metadata.put(METADATA_END_TIME, Long.toString(this.endTime));
+    this.metadata.put(METADATA_DURATION, Long.toString(this.duration));
+  }
+
+  @Override
+  public void close() {
+    doStop();
+    submitter.submit(this);
+  }
+
+  /**
+   * Check if the given {@link GobblinTrackingEvent} is a {@link TimingEvent}
+   */
+  public static boolean isTimingEvent(GobblinTrackingEvent event) {
+    String eventType = (event.getMetadata() == null) ? "" : event.getMetadata().get(EVENT_TYPE);
+    return StringUtils.isNotEmpty(eventType) && eventType.equals(METADATA_TIMING_EVENT);
+  }
+
+  /**
+   * Create a {@link TimingEvent} from a {@link GobblinTrackingEvent}. An inverse function
+   * to {@link TimingEvent#build()}
+   */
+  public static TimingEvent fromEvent(GobblinTrackingEvent event) {
+    if(!isTimingEvent(event)) {
+      return null;
+    }
+
+    Map<String, String> metadata = event.getMetadata();
+    TimingEvent timingEvent = new TimingEvent(null, event.getName());
 
-    Map<String, String> finalMetadata = Maps.newHashMap();
-    finalMetadata.putAll(additionalMetadata);
-    finalMetadata.put(EventSubmitter.EVENT_TYPE, METADATA_TIMING_EVENT);
-    finalMetadata.put(METADATA_START_TIME, Long.toString(this.startTime));
-    finalMetadata.put(METADATA_END_TIME, Long.toString(endTime));
-    finalMetadata.put(METADATA_DURATION, Long.toString(duration));
+    metadata.forEach((key, value) -> {
+      switch (key) {
+        case METADATA_START_TIME:
+          timingEvent.startTime = Long.parseLong(value);
+          break;
+        case METADATA_END_TIME:
+          timingEvent.endTime = Long.parseLong(value);
+          break;
+        case METADATA_DURATION:
+          timingEvent.duration = Long.parseLong(value);
+          break;
+        default:
+          timingEvent.addMetadata(key, value);
+          break;
+      }
+    });
 
-    this.submitter.submit(this.name, finalMetadata);
+    return timingEvent;
   }
 }
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
index 63d7237..210c2b9 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
@@ -118,6 +118,10 @@ public final class LineageEventBuilder extends GobblinEventBuilder {
    * to {@link LineageEventBuilder#build()}
    */
   public static LineageEventBuilder fromEvent(GobblinTrackingEvent event) {
+    if(!isLineageEvent(event)) {
+      return null;
+    }
+
     Map<String, String> metadata = event.getMetadata();
     LineageEventBuilder lineageEvent = new LineageEventBuilder(event.getName());
 
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/CountEventBuilderTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/CountEventBuilderTest.java
new file mode 100644
index 0000000..2754502
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/CountEventBuilderTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event;
+
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import javax.annotation.Nullable;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.notification.EventNotification;
+import org.apache.gobblin.metrics.notification.Notification;
+
+
+public class CountEventBuilderTest {
+
+  @Test
+  public void test() {
+    String name = "TestName";
+    int count = 10;
+    MetricContext context = new MetricContext.Builder("name").build();
+    CountEventBuilder countEventBuilder = new CountEventBuilder(name, count);
+    context.addNotificationTarget(new com.google.common.base.Function<Notification, Void>() {
+      @Nullable
+      @Override
+      public Void apply(@Nullable Notification input) {
+        if (input instanceof EventNotification) {
+          GobblinTrackingEvent event = ((EventNotification) input).getEvent();
+          Map<String, String> metadata = event.getMetadata();
+          Assert.assertEquals(metadata.containsKey(GobblinEventBuilder.EVENT_TYPE), true);
+          Assert.assertEquals(metadata.containsKey(CountEventBuilder.COUNT_KEY), true);
+          Assert.assertEquals(metadata.get(GobblinEventBuilder.EVENT_TYPE), CountEventBuilder.COUNT_EVENT_TYPE);
+          Assert.assertEquals(event.getName(), name);
+          Assert.assertEquals(event.getNamespace(), GobblinEventBuilder.NAMESPACE);
+          Assert.assertEquals(Integer.parseInt(metadata.get(CountEventBuilder.COUNT_KEY)), count);
+        }
+        return null;
+      }
+    });
+    EventSubmitter.submit(context, countEventBuilder);
+  }
+
+  @Test
+  public void fromEventTest() {
+    String name = "TestName";
+    int count = 10;
+    CountEventBuilder countEventBuilder = new CountEventBuilder(name, count);
+    GobblinTrackingEvent event = countEventBuilder.build();
+
+    //Count Event
+    CountEventBuilder builderFromEvent = CountEventBuilder.fromEvent(event);
+    Assert.assertEquals(CountEventBuilder.isCountEvent(event), true);
+    Assert.assertNotNull(builderFromEvent);
+    Assert.assertEquals(builderFromEvent.getName(), name);
+    Assert.assertEquals(builderFromEvent.getCount(), count);
+
+    // General Event
+    event = new GobblinTrackingEvent();
+    countEventBuilder = CountEventBuilder.fromEvent(event);
+    Assert.assertEquals(CountEventBuilder.isCountEvent(event), false);
+    Assert.assertEquals(countEventBuilder, null);
+  }
+
+}
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/TimingEventTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/TimingEventTest.java
new file mode 100644
index 0000000..284b93a
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/TimingEventTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event;
+
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import javax.annotation.Nullable;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.notification.EventNotification;
+import org.apache.gobblin.metrics.notification.Notification;
+
+
+public class TimingEventTest {
+
+  @Test
+  public void test() {
+    String name = "TestName";
+    String namepace = "TestNamespace";
+    MetricContext context = new MetricContext.Builder("name").build();
+    context.addNotificationTarget(new com.google.common.base.Function<Notification, Void>() {
+      @Nullable
+      @Override
+      public Void apply(@Nullable Notification input) {
+        if (input instanceof EventNotification) {
+          GobblinTrackingEvent event = ((EventNotification) input).getEvent();
+          Map<String, String> metadata = event.getMetadata();
+          Assert.assertEquals(event.getNamespace(), namepace);
+          Assert.assertEquals(metadata.containsKey(GobblinEventBuilder.EVENT_TYPE), true);
+          Assert.assertEquals(metadata.containsKey(TimingEvent.METADATA_START_TIME), true);
+          Assert.assertEquals(metadata.containsKey(TimingEvent.METADATA_END_TIME), true);
+          Assert.assertEquals(metadata.containsKey(TimingEvent.METADATA_DURATION), true);
+          Assert.assertEquals(metadata.get(GobblinEventBuilder.EVENT_TYPE), TimingEvent.METADATA_TIMING_EVENT);
+          Assert.assertEquals(event.getName(), name);
+        }
+        return null;
+      }
+    });
+    TimingEvent timingEvent = new TimingEvent(new EventSubmitter.Builder(context, namepace).build(), name);
+    timingEvent.close();
+  }
+
+  @Test
+  public void fromEventTest() {
+    String name = "TestName";
+    String namepace = "TestNamespace";
+    MetricContext context = new MetricContext.Builder("name").build();
+    TimingEvent timingEventBuilder = new TimingEvent(new EventSubmitter.Builder(context, namepace).build(), name);
+    GobblinTrackingEvent event = timingEventBuilder.build();
+    timingEventBuilder.close();
+
+    //Timing Event
+    TimingEvent builderFromEvent = TimingEvent.fromEvent(event);
+    Assert.assertEquals(TimingEvent.isTimingEvent(event), true);
+    Assert.assertNotNull(builderFromEvent);
+    Assert.assertEquals(builderFromEvent.getName(), name);
+    Assert.assertTrue(builderFromEvent.getStartTime() <= System.currentTimeMillis());
+    Assert.assertTrue(builderFromEvent.getEndTime() >= builderFromEvent.getStartTime());
+    Assert.assertTrue(builderFromEvent.getDuration() >= 0);
+
+    // General Event
+    event = new GobblinTrackingEvent();
+    timingEventBuilder = TimingEvent.fromEvent(event);
+    Assert.assertEquals(TimingEvent.isTimingEvent(event), false);
+    Assert.assertEquals(timingEventBuilder, null);
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index a80d832..b271652 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -38,6 +38,7 @@ import org.apache.gobblin.commit.DeliverySemantics;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.FailureEventBuilder;
@@ -244,7 +245,7 @@ final class SafeDatasetCommit implements Callable<Void> {
   private void submitLineageEvent(String dataset, Collection<TaskState> states) {
     Collection<LineageEventBuilder> events = LineageInfo.load(states);
     // Send events
-    events.forEach(event -> event.submit(metricContext));
+    events.forEach(event -> EventSubmitter.submit(this.metricContext, event));
     log.info(String.format("Submitted %d lineage events for dataset %s", events.size(), dataset));
   }