You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by af...@apache.org on 2015/11/13 05:07:49 UTC
incubator-reef git commit: [REEF-714] Service to report REEF events
Repository: incubator-reef
Updated Branches:
refs/heads/master 0a78fcac5 -> 53584db94
[REEF-714] Service to report REEF events
This addressed the issue by
* Create avro schema which wrap all driver-side event types of REEF control flow
* Create Watcher that subscribes control flow events and transfer them to EventStream
* Create LogEventStream and FileEventStream
JIRA:
[REEF-714](https://issues.apache.org/jira/browse/REEF-714)
Pull Request:
Closes #522
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/53584db9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/53584db9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/53584db9
Branch: refs/heads/master
Commit: 53584db94aa3884093584d092400fac30d79d571
Parents: 0a78fca
Author: Kim_Geon_Woo <gw...@gmail.com>
Authored: Tue Sep 8 00:52:42 2015 +0900
Committer: Andrew Chung <af...@gmail.com>
Committed: Thu Nov 12 20:07:03 2015 -0800
----------------------------------------------------------------------
.../main/avro/watcher/REEFDriverSideEvents.avsc | 354 +++++++++++++++++++
.../org/apache/reef/io/watcher/EventStream.java | 40 +++
.../org/apache/reef/io/watcher/EventType.java | 43 +++
.../apache/reef/io/watcher/FileEventStream.java | 105 ++++++
.../apache/reef/io/watcher/LogEventStream.java | 43 +++
.../org/apache/reef/io/watcher/Watcher.java | 184 ++++++++++
.../reef/io/watcher/WatcherConfiguration.java | 62 ++++
.../apache/reef/io/watcher/package-info.java | 23 ++
.../reef/io/watcher/param/EventStreams.java | 43 +++
.../reef/io/watcher/param/package-info.java | 23 ++
.../watcher/util/RunnableExecutingHandler.java | 43 +++
.../reef/io/watcher/util/WatcherAvroUtil.java | 333 +++++++++++++++++
.../reef/io/watcher/util/package-info.java | 23 ++
lang/java/reef-tests/pom.xml | 5 +
.../tests/watcher/FailedContextHandler.java | 36 ++
.../tests/watcher/FailedTaskStartHandler.java | 36 ++
.../reef/tests/watcher/IsTaskSuspended.java | 26 ++
.../reef/tests/watcher/TestEventStream.java | 83 +++++
.../reef/tests/watcher/WatcherTestDriver.java | 188 ++++++++++
.../reef/tests/watcher/WatcherTestTask.java | 81 +++++
.../apache/reef/tests/watcher/package-info.java | 23 ++
.../apache/reef/tests/watcher/WatcherTest.java | 85 +++++
.../apache/reef/tests/watcher/package-info.java | 23 ++
23 files changed, 1905 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/avro/watcher/REEFDriverSideEvents.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/avro/watcher/REEFDriverSideEvents.avsc b/lang/java/reef-io/src/main/avro/watcher/REEFDriverSideEvents.avsc
new file mode 100644
index 0000000..37ee7d0
--- /dev/null
+++ b/lang/java/reef-io/src/main/avro/watcher/REEFDriverSideEvents.avsc
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+[
+
+ /***********************/
+ /** Common Interfaces **/
+ /***********************/
+
+ /**
+ * org.apache.reef.common.Failure
+ */
+ {
+ "namespace": "org.apache.reef.io.watcher.common",
+ "type": "record",
+ "name": "AvroFailure",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "message", "type": "string"},
+ {"name": "description", "type": ["string", "null"]},
+ {"name": "reason", "type": ["string", "null"]},
+ {"name": "data", "type": ["bytes", "null"]},
+ {"name": "asError", "type": "string"}
+ ]
+ },
+
+ /**
+ * org.apache.reef.driver.catalog.RackDescriptor
+ */
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.catalog",
+ "type": "record",
+ "name": "AvroRackDescriptor",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {
+ "name": "nodes",
+ "type": {
+ "type": "array",
+ "name": "AvroNodeDescriptor",
+ "items": {
+ "type": "record",
+ "name": "AvroNodeDescriptorInRackDescriptor",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "name", "type": "string"},
+ {"name": "inetSocketAddress", "type": "string"}
+ ]
+ }
+ }
+ }
+ ]
+ },
+
+ /**
+ * org.apache.reef.driver.catalog.NodeDescriptor
+ */
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.catalog",
+ "type": "record",
+ "name": "AvroNodeDescriptor",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "name", "type": "string"},
+ {"name": "inetSocketAddress", "type": "string"},
+ {"name": "rackDescriptor", "type": "org.apache.reef.io.watcher.driver.catalog.AvroRackDescriptor"}
+ ]
+ },
+
+ /**
+ * org.apache.reef.driver.evaluator.EvaluatorType
+ */
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.evaluator",
+ "type": "enum",
+ "name": "AvroEvaluatorType",
+ "symbols": ["JVM", "CLR", "UNDECIDED"]
+ },
+
+ /**
+ * org.apache.reef.driver.evaluator.EvaluatorProcess
+ */
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.evaluator",
+ "type": "record",
+ "name": "AvroEvaluatorProcess",
+ "fields": [
+ {
+ "name": "commandLines",
+ "type": {
+ "type": "array",
+ "items": ["string", "null"]
+ }
+ },
+ {"name": "evaluatorType", "type": "org.apache.reef.io.watcher.driver.evaluator.AvroEvaluatorType"},
+ {"name": "isOptionSet", "type": "boolean"}
+ ]
+ },
+
+ /**
+ * org.apache.reef.driver.evaluator.EvaluatorDescriptor
+ */
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.evaluator",
+ "type": "record",
+ "name": "AvroEvaluatorDescriptor",
+ "fields": [
+ {"name": "nodeDescriptor", "type": "org.apache.reef.io.watcher.driver.catalog.AvroNodeDescriptor"},
+ {"name": "process", "type": "org.apache.reef.io.watcher.driver.evaluator.AvroEvaluatorProcess"},
+ {"name": "memory", "type": "int"},
+ {"name": "numberOfCores", "type": "int"}
+ ]
+ },
+
+ /*********************/
+ /** Time Interfaces **/
+ /*********************/
+
+ /**
+ * org.apache.reef.wake.time.runtime.event.RuntimeStart
+ */
+ {
+ "namespace": "org.apache.reef.io.watcher.wake.time.runtime.event",
+ "type": "record",
+ "name": "AvroRuntimeStart",
+ "fields": [
+ {"name": "timestamp", "type": "long"}
+ ]
+ },
+
+ /**
+ * org.apache.reef.wake.time.runtime.event.RuntimeStop
+ */
+ {
+ "namespace": "org.apache.reef.io.watcher.wake.time.runtime.event",
+ "type": "record",
+ "name": "AvroRuntimeStop",
+ "fields": [
+ {"name": "timestamp", "type": "long"},
+ {"name": "exception", "type": ["string", "null"]}
+ ]
+ },
+
+ /**
+ * org.apache.reef.wake.time.event.StartTime
+ */
+ {
+ "namespace": "org.apache.reef.io.watcher.wake.time.event",
+ "type": "record",
+ "name": "AvroStartTime",
+ "fields": [
+ {"name": "timestamp", "type": "long"}
+ ]
+ },
+
+ /**
+ * org.apache.reef.wake.time.event.StopTime
+ */
+ {
+ "namespace": "org.apache.reef.io.watcher.wake.time.event",
+ "type": "record",
+ "name": "AvroStopTime",
+ "fields": [
+ {"name": "timestamp", "type": "long"}
+ ]
+ },
+
+ /************************/
+ /** Context Interfaces **/
+ /************************/
+
+ /**
+ * org.apache.reef.driver.context.ContextBase
+ */
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.context",
+ "type": "record",
+ "name": "AvroContextBase",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "evaluatorId", "type": "string"},
+ {"name": "parentId", "type": ["string", "null"]},
+ {"name": "evaluatorDescriptor", "type": [
+ "org.apache.reef.io.watcher.driver.evaluator.AvroEvaluatorDescriptor",
+ "null"
+ ]}
+ ]
+ },
+
+ /**
+ * org.apache.reef.driver.context.ActiveContext
+ */
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.context",
+ "type": "record",
+ "name": "AvroActiveContext",
+ "fields": [
+ {"name": "base", "type": "org.apache.reef.io.watcher.driver.context.AvroContextBase"}
+ ]
+ },
+
+ /**
+ * org.apache.reef.driver.context.ClosedContext
+ */
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.context",
+ "type": "record",
+ "name": "AvroClosedContext",
+ "fields": [
+ {"name": "base", "type": "org.apache.reef.io.watcher.driver.context.AvroContextBase"},
+ {"name": "parentContext", "type": "org.apache.reef.io.watcher.driver.context.AvroActiveContext"}
+ ]
+ },
+
+ /**
+ * org.apache.reef.driver.context.FailedContext
+ */
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.context",
+ "type": "record",
+ "name": "AvroFailedContext",
+ "fields": [
+ {"name": "base", "type": "org.apache.reef.io.watcher.driver.context.AvroContextBase"},
+ {"name": "parentContext", "type": ["org.apache.reef.io.watcher.driver.context.AvroActiveContext", "null"]},
+ {"name": "failure", "type": "org.apache.reef.io.watcher.common.AvroFailure"}
+ ]
+ },
+
+ /**
+ * org.apache.reef.driver.context.ContextMessage
+ */
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.context",
+ "type": "record",
+ "name": "AvroContextMessage",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "messageSourceId", "type": "string"},
+ {"name": "get", "type": ["bytes", "null"]}
+ ]
+ },
+
+ /*********************/
+ /** Task Interfaces **/
+ /*********************/
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.task",
+ "type": "record",
+ "name": "AvroCompletedTask",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "activeContext", "type": "org.apache.reef.io.watcher.driver.context.AvroActiveContext"},
+ {"name": "get", "type": ["bytes", "null"]}
+ ]
+ },
+
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.task",
+ "type": "record",
+ "name": "AvroFailedTask",
+ "fields": [
+ {"name": "failure", "type": "org.apache.reef.io.watcher.common.AvroFailure"},
+ {"name": "activeContext", "type": ["org.apache.reef.io.watcher.driver.context.AvroActiveContext", "null"]}
+ ]
+ },
+
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.task",
+ "type": "record",
+ "name": "AvroRunningTask",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "activeContext", "type": "org.apache.reef.io.watcher.driver.context.AvroActiveContext"}
+ ]
+ },
+
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.task",
+ "type": "record",
+ "name": "AvroSuspendedTask",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "activeContext", "type": "org.apache.reef.io.watcher.driver.context.AvroActiveContext"},
+ {"name": "get", "type": ["bytes", "null"]}
+ ]
+ },
+
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.task",
+ "type": "record",
+ "name": "AvroTaskMessage",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "contextId", "type": "string"},
+ {"name": "messageSourceId", "type": "string"},
+ {"name": "get", "type": ["bytes", "null"]}
+ ]
+ },
+
+ /**************************/
+ /** Evaluator Interfaces **/
+ /**************************/
+
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.evaluator",
+ "type": "record",
+ "name": "AvroAllocatedEvaluator",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "evaluatorDescriptor", "type": "org.apache.reef.io.watcher.driver.evaluator.AvroEvaluatorDescriptor"}
+ ]
+ },
+
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.evaluator",
+ "type": "record",
+ "name": "AvroCompletedEvaluator",
+ "fields": [
+ {"name": "id", "type": "string"}
+ ]
+ },
+
+ {
+ "namespace": "org.apache.reef.io.watcher.driver.evaluator",
+ "type": "record",
+ "name": "AvroFailedEvaluator",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "evaluatorException", "type": "string"},
+ {
+ "name": "failedContextList", "type": {
+ "type": "array",
+ "items": "org.apache.reef.io.watcher.driver.context.AvroFailedContext"
+ }
+ },
+ {"name": "failedTask", "type": ["org.apache.reef.io.watcher.driver.task.AvroFailedTask", "null"]}
+ ]
+ }
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/EventStream.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/EventStream.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/EventStream.java
new file mode 100644
index 0000000..18c5fff
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/EventStream.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+/**
+ * Write events to a certain destination.
+ */
+@Unstable
+@DefaultImplementation(LogEventStream.class)
+public interface EventStream {
+
+ /**
+ * Write an eventRecord with specific type. This should be thread-safe
+ * since multiple event handlers can concurrently call the method.
+ *
+ * @param type a event type
+ * @param jsonEncodedEvent an event encoded as json
+ */
+ void onEvent(EventType type, String jsonEncodedEvent);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/EventType.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/EventType.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/EventType.java
new file mode 100644
index 0000000..d753724
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/EventType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher;
+
+import org.apache.reef.annotations.Unstable;
+
+/**
+ * Event types that Watcher subscribes.
+ */
+@Unstable
+public enum EventType {
+ RuntimeStart,
+ StartTime,
+ StopTime,
+ RuntimeStop,
+ ActiveContext,
+ ClosedContext,
+ FailedContext,
+ CompletedTask,
+ FailedTask,
+ RunningTask,
+ TaskMessage,
+ SuspendedTask,
+ AllocatedEvaluator,
+ FailedEvaluator,
+ CompletedEvaluator
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/FileEventStream.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/FileEventStream.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/FileEventStream.java
new file mode 100644
index 0000000..e50691a
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/FileEventStream.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.io.watcher.util.RunnableExecutingHandler;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EStage;
+import org.apache.reef.wake.impl.ThreadPoolStage;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * Write events to a file in the root directory of the driver.
+ */
+@Unstable
+public final class FileEventStream implements EventStream {
+
+ private final DateFormat dateFormat;
+ private final PrintWriter printWriter;
+ private final EStage<Runnable> singleThreadedExecutor;
+
+ @Inject
+ private FileEventStream(@Parameter(Path.class) final String path) {
+ this.dateFormat = new SimpleDateFormat("[yyyy.MM.dd HH:mm:ss.SSSS]");
+ this.singleThreadedExecutor = new ThreadPoolStage<>(new RunnableExecutingHandler(), 1);
+
+ try {
+ final OutputStreamWriter writer = new OutputStreamWriter(
+ new FileOutputStream(createFileWithPath(path)), Charset.forName("UTF-8"));
+ this.printWriter = new PrintWriter(writer);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private File createFileWithPath(final String path) throws Exception {
+ final File file = new File(path);
+ final File parent = file.getParentFile();
+ if (parent != null && !parent.exists()){
+ parent.mkdirs();
+ }
+
+ file.createNewFile();
+ return file;
+ }
+
+ @Override
+ public void onEvent(final EventType type, final String jsonEncodedEvent) {
+ final long timestamp = System.currentTimeMillis();
+ singleThreadedExecutor.onNext(new Runnable() {
+ @Override
+ public void run() {
+ final String eventDescription = new StringBuilder()
+ .append(dateFormat.format(new Date(timestamp)))
+ .append(" [")
+ .append(type)
+ .append("] ")
+ .append(jsonEncodedEvent)
+ .toString();
+
+ printWriter.println(eventDescription);
+
+ if (type == EventType.RuntimeStop) {
+ onRuntimeStop();
+ }
+ }
+ });
+ }
+
+ private void onRuntimeStop() {
+ printWriter.flush();
+ printWriter.close();
+ }
+
+ @NamedParameter(doc = "The relative path of the reporting file.", default_value = "watcher_report.txt")
+ public static final class Path implements Name<String> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/LogEventStream.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/LogEventStream.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/LogEventStream.java
new file mode 100644
index 0000000..bde190c
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/LogEventStream.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher;
+
+import org.apache.reef.annotations.Unstable;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Write events to driver.err using Logger.
+ */
+@Unstable
+public final class LogEventStream implements EventStream {
+
+ private static final Logger LOG = Logger.getLogger(LogEventStream.class.getName());
+
+ @Inject
+ private LogEventStream() {
+ }
+
+ @Override
+ public void onEvent(final EventType type, final String jsonEncodedEvent) {
+ LOG.log(Level.INFO, "[{0}] {1}", new Object[]{type, jsonEncodedEvent});
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/Watcher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/Watcher.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/Watcher.java
new file mode 100644
index 0000000..4a60d93
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/Watcher.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.io.watcher.param.EventStreams;
+import org.apache.reef.io.watcher.util.WatcherAvroUtil;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+import org.apache.reef.wake.time.runtime.event.RuntimeStart;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import javax.inject.Inject;
+import java.util.Set;
+
+/**
+ * Subscribe events and transfer them as wrapping with corresponding avro classes.
+ */
+@Unstable
+@Unit
+public final class Watcher {
+
+ private final Set<EventStream> eventStreamSet;
+
+ @Inject
+ private Watcher(@Parameter(EventStreams.class) final Set<EventStream> eventStreamSet) {
+ this.eventStreamSet = eventStreamSet;
+ }
+
+ private void onEvent(final EventType eventType, final String jsonEncodedEvent) {
+ for (final EventStream eventStream : eventStreamSet) {
+ eventStream.onEvent(eventType, jsonEncodedEvent);
+ }
+ }
+
+ public final class DriverRuntimeStartHandler implements EventHandler<RuntimeStart> {
+
+ @Override
+ public void onNext(final RuntimeStart runtimeStart) {
+ onEvent(EventType.RuntimeStart, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroRuntimeStart(runtimeStart)));
+ }
+ }
+
+ public final class DriverStartHandler implements EventHandler<StartTime> {
+
+ @Override
+ public void onNext(final StartTime startTime) {
+ onEvent(EventType.StartTime, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroStartTime(startTime)));
+ }
+ }
+
+ public final class DriverStopHandler implements EventHandler<StopTime> {
+
+ @Override
+ public void onNext(final StopTime stopTime) {
+ onEvent(EventType.StopTime, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroStopTime(stopTime)));
+ }
+ }
+
+ public final class DriverRuntimeStopHandler implements EventHandler<RuntimeStop> {
+
+ @Override
+ public void onNext(final RuntimeStop runtimeStop) {
+ onEvent(EventType.RuntimeStop, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroRuntimeStop(runtimeStop)));
+ }
+ }
+
+ public final class ContextActiveHandler implements EventHandler<ActiveContext> {
+
+ @Override
+ public void onNext(final ActiveContext activeContext) {
+ onEvent(EventType.ActiveContext, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroActiveContext(activeContext)));
+ }
+ }
+
+ public final class ContextClosedHandler implements EventHandler<ClosedContext> {
+
+ @Override
+ public void onNext(final ClosedContext closedContext) {
+ onEvent(EventType.ClosedContext, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroClosedContext(closedContext)));
+ }
+ }
+
+ public final class ContextFailedHandler implements EventHandler<FailedContext> {
+
+ @Override
+ public void onNext(final FailedContext failedContext) {
+ onEvent(EventType.FailedContext, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroFailedContext(failedContext)));
+ }
+ }
+
+ public final class TaskCompletedHandler implements EventHandler<CompletedTask> {
+
+ @Override
+ public void onNext(final CompletedTask completedTask) {
+ onEvent(EventType.CompletedTask, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroCompletedTask(completedTask)));
+ }
+ }
+
+ public final class TaskFailedHandler implements EventHandler<FailedTask> {
+
+ @Override
+ public void onNext(final FailedTask failedTask) {
+ onEvent(EventType.FailedTask, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroFailedTask(failedTask)));
+ }
+ }
+
+ public final class TaskRunningHandler implements EventHandler<RunningTask> {
+
+ @Override
+ public void onNext(final RunningTask runningTask) {
+ onEvent(EventType.RunningTask, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroRunningTask(runningTask)));
+ }
+ }
+
+ public final class TaskMessageHandler implements EventHandler<TaskMessage> {
+
+ @Override
+ public void onNext(final TaskMessage taskMessage) {
+ onEvent(EventType.TaskMessage, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroTaskMessage(taskMessage)));
+ }
+ }
+
+ public final class TaskSuspendedHandler implements EventHandler<SuspendedTask> {
+
+ @Override
+ public void onNext(final SuspendedTask suspendedTask) {
+ onEvent(EventType.SuspendedTask, WatcherAvroUtil.toString(WatcherAvroUtil.toAvroSuspendedTask(suspendedTask)));
+ }
+ }
+
+ public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+
+ @Override
+ public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+ onEvent(EventType.AllocatedEvaluator,
+ WatcherAvroUtil.toString(WatcherAvroUtil.toAvroAllocatedEvaluator(allocatedEvaluator)));
+ }
+ }
+
+ public final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
+
+ @Override
+ public void onNext(final FailedEvaluator failedEvaluator) {
+ onEvent(EventType.FailedEvaluator,
+ WatcherAvroUtil.toString(WatcherAvroUtil.toAvroFailedEvaluator(failedEvaluator)));
+ }
+ }
+
+ public final class EvaluatorCompletedHandler implements EventHandler<CompletedEvaluator> {
+
+ @Override
+ public void onNext(final CompletedEvaluator completedEvaluator) {
+ onEvent(EventType.CompletedEvaluator,
+ WatcherAvroUtil.toString(WatcherAvroUtil.toAvroCompletedEvaluator(completedEvaluator)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/WatcherConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/WatcherConfiguration.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/WatcherConfiguration.java
new file mode 100644
index 0000000..2f2ee02
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/WatcherConfiguration.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.io.watcher.param.EventStreams;
+import org.apache.reef.tang.formats.*;
+import org.apache.reef.wake.time.Clock;
+
+/**
+ * ConfigurationModule for Watcher.
+ */
+@Unstable
+public final class WatcherConfiguration extends ConfigurationModuleBuilder {
+
+ /**
+ * Types of EventStream where events subscribed by Watcher will be reported.
+ */
+ public static final OptionalImpl<EventStream> EVENT_STREAMS = new OptionalImpl<>();
+
+ public static final ConfigurationModule CONF = new WatcherConfiguration()
+ .bindSetEntry(EventStreams.class, EVENT_STREAMS)
+ .bindSetEntry(Clock.RuntimeStartHandler.class, Watcher.DriverRuntimeStartHandler.class)
+ .bindSetEntry(Clock.StartHandler.class, Watcher.DriverStartHandler.class)
+ .bindSetEntry(Clock.StopHandler.class, Watcher.DriverStopHandler.class)
+ .bindSetEntry(Clock.RuntimeStopHandler.class, Watcher.DriverRuntimeStopHandler.class)
+ .bindSetEntry(ServiceContextActiveHandlers.class, Watcher.ContextActiveHandler.class)
+ .bindSetEntry(ServiceContextClosedHandlers.class, Watcher.ContextClosedHandler.class)
+ .bindSetEntry(ServiceContextFailedHandlers.class, Watcher.ContextFailedHandler.class)
+ .bindSetEntry(ServiceTaskFailedHandlers.class, Watcher.TaskFailedHandler.class)
+ .bindSetEntry(ServiceTaskCompletedHandlers.class, Watcher.TaskCompletedHandler.class)
+ .bindSetEntry(ServiceTaskMessageHandlers.class, Watcher.TaskMessageHandler.class)
+ .bindSetEntry(ServiceTaskRunningHandlers.class, Watcher.TaskRunningHandler.class)
+ .bindSetEntry(ServiceTaskSuspendedHandlers.class, Watcher.TaskSuspendedHandler.class)
+ .bindSetEntry(ServiceEvaluatorAllocatedHandlers.class, Watcher.EvaluatorAllocatedHandler.class)
+ .bindSetEntry(ServiceEvaluatorFailedHandlers.class, Watcher.EvaluatorFailedHandler.class)
+ .bindSetEntry(ServiceEvaluatorCompletedHandlers.class, Watcher.EvaluatorCompletedHandler.class)
+ .build();
+
+ /**
+ * Construct a WatcherConfiguration.
+ */
+ private WatcherConfiguration() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/package-info.java
new file mode 100644
index 0000000..63bea01
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Watcher subscribes events and writes them to the destination using a certain EventStream.
+ */
+package org.apache.reef.io.watcher;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/param/EventStreams.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/param/EventStreams.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/param/EventStreams.java
new file mode 100644
index 0000000..6160c90
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/param/EventStreams.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher.param;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.io.watcher.EventStream;
+import org.apache.reef.io.watcher.LogEventStream;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+import java.util.Set;
+
+/**
+ * Set of EventStreams.
+ */
+@Private
+@Unstable
+@NamedParameter(default_classes = {LogEventStream.class})
+public final class EventStreams implements Name<Set<EventStream>> {
+
+ /**
+ * This class should not be instantiated.
+ */
+ private EventStreams() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/param/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/param/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/param/package-info.java
new file mode 100644
index 0000000..57106fe
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/param/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Parameters for Watcher.
+ */
+package org.apache.reef.io.watcher.param;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/RunnableExecutingHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/RunnableExecutingHandler.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/RunnableExecutingHandler.java
new file mode 100644
index 0000000..ddb8acd
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/RunnableExecutingHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher.util;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.wake.EventHandler;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Handler that executes Runnable commands.
+ */
+@Unstable
+public final class RunnableExecutingHandler implements EventHandler<Runnable> {
+
+ private static final Logger LOG = Logger.getLogger(RunnableExecutingHandler.class.getName());
+
+ @Override
+ public void onNext(final Runnable runnable) {
+ try {
+ runnable.run();
+ } catch(final Throwable exception) {
+ LOG.log(Level.INFO, "An exception occurred while writing event with Watcher. {0}", exception.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java
new file mode 100644
index 0000000..3fd6ffc
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.io.watcher.util;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.common.Failure;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.catalog.RackDescriptor;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextBase;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.*;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.io.watcher.common.AvroFailure;
+import org.apache.reef.io.watcher.driver.catalog.AvroNodeDescriptor;
+import org.apache.reef.io.watcher.driver.catalog.AvroNodeDescriptorInRackDescriptor;
+import org.apache.reef.io.watcher.driver.catalog.AvroRackDescriptor;
+import org.apache.reef.io.watcher.driver.context.AvroActiveContext;
+import org.apache.reef.io.watcher.driver.context.AvroClosedContext;
+import org.apache.reef.io.watcher.driver.context.AvroContextBase;
+import org.apache.reef.io.watcher.driver.context.AvroFailedContext;
+import org.apache.reef.io.watcher.driver.evaluator.*;
+import org.apache.reef.io.watcher.driver.task.*;
+import org.apache.reef.io.watcher.wake.time.event.AvroStartTime;
+import org.apache.reef.io.watcher.wake.time.event.AvroStopTime;
+import org.apache.reef.io.watcher.wake.time.runtime.event.AvroRuntimeStart;
+import org.apache.reef.io.watcher.wake.time.runtime.event.AvroRuntimeStop;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.event.StopTime;
+import org.apache.reef.wake.time.runtime.event.RuntimeStart;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+@Private
+@Unstable
+public final class WatcherAvroUtil {
+
+ public static AvroFailure toAvroFailure(final Failure failure) {
+ final String reason;
+ if (failure.getReason().isPresent()) {
+ reason = convertThrowableToString(failure.getReason().get());
+ } else {
+ reason = null;
+ }
+
+ return AvroFailure.newBuilder()
+ .setAsError(convertThrowableToString(failure.asError()))
+ .setData(unwrapOptionalByteArray(failure.getData()))
+ .setDescription(failure.getDescription().orElse(null))
+ .setId(failure.getId())
+ .setMessage(failure.getMessage())
+ .setReason(reason)
+ .build();
+ }
+
+ public static AvroNodeDescriptorInRackDescriptor toAvroNodeDescriptorInRackDescriptor(
+ final String id, final String name, final InetSocketAddress inetSocketAddress) {
+ return AvroNodeDescriptorInRackDescriptor.newBuilder()
+ .setInetSocketAddress(inetSocketAddress.toString())
+ .setId(id)
+ .setName(name)
+ .build();
+ }
+
+ public static AvroRackDescriptor toAvroRackDescriptor(final RackDescriptor rackDescriptor) {
+ final List<AvroNodeDescriptorInRackDescriptor> nodeDescriptorList = new ArrayList<>();
+ for (final NodeDescriptor nodeDescriptor : rackDescriptor.getNodes()) {
+ nodeDescriptorList.add(
+ toAvroNodeDescriptorInRackDescriptor(
+ nodeDescriptor.getId(), nodeDescriptor.getName(), nodeDescriptor.getInetSocketAddress()
+ )
+ );
+ }
+
+ return AvroRackDescriptor.newBuilder()
+ .setNodes(nodeDescriptorList)
+ .setName(rackDescriptor.getName())
+ .build();
+ }
+
+ public static AvroNodeDescriptor toAvroNodeDescriptor(final NodeDescriptor nodeDescriptor) {
+ return AvroNodeDescriptor.newBuilder()
+ .setId(nodeDescriptor.getId())
+ .setName(nodeDescriptor.getName())
+ .setInetSocketAddress(nodeDescriptor.getInetSocketAddress().toString())
+ .setRackDescriptor(toAvroRackDescriptor(nodeDescriptor.getRackDescriptor()))
+ .build();
+ }
+
+ public static AvroEvaluatorType toAvroEvaluatorType(final EvaluatorType evaluatorType) {
+ switch (evaluatorType) {
+ case JVM: return AvroEvaluatorType.JVM;
+ case CLR: return AvroEvaluatorType.CLR;
+ case UNDECIDED: return AvroEvaluatorType.UNDECIDED;
+ default: throw new RuntimeException(evaluatorType + " is not defined for AvroEvaluatorType.");
+ }
+ }
+
+ public static AvroEvaluatorProcess toAvroEvaluatorProcess(final EvaluatorProcess evaluatorProcess) {
+ final List<CharSequence> commandLines = new ArrayList<>();
+ for (final String commandLine : evaluatorProcess.getCommandLine()) {
+ commandLines.add(commandLine);
+ }
+
+ return AvroEvaluatorProcess.newBuilder()
+ .setCommandLines(commandLines)
+ .setEvaluatorType(toAvroEvaluatorType(evaluatorProcess.getType()))
+ .setIsOptionSet(evaluatorProcess.isOptionSet())
+ .build();
+ }
+
+ public static AvroEvaluatorDescriptor toAvroEvaluatorDescriptor(final EvaluatorDescriptor evaluatorDescriptor) {
+ return AvroEvaluatorDescriptor.newBuilder()
+ .setMemory(evaluatorDescriptor.getMemory())
+ .setNodeDescriptor(toAvroNodeDescriptor(evaluatorDescriptor.getNodeDescriptor()))
+ .setNumberOfCores(evaluatorDescriptor.getNumberOfCores())
+ .setProcess(toAvroEvaluatorProcess(evaluatorDescriptor.getProcess()))
+ .build();
+ }
+
+ public static AvroRuntimeStart toAvroRuntimeStart(final RuntimeStart runtimeStart) {
+ return AvroRuntimeStart.newBuilder()
+ .setTimestamp(runtimeStart.getTimeStamp())
+ .build();
+ }
+
+ public static AvroStartTime toAvroStartTime(final StartTime startTime) {
+ return AvroStartTime.newBuilder()
+ .setTimestamp(startTime.getTimeStamp())
+ .build();
+ }
+
+ public static AvroStopTime toAvroStopTime(final StopTime stopTime) {
+ return AvroStopTime.newBuilder()
+ .setTimestamp(stopTime.getTimeStamp())
+ .build();
+ }
+
+ public static AvroRuntimeStop toAvroRuntimeStop(final RuntimeStop runtimeStop) {
+ return AvroRuntimeStop.newBuilder()
+ .setException(convertThrowableToString(runtimeStop.getException()))
+ .setTimestamp(runtimeStop.getTimeStamp())
+ .build();
+ }
+
+ public static AvroContextBase toAvroContextBase(final ContextBase contextBase) {
+ return AvroContextBase.newBuilder()
+ .setEvaluatorDescriptor(null)
+ .setEvaluatorId(contextBase.getEvaluatorId())
+ .setId(contextBase.getId())
+ .setParentId(contextBase.getParentId().orElse(null))
+ .build();
+ }
+
+ public static AvroActiveContext toAvroActiveContext(final ActiveContext activeContext) {
+ return AvroActiveContext.newBuilder()
+ .setBase(toAvroContextBase(activeContext))
+ .build();
+ }
+
+ public static AvroClosedContext toAvroClosedContext(final ClosedContext closedContext) {
+ return AvroClosedContext.newBuilder()
+ .setBase(toAvroContextBase(closedContext))
+ .setParentContext(toAvroActiveContext(closedContext.getParentContext()))
+ .build();
+ }
+
+ public static AvroFailedContext toAvroFailedContext(final FailedContext failedContext) {
+ return AvroFailedContext.newBuilder()
+ .setBase(toAvroContextBase(failedContext))
+ .setParentContext(unwrapOptionalActiveContext(failedContext.getParentContext()))
+ .setFailure(toAvroFailure(failedContext))
+ .build();
+ }
+
+ public static AvroCompletedTask toAvroCompletedTask(final CompletedTask completedTask) {
+ return AvroCompletedTask.newBuilder()
+ .setId(completedTask.getId())
+ .setActiveContext(toAvroActiveContext(completedTask.getActiveContext()))
+ .setGet(wrapNullableByteArray(completedTask.get()))
+ .build();
+ }
+
+ public static AvroFailedTask toAvroFailedTask(final FailedTask failedTask) {
+ return AvroFailedTask.newBuilder()
+ .setActiveContext(unwrapOptionalActiveContext(failedTask.getActiveContext()))
+ .setFailure(toAvroFailure(failedTask))
+ .build();
+ }
+
+ public static AvroRunningTask toAvroRunningTask(final RunningTask runningTask) {
+ return AvroRunningTask.newBuilder()
+ .setActiveContext(toAvroActiveContext(runningTask.getActiveContext()))
+ .setId(runningTask.getId())
+ .build();
+ }
+
+ public static AvroTaskMessage toAvroTaskMessage(final TaskMessage taskMessage) {
+ return AvroTaskMessage.newBuilder()
+ .setId(taskMessage.getId())
+ .setContextId(taskMessage.getContextId())
+ .setMessageSourceId(taskMessage.getMessageSourceID())
+ .setGet(wrapNullableByteArray(taskMessage.get()))
+ .build();
+ }
+
+ public static AvroSuspendedTask toAvroSuspendedTask(final SuspendedTask suspendedTask) {
+ return AvroSuspendedTask.newBuilder()
+ .setGet(wrapNullableByteArray(suspendedTask.get()))
+ .setId(suspendedTask.getId())
+ .setActiveContext(toAvroActiveContext(suspendedTask.getActiveContext()))
+ .build();
+ }
+
+ public static AvroAllocatedEvaluator toAvroAllocatedEvaluator(final AllocatedEvaluator allocatedEvaluator) {
+ return AvroAllocatedEvaluator.newBuilder()
+ .setId(allocatedEvaluator.getId())
+ .setEvaluatorDescriptor(toAvroEvaluatorDescriptor(allocatedEvaluator.getEvaluatorDescriptor()))
+ .build();
+ }
+
+ public static AvroFailedEvaluator toAvroFailedEvaluator(final FailedEvaluator failedEvaluator) {
+ final AvroFailedTask avroFailedTask;
+ if (failedEvaluator.getFailedTask().isPresent()) {
+ avroFailedTask = toAvroFailedTask(failedEvaluator.getFailedTask().get());
+ } else {
+ avroFailedTask = null;
+ }
+
+ final List<AvroFailedContext> avroFailedContextList = new ArrayList<>();
+ for (final FailedContext failedContext : failedEvaluator.getFailedContextList()) {
+ avroFailedContextList.add(toAvroFailedContext(failedContext));
+ }
+
+ return AvroFailedEvaluator.newBuilder()
+ .setId(failedEvaluator.getId())
+ .setEvaluatorException(convertThrowableToString(failedEvaluator.getEvaluatorException()))
+ .setFailedContextList(avroFailedContextList)
+ .setFailedTask(avroFailedTask)
+ .build();
+ }
+
+ public static AvroCompletedEvaluator toAvroCompletedEvaluator(final CompletedEvaluator completedEvaluator) {
+ return AvroCompletedEvaluator.newBuilder()
+ .setId(completedEvaluator.getId())
+ .build();
+ }
+
+ public static String toString(final SpecificRecord record) {
+ final String jsonEncodedRecord;
+ try {
+ final Schema schema = record.getSchema();
+ final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ final Encoder encoder = EncoderFactory.get().jsonEncoder(schema, bos);
+ final SpecificDatumWriter datumWriter = new SpecificDatumWriter(record.getClass());
+ datumWriter.write(record, encoder);
+ encoder.flush();
+ jsonEncodedRecord = new String(bos.toByteArray(), Charset.forName("UTF-8"));
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ return jsonEncodedRecord;
+ }
+
+ private static AvroActiveContext unwrapOptionalActiveContext(final Optional<ActiveContext> optionalActiveContext) {
+ if (optionalActiveContext.isPresent()) {
+ return toAvroActiveContext(optionalActiveContext.get());
+ }
+
+ return null;
+ }
+
+ private static String convertThrowableToString(final Throwable throwable) {
+ if (throwable != null) {
+ return throwable.toString();
+ }
+
+ return null;
+ }
+
+ private static ByteBuffer wrapNullableByteArray(final byte[] data) {
+ if (data != null) {
+ return ByteBuffer.wrap(data);
+ }
+
+ return null;
+ }
+
+ private static ByteBuffer unwrapOptionalByteArray(final Optional<byte[]> optionalByteArray) {
+ if (optionalByteArray.isPresent()) {
+ return ByteBuffer.wrap(optionalByteArray.get());
+ }
+
+ return null;
+ }
+
+ /**
+ * Empty private constructor to prohibit instantiation of utility class.
+ */
+ private WatcherAvroUtil() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/package-info.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/package-info.java
new file mode 100644
index 0000000..e8df30f
--- /dev/null
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Utility classes for Watcher.
+ */
+package org.apache.reef.io.watcher.util;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/pom.xml b/lang/java/reef-tests/pom.xml
index 1c46186..832c345 100644
--- a/lang/java/reef-tests/pom.xml
+++ b/lang/java/reef-tests/pom.xml
@@ -66,6 +66,11 @@ under the License.
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
+ <artifactId>reef-io</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
<artifactId>vortex</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/FailedContextHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/FailedContextHandler.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/FailedContextHandler.java
new file mode 100644
index 0000000..808a7d8
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/FailedContextHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.watcher;
+
+import org.apache.reef.evaluator.context.events.ContextStart;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+public final class FailedContextHandler implements EventHandler<ContextStart> {
+
+ @Inject
+ private FailedContextHandler() {
+ throw new RuntimeException();
+ }
+
+ @Override
+ public void onNext(final ContextStart value) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/FailedTaskStartHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/FailedTaskStartHandler.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/FailedTaskStartHandler.java
new file mode 100644
index 0000000..633a76b
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/FailedTaskStartHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.watcher;
+
+import org.apache.reef.task.events.TaskStart;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+public final class FailedTaskStartHandler implements EventHandler<TaskStart> {
+
+ @Inject
+ private FailedTaskStartHandler() {
+ throw new RuntimeException();
+ }
+
+ @Override
+ public void onNext(final TaskStart value) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/IsTaskSuspended.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/IsTaskSuspended.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/IsTaskSuspended.java
new file mode 100644
index 0000000..a40efb7
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/IsTaskSuspended.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.watcher;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(default_value = "false")
+public final class IsTaskSuspended implements Name<Boolean> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/TestEventStream.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/TestEventStream.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/TestEventStream.java
new file mode 100644
index 0000000..8917e4b
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/TestEventStream.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.watcher;
+
+import org.apache.reef.io.watcher.EventStream;
+import org.apache.reef.io.watcher.EventType;
+
+import javax.inject.Inject;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class TestEventStream implements EventStream {
+
+ private Map<EventType, AtomicInteger> eventCounter;
+
+ @Inject
+ private TestEventStream(){
+ this.eventCounter = new HashMap<>();
+ for (final EventType type : EventType.values()) {
+ eventCounter.put(type, new AtomicInteger());
+ }
+ }
+
+ @Override
+ public void onEvent(final EventType type, final String jsonEncodedEvent) {
+ eventCounter.get(type).incrementAndGet();
+ }
+
+ private void checkEqualTo(final EventType type, final int expectedNum) {
+ final int actualNum = eventCounter.get(type).get();
+ if (actualNum != expectedNum) {
+ throw new RuntimeException("The expected number of " + type + " is "
+ + expectedNum + " but " + actualNum + " times occurred");
+ }
+ }
+
+ private void checkGreaterThan(final EventType type, final int num) {
+ final int actualNum = eventCounter.get(type).get();
+ if (actualNum < num) {
+ throw new RuntimeException("The number of event " + type + " should be greater than " + num
+ + " but " + actualNum + " times occurred");
+ }
+ }
+
+ /**
+ * This validation is called in WatcherTestDriver#RuntimeStopHandler, so RuntimeStop should not be guaranteed
+ * to be called before this.
+ */
+ public void validate() {
+ checkEqualTo(EventType.RuntimeStart, 1);
+ checkEqualTo(EventType.StartTime, 1);
+ checkEqualTo(EventType.AllocatedEvaluator, 2);
+ checkEqualTo(EventType.FailedEvaluator, 1);
+ checkEqualTo(EventType.ActiveContext, 2);
+ checkEqualTo(EventType.FailedContext, 1);
+ checkEqualTo(EventType.FailedTask, 1);
+ checkEqualTo(EventType.RunningTask, 2);
+ checkEqualTo(EventType.SuspendedTask, 1);
+ checkGreaterThan(EventType.TaskMessage, 0);
+ checkEqualTo(EventType.CompletedTask, 1);
+ checkEqualTo(EventType.ClosedContext, 1);
+ checkEqualTo(EventType.CompletedEvaluator, 1);
+ checkEqualTo(EventType.StopTime, 1);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/WatcherTestDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/WatcherTestDriver.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/WatcherTestDriver.java
new file mode 100644
index 0000000..70059f7
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/WatcherTestDriver.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.watcher;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorRequest;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.driver.task.SuspendedTask;
+import org.apache.reef.driver.task.TaskConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Unit
+public final class WatcherTestDriver {
+
+ private static final String ROOT_CONTEXT_ID = "ROOT_CONTEXT";
+ private static final String FIRST_CONTEXT_ID = "FIRST_CONTEXT";
+
+ private final EvaluatorRequestor evaluatorRequestor;
+ private final TestEventStream testEventStream;
+
+ /**
+ * The first evaluator will be failed to generate FailedEvaluator.
+ */
+ private final AtomicBoolean isFirstEvaluator;
+
+ /**
+ * The first task will be suspended to generate SuspendedTask.
+ */
+ private final AtomicBoolean isFirstTask;
+
+ @Inject
+ private WatcherTestDriver(final EvaluatorRequestor evaluatorRequestor,
+ final TestEventStream testEventStream) {
+ this.evaluatorRequestor = evaluatorRequestor;
+ this.testEventStream = testEventStream;
+ this.isFirstEvaluator = new AtomicBoolean(true);
+ this.isFirstTask = new AtomicBoolean(true);
+ }
+
+ public final class DriverStartedHandler implements EventHandler<StartTime> {
+
+ @Override
+ public void onNext(final StartTime value) {
+ evaluatorRequestor.submit(EvaluatorRequest
+ .newBuilder()
+ .setMemory(64)
+ .setNumberOfCores(1)
+ .setNumber(2)
+ .build());
+ }
+ }
+
+ public final class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
+
+ @Override
+ public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+ if (isFirstEvaluator.compareAndSet(true, false)) {
+ allocatedEvaluator.submitContext(getFailedContextConfiguration());
+ } else {
+ allocatedEvaluator.submitContext(ContextConfiguration.CONF
+ .set(ContextConfiguration.IDENTIFIER, ROOT_CONTEXT_ID)
+ .build());
+ }
+ }
+ }
+
+ public final class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
+
+ @Override
+ public void onNext(final FailedEvaluator failedEvaluator) {
+ // no-op
+ }
+ }
+
+ public final class ContextActivatedHandler implements EventHandler<ActiveContext> {
+
+ @Override
+ public void onNext(final ActiveContext activeContext) {
+ if (activeContext.getId().equals(ROOT_CONTEXT_ID)) {
+ activeContext.submitContext(ContextConfiguration.CONF
+ .set(ContextConfiguration.IDENTIFIER, FIRST_CONTEXT_ID)
+ .build());
+ } else if (activeContext.getId().equals(FIRST_CONTEXT_ID)) {
+ activeContext.submitContext(getFailedContextConfiguration());
+ }
+ }
+ }
+
+ public final class ContextFailedHandler implements EventHandler<FailedContext> {
+
+ @Override
+ public void onNext(final FailedContext failedContext) {
+ failedContext.getParentContext().get().submitTask(getFailedTaskConfiguration());
+ }
+ }
+
+ public final class TaskRunningHandler implements EventHandler<RunningTask> {
+
+ @Override
+ public void onNext(final RunningTask runningTask) {
+ if (isFirstTask.compareAndSet(true, false)) {
+ runningTask.suspend();
+ }
+ }
+ }
+
+ public final class TaskFailedHandler implements EventHandler<FailedTask> {
+
+ @Override
+ public void onNext(final FailedTask failedTask) {
+ failedTask.getActiveContext().get().submitTask(getTaskConfiguration(true));
+ }
+ }
+
+ public final class TaskSuspendedHandler implements EventHandler<SuspendedTask> {
+
+ @Override
+ public void onNext(final SuspendedTask value) {
+ value.getActiveContext().submitTask(getTaskConfiguration(false));
+ }
+ }
+
+ public final class RuntimeStopHandler implements EventHandler<RuntimeStop> {
+
+ @Override
+ public void onNext(final RuntimeStop runtimeStop) {
+ testEventStream.validate();
+ }
+ }
+
+ private Configuration getTaskConfiguration(final boolean isTaskSuspended) {
+ final Configuration taskConf = TaskConfiguration.CONF
+ .set(TaskConfiguration.TASK, WatcherTestTask.class)
+ .set(TaskConfiguration.IDENTIFIER, "TASK")
+ .set(TaskConfiguration.ON_SEND_MESSAGE, WatcherTestTask.class)
+ .set(TaskConfiguration.ON_SUSPEND, WatcherTestTask.TaskSuspendedHandler.class)
+ .build();
+
+ return Tang.Factory.getTang().newConfigurationBuilder(taskConf)
+ .bindNamedParameter(IsTaskSuspended.class, String.valueOf(isTaskSuspended))
+ .build();
+ }
+
+ private Configuration getFailedTaskConfiguration() {
+ return TaskConfiguration.CONF
+ .set(TaskConfiguration.TASK, WatcherTestTask.class)
+ .set(TaskConfiguration.IDENTIFIER, "FAILED_TASK")
+ .set(TaskConfiguration.ON_TASK_STARTED, FailedTaskStartHandler.class)
+ .build();
+ }
+
+ private Configuration getFailedContextConfiguration() {
+ return ContextConfiguration.CONF
+ .set(ContextConfiguration.IDENTIFIER, "FAILED_CONTEXT")
+ .set(ContextConfiguration.ON_CONTEXT_STARTED, FailedContextHandler.class)
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/WatcherTestTask.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/WatcherTestTask.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/WatcherTestTask.java
new file mode 100644
index 0000000..d581749
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/WatcherTestTask.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.watcher;
+
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.task.HeartBeatTriggerManager;
+import org.apache.reef.task.Task;
+import org.apache.reef.task.TaskMessage;
+import org.apache.reef.task.TaskMessageSource;
+import org.apache.reef.task.events.SuspendEvent;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.nio.charset.Charset;
+
+@Unit
+public final class WatcherTestTask implements Task, TaskMessageSource {
+
+ private final TaskMessage taskMessage;
+ private final HeartBeatTriggerManager heartBeatTriggerManager;
+ private final boolean isTaskSuspended;
+ private boolean isRunning;
+
+ @Inject
+ private WatcherTestTask(final HeartBeatTriggerManager heartBeatTriggerManager,
+ @Parameter(IsTaskSuspended.class) final boolean isTaskSuspended) {
+ this.taskMessage = TaskMessage.from("MESSAGE_SOURCE", "MESSAGE".getBytes(Charset.forName("UTF-8")));
+ this.heartBeatTriggerManager = heartBeatTriggerManager;
+ this.isTaskSuspended = isTaskSuspended;
+ this.isRunning = true;
+ }
+
+ @Override
+ public byte[] call(final byte[] memento) throws Exception {
+ if (isTaskSuspended) {
+ synchronized (this) {
+ while (isRunning) {
+ wait();
+ }
+ }
+ } else {
+ heartBeatTriggerManager.triggerHeartBeat();
+ }
+
+ return null;
+ }
+
+ @Override
+ public Optional<TaskMessage> getMessage() {
+ return Optional.of(taskMessage);
+ }
+
+ public final class TaskSuspendedHandler implements EventHandler<SuspendEvent> {
+
+ @Override
+ public void onNext(final SuspendEvent value) {
+ synchronized (WatcherTestTask.this) {
+ isRunning = false;
+ WatcherTestTask.this.notify();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/package-info.java
new file mode 100644
index 0000000..e0663c7
--- /dev/null
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/watcher/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test reef.io.watcher.
+ */
+package org.apache.reef.tests.watcher;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/test/java/org/apache/reef/tests/watcher/WatcherTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/watcher/WatcherTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/watcher/WatcherTest.java
new file mode 100644
index 0000000..5cd79f2
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/watcher/WatcherTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.watcher;
+
+import org.apache.reef.client.DriverConfiguration;
+import org.apache.reef.client.LauncherStatus;
+import org.apache.reef.io.watcher.LogEventStream;
+import org.apache.reef.io.watcher.WatcherConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Configurations;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tests.TestEnvironment;
+import org.apache.reef.tests.TestEnvironmentFactory;
+import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.wake.time.Clock;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public final class WatcherTest {
+ private final TestEnvironment testEnvironment = TestEnvironmentFactory.getNewTestEnvironment();
+
+ /**
+ * Set up the test environment.
+ */
+ @Before
+ public void setUp() throws Exception {
+ this.testEnvironment.setUp();
+ }
+
+ /**
+ * Tear down the test environment.
+ */
+ @After
+ public void tearDown() throws Exception {
+ this.testEnvironment.tearDown();
+ }
+
+ private Configuration getDriverConfiguration() {
+ final Configuration driverConf = DriverConfiguration.CONF
+ .set(DriverConfiguration.GLOBAL_LIBRARIES, EnvironmentUtils.getClassLocation(this.getClass()))
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, "WatcherTest")
+ .set(DriverConfiguration.ON_DRIVER_STARTED, WatcherTestDriver.DriverStartedHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, WatcherTestDriver.EvaluatorAllocatedHandler.class)
+ .set(DriverConfiguration.ON_EVALUATOR_FAILED, WatcherTestDriver.EvaluatorFailedHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_ACTIVE, WatcherTestDriver.ContextActivatedHandler.class)
+ .set(DriverConfiguration.ON_CONTEXT_FAILED, WatcherTestDriver.ContextFailedHandler.class)
+ .set(DriverConfiguration.ON_TASK_FAILED, WatcherTestDriver.TaskFailedHandler.class)
+ .set(DriverConfiguration.ON_TASK_RUNNING, WatcherTestDriver.TaskRunningHandler.class)
+ .set(DriverConfiguration.ON_TASK_SUSPENDED, WatcherTestDriver.TaskSuspendedHandler.class)
+ .build();
+
+ final Configuration runtimeStopConf = Tang.Factory.getTang().newConfigurationBuilder()
+ .bindSetEntry(Clock.RuntimeStopHandler.class, WatcherTestDriver.RuntimeStopHandler.class)
+ .build();
+
+ return Configurations.merge(driverConf, runtimeStopConf, WatcherConfiguration.CONF
+ .set(WatcherConfiguration.EVENT_STREAMS, LogEventStream.class)
+ .set(WatcherConfiguration.EVENT_STREAMS, TestEventStream.class)
+ .build());
+ }
+
+ @Test
+ public void testEventStream() {
+ final LauncherStatus status = this.testEnvironment.run(getDriverConfiguration());
+ Assert.assertTrue("Job state after execution: " + status, status.isSuccess());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53584db9/lang/java/reef-tests/src/test/java/org/apache/reef/tests/watcher/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/watcher/package-info.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/watcher/package-info.java
new file mode 100644
index 0000000..e0663c7
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/watcher/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Test reef.io.watcher.
+ */
+package org.apache.reef.tests.watcher;