You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by af...@apache.org on 2015/11/13 05:07:49 UTC

incubator-reef git commit: [REEF-714] Service to report REEF events

Repository: incubator-reef
Updated Branches:
  refs/heads/master 0a78fcac5 -> 53584db94


[REEF-714] Service to report REEF events

This addressed the issue by
 * Create avro schema which wrap all driver-side event types of REEF control flow
 * Create Watcher that subscribes control flow events and transfer them to EventStream
 * Create LogEventStream and FileEventStream

JIRA:
[REEF-714](https://issues.apache.org/jira/browse/REEF-714)

Pull Request:
  Closes #522


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/53584db9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/53584db9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/53584db9

Branch: refs/heads/master
Commit: 53584db94aa3884093584d092400fac30d79d571
Parents: 0a78fca
Author: Kim_Geon_Woo <gw...@gmail.com>
Authored: Tue Sep 8 00:52:42 2015 +0900
Committer: Andrew Chung <af...@gmail.com>
Committed: Thu Nov 12 20:07:03 2015 -0800

----------------------------------------------------------------------
 .../main/avro/watcher/REEFDriverSideEvents.avsc | 354 +++++++++++++++++++
 .../org/apache/reef/io/watcher/EventStream.java |  40 +++
 .../org/apache/reef/io/watcher/EventType.java   |  43 +++
 .../apache/reef/io/watcher/FileEventStream.java | 105 ++++++
 .../apache/reef/io/watcher/LogEventStream.java  |  43 +++
 .../org/apache/reef/io/watcher/Watcher.java     | 184 ++++++++++
 .../reef/io/watcher/WatcherConfiguration.java   |  62 ++++
 .../apache/reef/io/watcher/package-info.java    |  23 ++
 .../reef/io/watcher/param/EventStreams.java     |  43 +++
 .../reef/io/watcher/param/package-info.java     |  23 ++
 .../watcher/util/RunnableExecutingHandler.java  |  43 +++
 .../reef/io/watcher/util/WatcherAvroUtil.java   | 333 +++++++++++++++++
 .../reef/io/watcher/util/package-info.java      |  23 ++
 lang/java/reef-tests/pom.xml                    |   5 +
 .../tests/watcher/FailedContextHandler.java     |  36 ++
 .../tests/watcher/FailedTaskStartHandler.java   |  36 ++
 .../reef/tests/watcher/IsTaskSuspended.java     |  26 ++
 .../reef/tests/watcher/TestEventStream.java     |  83 +++++
 .../reef/tests/watcher/WatcherTestDriver.java   | 188 ++++++++++
 .../reef/tests/watcher/WatcherTestTask.java     |  81 +++++
 .../apache/reef/tests/watcher/package-info.java |  23 ++
 .../apache/reef/tests/watcher/WatcherTest.java  |  85 +++++
 .../apache/reef/tests/watcher/package-info.java |  23 ++
 23 files changed, 1905 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/avro/watcher/REEFDriverSideEvents.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/avro/watcher/REEFDriverSideEvents.avsc b/lang/java/reef-io/src/main/avro/watcher/REEFDriverSideEvents.avsc
new file mode 100644
index 0000000..37ee7d0
--- /dev/null
+++ b/lang/java/reef-io/src/main/avro/watcher/REEFDriverSideEvents.avsc
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+
+  /***********************/
+  /** Common Interfaces **/
+  /***********************/
+
+  /**
+   * org.apache.reef.common.Failure
+   */
+  {
+    "namespace": "org.apache.reef.io.watcher.common",
+    "type": "record",
+    "name": "AvroFailure",
+    "fields": [
+      {"name": "id", "type": "string"},
+      {"name": "message", "type": "string"},
+      {"name": "description", "type": ["string", "null"]},
+      {"name": "reason", "type": ["string", "null"]},
+      {"name": "data", "type": ["bytes", "null"]},
+      {"name": "asError", "type": "string"}
+    ]
+  },
+
+  /**
+   * org.apache.reef.driver.catalog.RackDescriptor
+   */
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.catalog",
+    "type": "record",
+    "name": "AvroRackDescriptor",
+    "fields": [
+      {"name": "name", "type": "string"},
+      {
+        "name": "nodes",
+        "type": {
+          "type": "array",
+          "name": "AvroNodeDescriptor",
+          "items": {
+            "type": "record",
+            "name": "AvroNodeDescriptorInRackDescriptor",
+            "fields": [
+                  {"name": "id", "type": "string"},
+                  {"name": "name", "type": "string"},
+                  {"name": "inetSocketAddress", "type": "string"}
+            ]
+          }
+        }
+      }
+    ]
+  },
+
+  /**
+   * org.apache.reef.driver.catalog.NodeDescriptor
+   */
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.catalog",
+    "type": "record",
+    "name": "AvroNodeDescriptor",
+    "fields": [
+      {"name": "id", "type": "string"},
+      {"name": "name", "type": "string"},
+      {"name": "inetSocketAddress", "type": "string"},
+      {"name": "rackDescriptor", "type": "org.apache.reef.io.watcher.driver.catalog.AvroRackDescriptor"}
+    ]
+  },
+
+  /**
+   * org.apache.reef.driver.evaluator.EvaluatorType
+   */
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.evaluator",
+    "type": "enum",
+    "name": "AvroEvaluatorType",
+    "symbols": ["JVM", "CLR", "UNDECIDED"]
+  },
+
+  /**
+   * org.apache.reef.driver.evaluator.EvaluatorProcess
+   */
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.evaluator",
+    "type": "record",
+    "name": "AvroEvaluatorProcess",
+    "fields": [
+      {
+        "name": "commandLines",
+        "type": {
+          "type": "array",
+          "items": ["string", "null"]
+        }
+      },
+      {"name": "evaluatorType", "type": "org.apache.reef.io.watcher.driver.evaluator.AvroEvaluatorType"},
+      {"name": "isOptionSet", "type": "boolean"}
+    ]
+  },
+
+  /**
+   * org.apache.reef.driver.evaluator.EvaluatorDescriptor
+   */
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.evaluator",
+    "type": "record",
+    "name": "AvroEvaluatorDescriptor",
+    "fields": [
+      {"name": "nodeDescriptor", "type": "org.apache.reef.io.watcher.driver.catalog.AvroNodeDescriptor"},
+      {"name": "process", "type": "org.apache.reef.io.watcher.driver.evaluator.AvroEvaluatorProcess"},
+      {"name": "memory", "type": "int"},
+      {"name": "numberOfCores", "type": "int"}
+    ]
+  },
+
+  /*********************/
+  /** Time Interfaces **/
+  /*********************/
+
+  /**
+   * org.apache.reef.wake.time.runtime.event.RuntimeStart
+   */
+  {
+    "namespace": "org.apache.reef.io.watcher.wake.time.runtime.event",
+    "type": "record",
+    "name": "AvroRuntimeStart",
+    "fields": [
+      {"name": "timestamp", "type": "long"}
+    ]
+  },
+
+  /**
+   * org.apache.reef.wake.time.runtime.event.RuntimeStop
+   */
+  {
+    "namespace": "org.apache.reef.io.watcher.wake.time.runtime.event",
+    "type": "record",
+    "name": "AvroRuntimeStop",
+    "fields": [
+      {"name": "timestamp", "type": "long"},
+      {"name": "exception", "type": ["string", "null"]}
+    ]
+  },
+
+  /**
+   * org.apache.reef.wake.time.event.StartTime
+   */
+  {
+    "namespace": "org.apache.reef.io.watcher.wake.time.event",
+    "type": "record",
+    "name": "AvroStartTime",
+    "fields": [
+      {"name": "timestamp", "type": "long"}
+    ]
+  },
+
+  /**
+   * org.apache.reef.wake.time.event.StopTime
+   */
+  {
+    "namespace": "org.apache.reef.io.watcher.wake.time.event",
+    "type": "record",
+    "name": "AvroStopTime",
+    "fields": [
+      {"name": "timestamp", "type": "long"}
+    ]
+  },
+
+  /************************/
+  /** Context Interfaces **/
+  /************************/
+
+  /**
+   * org.apache.reef.driver.context.ContextBase
+   */
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.context",
+    "type": "record",
+    "name": "AvroContextBase",
+    "fields": [
+      {"name": "id", "type": "string"},
+      {"name": "evaluatorId", "type": "string"},
+      {"name": "parentId", "type": ["string", "null"]},
+      {"name": "evaluatorDescriptor", "type": [
+        "org.apache.reef.io.watcher.driver.evaluator.AvroEvaluatorDescriptor",
+        "null"
+      ]}
+    ]
+  },
+
+  /**
+   * org.apache.reef.driver.context.ActiveContext
+   */
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.context",
+    "type": "record",
+    "name": "AvroActiveContext",
+    "fields": [
+      {"name": "base", "type": "org.apache.reef.io.watcher.driver.context.AvroContextBase"}
+    ]
+  },
+
+  /**
+   * org.apache.reef.driver.context.ClosedContext
+   */
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.context",
+    "type": "record",
+    "name": "AvroClosedContext",
+    "fields": [
+      {"name": "base", "type": "org.apache.reef.io.watcher.driver.context.AvroContextBase"},
+      {"name": "parentContext", "type": "org.apache.reef.io.watcher.driver.context.AvroActiveContext"}
+    ]
+  },
+
+  /**
+   * org.apache.reef.driver.context.FailedContext
+   */
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.context",
+    "type": "record",
+    "name": "AvroFailedContext",
+    "fields": [
+      {"name": "base", "type": "org.apache.reef.io.watcher.driver.context.AvroContextBase"},
+      {"name": "parentContext", "type": ["org.apache.reef.io.watcher.driver.context.AvroActiveContext", "null"]},
+      {"name": "failure", "type": "org.apache.reef.io.watcher.common.AvroFailure"}
+    ]
+  },
+
+  /**
+   * org.apache.reef.driver.context.ContextMessage
+   */
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.context",
+    "type": "record",
+    "name": "AvroContextMessage",
+    "fields": [
+      {"name": "id", "type": "string"},
+      {"name": "messageSourceId", "type": "string"},
+      {"name": "get", "type": ["bytes", "null"]}
+    ]
+  },
+
+  /*********************/
+  /** Task Interfaces **/
+  /*********************/
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.task",
+    "type": "record",
+    "name": "AvroCompletedTask",
+    "fields": [
+      {"name": "id", "type": "string"},
+      {"name": "activeContext", "type": "org.apache.reef.io.watcher.driver.context.AvroActiveContext"},
+      {"name": "get", "type": ["bytes", "null"]}
+    ]
+  },
+
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.task",
+    "type": "record",
+    "name": "AvroFailedTask",
+    "fields": [
+      {"name": "failure", "type": "org.apache.reef.io.watcher.common.AvroFailure"},
+      {"name": "activeContext", "type": ["org.apache.reef.io.watcher.driver.context.AvroActiveContext", "null"]}
+    ]
+  },
+
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.task",
+    "type": "record",
+    "name": "AvroRunningTask",
+    "fields": [
+      {"name": "id", "type": "string"},
+      {"name": "activeContext", "type": "org.apache.reef.io.watcher.driver.context.AvroActiveContext"}
+    ]
+  },
+
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.task",
+    "type": "record",
+    "name": "AvroSuspendedTask",
+    "fields": [
+      {"name": "id", "type": "string"},
+      {"name": "activeContext", "type": "org.apache.reef.io.watcher.driver.context.AvroActiveContext"},
+      {"name": "get", "type": ["bytes", "null"]}
+    ]
+  },
+
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.task",
+    "type": "record",
+    "name": "AvroTaskMessage",
+    "fields": [
+      {"name": "id", "type": "string"},
+      {"name": "contextId", "type": "string"},
+      {"name": "messageSourceId", "type": "string"},
+      {"name": "get", "type": ["bytes", "null"]}
+    ]
+  },
+
+  /**************************/
+  /** Evaluator Interfaces **/
+  /**************************/
+
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.evaluator",
+    "type": "record",
+    "name": "AvroAllocatedEvaluator",
+    "fields": [
+      {"name": "id", "type": "string"},
+      {"name": "evaluatorDescriptor", "type": "org.apache.reef.io.watcher.driver.evaluator.AvroEvaluatorDescriptor"}
+    ]
+  },
+
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.evaluator",
+    "type": "record",
+    "name": "AvroCompletedEvaluator",
+    "fields": [
+      {"name": "id", "type": "string"}
+    ]
+  },
+
+  {
+    "namespace": "org.apache.reef.io.watcher.driver.evaluator",
+    "type": "record",
+    "name": "AvroFailedEvaluator",
+    "fields": [
+      {"name": "id", "type": "string"},
+      {"name": "evaluatorException", "type": "string"},
+      {
+        "name": "failedContextList", "type": {
+          "type": "array",
+          "items": "org.apache.reef.io.watcher.driver.context.AvroFailedContext"
+        }
+      },
+      {"name": "failedTask", "type": ["org.apache.reef.io.watcher.driver.task.AvroFailedTask", "null"]}
+    ]
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/EventStream.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/EventStream.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/EventStream.java
new file mode 100644
index 0000000..18c5fff
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/EventStream.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+/**
+ * Write events to a certain destination.
+ */
+@Unstable
+@DefaultImplementation(LogEventStream.class)
+public interface EventStream {
+
+  /**
+   * Write an eventRecord with specific type. This should be thread-safe
+   * since multiple event handlers can concurrently call the method.
+   *
+   * @param type a event type
+   * @param jsonEncodedEvent an event encoded as json
+   */
+  void onEvent(EventType type, String jsonEncodedEvent);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/EventType.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/EventType.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/EventType.java
new file mode 100644
index 0000000..d753724
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/EventType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher;
+
+import org.apache.reef.annotations.Unstable;
+
+/**
+ * Event types that Watcher subscribes.
+ */
+@Unstable
+public enum EventType {
+  RuntimeStart,
+  StartTime,
+  StopTime,
+  RuntimeStop,
+  ActiveContext,
+  ClosedContext,
+  FailedContext,
+  CompletedTask,
+  FailedTask,
+  RunningTask,
+  TaskMessage,
+  SuspendedTask,
+  AllocatedEvaluator,
+  FailedEvaluator,
+  CompletedEvaluator
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/FileEventStream.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/FileEventStream.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/FileEventStream.java
new file mode 100644
index 0000000..e50691a
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/FileEventStream.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.io.watcher.util.RunnableExecutingHandler;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.impl.ThreadPoolStage;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * Write events to a file in the root directory of the driver.
+ */
+@Unstable
+public final class FileEventStream implements EventStream {
+
+  private final DateFormat dateFormat;
+  private final PrintWriter printWriter;
+  private final EStage<Runnable> singleThreadedExecutor;
+
+  @Inject
+  private FileEventStream(@Parameter(Path.class) final String path) {
+    this.dateFormat = new SimpleDateFormat("[yyyy.MM.dd HH:mm:ss.SSSS]");
+    this.singleThreadedExecutor = new ThreadPoolStage<>(new RunnableExecutingHandler(), 1);
+
+    try {
+      final OutputStreamWriter writer = new OutputStreamWriter(
+          new FileOutputStream(createFileWithPath(path)), Charset.forName("UTF-8"));
+      this.printWriter = new PrintWriter(writer);
+    } catch (final Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private File createFileWithPath(final String path) throws Exception {
+    final File file = new File(path);
+    final File parent = file.getParentFile();
+    if (parent != null && !parent.exists()){
+      parent.mkdirs();
+    }
+
+    file.createNewFile();
+    return file;
+  }
+
+  @Override
+  public void onEvent(final EventType type, final String jsonEncodedEvent) {
+    final long timestamp = System.currentTimeMillis();
+    singleThreadedExecutor.onNext(new Runnable() {
+      @Override
+      public void run() {
+        final String eventDescription = new StringBuilder()
+            .append(dateFormat.format(new Date(timestamp)))
+            .append(" [")
+            .append(type)
+            .append("] ")
+            .append(jsonEncodedEvent)
+            .toString();
+
+        printWriter.println(eventDescription);
+
+        if (type == EventType.RuntimeStop) {
+          onRuntimeStop();
+        }
+      }
+    });
+  }
+
+  private void onRuntimeStop() {
+    printWriter.flush();
+    printWriter.close();
+  }
+
+  @NamedParameter(doc = "The relative path of the reporting file.", default_value = "watcher_report.txt")
+  public static final class Path implements Name<String> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/LogEventStream.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/LogEventStream.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/LogEventStream.java
new file mode 100644
index 0000000..bde190c
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/LogEventStream.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher;
+
+import org.apache.reef.annotations.Unstable;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Write events to driver.err using Logger.
+ */
+@Unstable
+public final class LogEventStream implements EventStream {
+
+  private static final Logger LOG = Logger.getLogger(LogEventStream.class.getName());
+
+  @Inject
+  private LogEventStream() {
+  }
+
+  @Override
+  public void onEvent(final EventType type, final String jsonEncodedEvent) {
+    LOG.log(Level.INFO, "[{0}] {1}", new Object[]{type, jsonEncodedEvent});
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/Watcher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/Watcher.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/Watcher.java
new file mode 100644
index 0000000..4a60d93
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/Watcher.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.io.watcher.param.EventStreams;
+import org.apache.reef.io.watcher.util.WatcherAvroUtil;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+import org.apache.reef.wake.time.runtime.event.RuntimeStart;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import javax.inject.Inject;
+import java.util.Set;
+
+/**
+ * Subscribe events and transfer them as wrapping with corresponding avro classes.
+ */
+@Unstable
+@Unit
+public final class Watcher {
+
+  private final Set<EventStream> eventStreamSet;
+
+  @Inject
+  private Watcher(@Parameter(EventStreams.class) final Set<EventStream> eventStreamSet) {
+    this.eventStreamSet = eventStreamSet;
+  }
+
+  private void onEvent(final EventType eventType, final String jsonEncodedEvent) {
+    for (final EventStream eventStream : eventStreamSet) {
+      eventStream.onEvent(eventType, jsonEncodedEvent);
+    }
+  }
+
+  public final class DriverRuntimeStartHandler implements EventHandler<RuntimeStart> {
+
+    @Override
+    public void onNext(final RuntimeStart runtimeStart) {
+      onEvent(EventType.RuntimeStart, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroRuntimeStart(runtimeStart)));
+    }
+  }
+
+  public final class DriverStartHandler implements EventHandler<StartTime> {
+
+    @Override
+    public void onNext(final StartTime startTime) {
+      onEvent(EventType.StartTime, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroStartTime(startTime)));
+    }
+  }
+
+  public final class DriverStopHandler implements EventHandler<StopTime> {
+
+    @Override
+    public void onNext(final StopTime stopTime) {
+      onEvent(EventType.StopTime, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroStopTime(stopTime)));
+    }
+  }
+
+  public final class DriverRuntimeStopHandler implements EventHandler<RuntimeStop> {
+
+    @Override
+    public void onNext(final RuntimeStop runtimeStop) {
+      onEvent(EventType.RuntimeStop, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroRuntimeStop(runtimeStop)));
+    }
+  }
+
+  public final class ContextActiveHandler implements EventHandler<ActiveContext> {
+
+    @Override
+    public void onNext(final ActiveContext activeContext) {
+      onEvent(EventType.ActiveContext, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroActiveContext(activeContext)));
+    }
+  }
+
+  public final class ContextClosedHandler implements EventHandler<ClosedContext> {
+
+    @Override
+    public void onNext(final ClosedContext closedContext) {
+      onEvent(EventType.ClosedContext, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroClosedContext(closedContext)));
+    }
+  }
+
+  public final class ContextFailedHandler implements EventHandler<FailedContext> {
+
+    @Override
+    public void onNext(final FailedContext failedContext) {
+      onEvent(EventType.FailedContext, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroFailedContext(failedContext)));
+    }
+  }
+
+  public final class TaskCompletedHandler implements EventHandler<CompletedTask> {
+
+    @Override
+    public void onNext(final CompletedTask completedTask) {
+      onEvent(EventType.CompletedTask, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroCompletedTask(completedTask)));
+    }
+  }
+
+  public final class TaskFailedHandler implements EventHandler<FailedTask> {
+
+    @Override
+    public void onNext(final FailedTask failedTask) {
+      onEvent(EventType.FailedTask, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroFailedTask(failedTask)));
+    }
+  }
+
+  public final class TaskRunningHandler implements EventHandler<RunningTask> {
+
+    @Override
+    public void onNext(final RunningTask runningTask) {
+      onEvent(EventType.RunningTask, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroRunningTask(runningTask)));
+    }
+  }
+
+  public final class TaskMessageHandler implements EventHandler<TaskMessage> {
+
+    @Override
+    public void onNext(final TaskMessage taskMessage) {
+      onEvent(EventType.TaskMessage, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroTaskMessage(taskMessage)));
+    }
+  }
+
+  public final class TaskSuspendedHandler implements EventHandler<SuspendedTask> {
+
+    @Override
+    public void onNext(final SuspendedTask suspendedTask) {
+      onEvent(EventType.SuspendedTask, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroSuspendedTask(suspendedTask)));
+    }
+  }
+
+  public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+
+    @Override
+    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+      onEvent(EventType.AllocatedEvaluator,
+          WatcherAvroUtil.toString(WatcherAvroUtil.toAvroAllocatedEvaluator(allocatedEvaluator)));
+    }
+  }
+
+  public final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
+
+    @Override
+    public void onNext(final FailedEvaluator failedEvaluator) {
+      onEvent(EventType.FailedEvaluator,
+          WatcherAvroUtil.toString(WatcherAvroUtil.toAvroFailedEvaluator(failedEvaluator)));
+    }
+  }
+
+  public final class EvaluatorCompletedHandler implements EventHandler<CompletedEvaluator> {
+
+    @Override
+    public void onNext(final CompletedEvaluator completedEvaluator) {
+      onEvent(EventType.CompletedEvaluator,
+          WatcherAvroUtil.toString(WatcherAvroUtil.toAvroCompletedEvaluator(completedEvaluator)));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/WatcherConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/WatcherConfiguration.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/WatcherConfiguration.java
new file mode 100644
index 0000000..2f2ee02
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/WatcherConfiguration.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.io.watcher.param.EventStreams;
+import org.apache.reef.tang.formats.*;
+import org.apache.reef.wake.time.Clock;
+
+/**
+ * ConfigurationModule for Watcher.
+ */
+@Unstable
+public final class WatcherConfiguration extends ConfigurationModuleBuilder {
+
+  /**
+   * Types of EventStream where events subscribed by Watcher will be reported.
+   */
+  public static final OptionalImpl<EventStream> EVENT_STREAMS = new OptionalImpl<>();
+
+  public static final ConfigurationModule CONF = new WatcherConfiguration()
+      .bindSetEntry(EventStreams.class, EVENT_STREAMS)
+      .bindSetEntry(Clock.RuntimeStartHandler.class, Watcher.DriverRuntimeStartHandler.class)
+      .bindSetEntry(Clock.StartHandler.class, Watcher.DriverStartHandler.class)
+      .bindSetEntry(Clock.StopHandler.class, Watcher.DriverStopHandler.class)
+      .bindSetEntry(Clock.RuntimeStopHandler.class, Watcher.DriverRuntimeStopHandler.class)
+      .bindSetEntry(ServiceContextActiveHandlers.class, Watcher.ContextActiveHandler.class)
+      .bindSetEntry(ServiceContextClosedHandlers.class, Watcher.ContextClosedHandler.class)
+      .bindSetEntry(ServiceContextFailedHandlers.class, Watcher.ContextFailedHandler.class)
+      .bindSetEntry(ServiceTaskFailedHandlers.class, Watcher.TaskFailedHandler.class)
+      .bindSetEntry(ServiceTaskCompletedHandlers.class, Watcher.TaskCompletedHandler.class)
+      .bindSetEntry(ServiceTaskMessageHandlers.class, Watcher.TaskMessageHandler.class)
+      .bindSetEntry(ServiceTaskRunningHandlers.class, Watcher.TaskRunningHandler.class)
+      .bindSetEntry(ServiceTaskSuspendedHandlers.class, Watcher.TaskSuspendedHandler.class)
+      .bindSetEntry(ServiceEvaluatorAllocatedHandlers.class, Watcher.EvaluatorAllocatedHandler.class)
+      .bindSetEntry(ServiceEvaluatorFailedHandlers.class, Watcher.EvaluatorFailedHandler.class)
+      .bindSetEntry(ServiceEvaluatorCompletedHandlers.class, Watcher.EvaluatorCompletedHandler.class)
+      .build();
+
+  /**
+   * Construct a WatcherConfiguration.
+   */
+  private WatcherConfiguration() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/package-info.java
new file mode 100644
index 0000000..63bea01
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Watcher subscribes events and writes them to the destination using a certain EventStream.
+ */
+package org.apache.reef.io.watcher;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/param/EventStreams.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/param/EventStreams.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/param/EventStreams.java
new file mode 100644
index 0000000..6160c90
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/param/EventStreams.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher.param;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.io.watcher.EventStream;
+import org.apache.reef.io.watcher.LogEventStream;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.util.Set;
+
+/**
+ * Set of EventStreams.
+ */
+@Private
+@Unstable
+@NamedParameter(default_classes = {LogEventStream.class})
+public final class EventStreams implements Name<Set<EventStream>> {
+
+  /**
+   * This class should not be instantiated.
+   */
+  private EventStreams() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/param/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/param/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/param/package-info.java
new file mode 100644
index 0000000..57106fe
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/param/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Parameters for Watcher.
+ */
+package org.apache.reef.io.watcher.param;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/RunnableExecutingHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/RunnableExecutingHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/RunnableExecutingHandler.java
new file mode 100644
index 0000000..ddb8acd
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/RunnableExecutingHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher.util;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Handler that executes Runnable commands.
+ */
+@Unstable
+public final class RunnableExecutingHandler implements EventHandler<Runnable> {
+
+  private static final Logger LOG = Logger.getLogger(RunnableExecutingHandler.class.getName());
+
+  @Override
+  public void onNext(final Runnable runnable) {
+    try {
+      runnable.run();
+    } catch(final Throwable exception) {
+      LOG.log(Level.INFO, "An exception occurred while writing event with Watcher. {0}", exception.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java
new file mode 100644
index 0000000..3fd6ffc
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher.util;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.common.Failure;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.catalog.RackDescriptor;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextBase;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.*;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.io.watcher.common.AvroFailure;
+import org.apache.reef.io.watcher.driver.catalog.AvroNodeDescriptor;
+import org.apache.reef.io.watcher.driver.catalog.AvroNodeDescriptorInRackDescriptor;
+import org.apache.reef.io.watcher.driver.catalog.AvroRackDescriptor;
+import org.apache.reef.io.watcher.driver.context.AvroActiveContext;
+import org.apache.reef.io.watcher.driver.context.AvroClosedContext;
+import org.apache.reef.io.watcher.driver.context.AvroContextBase;
+import org.apache.reef.io.watcher.driver.context.AvroFailedContext;
+import org.apache.reef.io.watcher.driver.evaluator.*;
+import org.apache.reef.io.watcher.driver.task.*;
+import org.apache.reef.io.watcher.wake.time.event.AvroStartTime;
+import org.apache.reef.io.watcher.wake.time.event.AvroStopTime;
+import org.apache.reef.io.watcher.wake.time.runtime.event.AvroRuntimeStart;
+import org.apache.reef.io.watcher.wake.time.runtime.event.AvroRuntimeStop;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+import org.apache.reef.wake.time.runtime.event.RuntimeStart;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+@Private
+@Unstable
+public final class WatcherAvroUtil {
+
+  public static AvroFailure toAvroFailure(final Failure failure) {
+    final String reason;
+    if (failure.getReason().isPresent()) {
+      reason = convertThrowableToString(failure.getReason().get());
+    } else {
+      reason = null;
+    }
+
+    return AvroFailure.newBuilder()
+        .setAsError(convertThrowableToString(failure.asError()))
+        .setData(unwrapOptionalByteArray(failure.getData()))
+        .setDescription(failure.getDescription().orElse(null))
+        .setId(failure.getId())
+        .setMessage(failure.getMessage())
+        .setReason(reason)
+        .build();
+  }
+
+  public static AvroNodeDescriptorInRackDescriptor toAvroNodeDescriptorInRackDescriptor(
+      final String id, final String name, final InetSocketAddress inetSocketAddress) {
+    return AvroNodeDescriptorInRackDescriptor.newBuilder()
+        .setInetSocketAddress(inetSocketAddress.toString())
+        .setId(id)
+        .setName(name)
+        .build();
+  }
+
+  public static AvroRackDescriptor toAvroRackDescriptor(final RackDescriptor rackDescriptor) {
+    final List<AvroNodeDescriptorInRackDescriptor> nodeDescriptorList = new ArrayList<>();
+    for (final NodeDescriptor nodeDescriptor : rackDescriptor.getNodes()) {
+      nodeDescriptorList.add(
+          toAvroNodeDescriptorInRackDescriptor(
+              nodeDescriptor.getId(), nodeDescriptor.getName(), nodeDescriptor.getInetSocketAddress()
+          )
+      );
+    }
+
+    return AvroRackDescriptor.newBuilder()
+        .setNodes(nodeDescriptorList)
+        .setName(rackDescriptor.getName())
+        .build();
+  }
+
+  public static AvroNodeDescriptor toAvroNodeDescriptor(final NodeDescriptor nodeDescriptor) {
+    return AvroNodeDescriptor.newBuilder()
+        .setId(nodeDescriptor.getId())
+        .setName(nodeDescriptor.getName())
+        .setInetSocketAddress(nodeDescriptor.getInetSocketAddress().toString())
+        .setRackDescriptor(toAvroRackDescriptor(nodeDescriptor.getRackDescriptor()))
+        .build();
+  }
+
+  public static AvroEvaluatorType toAvroEvaluatorType(final EvaluatorType evaluatorType) {
+    switch (evaluatorType) {
+    case JVM: return AvroEvaluatorType.JVM;
+    case CLR: return AvroEvaluatorType.CLR;
+    case UNDECIDED: return AvroEvaluatorType.UNDECIDED;
+    default: throw new RuntimeException(evaluatorType + " is not defined for AvroEvaluatorType.");
+    }
+  }
+
+  public static AvroEvaluatorProcess toAvroEvaluatorProcess(final EvaluatorProcess evaluatorProcess) {
+    final List<CharSequence> commandLines = new ArrayList<>();
+    for (final  String commandLine : evaluatorProcess.getCommandLine()) {
+      commandLines.add(commandLine);
+    }
+
+    return AvroEvaluatorProcess.newBuilder()
+        .setCommandLines(commandLines)
+        .setEvaluatorType(toAvroEvaluatorType(evaluatorProcess.getType()))
+        .setIsOptionSet(evaluatorProcess.isOptionSet())
+        .build();
+  }
+
+  public static AvroEvaluatorDescriptor toAvroEvaluatorDescriptor(final EvaluatorDescriptor evaluatorDescriptor) {
+    return AvroEvaluatorDescriptor.newBuilder()
+        .setMemory(evaluatorDescriptor.getMemory())
+        .setNodeDescriptor(toAvroNodeDescriptor(evaluatorDescriptor.getNodeDescriptor()))
+        .setNumberOfCores(evaluatorDescriptor.getNumberOfCores())
+        .setProcess(toAvroEvaluatorProcess(evaluatorDescriptor.getProcess()))
+        .build();
+  }
+
+  public static AvroRuntimeStart toAvroRuntimeStart(final RuntimeStart runtimeStart) {
+    return AvroRuntimeStart.newBuilder()
+        .setTimestamp(runtimeStart.getTimeStamp())
+        .build();
+  }
+
+  public static AvroStartTime toAvroStartTime(final StartTime startTime) {
+    return AvroStartTime.newBuilder()
+        .setTimestamp(startTime.getTimeStamp())
+        .build();
+  }
+
+  public static AvroStopTime toAvroStopTime(final StopTime stopTime) {
+    return AvroStopTime.newBuilder()
+        .setTimestamp(stopTime.getTimeStamp())
+        .build();
+  }
+
+  public static AvroRuntimeStop toAvroRuntimeStop(final RuntimeStop runtimeStop) {
+    return AvroRuntimeStop.newBuilder()
+        .setException(convertThrowableToString(runtimeStop.getException()))
+        .setTimestamp(runtimeStop.getTimeStamp())
+        .build();
+  }
+
+  public static AvroContextBase toAvroContextBase(final ContextBase contextBase) {
+    return AvroContextBase.newBuilder()
+        .setEvaluatorDescriptor(null)
+        .setEvaluatorId(contextBase.getEvaluatorId())
+        .setId(contextBase.getId())
+        .setParentId(contextBase.getParentId().orElse(null))
+        .build();
+  }
+
+  public static AvroActiveContext toAvroActiveContext(final ActiveContext activeContext) {
+    return AvroActiveContext.newBuilder()
+        .setBase(toAvroContextBase(activeContext))
+        .build();
+  }
+
+  public static AvroClosedContext toAvroClosedContext(final ClosedContext closedContext) {
+    return AvroClosedContext.newBuilder()
+        .setBase(toAvroContextBase(closedContext))
+        .setParentContext(toAvroActiveContext(closedContext.getParentContext()))
+        .build();
+  }
+
+  public static AvroFailedContext toAvroFailedContext(final FailedContext failedContext) {
+    return AvroFailedContext.newBuilder()
+        .setBase(toAvroContextBase(failedContext))
+        .setParentContext(unwrapOptionalActiveContext(failedContext.getParentContext()))
+        .setFailure(toAvroFailure(failedContext))
+        .build();
+  }
+
+  public static AvroCompletedTask toAvroCompletedTask(final CompletedTask completedTask) {
+    return AvroCompletedTask.newBuilder()
+        .setId(completedTask.getId())
+        .setActiveContext(toAvroActiveContext(completedTask.getActiveContext()))
+        .setGet(wrapNullableByteArray(completedTask.get()))
+        .build();
+  }
+
+  public static AvroFailedTask toAvroFailedTask(final FailedTask failedTask) {
+    return AvroFailedTask.newBuilder()
+        .setActiveContext(unwrapOptionalActiveContext(failedTask.getActiveContext()))
+        .setFailure(toAvroFailure(failedTask))
+        .build();
+  }
+
+  public static AvroRunningTask toAvroRunningTask(final RunningTask runningTask) {
+    return AvroRunningTask.newBuilder()
+        .setActiveContext(toAvroActiveContext(runningTask.getActiveContext()))
+        .setId(runningTask.getId())
+        .build();
+  }
+
+  public static AvroTaskMessage toAvroTaskMessage(final TaskMessage taskMessage) {
+    return AvroTaskMessage.newBuilder()
+        .setId(taskMessage.getId())
+        .setContextId(taskMessage.getContextId())
+        .setMessageSourceId(taskMessage.getMessageSourceID())
+        .setGet(wrapNullableByteArray(taskMessage.get()))
+        .build();
+  }
+
+  public static AvroSuspendedTask toAvroSuspendedTask(final SuspendedTask suspendedTask) {
+    return AvroSuspendedTask.newBuilder()
+        .setGet(wrapNullableByteArray(suspendedTask.get()))
+        .setId(suspendedTask.getId())
+        .setActiveContext(toAvroActiveContext(suspendedTask.getActiveContext()))
+        .build();
+  }
+
+  public static AvroAllocatedEvaluator toAvroAllocatedEvaluator(final AllocatedEvaluator allocatedEvaluator) {
+    return AvroAllocatedEvaluator.newBuilder()
+        .setId(allocatedEvaluator.getId())
+        .setEvaluatorDescriptor(toAvroEvaluatorDescriptor(allocatedEvaluator.getEvaluatorDescriptor()))
+        .build();
+  }
+
+  public static AvroFailedEvaluator toAvroFailedEvaluator(final FailedEvaluator failedEvaluator) {
+    final AvroFailedTask avroFailedTask;
+    if (failedEvaluator.getFailedTask().isPresent()) {
+      avroFailedTask = toAvroFailedTask(failedEvaluator.getFailedTask().get());
+    } else {
+      avroFailedTask = null;
+    }
+
+    final List<AvroFailedContext> avroFailedContextList = new ArrayList<>();
+    for (final FailedContext failedContext : failedEvaluator.getFailedContextList()) {
+      avroFailedContextList.add(toAvroFailedContext(failedContext));
+    }
+
+    return AvroFailedEvaluator.newBuilder()
+        .setId(failedEvaluator.getId())
+        .setEvaluatorException(convertThrowableToString(failedEvaluator.getEvaluatorException()))
+        .setFailedContextList(avroFailedContextList)
+        .setFailedTask(avroFailedTask)
+        .build();
+  }
+
+  public static AvroCompletedEvaluator toAvroCompletedEvaluator(final CompletedEvaluator completedEvaluator) {
+    return AvroCompletedEvaluator.newBuilder()
+        .setId(completedEvaluator.getId())
+        .build();
+  }
+
+  public static String toString(final SpecificRecord record) {
+    final String jsonEncodedRecord;
+    try {
+      final Schema schema = record.getSchema();
+      final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      final Encoder encoder = EncoderFactory.get().jsonEncoder(schema, bos);
+      final SpecificDatumWriter datumWriter = new SpecificDatumWriter(record.getClass());
+      datumWriter.write(record, encoder);
+      encoder.flush();
+      jsonEncodedRecord = new String(bos.toByteArray(), Charset.forName("UTF-8"));
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+    return jsonEncodedRecord;
+  }
+
+  private static AvroActiveContext unwrapOptionalActiveContext(final Optional<ActiveContext> optionalActiveContext) {
+    if (optionalActiveContext.isPresent()) {
+      return toAvroActiveContext(optionalActiveContext.get());
+    }
+
+    return null;
+  }
+
+  private static String convertThrowableToString(final Throwable throwable) {
+    if (throwable != null) {
+      return throwable.toString();
+    }
+
+    return null;
+  }
+
+  private static ByteBuffer wrapNullableByteArray(final byte[] data) {
+    if (data != null) {
+      return ByteBuffer.wrap(data);
+    }
+
+    return null;
+  }
+
+  private static ByteBuffer unwrapOptionalByteArray(final Optional<byte[]> optionalByteArray) {
+    if (optionalByteArray.isPresent()) {
+      return ByteBuffer.wrap(optionalByteArray.get());
+    }
+
+    return null;
+  }
+
+  /**
+   * Empty private constructor to prohibit instantiation of utility class.
+   */
+  private WatcherAvroUtil() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/package-info.java
new file mode 100644
index 0000000..e8df30f
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Utility classes for Watcher.
+ */
+package org.apache.reef.io.watcher.util;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/pom.xml b/lang/java/reef-tests/pom.xml
index 1c46186..832c345 100644
--- a/lang/java/reef-tests/pom.xml
+++ b/lang/java/reef-tests/pom.xml
@@ -66,6 +66,11 @@ under the License.
         </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
+            <artifactId>reef-io</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
             <artifactId>vortex</artifactId>
             <version>${project.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/FailedContextHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/FailedContextHandler.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/FailedContextHandler.java
new file mode 100644
index 0000000..808a7d8
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/FailedContextHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.watcher;
+
+import org.apache.reef.evaluator.context.events.ContextStart;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+public final class FailedContextHandler implements EventHandler<ContextStart> {
+
+  @Inject
+  private FailedContextHandler() {
+    throw new RuntimeException();
+  }
+
+  @Override
+  public void onNext(final ContextStart value) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/FailedTaskStartHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/FailedTaskStartHandler.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/FailedTaskStartHandler.java
new file mode 100644
index 0000000..633a76b
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/FailedTaskStartHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.watcher;
+
+import org.apache.reef.task.events.TaskStart;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+public final class FailedTaskStartHandler implements EventHandler<TaskStart> {
+
+  @Inject
+  private FailedTaskStartHandler() {
+    throw new RuntimeException();
+  }
+
+  @Override
+  public void onNext(final TaskStart value) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/IsTaskSuspended.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/IsTaskSuspended.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/IsTaskSuspended.java
new file mode 100644
index 0000000..a40efb7
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/IsTaskSuspended.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.watcher;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(default_value = "false")
+public final class IsTaskSuspended implements Name<Boolean> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/TestEventStream.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/TestEventStream.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/TestEventStream.java
new file mode 100644
index 0000000..8917e4b
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/TestEventStream.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.watcher;
+
+import org.apache.reef.io.watcher.EventStream;
+import org.apache.reef.io.watcher.EventType;
+
+import javax.inject.Inject;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class TestEventStream implements EventStream {
+
+  private Map<EventType, AtomicInteger> eventCounter;
+
+  @Inject
+  private TestEventStream(){
+    this.eventCounter = new HashMap<>();
+    for (final EventType type : EventType.values()) {
+      eventCounter.put(type, new AtomicInteger());
+    }
+  }
+
+  @Override
+  public void onEvent(final EventType type, final String jsonEncodedEvent) {
+    eventCounter.get(type).incrementAndGet();
+  }
+
+  private void checkEqualTo(final EventType type, final int expectedNum) {
+    final int actualNum = eventCounter.get(type).get();
+    if (actualNum != expectedNum) {
+      throw new RuntimeException("The expected number of " + type + " is "
+          + expectedNum + " but " + actualNum + " times occurred");
+    }
+  }
+
+  private void checkGreaterThan(final EventType type, final int num) {
+    final int actualNum = eventCounter.get(type).get();
+    if (actualNum < num) {
+      throw new RuntimeException("The number of event " + type + " should be greater than " + num
+          + " but " + actualNum + " times occurred");
+    }
+  }
+
+  /**
+   * This validation is called in WatcherTestDriver#RuntimeStopHandler, so RuntimeStop should not be guaranteed
+   * to be called before this.
+   */
+  public void validate() {
+    checkEqualTo(EventType.RuntimeStart, 1);
+    checkEqualTo(EventType.StartTime, 1);
+    checkEqualTo(EventType.AllocatedEvaluator, 2);
+    checkEqualTo(EventType.FailedEvaluator, 1);
+    checkEqualTo(EventType.ActiveContext, 2);
+    checkEqualTo(EventType.FailedContext, 1);
+    checkEqualTo(EventType.FailedTask, 1);
+    checkEqualTo(EventType.RunningTask, 2);
+    checkEqualTo(EventType.SuspendedTask, 1);
+    checkGreaterThan(EventType.TaskMessage, 0);
+    checkEqualTo(EventType.CompletedTask, 1);
+    checkEqualTo(EventType.ClosedContext, 1);
+    checkEqualTo(EventType.CompletedEvaluator, 1);
+    checkEqualTo(EventType.StopTime, 1);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/WatcherTestDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/WatcherTestDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/WatcherTestDriver.java
new file mode 100644
index 0000000..70059f7
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/WatcherTestDriver.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.watcher;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.driver.task.SuspendedTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Unit
+public final class WatcherTestDriver {
+
+  private static final String ROOT_CONTEXT_ID = "ROOT_CONTEXT";
+  private static final String FIRST_CONTEXT_ID = "FIRST_CONTEXT";
+
+  private final EvaluatorRequestor evaluatorRequestor;
+  private final TestEventStream testEventStream;
+
+  /**
+   * The first evaluator will be failed to generate FailedEvaluator.
+   */
+  private final AtomicBoolean isFirstEvaluator;
+
+  /**
+   * The first task will be suspended to generate SuspendedTask.
+   */
+  private final AtomicBoolean isFirstTask;
+
+  @Inject
+  private WatcherTestDriver(final EvaluatorRequestor evaluatorRequestor,
+                            final TestEventStream testEventStream) {
+    this.evaluatorRequestor = evaluatorRequestor;
+    this.testEventStream = testEventStream;
+    this.isFirstEvaluator = new AtomicBoolean(true);
+    this.isFirstTask = new AtomicBoolean(true);
+  }
+
+  public final class DriverStartedHandler implements EventHandler<StartTime> {
+
+    @Override
+    public void onNext(final StartTime value) {
+      evaluatorRequestor.submit(EvaluatorRequest
+          .newBuilder()
+          .setMemory(64)
+          .setNumberOfCores(1)
+          .setNumber(2)
+          .build());
+    }
+  }
+
+  public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+
+    @Override
+    public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+      if (isFirstEvaluator.compareAndSet(true, false)) {
+        allocatedEvaluator.submitContext(getFailedContextConfiguration());
+      } else {
+        allocatedEvaluator.submitContext(ContextConfiguration.CONF
+            .set(ContextConfiguration.IDENTIFIER, ROOT_CONTEXT_ID)
+            .build());
+      }
+    }
+  }
+
+  public final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
+
+    @Override
+    public void onNext(final FailedEvaluator failedEvaluator) {
+      // no-op
+    }
+  }
+
+  public final class ContextActivatedHandler implements EventHandler<ActiveContext> {
+
+    @Override
+    public void onNext(final ActiveContext activeContext) {
+      if (activeContext.getId().equals(ROOT_CONTEXT_ID)) {
+        activeContext.submitContext(ContextConfiguration.CONF
+            .set(ContextConfiguration.IDENTIFIER, FIRST_CONTEXT_ID)
+            .build());
+      } else if (activeContext.getId().equals(FIRST_CONTEXT_ID)) {
+        activeContext.submitContext(getFailedContextConfiguration());
+      }
+    }
+  }
+
+  public final class ContextFailedHandler implements EventHandler<FailedContext> {
+
+    @Override
+    public void onNext(final FailedContext failedContext) {
+      failedContext.getParentContext().get().submitTask(getFailedTaskConfiguration());
+    }
+  }
+
+  public final class TaskRunningHandler implements EventHandler<RunningTask> {
+
+    @Override
+    public void onNext(final RunningTask runningTask) {
+      if (isFirstTask.compareAndSet(true, false)) {
+        runningTask.suspend();
+      }
+    }
+  }
+
+  public final class TaskFailedHandler implements EventHandler<FailedTask> {
+
+    @Override
+    public void onNext(final FailedTask failedTask) {
+      failedTask.getActiveContext().get().submitTask(getTaskConfiguration(true));
+    }
+  }
+
+  public final class TaskSuspendedHandler implements EventHandler<SuspendedTask> {
+
+    @Override
+    public void onNext(final SuspendedTask value) {
+      value.getActiveContext().submitTask(getTaskConfiguration(false));
+    }
+  }
+
+  public final class RuntimeStopHandler implements EventHandler<RuntimeStop> {
+
+    @Override
+    public void onNext(final RuntimeStop runtimeStop) {
+      testEventStream.validate();
+    }
+  }
+
+  private Configuration getTaskConfiguration(final boolean isTaskSuspended) {
+    final Configuration taskConf = TaskConfiguration.CONF
+        .set(TaskConfiguration.TASK, WatcherTestTask.class)
+        .set(TaskConfiguration.IDENTIFIER, "TASK")
+        .set(TaskConfiguration.ON_SEND_MESSAGE, WatcherTestTask.class)
+        .set(TaskConfiguration.ON_SUSPEND, WatcherTestTask.TaskSuspendedHandler.class)
+        .build();
+
+    return Tang.Factory.getTang().newConfigurationBuilder(taskConf)
+        .bindNamedParameter(IsTaskSuspended.class, String.valueOf(isTaskSuspended))
+        .build();
+  }
+
+  private Configuration getFailedTaskConfiguration() {
+    return TaskConfiguration.CONF
+        .set(TaskConfiguration.TASK, WatcherTestTask.class)
+        .set(TaskConfiguration.IDENTIFIER, "FAILED_TASK")
+        .set(TaskConfiguration.ON_TASK_STARTED, FailedTaskStartHandler.class)
+        .build();
+  }
+
+  private Configuration getFailedContextConfiguration() {
+    return ContextConfiguration.CONF
+        .set(ContextConfiguration.IDENTIFIER, "FAILED_CONTEXT")
+        .set(ContextConfiguration.ON_CONTEXT_STARTED, FailedContextHandler.class)
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/WatcherTestTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/WatcherTestTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/WatcherTestTask.java
new file mode 100644
index 0000000..d581749
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/WatcherTestTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.watcher;
+
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.task.HeartBeatTriggerManager;
+import org.apache.reef.task.Task;
+import org.apache.reef.task.TaskMessage;
+import org.apache.reef.task.TaskMessageSource;
+import org.apache.reef.task.events.SuspendEvent;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.nio.charset.Charset;
+
+@Unit
+public final class WatcherTestTask implements Task, TaskMessageSource {
+
+  private final TaskMessage taskMessage;
+  private final HeartBeatTriggerManager heartBeatTriggerManager;
+  private final boolean isTaskSuspended;
+  private boolean isRunning;
+
+  @Inject
+  private WatcherTestTask(final HeartBeatTriggerManager heartBeatTriggerManager,
+                          @Parameter(IsTaskSuspended.class) final boolean isTaskSuspended) {
+    this.taskMessage = TaskMessage.from("MESSAGE_SOURCE", "MESSAGE".getBytes(Charset.forName("UTF-8")));
+    this.heartBeatTriggerManager = heartBeatTriggerManager;
+    this.isTaskSuspended = isTaskSuspended;
+    this.isRunning = true;
+  }
+
+  @Override
+  public byte[] call(final byte[] memento) throws Exception {
+    if (isTaskSuspended) {
+      synchronized (this) {
+        while (isRunning) {
+          wait();
+        }
+      }
+    } else {
+      heartBeatTriggerManager.triggerHeartBeat();
+    }
+
+    return null;
+  }
+
+  @Override
+  public Optional<TaskMessage> getMessage() {
+    return Optional.of(taskMessage);
+  }
+
+  public final class TaskSuspendedHandler implements EventHandler<SuspendEvent> {
+
+    @Override
+    public void onNext(final SuspendEvent value) {
+      synchronized (WatcherTestTask.this) {
+        isRunning = false;
+        WatcherTestTask.this.notify();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/package-info.java
new file mode 100644
index 0000000..e0663c7
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test reef.io.watcher.
+ */
+package org.apache.reef.tests.watcher;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/test/java/org/apache/reef/tests/watcher/WatcherTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/watcher/WatcherTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/watcher/WatcherTest.java
new file mode 100644
index 0000000..5cd79f2
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/watcher/WatcherTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.watcher;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.io.watcher.LogEventStream;
+import org.apache.reef.io.watcher.WatcherConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tests.TestEnvironment;
+import org.apache.reef.tests.TestEnvironmentFactory;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.wake.time.Clock;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public final class WatcherTest {
+  private final TestEnvironment testEnvironment = TestEnvironmentFactory.getNewTestEnvironment();
+
+  /**
+   * Set up the test environment.
+   */
+  @Before
+  public void setUp() throws Exception {
+    this.testEnvironment.setUp();
+  }
+
+  /**
+   * Tear down the test environment.
+   */
+  @After
+  public void tearDown() throws Exception {
+    this.testEnvironment.tearDown();
+  }
+
+  private Configuration getDriverConfiguration() {
+    final Configuration driverConf = DriverConfiguration.CONF
+        .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(this.getClass()))
+        .set(DriverConfiguration.DRIVER_IDENTIFIER, "WatcherTest")
+        .set(DriverConfiguration.ON_DRIVER_STARTED, WatcherTestDriver.DriverStartedHandler.class)
+        .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, WatcherTestDriver.EvaluatorAllocatedHandler.class)
+        .set(DriverConfiguration.ON_EVALUATOR_FAILED, WatcherTestDriver.EvaluatorFailedHandler.class)
+        .set(DriverConfiguration.ON_CONTEXT_ACTIVE, WatcherTestDriver.ContextActivatedHandler.class)
+        .set(DriverConfiguration.ON_CONTEXT_FAILED, WatcherTestDriver.ContextFailedHandler.class)
+        .set(DriverConfiguration.ON_TASK_FAILED, WatcherTestDriver.TaskFailedHandler.class)
+        .set(DriverConfiguration.ON_TASK_RUNNING, WatcherTestDriver.TaskRunningHandler.class)
+        .set(DriverConfiguration.ON_TASK_SUSPENDED, WatcherTestDriver.TaskSuspendedHandler.class)
+        .build();
+
+    final Configuration runtimeStopConf = Tang.Factory.getTang().newConfigurationBuilder()
+        .bindSetEntry(Clock.RuntimeStopHandler.class, WatcherTestDriver.RuntimeStopHandler.class)
+        .build();
+
+    return Configurations.merge(driverConf, runtimeStopConf, WatcherConfiguration.CONF
+        .set(WatcherConfiguration.EVENT_STREAMS, LogEventStream.class)
+        .set(WatcherConfiguration.EVENT_STREAMS, TestEventStream.class)
+        .build());
+  }
+
+  @Test
+  public void testEventStream() {
+    final LauncherStatus status = this.testEnvironment.run(getDriverConfiguration());
+    Assert.assertTrue("Job state after execution: " + status, status.isSuccess());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/test/java/org/apache/reef/tests/watcher/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/watcher/package-info.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/watcher/package-info.java
new file mode 100644
index 0000000..e0663c7
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/watcher/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test reef.io.watcher.
+ */
+package org.apache.reef.tests.watcher;