You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2019/06/28 22:12:53 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-807]
TimingEvent is now closeable, extends GobblinEventBuilder
This is an automated email from the ASF dual-hosted git repository.
ibuenros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2c81c25 [GOBBLIN-807] TimingEvent is now closeable, extends GobblinEventBuilder
2c81c25 is described below
commit 2c81c25b78bacde44fb2bfd543f6795fe2fd7d6a
Author: vbohra <vb...@linkedin.com>
AuthorDate: Fri Jun 28 15:12:46 2019 -0700
[GOBBLIN-807] TimingEvent is now closeable, extends GobblinEventBuilder
Closes #2678 from vikrambohra/master
---
.../gobblin/metrics/event/CountEventBuilder.java | 95 +++++++++++++++++++++
.../gobblin/metrics/event/EventSubmitter.java | 46 +++++++++-
.../gobblin/metrics/event/FailureEventBuilder.java | 47 +++++++++--
.../gobblin/metrics/event/GobblinEventBuilder.java | 12 ++-
.../apache/gobblin/metrics/event/TimingEvent.java | 97 ++++++++++++++++++----
.../metrics/event/lineage/LineageEventBuilder.java | 4 +
.../metrics/event/CountEventBuilderTest.java | 82 ++++++++++++++++++
.../gobblin/metrics/event/TimingEventTest.java | 86 +++++++++++++++++++
.../apache/gobblin/runtime/SafeDatasetCommit.java | 3 +-
9 files changed, 446 insertions(+), 26 deletions(-)
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java
new file mode 100644
index 0000000..bcaa732
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/CountEventBuilder.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.metrics.event;
+
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+
+/**
+ * The builder builds builds a specific {@link GobblinTrackingEvent} whose metadata has
+ * {@value GobblinEventBuilder#EVENT_TYPE} to be {@value #COUNT_EVENT_TYPE}
+ *
+ * <p>
+ * Note: A {@link CountEventBuilder} instance is not reusable
+ */
+public class CountEventBuilder extends GobblinEventBuilder {
+
+ public static final String COUNT_EVENT_TYPE = "CountEvent";
+ public static final String COUNT_KEY = "count";
+ @Setter
+ @Getter
+ private int count;
+
+ public CountEventBuilder(String name, int count) {
+ this(name, NAMESPACE, count);
+ }
+
+ public CountEventBuilder(String name, String namespace, int count) {
+ super(name, namespace);
+ this.metadata.put(EVENT_TYPE, COUNT_EVENT_TYPE);
+ this.count = count;
+ }
+
+ /**
+ *
+ * @return {@link GobblinTrackingEvent}
+ */
+ @Override
+ public GobblinTrackingEvent build() {
+ this.metadata.put(COUNT_KEY, Integer.toString(count));
+ return super.build();
+ }
+
+ /**
+ * Check if the given {@link GobblinTrackingEvent} is a {@link CountEventBuilder}
+ */
+ public static boolean isCountEvent(GobblinTrackingEvent event) {
+ String eventType = (event.getMetadata() == null) ? "" : event.getMetadata().get(EVENT_TYPE);
+ return StringUtils.isNotEmpty(eventType) && eventType.equals(COUNT_EVENT_TYPE);
+ }
+ /**
+ * Create a {@link CountEventBuilder} from a {@link GobblinTrackingEvent}. An inverse function
+ * to {@link CountEventBuilder#build()}
+ */
+ public static CountEventBuilder fromEvent(GobblinTrackingEvent event) {
+ if(!isCountEvent(event)) {
+ return null;
+ }
+
+ Map<String, String> metadata = event.getMetadata();
+ int count = Integer.parseInt(metadata.getOrDefault(COUNT_KEY, "0"));
+ CountEventBuilder countEventBuilder = new CountEventBuilder(event.getName(), count);
+ metadata.forEach((key, value) -> {
+ switch (key) {
+ case COUNT_KEY:
+ countEventBuilder.setCount(Integer.parseInt(value));
+ break;
+ default:
+ countEventBuilder.addMetadata(key, value);
+ break;
+ }
+ });
+
+ return countEventBuilder;
+ }
+}
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
index 16ee2de..d3c0d1f 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/EventSubmitter.java
@@ -36,9 +36,7 @@ import lombok.Getter;
* Instances of this class are immutable. Calling set* methods returns a copy of the calling instance.
* </p>
*
- * @deprecated Use {@link GobblinEventBuilder}
*/
-@Deprecated
public class EventSubmitter {
public static final String EVENT_TYPE = "eventType";
@@ -79,23 +77,55 @@ public class EventSubmitter {
}
}
+ /**
+ * GobblinEventBuilder namespace trumps over the namespace of the EventSubmitter unless it's null
+ *
+ * @param eventBuilder
+ */
+ public void submit(GobblinEventBuilder eventBuilder) {
+ eventBuilder.addAdditionalMetadata(this.metadata);
+ if(eventBuilder.namespace == null) {
+ eventBuilder.setNamespace(this.namespace);
+ }
+ this.metricContext.get().submitEvent(eventBuilder.build());
+ }
+
+ /**
+ * This is a convenient way to submit an Event without using an EventSubmitter
+ * namespace should never be null and is defaulted to {@link GobblinEventBuilder#NAMESPACE}
+ * @param context
+ * @param builder
+ */
+ public static void submit(MetricContext context, GobblinEventBuilder builder) {
+ if(builder.namespace == null) {
+ builder.setNamespace(GobblinEventBuilder.NAMESPACE);
+ }
+ context.submitEvent(builder.build());
+ }
+
private EventSubmitter(Builder builder) {
this.metadata = builder.metadata;
this.namespace = builder.namespace;
this.metricContext = builder.metricContext;
}
+
/**
* Submits the {@link org.apache.gobblin.metrics.GobblinTrackingEvent} to the {@link org.apache.gobblin.metrics.MetricContext}.
* @param name Name of the event.
+ * @deprecated Use {{@link #submit(GobblinEventBuilder)}}
*/
+ @Deprecated
public void submit(String name) {
submit(name, ImmutableMap.<String, String>of());
}
+
/**
- * Calls submit on submitter if present.
+ * Calls submit on submitter if present.timing
+ * @deprecated Use {{@link #submit(GobblinEventBuilder)}}
*/
+ @Deprecated
public static void submit(Optional<EventSubmitter> submitter, String name) {
if (submitter.isPresent()) {
submitter.get().submit(name);
@@ -106,7 +136,9 @@ public class EventSubmitter {
* Submits the {@link org.apache.gobblin.metrics.GobblinTrackingEvent} to the {@link org.apache.gobblin.metrics.MetricContext}.
* @param name Name of the event.
* @param metadataEls List of keys and values for metadata of the form key1, value2, key2, value2, ...
+ * @deprecated Use {{@link #submit(GobblinEventBuilder)}}
*/
+ @Deprecated
public void submit(String name, String... metadataEls) {
if(metadataEls.length % 2 != 0) {
throw new IllegalArgumentException("Unmatched keys in metadata elements.");
@@ -121,7 +153,9 @@ public class EventSubmitter {
/**
* Calls submit on submitter if present.
+ * @deprecated Use {{@link #submit(GobblinEventBuilder)}}
*/
+ @Deprecated
public static void submit(Optional<EventSubmitter> submitter, String name, String... metadataEls) {
if (submitter.isPresent()) {
submitter.get().submit(name, metadataEls);
@@ -132,7 +166,9 @@ public class EventSubmitter {
* Submits the {@link org.apache.gobblin.metrics.GobblinTrackingEvent} to the {@link org.apache.gobblin.metrics.MetricContext}.
* @param name Name of the event.
* @param additionalMetadata Additional metadata to be added to the event.
+ * @deprecated Use {{@link #submit(GobblinEventBuilder)}}
*/
+ @Deprecated
public void submit(String name, Map<String, String> additionalMetadata) {
if(this.metricContext.isPresent()) {
Map<String, String> finalMetadata = Maps.newHashMap(this.metadata);
@@ -147,7 +183,9 @@ public class EventSubmitter {
/**
* Calls submit on submitter if present.
+ * @deprecated Use {{@link #submit(GobblinEventBuilder)}}
*/
+ @Deprecated
public static void submit(Optional<EventSubmitter> submitter, String name, Map<String, String> additionalMetadata) {
if (submitter.isPresent()) {
submitter.get().submit(name, additionalMetadata);
@@ -158,7 +196,9 @@ public class EventSubmitter {
* Get a {@link org.apache.gobblin.metrics.event.TimingEvent} attached to this {@link org.apache.gobblin.metrics.event.EventSubmitter}.
* @param name Name of the {@link org.apache.gobblin.metrics.GobblinTrackingEvent} that will be generated.
* @return a {@link org.apache.gobblin.metrics.event.TimingEvent}.
+ * @deprecated Use {{@link TimingEvent)}}
*/
+ @Deprecated
public TimingEvent getTimingEvent(String name) {
return new TimingEvent(this, name);
}
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
index d1ce681..dd6cf38 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/FailureEventBuilder.java
@@ -18,9 +18,16 @@
package org.apache.gobblin.metrics.event;
+import java.util.Map;
+
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
+
+import lombok.Getter;
+
+import org.apache.gobblin.dataset.Descriptor;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
/**
@@ -34,7 +41,8 @@ public class FailureEventBuilder extends GobblinEventBuilder {
private static final String FAILURE_EVENT_TYPE = "FailureEvent";
private static final String ROOT_CAUSE = "rootException";
- private Throwable rootCause;
+ @Getter
+ private String rootCause;
public FailureEventBuilder(String name) {
this(name, NAMESPACE);
@@ -49,15 +57,18 @@ public class FailureEventBuilder extends GobblinEventBuilder {
* Given an throwable, get its root cause and set as a metadata
*/
public void setRootCause(Throwable t) {
- rootCause = getRootCause(t);
+ Throwable rootCause = getRootCause(t);
+ if(rootCause != null) {
+ this.rootCause = ExceptionUtils.getStackTrace(rootCause);
+ }
}
/**
* Build as {@link GobblinTrackingEvent}
*/
public GobblinTrackingEvent build() {
- if (rootCause != null) {
- metadata.put(ROOT_CAUSE, ExceptionUtils.getStackTrace(rootCause));
+ if (this.rootCause != null) {
+ metadata.put(ROOT_CAUSE, this.rootCause);
}
return new GobblinTrackingEvent(0L, namespace, name, metadata);
}
@@ -66,7 +77,7 @@ public class FailureEventBuilder extends GobblinEventBuilder {
* Check if the given {@link GobblinTrackingEvent} is a failure event
*/
public static boolean isFailureEvent(GobblinTrackingEvent event) {
- String eventType = event.getMetadata().get(EVENT_TYPE);
+ String eventType = (event.getMetadata() == null) ? "" : event.getMetadata().get(EVENT_TYPE);
return StringUtils.isNotEmpty(eventType) && eventType.equals(FAILURE_EVENT_TYPE);
}
@@ -77,4 +88,30 @@ public class FailureEventBuilder extends GobblinEventBuilder {
}
return rootCause;
}
+
+ /**
+ * Create a {@link FailureEventBuilder} from a {@link GobblinTrackingEvent}. An inverse function
+ * to {@link FailureEventBuilder#build()}
+ */
+ public static FailureEventBuilder fromEvent(GobblinTrackingEvent event) {
+ if(!isFailureEvent(event)) {
+ return null;
+ }
+
+ Map<String, String> metadata = event.getMetadata();
+ FailureEventBuilder failureEvent = new FailureEventBuilder(event.getName());
+
+ metadata.forEach((key, value) -> {
+ switch (key) {
+ case ROOT_CAUSE:
+ failureEvent.rootCause = value;
+ break;
+ default:
+ failureEvent.addMetadata(key, value);
+ break;
+ }
+ });
+
+ return failureEvent;
+ }
}
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
index 6b82342..0f03e93 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/GobblinEventBuilder.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import lombok.Getter;
+import lombok.Setter;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
@@ -40,11 +41,12 @@ public class GobblinEventBuilder {
@Getter
protected final String name;
@Getter
- protected final String namespace;
+ @Setter
+ protected String namespace;
protected final Map<String, String> metadata;
public GobblinEventBuilder(String name) {
- this(name, NAMESPACE);
+ this(name, null);
}
public GobblinEventBuilder(String name, String namespace) {
@@ -79,8 +81,14 @@ public class GobblinEventBuilder {
}
/**
* Submit the event
+ * @deprecated Use {@link EventSubmitter#submit(MetricContext, GobblinEventBuilder)}
*/
+ @Deprecated
public void submit(MetricContext context) {
+ if(namespace == null) {
+ namespace = NAMESPACE;
+ }
context.submitEvent(build());
}
+
}
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 8a1e6ef..d699238 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -17,15 +17,23 @@
package org.apache.gobblin.metrics.event;
+import java.io.Closeable;
import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+
import com.google.common.collect.Maps;
+import lombok.Getter;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+
+
/**
* Event to time actions in the program. Automatically reports start time, end time, and duration from the time
* the {@link org.apache.gobblin.metrics.event.TimingEvent} was created to the time {@link #stop} is called.
*/
-public class TimingEvent {
+public class TimingEvent extends GobblinEventBuilder implements Closeable {
public static class LauncherTimings {
public static final String FULL_JOB_EXECUTION = "FullJobExecutionTimer";
@@ -51,7 +59,7 @@ public class TimingEvent {
public static final String MR_DISTRIBUTED_CACHE_SETUP = "JobMrDistributedCacheSetupTimer";
public static final String MR_JOB_SETUP = "JobMrSetupTimer";
public static final String MR_JOB_RUN = "JobMrRunTimer";
- public static final String HELIX_JOB_SUBMISSION= "JobHelixSubmissionTimer";
+ public static final String HELIX_JOB_SUBMISSION = "JobHelixSubmissionTimer";
public static final String HELIX_JOB_RUN = "JobHelixRunTimer";
}
@@ -84,14 +92,18 @@ public class TimingEvent {
public static final String JOB_START_TIME = "jobStartTime";
public static final String JOB_END_TIME = "jobEndTime";
- private final String name;
- private final Long startTime;
+ @Getter
+ private Long startTime;
+ @Getter
+ private Long endTime;
+ @Getter
+ private Long duration;
private final EventSubmitter submitter;
private boolean stopped;
public TimingEvent(EventSubmitter submitter, String name) {
+ super(name);
this.stopped = false;
- this.name = name;
this.submitter = submitter;
this.startTime = System.currentTimeMillis();
}
@@ -100,7 +112,7 @@ public class TimingEvent {
* Stop the timer and submit the event. If the timer was already stopped before, this is a no-op.
*/
public void stop() {
- stop(Maps.<String, String> newHashMap());
+ stop(Maps.<String, String>newHashMap());
}
/**
@@ -108,22 +120,77 @@ public class TimingEvent {
* before, this is a no-op.
*
* @param additionalMetadata a {@link Map} of additional metadata that should be submitted along with this event
+ * @deprecated Use {@link #close()}
*/
+ @Deprecated
public void stop(Map<String, String> additionalMetadata) {
if (this.stopped) {
return;
}
+
+ this.metadata.putAll(additionalMetadata);
+ doStop();
+ this.submitter.submit(name, this.metadata);
+ }
+
+ public void doStop() {
+ if (this.stopped) {
+ return;
+ }
+
this.stopped = true;
- long endTime = System.currentTimeMillis();
- long duration = endTime - this.startTime;
+ this.endTime = System.currentTimeMillis();
+ this.duration = this.endTime - this.startTime;
+
+ this.metadata.put(EventSubmitter.EVENT_TYPE, METADATA_TIMING_EVENT);
+ this.metadata.put(METADATA_START_TIME, Long.toString(this.startTime));
+ this.metadata.put(METADATA_END_TIME, Long.toString(this.endTime));
+ this.metadata.put(METADATA_DURATION, Long.toString(this.duration));
+ }
+
+ @Override
+ public void close() {
+ doStop();
+ submitter.submit(this);
+ }
+
+ /**
+ * Check if the given {@link GobblinTrackingEvent} is a {@link TimingEvent}
+ */
+ public static boolean isTimingEvent(GobblinTrackingEvent event) {
+ String eventType = (event.getMetadata() == null) ? "" : event.getMetadata().get(EVENT_TYPE);
+ return StringUtils.isNotEmpty(eventType) && eventType.equals(METADATA_TIMING_EVENT);
+ }
+
+ /**
+ * Create a {@link TimingEvent} from a {@link GobblinTrackingEvent}. An inverse function
+ * to {@link TimingEvent#build()}
+ */
+ public static TimingEvent fromEvent(GobblinTrackingEvent event) {
+ if(!isTimingEvent(event)) {
+ return null;
+ }
+
+ Map<String, String> metadata = event.getMetadata();
+ TimingEvent timingEvent = new TimingEvent(null, event.getName());
- Map<String, String> finalMetadata = Maps.newHashMap();
- finalMetadata.putAll(additionalMetadata);
- finalMetadata.put(EventSubmitter.EVENT_TYPE, METADATA_TIMING_EVENT);
- finalMetadata.put(METADATA_START_TIME, Long.toString(this.startTime));
- finalMetadata.put(METADATA_END_TIME, Long.toString(endTime));
- finalMetadata.put(METADATA_DURATION, Long.toString(duration));
+ metadata.forEach((key, value) -> {
+ switch (key) {
+ case METADATA_START_TIME:
+ timingEvent.startTime = Long.parseLong(value);
+ break;
+ case METADATA_END_TIME:
+ timingEvent.endTime = Long.parseLong(value);
+ break;
+ case METADATA_DURATION:
+ timingEvent.duration = Long.parseLong(value);
+ break;
+ default:
+ timingEvent.addMetadata(key, value);
+ break;
+ }
+ });
- this.submitter.submit(this.name, finalMetadata);
+ return timingEvent;
}
}
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
index 63d7237..210c2b9 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
@@ -118,6 +118,10 @@ public final class LineageEventBuilder extends GobblinEventBuilder {
* to {@link LineageEventBuilder#build()}
*/
public static LineageEventBuilder fromEvent(GobblinTrackingEvent event) {
+ if(!isLineageEvent(event)) {
+ return null;
+ }
+
Map<String, String> metadata = event.getMetadata();
LineageEventBuilder lineageEvent = new LineageEventBuilder(event.getName());
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/CountEventBuilderTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/CountEventBuilderTest.java
new file mode 100644
index 0000000..2754502
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/CountEventBuilderTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event;
+
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import javax.annotation.Nullable;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.notification.EventNotification;
+import org.apache.gobblin.metrics.notification.Notification;
+
+
+public class CountEventBuilderTest {
+
+ @Test
+ public void test() {
+ String name = "TestName";
+ int count = 10;
+ MetricContext context = new MetricContext.Builder("name").build();
+ CountEventBuilder countEventBuilder = new CountEventBuilder(name, count);
+ context.addNotificationTarget(new com.google.common.base.Function<Notification, Void>() {
+ @Nullable
+ @Override
+ public Void apply(@Nullable Notification input) {
+ if (input instanceof EventNotification) {
+ GobblinTrackingEvent event = ((EventNotification) input).getEvent();
+ Map<String, String> metadata = event.getMetadata();
+ Assert.assertEquals(metadata.containsKey(GobblinEventBuilder.EVENT_TYPE), true);
+ Assert.assertEquals(metadata.containsKey(CountEventBuilder.COUNT_KEY), true);
+ Assert.assertEquals(metadata.get(GobblinEventBuilder.EVENT_TYPE), CountEventBuilder.COUNT_EVENT_TYPE);
+ Assert.assertEquals(event.getName(), name);
+ Assert.assertEquals(event.getNamespace(), GobblinEventBuilder.NAMESPACE);
+ Assert.assertEquals(Integer.parseInt(metadata.get(CountEventBuilder.COUNT_KEY)), count);
+ }
+ return null;
+ }
+ });
+ EventSubmitter.submit(context, countEventBuilder);
+ }
+
+ @Test
+ public void fromEventTest() {
+ String name = "TestName";
+ int count = 10;
+ CountEventBuilder countEventBuilder = new CountEventBuilder(name, count);
+ GobblinTrackingEvent event = countEventBuilder.build();
+
+ //Count Event
+ CountEventBuilder builderFromEvent = CountEventBuilder.fromEvent(event);
+ Assert.assertEquals(CountEventBuilder.isCountEvent(event), true);
+ Assert.assertNotNull(builderFromEvent);
+ Assert.assertEquals(builderFromEvent.getName(), name);
+ Assert.assertEquals(builderFromEvent.getCount(), count);
+
+ // General Event
+ event = new GobblinTrackingEvent();
+ countEventBuilder = CountEventBuilder.fromEvent(event);
+ Assert.assertEquals(CountEventBuilder.isCountEvent(event), false);
+ Assert.assertEquals(countEventBuilder, null);
+ }
+
+}
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/TimingEventTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/TimingEventTest.java
new file mode 100644
index 0000000..284b93a
--- /dev/null
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/TimingEventTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event;
+
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import javax.annotation.Nullable;
+
+import org.apache.gobblin.metrics.GobblinTrackingEvent;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.notification.EventNotification;
+import org.apache.gobblin.metrics.notification.Notification;
+
+
+public class TimingEventTest {
+
+ @Test
+ public void test() {
+ String name = "TestName";
+ String namepace = "TestNamespace";
+ MetricContext context = new MetricContext.Builder("name").build();
+ context.addNotificationTarget(new com.google.common.base.Function<Notification, Void>() {
+ @Nullable
+ @Override
+ public Void apply(@Nullable Notification input) {
+ if (input instanceof EventNotification) {
+ GobblinTrackingEvent event = ((EventNotification) input).getEvent();
+ Map<String, String> metadata = event.getMetadata();
+ Assert.assertEquals(event.getNamespace(), namepace);
+ Assert.assertEquals(metadata.containsKey(GobblinEventBuilder.EVENT_TYPE), true);
+ Assert.assertEquals(metadata.containsKey(TimingEvent.METADATA_START_TIME), true);
+ Assert.assertEquals(metadata.containsKey(TimingEvent.METADATA_END_TIME), true);
+ Assert.assertEquals(metadata.containsKey(TimingEvent.METADATA_DURATION), true);
+ Assert.assertEquals(metadata.get(GobblinEventBuilder.EVENT_TYPE), TimingEvent.METADATA_TIMING_EVENT);
+ Assert.assertEquals(event.getName(), name);
+ }
+ return null;
+ }
+ });
+ TimingEvent timingEvent = new TimingEvent(new EventSubmitter.Builder(context, namepace).build(), name);
+ timingEvent.close();
+ }
+
+ @Test
+ public void fromEventTest() {
+ String name = "TestName";
+ String namepace = "TestNamespace";
+ MetricContext context = new MetricContext.Builder("name").build();
+ TimingEvent timingEventBuilder = new TimingEvent(new EventSubmitter.Builder(context, namepace).build(), name);
+ GobblinTrackingEvent event = timingEventBuilder.build();
+ timingEventBuilder.close();
+
+ //Timing Event
+ TimingEvent builderFromEvent = TimingEvent.fromEvent(event);
+ Assert.assertEquals(TimingEvent.isTimingEvent(event), true);
+ Assert.assertNotNull(builderFromEvent);
+ Assert.assertEquals(builderFromEvent.getName(), name);
+ Assert.assertTrue(builderFromEvent.getStartTime() <= System.currentTimeMillis());
+ Assert.assertTrue(builderFromEvent.getEndTime() >= builderFromEvent.getStartTime());
+ Assert.assertTrue(builderFromEvent.getDuration() >= 0);
+
+ // General Event
+ event = new GobblinTrackingEvent();
+ timingEventBuilder = TimingEvent.fromEvent(event);
+ Assert.assertEquals(TimingEvent.isTimingEvent(event), false);
+ Assert.assertEquals(timingEventBuilder, null);
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index a80d832..b271652 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -38,6 +38,7 @@ import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.FailureEventBuilder;
@@ -244,7 +245,7 @@ final class SafeDatasetCommit implements Callable<Void> {
private void submitLineageEvent(String dataset, Collection<TaskState> states) {
Collection<LineageEventBuilder> events = LineageInfo.load(states);
// Send events
- events.forEach(event -> event.submit(metricContext));
+ events.forEach(event -> EventSubmitter.submit(this.metricContext, event));
log.info(String.format("Submitted %d lineage events for dataset %s", events.size(), dataset));
}