You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2014/10/23 02:04:17 UTC
[46/51] [abbrv] [partial] Initial merge of Wake,
Tang and REEF into one repository and project
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-checkpoint/src/main/java/com/microsoft/reef/io/checkpoint/fs/FSCheckpointService.java
----------------------------------------------------------------------
diff --git a/reef-checkpoint/src/main/java/com/microsoft/reef/io/checkpoint/fs/FSCheckpointService.java b/reef-checkpoint/src/main/java/com/microsoft/reef/io/checkpoint/fs/FSCheckpointService.java
deleted file mode 100644
index 4bb2704..0000000
--- a/reef-checkpoint/src/main/java/com/microsoft/reef/io/checkpoint/fs/FSCheckpointService.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.io.checkpoint.fs;
-
-import com.microsoft.reef.io.checkpoint.CheckpointID;
-import com.microsoft.reef.io.checkpoint.CheckpointNamingService;
-import com.microsoft.reef.io.checkpoint.CheckpointService;
-import com.microsoft.tang.annotations.Name;
-import com.microsoft.tang.annotations.NamedParameter;
-import com.microsoft.tang.annotations.Parameter;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import javax.inject.Inject;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-
-/**
- * A FileSystem based CheckpointService.
- */
-public class FSCheckpointService implements CheckpointService {
-
- @NamedParameter(doc = "The path to be used to store the checkpoints.")
- static class PATH implements Name<String> {
- }
-
- @NamedParameter(doc = "The replication factor to be used for the stored checkpoints", default_value = "3")
- static class REPLICATION_FACTOR implements Name<Short> {
- }
-
- private final Path base;
- private final FileSystem fs;
- private final CheckpointNamingService namingPolicy;
- private final short replication;
-
- @Inject
- FSCheckpointService(final FileSystem fs,
- final @Parameter(PATH.class) String basePath,
- final CheckpointNamingService namingPolicy,
- final @Parameter(REPLICATION_FACTOR.class) short replication) {
- this.fs = fs;
- this.base = new Path(basePath);
- this.namingPolicy = namingPolicy;
- this.replication = replication;
- }
-
- public FSCheckpointService(final FileSystem fs,
- final Path base,
- final CheckpointNamingService namingPolicy,
- final short replication) {
- this.fs = fs;
- this.base = base;
- this.namingPolicy = namingPolicy;
- this.replication = replication;
- }
-
- public CheckpointWriteChannel create()
- throws IOException {
-
- final String name = namingPolicy.getNewName();
-
- final Path p = new Path(name);
- if (p.isUriPathAbsolute()) {
- throw new IOException("Checkpoint cannot be an absolute path");
- }
- return createInternal(new Path(base, p));
- }
-
- CheckpointWriteChannel createInternal(Path name) throws IOException {
-
- //create a temp file, fail if file exists
- return new FSCheckpointWriteChannel(name, fs.create(tmpfile(name), replication));
- }
-
- private static class FSCheckpointWriteChannel
- implements CheckpointWriteChannel {
- private boolean isOpen = true;
- private final Path finalDst;
- private final WritableByteChannel out;
-
- FSCheckpointWriteChannel(final Path finalDst, final FSDataOutputStream out) {
- this.finalDst = finalDst;
- this.out = Channels.newChannel(out);
- }
-
- public int write(final ByteBuffer b) throws IOException {
- return out.write(b);
- }
-
- public Path getDestination() {
- return finalDst;
- }
-
- @Override
- public void close() throws IOException {
- isOpen = false;
- out.close();
- }
-
- @Override
- public boolean isOpen() {
- return isOpen;
- }
-
- }
-
- @Override
- public CheckpointReadChannel open(final CheckpointID id)
- throws IOException, InterruptedException {
- if (!(id instanceof FSCheckpointID)) {
- throw new IllegalArgumentException(
- "Mismatched checkpoint type: " + id.getClass());
- }
- return new FSCheckpointReadChannel(
- fs.open(((FSCheckpointID) id).getPath()));
- }
-
- private static class FSCheckpointReadChannel
- implements CheckpointReadChannel {
-
- private boolean isOpen = true;
- private final ReadableByteChannel in;
-
- FSCheckpointReadChannel(final FSDataInputStream in) {
- this.in = Channels.newChannel(in);
- }
-
- @Override
- public int read(final ByteBuffer bb) throws IOException {
- return in.read(bb);
- }
-
- @Override
- public void close() throws IOException {
- isOpen = false;
- in.close();
- }
-
- @Override
- public boolean isOpen() {
- return isOpen;
- }
-
- }
-
- @Override
- public CheckpointID commit(final CheckpointWriteChannel ch) throws IOException,
- InterruptedException {
- if (ch.isOpen()) {
- ch.close();
- }
- final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch;
- final Path dst = hch.getDestination();
- if (!fs.rename(tmpfile(dst), dst)) {
- // attempt to clean up
- abort(ch);
- throw new IOException("Failed to promote checkpoint" +
- tmpfile(dst) + " -> " + dst);
- }
- return new FSCheckpointID(hch.getDestination());
- }
-
- @Override
- public void abort(final CheckpointWriteChannel ch) throws IOException {
- if (ch.isOpen()) {
- ch.close();
- }
- final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch;
- final Path tmp = tmpfile(hch.getDestination());
- try {
- if (!fs.delete(tmp, false)) {
- throw new IOException("Failed to delete checkpoint during abort");
- }
- } catch (FileNotFoundException e) {
- // IGNORE
- }
- }
-
- @Override
- public boolean delete(final CheckpointID id) throws IOException,
- InterruptedException {
- if (!(id instanceof FSCheckpointID)) {
- throw new IllegalArgumentException(
- "Mismatched checkpoint type: " + id.getClass());
- }
- Path tmp = ((FSCheckpointID) id).getPath();
- try {
- return fs.delete(tmp, false);
- } catch (FileNotFoundException e) {
- // IGNORE
- }
- return true;
- }
-
- static final Path tmpfile(final Path p) {
- return new Path(p.getParent(), p.getName() + ".tmp");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java
----------------------------------------------------------------------
diff --git a/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java b/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java
new file mode 100644
index 0000000..bb9241b
--- /dev/null
+++ b/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.io.checkpoint;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This class represent the identified (memento) for a checkpoint. It is allowed
+ * to contain small amount of metadata about a checkpoint and must provide sufficient
+ * information to the corresponding CheckpointService to locate and retrieve the
+ * data contained in the checkpoint.
+ */
+public interface CheckpointID extends Writable {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointNamingService.java
----------------------------------------------------------------------
diff --git a/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointNamingService.java b/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointNamingService.java
new file mode 100644
index 0000000..4843ccd
--- /dev/null
+++ b/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointNamingService.java
@@ -0,0 +1,30 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.io.checkpoint;
+
+/**
+ * This class represent a naming service for checkpoints.
+ */
+public interface CheckpointNamingService {
+
+ /**
+ * Generate a new checkpoint Name
+ *
+ * @return the checkpoint name
+ */
+ public String getNewName();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java
----------------------------------------------------------------------
diff --git a/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java b/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java
new file mode 100644
index 0000000..4984884
--- /dev/null
+++ b/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java
@@ -0,0 +1,97 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.io.checkpoint;
+
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * The CheckpointService provides a simple API to store and retrieve the state of a task.
+ * <p/>
+ * Checkpoints are atomic, single-writer, write-once, multiple-readers, ready-many type of objects.
+ * This is provided by releasing the CheckpointID for a checkpoint only upon commit of the checkpoint,
+ * and by preventing a checkpoint to be re-opened for writes.
+ * <p/>
+ * Non-functional properties such as durability, availability, compression, garbage collection,
+ * quotas are left to the implementation.
+ * <p/>
+ * This API is envisioned as the basic building block for a checkpoint service, on top of which richer
+ * interfaces can be layered (e.g., frameworks providing object-serialization, checkpoint metadata and
+ * provenance, etc.)
+ */
+public interface CheckpointService {
+
+ /**
+ * This method creates a checkpoint and provide a channel to write to it.
+ * The name/location of the checkpoint are unknown to the user as of this time, in fact,
+ * the CheckpointID is not released to the user until commit is called. This makes enforcing
+ * atomicity of writes easy.
+ *
+ * @return a CheckpointWriteChannel that can be used to write to the checkpoint
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ CheckpointWriteChannel create() throws IOException, InterruptedException;
+
+ /**
+ * Used to finalize and existing checkpoint. It returns the CheckpointID that can be later
+ * used to access (read-only) this checkpoint. This guarantees atomicity of the checkpoint.
+ *
+ * @param channel the CheckpointWriteChannel to commit
+ * @return a CheckpointID
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ CheckpointID commit(CheckpointWriteChannel channel) throws IOException, InterruptedException;
+
+ /**
+ * Dual to commit, it aborts the current checkpoint. Garbage collection choices are
+ * left to the implementation. The CheckpointID is not generated nor released to the
+ * user so the checkpoint is not accessible.
+ *
+ * @param channel the CheckpointWriteChannel to abort
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void abort(CheckpointWriteChannel channel) throws IOException, InterruptedException;
+
+ /**
+ * Given a CheckpointID returns a reading channel.
+ *
+ * @param checkpointId CheckpointID for the checkpoint to be opened
+ * @return a CheckpointReadChannel
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ CheckpointReadChannel open(CheckpointID checkpointId) throws IOException, InterruptedException;
+
+ /**
+ * It discards an existing checkpoint identified by its CheckpointID.
+ *
+ * @param checkpointId CheckpointID for the checkpoint to be deleted
+ * @return a boolean confirming success of the deletion
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ boolean delete(CheckpointID checkpointId) throws IOException, InterruptedException;
+
+ interface CheckpointWriteChannel extends WritableByteChannel {
+ }
+
+ interface CheckpointReadChannel extends ReadableByteChannel {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java
----------------------------------------------------------------------
diff --git a/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java b/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java
new file mode 100644
index 0000000..aa971c4
--- /dev/null
+++ b/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.io.checkpoint;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+/**
+ * Simple naming service that generates a random checkpoint name.
+ */
+public class RandomNameCNS implements CheckpointNamingService {
+
+ private final String prefix;
+
+ @Inject
+ public RandomNameCNS(@Parameter(PREFIX.class) final String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public String getNewName() {
+ return this.prefix + RandomStringUtils.randomAlphanumeric(8);
+ }
+
+ @NamedParameter(doc = "The prefix used for the random names returned.", default_value = "checkpoint_")
+ public static class PREFIX implements Name<String> {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java
----------------------------------------------------------------------
diff --git a/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java b/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java
new file mode 100644
index 0000000..051d575
--- /dev/null
+++ b/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java
@@ -0,0 +1,52 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.io.checkpoint;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+/**
+ * A naming service that simply returns the name it has been initialized with.
+ */
+public class SimpleNamingService implements CheckpointNamingService {
+
+ private final String name;
+
+ @Inject
+ public SimpleNamingService(@Parameter(CheckpointName.class) final String name) {
+ this.name = "checkpoint_" + name;
+ }
+
+ /**
+ * Generate a new checkpoint Name.
+ *
+ * @return the checkpoint name
+ */
+ @Override
+ public String getNewName() {
+ return this.name;
+ }
+
+ /**
+ * Prefix for checkpoints.
+ */
+ @NamedParameter(doc = "Checkpoint prefix.", short_name = "checkpoint_prefix", default_value = "reef")
+ public static final class CheckpointName implements Name<String> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java b/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java
new file mode 100644
index 0000000..ecfa642
--- /dev/null
+++ b/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java
@@ -0,0 +1,107 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.io.checkpoint.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.io.checkpoint.CheckpointID;
+import org.apache.reef.io.checkpoint.CheckpointNamingService;
+import org.apache.reef.io.checkpoint.CheckpointService;
+import org.apache.reef.io.checkpoint.RandomNameCNS;
+import org.apache.reef.tang.ExternalConstructor;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.RequiredParameter;
+
+import javax.inject.Inject;
+import java.io.IOException;
+
+/**
+ * ConfigurationModule for the FSCheckPointService.
+ * This can be used to create Evaluator-side configurations of the checkpointing service.
+ */
+@DriverSide
+@Public
+public class FSCheckPointServiceConfiguration extends ConfigurationModuleBuilder {
+
+ /**
+ * Use local file system if true; otherwise, use HDFS.
+ */
+ public static final RequiredParameter<Boolean> IS_LOCAL = new RequiredParameter<>();
+
+ /**
+ * Path to be used to store the checkpoints on file system.
+ */
+ public static final RequiredParameter<String> PATH = new RequiredParameter<>();
+
+ /**
+ * Replication factor to be used for the checkpoints.
+ */
+ public static final OptionalParameter<Short> REPLICATION_FACTOR = new OptionalParameter<>();
+
+ /**
+ * Prefix for checkpoint files (optional).
+ */
+ public static final OptionalParameter<String> PREFIX = new OptionalParameter<>();
+ public static final ConfigurationModule CONF = new FSCheckPointServiceConfiguration()
+ .bindImplementation(CheckpointService.class, FSCheckpointService.class) // Use the HDFS based ccheckpoints
+ .bindImplementation(CheckpointNamingService.class, RandomNameCNS.class) // Use Random Names for the checkpoints
+ .bindImplementation(CheckpointID.class, FSCheckpointID.class)
+ .bindConstructor(FileSystem.class, FileSystemConstructor.class)
+ .bindNamedParameter(FileSystemConstructor.IS_LOCAL.class, IS_LOCAL)
+ .bindNamedParameter(FSCheckpointService.PATH.class, PATH)
+ .bindNamedParameter(FSCheckpointService.REPLICATION_FACTOR.class, REPLICATION_FACTOR)
+ .bindNamedParameter(RandomNameCNS.PREFIX.class, PREFIX)
+ .build();
+
+ /**
+ * Constructor for Hadoop FileSystem instances.
+ * This assumes that Hadoop Configuration is in the CLASSPATH.
+ */
+ public static class FileSystemConstructor implements ExternalConstructor<FileSystem> {
+
+ /**
+ * If false, use default values for Hadoop configuration; otherwise, load from config file.
+ * Set to false when REEF is running in local mode.
+ */
+ private final boolean loadConfig;
+
+ @Inject
+ public FileSystemConstructor(final @Parameter(IS_LOCAL.class) boolean isLocal) {
+ this.loadConfig = !isLocal;
+ }
+
+ @Override
+ public FileSystem newInstance() {
+ try {
+ return FileSystem.get(new Configuration(this.loadConfig));
+ } catch (final IOException ex) {
+ throw new RuntimeException("Unable to create a FileSystem instance." +
+ " Probably Hadoop configuration is not in the CLASSPATH", ex);
+ }
+ }
+
+ @NamedParameter(doc = "Use local file system if true; otherwise, use HDFS.")
+ static class IS_LOCAL implements Name<Boolean> {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java
----------------------------------------------------------------------
diff --git a/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java b/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java
new file mode 100644
index 0000000..46b523f
--- /dev/null
+++ b/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java
@@ -0,0 +1,71 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.io.checkpoint.fs;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.reef.io.checkpoint.CheckpointID;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A FileSystem based checkpoint ID contains reference to the Path
+ * where the checkpoint has been saved.
+ */
+public class FSCheckpointID implements CheckpointID {
+
+ private Path path;
+
+ public FSCheckpointID() {
+ }
+
+ public FSCheckpointID(Path path) {
+ this.path = path;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ @Override
+ public String toString() {
+ return path.toString();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, path.toString());
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.path = new Path(Text.readString(in));
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other instanceof FSCheckpointID
+ && path.equals(((FSCheckpointID) other).path);
+ }
+
+ @Override
+ public int hashCode() {
+ return path.hashCode();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java
----------------------------------------------------------------------
diff --git a/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java b/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java
new file mode 100644
index 0000000..9925401
--- /dev/null
+++ b/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java
@@ -0,0 +1,218 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.io.checkpoint.fs;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.reef.io.checkpoint.CheckpointID;
+import org.apache.reef.io.checkpoint.CheckpointNamingService;
+import org.apache.reef.io.checkpoint.CheckpointService;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A FileSystem based CheckpointService.
+ */
+public class FSCheckpointService implements CheckpointService {
+
+ private final Path base;
+ private final FileSystem fs;
+ private final CheckpointNamingService namingPolicy;
+ private final short replication;
+
+ @Inject
+ FSCheckpointService(final FileSystem fs,
+ final @Parameter(PATH.class) String basePath,
+ final CheckpointNamingService namingPolicy,
+ final @Parameter(REPLICATION_FACTOR.class) short replication) {
+ this.fs = fs;
+ this.base = new Path(basePath);
+ this.namingPolicy = namingPolicy;
+ this.replication = replication;
+ }
+
+ public FSCheckpointService(final FileSystem fs,
+ final Path base,
+ final CheckpointNamingService namingPolicy,
+ final short replication) {
+ this.fs = fs;
+ this.base = base;
+ this.namingPolicy = namingPolicy;
+ this.replication = replication;
+ }
+
+ static final Path tmpfile(final Path p) {
+ return new Path(p.getParent(), p.getName() + ".tmp");
+ }
+
+ public CheckpointWriteChannel create()
+ throws IOException {
+
+ final String name = namingPolicy.getNewName();
+
+ final Path p = new Path(name);
+ if (p.isUriPathAbsolute()) {
+ throw new IOException("Checkpoint cannot be an absolute path");
+ }
+ return createInternal(new Path(base, p));
+ }
+
+ CheckpointWriteChannel createInternal(Path name) throws IOException {
+
+ //create a temp file, fail if file exists
+ return new FSCheckpointWriteChannel(name, fs.create(tmpfile(name), replication));
+ }
+
+ @Override
+ public CheckpointReadChannel open(final CheckpointID id)
+ throws IOException, InterruptedException {
+ if (!(id instanceof FSCheckpointID)) {
+ throw new IllegalArgumentException(
+ "Mismatched checkpoint type: " + id.getClass());
+ }
+ return new FSCheckpointReadChannel(
+ fs.open(((FSCheckpointID) id).getPath()));
+ }
+
+ @Override
+ public CheckpointID commit(final CheckpointWriteChannel ch) throws IOException,
+ InterruptedException {
+ if (ch.isOpen()) {
+ ch.close();
+ }
+ final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch;
+ final Path dst = hch.getDestination();
+ if (!fs.rename(tmpfile(dst), dst)) {
+ // attempt to clean up
+ abort(ch);
+ throw new IOException("Failed to promote checkpoint" +
+ tmpfile(dst) + " -> " + dst);
+ }
+ return new FSCheckpointID(hch.getDestination());
+ }
+
+ @Override
+ public void abort(final CheckpointWriteChannel ch) throws IOException {
+ if (ch.isOpen()) {
+ ch.close();
+ }
+ final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch;
+ final Path tmp = tmpfile(hch.getDestination());
+ try {
+ if (!fs.delete(tmp, false)) {
+ throw new IOException("Failed to delete checkpoint during abort");
+ }
+ } catch (FileNotFoundException e) {
+ // IGNORE
+ }
+ }
+
+ @Override
+ public boolean delete(final CheckpointID id) throws IOException,
+ InterruptedException {
+ if (!(id instanceof FSCheckpointID)) {
+ throw new IllegalArgumentException(
+ "Mismatched checkpoint type: " + id.getClass());
+ }
+ Path tmp = ((FSCheckpointID) id).getPath();
+ try {
+ return fs.delete(tmp, false);
+ } catch (FileNotFoundException e) {
+ // IGNORE
+ }
+ return true;
+ }
+
+ @NamedParameter(doc = "The path to be used to store the checkpoints.")
+ static class PATH implements Name<String> {
+ }
+
+ @NamedParameter(doc = "The replication factor to be used for the stored checkpoints", default_value = "3")
+ static class REPLICATION_FACTOR implements Name<Short> {
+ }
+
+ private static class FSCheckpointWriteChannel
+ implements CheckpointWriteChannel {
+ private final Path finalDst;
+ private final WritableByteChannel out;
+ private boolean isOpen = true;
+
+ FSCheckpointWriteChannel(final Path finalDst, final FSDataOutputStream out) {
+ this.finalDst = finalDst;
+ this.out = Channels.newChannel(out);
+ }
+
+ public int write(final ByteBuffer b) throws IOException {
+ return out.write(b);
+ }
+
+ public Path getDestination() {
+ return finalDst;
+ }
+
+ @Override
+ public void close() throws IOException {
+ isOpen = false;
+ out.close();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return isOpen;
+ }
+
+ }
+
+ private static class FSCheckpointReadChannel
+ implements CheckpointReadChannel {
+
+ private final ReadableByteChannel in;
+ private boolean isOpen = true;
+
+ FSCheckpointReadChannel(final FSDataInputStream in) {
+ this.in = Channels.newChannel(in);
+ }
+
+ @Override
+ public int read(final ByteBuffer bb) throws IOException {
+ return in.read(bb);
+ }
+
+ @Override
+ public void close() throws IOException {
+ isOpen = false;
+ in.close();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return isOpen;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/pom.xml
----------------------------------------------------------------------
diff --git a/reef-common/pom.xml b/reef-common/pom.xml
index 75ae7fa..a67349a 100644
--- a/reef-common/pom.xml
+++ b/reef-common/pom.xml
@@ -1,8 +1,9 @@
<?xml version="1.0"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
- <groupId>com.microsoft.reef</groupId>
+ <groupId>org.apache.reef</groupId>
<artifactId>reef-project</artifactId>
<version>0.10-SNAPSHOT</version>
</parent>
@@ -21,15 +22,15 @@
<phase>generate-sources</phase>
<configuration>
<tasks>
- <mkdir dir="target/generated-sources/proto" />
+ <mkdir dir="target/generated-sources/proto"/>
<exec executable="protoc">
- <arg value="--proto_path=src/main/proto/" />
- <arg value="--java_out=target/generated-sources/proto" />
- <arg value="src/main/proto/reef_service_protos.proto" />
- <arg value="src/main/proto/evaluator_runtime.proto" />
- <arg value="src/main/proto/client_runtime.proto" />
- <arg value="src/main/proto/driver_runtime.proto" />
- <arg value="src/main/proto/reef_protocol.proto" />
+ <arg value="--proto_path=src/main/proto/"/>
+ <arg value="--java_out=target/generated-sources/proto"/>
+ <arg value="src/main/proto/reef_service_protos.proto"/>
+ <arg value="src/main/proto/evaluator_runtime.proto"/>
+ <arg value="src/main/proto/client_runtime.proto"/>
+ <arg value="src/main/proto/driver_runtime.proto"/>
+ <arg value="src/main/proto/reef_protocol.proto"/>
</exec>
</tasks>
<sourceRoot>target/generated-sources/proto</sourceRoot>
@@ -88,7 +89,7 @@
</goals>
</pluginExecutionFilter>
<action>
- <ignore />
+ <ignore/>
</action>
</pluginExecution>
<pluginExecution>
@@ -107,7 +108,7 @@
</goals>
</pluginExecutionFilter>
<action>
- <ignore />
+ <ignore/>
</action>
</pluginExecution>
</pluginExecutions>
@@ -129,11 +130,11 @@
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
- <groupId>com.microsoft.wake</groupId>
+ <groupId>${project.groupId}</groupId>
<artifactId>wake</artifactId>
</dependency>
<dependency>
- <groupId>com.microsoft.tang</groupId>
+ <groupId>${project.groupId}</groupId>
<artifactId>tang</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/com/microsoft/reef/client/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/com/microsoft/reef/client/ClientConfiguration.java b/reef-common/src/main/java/com/microsoft/reef/client/ClientConfiguration.java
deleted file mode 100644
index 040d219..0000000
--- a/reef-common/src/main/java/com/microsoft/reef/client/ClientConfiguration.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.client;
-
-import com.microsoft.reef.client.parameters.*;
-import com.microsoft.reef.runtime.common.client.parameters.ClientPresent;
-import com.microsoft.tang.formats.ConfigurationModule;
-import com.microsoft.tang.formats.ConfigurationModuleBuilder;
-import com.microsoft.tang.formats.OptionalImpl;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.remote.RemoteConfiguration;
-
-/**
- * A ConfigurationModule to fill out for the client configuration.
- */
-public final class ClientConfiguration extends ConfigurationModuleBuilder {
-
- /**
- * Event handler for messages from the running job.
- * Default implementation just writes message to the log.
- * A message contains a status and a client-defined message payload.
- */
- public static final OptionalImpl<EventHandler<JobMessage>> ON_JOB_MESSAGE = new OptionalImpl<>();
-
- /**
- * Handler for the event when a submitted REEF Job is running.
- * Default implementation just writes to the log.
- */
- public static final OptionalImpl<EventHandler<RunningJob>> ON_JOB_RUNNING = new OptionalImpl<>();
-
- /**
- * Handler for the event when a submitted REEF Job is completed.
- * Default implementation just writes to the log.
- */
- public static final OptionalImpl<EventHandler<CompletedJob>> ON_JOB_COMPLETED = new OptionalImpl<>();
-
- /**
- * Handler for the event when a submitted REEF Job has failed.
- * Default implementation logs an error and rethrows the exception in the client JVM.
- */
- public static final OptionalImpl<EventHandler<FailedJob>> ON_JOB_FAILED = new OptionalImpl<>();
-
- /**
- * Receives fatal resourcemanager errors. The presence of this error means that the
- * underlying REEF instance is no longer able to execute REEF jobs. The
- * actual Jobs may or may not still be running.
- * Default implementation logs an error and rethrows the exception in the client JVM.
- */
- public static final OptionalImpl<EventHandler<FailedRuntime>> ON_RUNTIME_ERROR = new OptionalImpl<>();
-
- /**
- * Error handler for events on Wake-spawned threads.
- * Exceptions that are thrown on wake-spawned threads (e.g. in EventHandlers) will be caught by Wake and delivered to
- * this handler. Default behavior is to log the exceptions and rethrow them as RuntimeExceptions.
- */
- public static final OptionalImpl<EventHandler<Throwable>> ON_WAKE_ERROR = new OptionalImpl<>();
-
- public static final ConfigurationModule CONF = new ClientConfiguration()
- .bind(JobMessageHandler.class, ON_JOB_MESSAGE)
- .bind(JobRunningHandler.class, ON_JOB_RUNNING)
- .bind(JobCompletedHandler.class, ON_JOB_COMPLETED)
- .bind(JobFailedHandler.class, ON_JOB_FAILED)
- .bind(ResourceManagerErrorHandler.class, ON_RUNTIME_ERROR)
- .bindNamedParameter(ClientPresent.class, ClientPresent.YES)
- .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, ON_WAKE_ERROR)
- .bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_CLIENT")
- .build();
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/com/microsoft/reef/client/CompletedJob.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/com/microsoft/reef/client/CompletedJob.java b/reef-common/src/main/java/com/microsoft/reef/client/CompletedJob.java
deleted file mode 100644
index 837e913..0000000
--- a/reef-common/src/main/java/com/microsoft/reef/client/CompletedJob.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.client;
-
-import com.microsoft.reef.annotations.Provided;
-import com.microsoft.reef.annotations.audience.ClientSide;
-import com.microsoft.reef.annotations.audience.Public;
-import com.microsoft.reef.io.naming.Identifiable;
-
-/**
- * Represents a completed REEF job.
- */
-@Public
-@ClientSide
-@Provided
-public interface CompletedJob extends Identifiable {
-
- /**
- * @return the ID of the completed job.
- */
- @Override
- public String getId();
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/com/microsoft/reef/client/DriverConfiguration.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/com/microsoft/reef/client/DriverConfiguration.java b/reef-common/src/main/java/com/microsoft/reef/client/DriverConfiguration.java
deleted file mode 100644
index 2483d75..0000000
--- a/reef-common/src/main/java/com/microsoft/reef/client/DriverConfiguration.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.client;
-
-import com.microsoft.reef.annotations.Provided;
-import com.microsoft.reef.annotations.audience.ClientSide;
-import com.microsoft.reef.annotations.audience.Public;
-import com.microsoft.reef.driver.context.ActiveContext;
-import com.microsoft.reef.driver.context.ClosedContext;
-import com.microsoft.reef.driver.context.ContextMessage;
-import com.microsoft.reef.driver.context.FailedContext;
-import com.microsoft.reef.driver.evaluator.AllocatedEvaluator;
-import com.microsoft.reef.driver.evaluator.CompletedEvaluator;
-import com.microsoft.reef.driver.evaluator.FailedEvaluator;
-import com.microsoft.reef.driver.parameters.*;
-import com.microsoft.reef.driver.task.*;
-import com.microsoft.reef.runtime.common.DriverRestartCompleted;
-import com.microsoft.reef.runtime.common.driver.DriverRuntimeConfiguration;
-import com.microsoft.tang.formats.*;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.time.Clock;
-import com.microsoft.wake.time.event.StartTime;
-import com.microsoft.wake.time.event.StopTime;
-
-/**
- * A ConfigurationModule for Drivers.
- */
-@ClientSide
-@Public
-@Provided
-public final class DriverConfiguration extends ConfigurationModuleBuilder {
-
- /**
- * Identifies the driver and therefore the JOB. Expect to see this e.g. on YARN's dashboard.
- */
- public static final RequiredParameter<String> DRIVER_IDENTIFIER = new RequiredParameter<>();
-
- /**
- * The amount of memory to be allocated for the Driver. This is the size of the AM container in YARN.
- */
- public static final OptionalParameter<Integer> DRIVER_MEMORY = new OptionalParameter<>();
-
- /**
- * Files to be made available on the Driver and all Evaluators.
- */
- public static final OptionalParameter<String> GLOBAL_FILES = new OptionalParameter<>();
-
- /**
- * Libraries to be made available on the Driver and all Evaluators.
- */
- public static final OptionalParameter<String> GLOBAL_LIBRARIES = new OptionalParameter<>();
-
- /**
- * Files to be made available on the Driver only.
- */
- public static final OptionalParameter<String> LOCAL_FILES = new OptionalParameter<>();
-
- /**
- * Libraries to be made available on the Driver only.
- */
- public static final OptionalParameter<String> LOCAL_LIBRARIES = new OptionalParameter<>();
-
- /**
- * Job submission directory to be used by driver. This is the folder on the DFS used to stage the files
- * for the Driver and subsequently for the Evaluators. It will be created if it doesn't exist yet.
- * If this is set by the user, user must make sure its uniqueness across different jobs.
- */
- public static final OptionalParameter<String> DRIVER_JOB_SUBMISSION_DIRECTORY = new OptionalParameter<>();
-
- /**
- * The event handler invoked right after the driver boots up.
- */
- public static final RequiredImpl<EventHandler<StartTime>> ON_DRIVER_STARTED = new RequiredImpl<>();
-
- /**
- * This event is fired in place of the ON_DRIVER_STARTED when the Driver is in fact restarted after failure.
- */
- public static final OptionalImpl<EventHandler<StartTime>> ON_DRIVER_RESTARTED = new OptionalImpl<>();
-
- /**
- * The event handler invoked right before the driver shuts down. Defaults to ignore.
- */
- public static final OptionalImpl<EventHandler<StopTime>> ON_DRIVER_STOP = new OptionalImpl<>();
-
- // ***** EVALUATOR HANDLER BINDINGS:
-
- /**
- * Event handler for allocated evaluators. Defaults to returning the evaluator if not bound.
- */
- public static final OptionalImpl<EventHandler<AllocatedEvaluator>> ON_EVALUATOR_ALLOCATED = new OptionalImpl<>();
-
- /**
- * Event handler for completed evaluators. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<CompletedEvaluator>> ON_EVALUATOR_COMPLETED = new OptionalImpl<>();
-
- /**
- * Event handler for failed evaluators. Defaults to job failure if not bound.
- */
- public static final OptionalImpl<EventHandler<FailedEvaluator>> ON_EVALUATOR_FAILED = new OptionalImpl<>();
-
- // ***** TASK HANDLER BINDINGS:
-
- /**
- * Event handler for task messages. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<TaskMessage>> ON_TASK_MESSAGE = new OptionalImpl<>();
-
- /**
- * Event handler for completed tasks. Defaults to closing the context the task ran on if not bound.
- */
- public static final OptionalImpl<EventHandler<CompletedTask>> ON_TASK_COMPLETED = new OptionalImpl<>();
-
- /**
- * Event handler for failed tasks. Defaults to job failure if not bound.
- */
- public static final OptionalImpl<EventHandler<FailedTask>> ON_TASK_FAILED = new OptionalImpl<>();
-
- /**
- * Event handler for running tasks. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<RunningTask>> ON_TASK_RUNNING = new OptionalImpl<>();
-
- /**
- * Event handler for running tasks in previous evaluator, when driver restarted. Defaults to crash if not bound.
- */
- public static final OptionalImpl<EventHandler<RunningTask>> ON_DRIVER_RESTART_TASK_RUNNING = new OptionalImpl<>();
-
- /**
- * Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support
- * task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then.
- */
- public static final OptionalImpl<EventHandler<SuspendedTask>> ON_TASK_SUSPENDED = new OptionalImpl<>();
-
- // ***** CLIENT HANDLER BINDINGS:
-
- /**
- * Event handler for client messages. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_MESSAGE = new OptionalImpl<>();
-
- /**
- * Event handler for close messages sent by the client. Defaults to job failure if not bound.
- */
- public static final OptionalImpl<EventHandler<Void>> ON_CLIENT_CLOSED = new OptionalImpl<>();
-
- /**
- * Event handler for close messages sent by the client. Defaults to job failure if not bound.
- */
- public static final OptionalImpl<EventHandler<byte[]>> ON_CLIENT_CLOSED_MESSAGE = new OptionalImpl<>();
-
- // ***** CONTEXT HANDLER BINDINGS:
-
- /**
- * Event handler for active context. Defaults to closing the context if not bound.
- */
- public static final OptionalImpl<EventHandler<ActiveContext>> ON_CONTEXT_ACTIVE = new OptionalImpl<>();
-
- /**
- * Event handler for active context when driver restart. Defaults to closing the context if not bound.
- */
- public static final OptionalImpl<EventHandler<ActiveContext>> ON_DRIVER_RESTART_CONTEXT_ACTIVE = new OptionalImpl<>();
-
- /**
- * Event handler for closed context. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<ClosedContext>> ON_CONTEXT_CLOSED = new OptionalImpl<>();
-
- /**
- * Event handler for closed context. Defaults to job failure if not bound.
- */
- public static final OptionalImpl<EventHandler<FailedContext>> ON_CONTEXT_FAILED = new OptionalImpl<>();
-
- /**
- * Event handler for context messages. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<ContextMessage>> ON_CONTEXT_MESSAGE = new OptionalImpl<>();
-
- /**
- * "Number of threads allocated per evaluator to dispatch events from this Evaluator.
- */
- public static final OptionalParameter<Integer> EVALUATOR_DISPATCHER_THREADS = new OptionalParameter<>();
-
- /**
- * Event handler for the event of driver restart completion, default to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<DriverRestartCompleted>> ON_DRIVER_RESTART_COMPLETED = new OptionalImpl<>();
-
- /**
- * ConfigurationModule to fill out to get a legal Driver Configuration.
- */
- public static final ConfigurationModule CONF = new DriverConfiguration().merge(DriverRuntimeConfiguration.CONF)
-
- .bindNamedParameter(DriverIdentifier.class, DRIVER_IDENTIFIER)
- .bindNamedParameter(DriverMemory.class, DRIVER_MEMORY)
- .bindNamedParameter(DriverJobSubmissionDirectory.class, DRIVER_JOB_SUBMISSION_DIRECTORY)
- .bindSetEntry(JobGlobalFiles.class, GLOBAL_FILES)
- .bindSetEntry(JobGlobalLibraries.class, GLOBAL_LIBRARIES)
- .bindSetEntry(DriverLocalFiles.class, LOCAL_FILES)
- .bindSetEntry(DriverLocalLibraries.class, LOCAL_LIBRARIES)
-
- // Driver start/stop handlers
- .bindSetEntry(DriverStartHandler.class, ON_DRIVER_STARTED)
- .bindNamedParameter(DriverRestartHandler.class, ON_DRIVER_RESTARTED)
- .bindSetEntry(Clock.StartHandler.class, com.microsoft.reef.runtime.common.driver.DriverStartHandler.class)
- .bindSetEntry(Clock.StopHandler.class, ON_DRIVER_STOP)
-
- // Evaluator handlers
- .bindSetEntry(EvaluatorAllocatedHandlers.class, ON_EVALUATOR_ALLOCATED)
- .bindSetEntry(EvaluatorCompletedHandlers.class, ON_EVALUATOR_COMPLETED)
- .bindSetEntry(EvaluatorFailedHandlers.class, ON_EVALUATOR_FAILED)
-
- // Task handlers
- .bindSetEntry(TaskRunningHandlers.class, ON_TASK_RUNNING)
- .bindSetEntry(DriverRestartTaskRunningHandlers.class, ON_DRIVER_RESTART_TASK_RUNNING)
- .bindSetEntry(TaskFailedHandlers.class, ON_TASK_FAILED)
- .bindSetEntry(TaskMessageHandlers.class, ON_TASK_MESSAGE)
- .bindSetEntry(TaskCompletedHandlers.class, ON_TASK_COMPLETED)
- .bindSetEntry(TaskSuspendedHandlers.class, ON_TASK_SUSPENDED)
-
- // Context handlers
- .bindSetEntry(ContextActiveHandlers.class, ON_CONTEXT_ACTIVE)
- .bindSetEntry(DriverRestartContextActiveHandlers.class, ON_DRIVER_RESTART_CONTEXT_ACTIVE)
- .bindSetEntry(ContextClosedHandlers.class, ON_CONTEXT_CLOSED)
- .bindSetEntry(ContextMessageHandlers.class, ON_CONTEXT_MESSAGE)
- .bindSetEntry(ContextFailedHandlers.class, ON_CONTEXT_FAILED)
-
- // Client handlers
- .bindSetEntry(ClientMessageHandlers.class, ON_CLIENT_MESSAGE)
- .bindSetEntry(ClientCloseHandlers.class, ON_CLIENT_CLOSED)
- .bindSetEntry(ClientCloseWithMessageHandlers.class, ON_CLIENT_CLOSED_MESSAGE)
-
- // Various parameters
- .bindNamedParameter(EvaluatorDispatcherThreads.class, EVALUATOR_DISPATCHER_THREADS)
- .bindSetEntry(DriverRestartCompletedHandlers.class, ON_DRIVER_RESTART_COMPLETED)
- .build();
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/com/microsoft/reef/client/DriverLauncher.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/com/microsoft/reef/client/DriverLauncher.java b/reef-common/src/main/java/com/microsoft/reef/client/DriverLauncher.java
deleted file mode 100644
index 6b4b80c..0000000
--- a/reef-common/src/main/java/com/microsoft/reef/client/DriverLauncher.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.client;
-
-import com.microsoft.reef.annotations.Provided;
-import com.microsoft.reef.annotations.audience.ClientSide;
-import com.microsoft.reef.annotations.audience.Public;
-import com.microsoft.reef.util.Optional;
-import com.microsoft.tang.Configuration;
-import com.microsoft.tang.Tang;
-import com.microsoft.tang.annotations.Unit;
-import com.microsoft.tang.exceptions.BindException;
-import com.microsoft.tang.exceptions.InjectionException;
-import com.microsoft.wake.EventHandler;
-
-import javax.inject.Inject;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * A launcher for REEF Drivers.
- * <p/>
- * It can be instantiated using a configuration that can create a REEF instance.
- * For example, the local resourcemanager and the YARN resourcemanager can do this.
- * <p/>
- * {@see com.microsoft.reef.examples.hello.HelloREEF} for a demo use case.
- */
-@Public
-@Provided
-@ClientSide
-@Unit
-public final class DriverLauncher {
-
- private static final Logger LOG = Logger.getLogger(DriverLauncher.class.getName());
-
- private LauncherStatus status = LauncherStatus.INIT;
-
- private RunningJob theJob = null;
-
- private final REEF reef;
-
- @Inject
- private DriverLauncher(final REEF reef) {
- this.reef = reef;
- }
-
- /**
- * Job driver notifies us that the job is running.
- */
- public final class RunningJobHandler implements EventHandler<RunningJob> {
- @Override
- public void onNext(final RunningJob job) {
- LOG.log(Level.INFO, "The Job {0} is running.", job.getId());
- theJob = job;
- setStatusAndNotify(LauncherStatus.RUNNING);
- }
- }
-
- /**
- * Job driver notifies us that the job had failed.
- */
- public final class FailedJobHandler implements EventHandler<FailedJob> {
- @Override
- public void onNext(final FailedJob job) {
- final Optional<Throwable> ex = job.getReason();
- LOG.log(Level.SEVERE, "Received an error for job " + job.getId(), ex);
- theJob = null;
- setStatusAndNotify(LauncherStatus.FAILED(ex));
- }
- }
-
- /**
- * Job driver notifies us that the job had completed successfully.
- */
- public final class CompletedJobHandler implements EventHandler<CompletedJob> {
- @Override
- public void onNext(final CompletedJob job) {
- LOG.log(Level.INFO, "The Job {0} is done.", job.getId());
- theJob = null;
- setStatusAndNotify(LauncherStatus.COMPLETED);
- }
- }
-
- /**
- * Handler an error in the job driver.
- */
- public final class RuntimeErrorHandler implements EventHandler<FailedRuntime> {
- @Override
- public void onNext(final FailedRuntime error) {
- LOG.log(Level.SEVERE, "Received a resourcemanager error", error.getReason());
- theJob = null;
- setStatusAndNotify(LauncherStatus.FAILED(error.getReason()));
- }
- }
-
- /**
- * Kills the running job.
- */
- public synchronized void close() {
- if (this.status.isRunning()) {
- this.status = LauncherStatus.FORCE_CLOSED;
- }
- if (null != this.theJob) {
- this.theJob.close();
- }
- this.notify();
- }
-
- /**
- * Run a job. Waits indefinitely for the job to complete.
- *
- * @param driverConfig the configuration for the driver. See DriverConfiguration for details.
- * @return the state of the job after execution.
- */
- public LauncherStatus run(final Configuration driverConfig) {
- this.reef.submit(driverConfig);
- synchronized (this) {
- while (!this.status.isDone()) {
- try {
- LOG.log(Level.FINE, "Wait indefinitely");
- this.wait();
- } catch (final InterruptedException ex) {
- LOG.log(Level.FINE, "Interrupted: {0}", ex);
- }
- }
- }
- this.reef.close();
- return this.status;
- }
-
- /**
- * Run a job with a waiting timeout after which it will be killed, if it did not complete yet.
- *
- * @param driverConfig the configuration for the driver. See DriverConfiguration for details.
- * @param timeOut timeout on the job.
- * @return the state of the job after execution.
- */
- public LauncherStatus run(final Configuration driverConfig, final long timeOut) {
- final long endTime = System.currentTimeMillis() + timeOut;
- this.reef.submit(driverConfig);
- synchronized (this) {
- while (!this.status.isDone()) {
- try {
- final long waitTime = endTime - System.currentTimeMillis();
- if (waitTime <= 0) {
- break;
- }
- LOG.log(Level.FINE, "Wait for {0} milliSeconds", waitTime);
- this.wait(waitTime);
- } catch (final InterruptedException ex) {
- LOG.log(Level.FINE, "Interrupted: {0}", ex);
- }
- }
- if (System.currentTimeMillis() >= endTime) {
- LOG.log(Level.WARNING, "The Job timed out.");
- this.status = LauncherStatus.FORCE_CLOSED;
- }
- }
-
- this.reef.close();
- return this.status;
- }
-
- /**
- * Instantiate a launcher for the given Configuration.
- *
- * @param runtimeConfiguration the resourcemanager configuration to be used
- * @return a DriverLauncher based on the given resourcemanager configuration
- * @throws BindException on configuration errors
- * @throws InjectionException on configuration errors
- */
- public static DriverLauncher getLauncher(
- final Configuration runtimeConfiguration) throws BindException, InjectionException {
-
- final Configuration clientConfiguration = ClientConfiguration.CONF
- .set(ClientConfiguration.ON_JOB_RUNNING, RunningJobHandler.class)
- .set(ClientConfiguration.ON_JOB_COMPLETED, CompletedJobHandler.class)
- .set(ClientConfiguration.ON_JOB_FAILED, FailedJobHandler.class)
- .set(ClientConfiguration.ON_RUNTIME_ERROR, RuntimeErrorHandler.class)
- .build();
-
- return Tang.Factory.getTang()
- .newInjector(runtimeConfiguration, clientConfiguration)
- .getInstance(DriverLauncher.class);
- }
-
- /**
- * @return the current status of the job.
- */
- public LauncherStatus getStatus() {
- return this.status;
- }
-
- /**
- * Update job status and notify the waiting thread.
- */
- public synchronized void setStatusAndNotify(final LauncherStatus status) {
- LOG.log(Level.FINEST, "Set status: {0} -> {1}", new Object[]{this.status, status});
- this.status = status;
- this.notify();
- }
-
- @Override
- public String toString() {
- return this.status.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/com/microsoft/reef/client/DriverServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/com/microsoft/reef/client/DriverServiceConfiguration.java b/reef-common/src/main/java/com/microsoft/reef/client/DriverServiceConfiguration.java
deleted file mode 100644
index d04a930..0000000
--- a/reef-common/src/main/java/com/microsoft/reef/client/DriverServiceConfiguration.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.microsoft.reef.client;
-
-import com.microsoft.reef.annotations.Provided;
-import com.microsoft.reef.annotations.audience.ClientSide;
-import com.microsoft.reef.annotations.audience.Public;
-import com.microsoft.reef.driver.context.ActiveContext;
-import com.microsoft.reef.driver.context.ClosedContext;
-import com.microsoft.reef.driver.context.ContextMessage;
-import com.microsoft.reef.driver.context.FailedContext;
-import com.microsoft.reef.driver.evaluator.AllocatedEvaluator;
-import com.microsoft.reef.driver.evaluator.CompletedEvaluator;
-import com.microsoft.reef.driver.evaluator.FailedEvaluator;
-import com.microsoft.reef.driver.parameters.*;
-import com.microsoft.reef.driver.task.*;
-import com.microsoft.tang.formats.*;
-import com.microsoft.wake.EventHandler;
-import com.microsoft.wake.time.Clock;
-import com.microsoft.wake.time.event.StartTime;
-import com.microsoft.wake.time.event.StopTime;
-
-/**
- * Use this ConfigurationModule to configure Services to be run in the Driver.
- * <p/>
- * A service is a set of event handlers that are informed of events in addition to * the event handlers defined in
- * DriverConfiguration. However, most services will treat the events as read-only. Doing differently should be
- * documented clearly in the Service documentation.
- */
-@ClientSide
-@Public
-@Provided
-public final class DriverServiceConfiguration extends ConfigurationModuleBuilder {
-
-
- /**
- * Files to be made available on the Driver and all Evaluators.
- */
- public static final OptionalParameter<String> GLOBAL_FILES = new OptionalParameter<>();
-
- /**
- * Libraries to be made available on the Driver and all Evaluators.
- */
- public static final OptionalParameter<String> GLOBAL_LIBRARIES = new OptionalParameter<>();
-
- /**
- * Files to be made available on the Driver only.
- */
- public static final OptionalParameter<String> LOCAL_FILES = new OptionalParameter<>();
-
- /**
- * Libraries to be made available on the Driver only.
- */
- public static final OptionalParameter<String> LOCAL_LIBRARIES = new OptionalParameter<>();
-
- /**
- * The event handler invoked right after the driver boots up.
- */
- public static final RequiredImpl<EventHandler<StartTime>> ON_DRIVER_STARTED = new RequiredImpl<>();
-
- /**
- * The event handler invoked right before the driver shuts down. Defaults to ignore.
- */
- public static final OptionalImpl<EventHandler<StopTime>> ON_DRIVER_STOP = new OptionalImpl<>();
-
- // ***** EVALUATOR HANDLER BINDINGS:
-
- /**
- * Event handler for allocated evaluators. Defaults to returning the evaluator if not bound.
- */
- public static final OptionalImpl<EventHandler<AllocatedEvaluator>> ON_EVALUATOR_ALLOCATED = new OptionalImpl<>();
-
- /**
- * Event handler for completed evaluators. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<CompletedEvaluator>> ON_EVALUATOR_COMPLETED = new OptionalImpl<>();
-
- /**
- * Event handler for failed evaluators. Defaults to job failure if not bound.
- */
- public static final OptionalImpl<EventHandler<FailedEvaluator>> ON_EVALUATOR_FAILED = new OptionalImpl<>();
-
- // ***** TASK HANDLER BINDINGS:
-
- /**
- * Event handler for task messages. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<TaskMessage>> ON_TASK_MESSAGE = new OptionalImpl<>();
-
- /**
- * Event handler for completed tasks. Defaults to closing the context the task ran on if not bound.
- */
- public static final OptionalImpl<EventHandler<CompletedTask>> ON_TASK_COMPLETED = new OptionalImpl<>();
-
- /**
- * Event handler for failed tasks. Defaults to job failure if not bound.
- */
- public static final OptionalImpl<EventHandler<FailedTask>> ON_TASK_FAILED = new OptionalImpl<>();
-
- /**
- * Event handler for running tasks. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<RunningTask>> ON_TASK_RUNNING = new OptionalImpl<>();
-
- /**
- * Event handler for running tasks in previous evaluator, when driver restarted. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<RunningTask>> ON_DRIVER_RESTART_TASK_RUNNING = new OptionalImpl<>();
-
- /**
- * Event handler for suspended tasks. Defaults to job failure if not bound. Rationale: many jobs don't support
- * task suspension. Hence, this parameter should be optional. The only sane default is to crash the job, then.
- */
- public static final OptionalImpl<EventHandler<SuspendedTask>> ON_TASK_SUSPENDED = new OptionalImpl<>();
-
-
- // ***** CONTEXT HANDLER BINDINGS:
-
- /**
- * Event handler for active context. Defaults to closing the context if not bound.
- */
- public static final OptionalImpl<EventHandler<ActiveContext>> ON_CONTEXT_ACTIVE = new OptionalImpl<>();
-
- /**
- * Event handler for active context when driver restart. Defaults to closing the context if not bound.
- */
- public static final OptionalImpl<EventHandler<ActiveContext>> ON_DRIVER_RESTART_CONTEXT_ACTIVE = new OptionalImpl<>();
-
- /**
- * Event handler for closed context. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<ClosedContext>> ON_CONTEXT_CLOSED = new OptionalImpl<>();
-
- /**
- * Event handler for closed context. Defaults to job failure if not bound.
- */
- public static final OptionalImpl<EventHandler<FailedContext>> ON_CONTEXT_FAILED = new OptionalImpl<>();
-
- /**
- * Event handler for context messages. Defaults to logging if not bound.
- */
- public static final OptionalImpl<EventHandler<ContextMessage>> ON_CONTEXT_MESSAGE = new OptionalImpl<>();
-
-
- /**
- * ConfigurationModule to fill out to get a legal Driver Configuration.
- */
- public static final ConfigurationModule CONF = new DriverServiceConfiguration()
- // Files use the very same named parameters as the DriverConfiguration
- .bindSetEntry(JobGlobalFiles.class, GLOBAL_FILES)
- .bindSetEntry(JobGlobalLibraries.class, GLOBAL_LIBRARIES)
- .bindSetEntry(DriverLocalFiles.class, LOCAL_FILES)
- .bindSetEntry(DriverLocalLibraries.class, LOCAL_LIBRARIES)
-
- // Start and stop events are the same handlers for applications and services.
- .bindSetEntry(Clock.StartHandler.class, ON_DRIVER_STARTED)
- .bindSetEntry(Clock.StopHandler.class, ON_DRIVER_STOP)
-
- // Evaluator handlers
- .bindSetEntry(ServiceEvaluatorAllocatedHandlers.class, ON_EVALUATOR_ALLOCATED)
- .bindSetEntry(ServiceEvaluatorCompletedHandlers.class, ON_EVALUATOR_COMPLETED)
- .bindSetEntry(ServiceEvaluatorFailedHandlers.class, ON_EVALUATOR_FAILED)
-
- // Task handlers
- .bindSetEntry(ServiceTaskRunningHandlers.class, ON_TASK_RUNNING)
- .bindSetEntry(DriverRestartTaskRunningHandlers.class, ON_DRIVER_RESTART_TASK_RUNNING)
- .bindSetEntry(ServiceTaskFailedHandlers.class, ON_TASK_FAILED)
- .bindSetEntry(ServiceTaskMessageHandlers.class, ON_TASK_MESSAGE)
- .bindSetEntry(ServiceTaskCompletedHandlers.class, ON_TASK_COMPLETED)
- .bindSetEntry(ServiceTaskSuspendedHandlers.class, ON_TASK_SUSPENDED)
-
- // Context handlers
- .bindSetEntry(ServiceContextActiveHandlers.class, ON_CONTEXT_ACTIVE)
- .bindSetEntry(DriverRestartContextActiveHandlers.class, ON_DRIVER_RESTART_CONTEXT_ACTIVE)
- .bindSetEntry(ServiceContextClosedHandlers.class, ON_CONTEXT_CLOSED)
- .bindSetEntry(ServiceContextMessageHandlers.class, ON_CONTEXT_MESSAGE)
- .bindSetEntry(ServiceContextFailedHandlers.class, ON_CONTEXT_FAILED)
-
- .build();
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/com/microsoft/reef/client/FailedJob.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/com/microsoft/reef/client/FailedJob.java b/reef-common/src/main/java/com/microsoft/reef/client/FailedJob.java
deleted file mode 100644
index ffcb640..0000000
--- a/reef-common/src/main/java/com/microsoft/reef/client/FailedJob.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.client;
-
-import com.microsoft.reef.annotations.Provided;
-import com.microsoft.reef.annotations.audience.ClientSide;
-import com.microsoft.reef.annotations.audience.Public;
-import com.microsoft.reef.common.AbstractFailure;
-import com.microsoft.reef.util.Optional;
-
-/**
- * An error message that REEF Client receives when there is a user error in REEF job.
- */
-@Public
-@ClientSide
-@Provided
-public final class FailedJob extends AbstractFailure {
- /**
- * @param id Identifier of the Job that produced the error.
- * @param message One-line error message.
- * @param description Long error description.
- * @param cause Java Exception that caused the error.
- * @param data byte array that contains serialized version of the error.
- */
- public FailedJob(final String id,
- final String message,
- final Optional<String> description,
- final Optional<Throwable> cause,
- final Optional<byte[]> data) {
- super(id, message, description, cause, data);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/com/microsoft/reef/client/FailedRuntime.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/com/microsoft/reef/client/FailedRuntime.java b/reef-common/src/main/java/com/microsoft/reef/client/FailedRuntime.java
deleted file mode 100644
index bc48b1b..0000000
--- a/reef-common/src/main/java/com/microsoft/reef/client/FailedRuntime.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.client;
-
-import com.microsoft.reef.common.AbstractFailure;
-import com.microsoft.reef.proto.ReefServiceProtos.RuntimeErrorProto;
-import com.microsoft.reef.util.Optional;
-import com.microsoft.wake.remote.impl.ObjectSerializableCodec;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Error message that REEF Client gets when there is an error in REEF resourcemanager.
- */
-public final class FailedRuntime extends AbstractFailure {
-
- /**
- * Standard java logger.
- */
- private static final Logger LOG = Logger.getLogger(AbstractFailure.class.getName());
-
- /**
- * Codec to decode serialized exception from a byte array. It is used in getThrowable().
- */
- private static final ObjectSerializableCodec<Exception> CODEC = new ObjectSerializableCodec<>();
-
- /**
- * Create a new Failure object out of protobuf data.
- *
- * @param error Error message as a protocol buffers object.
- */
- public FailedRuntime(final RuntimeErrorProto error) {
- super(error.getIdentifier(), error.getMessage(), Optional.<String>empty(), Optional.of(getThrowable(error)), Optional.<byte[]>empty());
- }
-
- /**
- * Retrieve Java exception from protobuf object, if possible. Otherwise, return null.
- * This is a utility method used in the FailedRuntime constructor.
- *
- * @param error protobuf error message structure.
- * @return Java exception or null if exception is missing or cannot be decoded.
- */
- private static Throwable getThrowable(final RuntimeErrorProto error) {
- final byte[] data = getData(error);
- if (data != null) {
- try {
- return CODEC.decode(data);
- } catch (final Throwable ex) {
- LOG.log(Level.FINE, "Could not decode exception {0}: {1}", new Object[]{error, ex});
- }
- }
- return null;
- }
-
- /**
- * Get binary data for the exception, if it exists. Otherwise, return null.
- * This is a utility method used in the FailedRuntime constructor and getThrowable() method.
- *
- * @param error protobuf error message structure.
- * @return byte array of the exception or null if exception is missing.
- */
- private static byte[] getData(final RuntimeErrorProto error) {
- return error.hasException() ? error.getException().toByteArray() : null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/com/microsoft/reef/client/JobMessage.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/com/microsoft/reef/client/JobMessage.java b/reef-common/src/main/java/com/microsoft/reef/client/JobMessage.java
deleted file mode 100644
index 945f345..0000000
--- a/reef-common/src/main/java/com/microsoft/reef/client/JobMessage.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.client;
-
-import com.microsoft.reef.annotations.Provided;
-import com.microsoft.reef.annotations.audience.ClientSide;
-import com.microsoft.reef.annotations.audience.Public;
-import com.microsoft.reef.io.Message;
-import com.microsoft.reef.io.naming.Identifiable;
-
-/**
- * A message received by the client from the driver.
- */
-@Public
-@ClientSide
-@Provided
-public final class JobMessage implements Message, Identifiable {
-
- private final String id;
- private final byte[] value;
-
- /**
- * @param id the identifier of the sending Job
- * @param value the message
- */
- public JobMessage(final String id, final byte[] value) {
- this.id = id;
- this.value = value;
- }
-
- /**
- * Get the message sent by the Job.
- *
- * @return the message sent by the Job.
- */
- @Override
- public final byte[] get() {
- return this.value;
- }
-
- /**
- * Get the Identifier of the sending Job.
- *
- * @return the Identifier of the sending Job.
- */
- @Override
- public final String getId() {
- return this.id;
- }
-
- @Override
- public String toString() {
- return "JobMessage{" +
- "id='" + id + '\'' +
- ", value.length=" + value.length +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/com/microsoft/reef/client/LauncherStatus.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/com/microsoft/reef/client/LauncherStatus.java b/reef-common/src/main/java/com/microsoft/reef/client/LauncherStatus.java
deleted file mode 100644
index 0244b5b..0000000
--- a/reef-common/src/main/java/com/microsoft/reef/client/LauncherStatus.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.client;
-
-import com.microsoft.reef.util.Optional;
-
-/**
- * The status of a reef job spawned using the DriverLauncher class.
- */
-public final class LauncherStatus {
-
- public static final LauncherStatus INIT = new LauncherStatus(State.INIT);
- public static final LauncherStatus RUNNING = new LauncherStatus(State.RUNNING);
- public static final LauncherStatus COMPLETED = new LauncherStatus(State.COMPLETED);
- public static final LauncherStatus FORCE_CLOSED = new LauncherStatus(State.FORCE_CLOSED);
- public static final LauncherStatus FAILED = new LauncherStatus(State.FAILED);
-
- public static final LauncherStatus FAILED(final Throwable ex) {
- return new LauncherStatus(State.FAILED, ex);
- }
-
- public static final LauncherStatus FAILED(final Optional<Throwable> ex) {
- return new LauncherStatus(State.FAILED, ex.orElse(null));
- }
-
- /**
- * The state the computation could be in.
- */
- private enum State {
- INIT,
- RUNNING,
- COMPLETED,
- FAILED,
- FORCE_CLOSED
- }
-
-
- private final State state;
- private final Optional<Throwable> error;
-
- private LauncherStatus(final State state) {
- this(state, null);
- }
-
- private LauncherStatus(final State state, final Throwable ex) {
- this.state = state;
- this.error = Optional.ofNullable(ex);
- }
-
- public Optional<Throwable> getError() {
- return this.error;
- }
-
- /**
- * Compare the <b>State</b> of two LauncherStatus objects.
- * Note that it does NOT compare the exceptions - just the states.
- *
- * @return True if both LauncherStatus objects are in the same state.
- */
- @Override
- public boolean equals(final Object other) {
- return this == other ||
- (other instanceof LauncherStatus && ((LauncherStatus) other).state == this.state);
- }
-
- /**
- * Has the job completed?
- *
- * @return True if the job has been completed, false otherwise.
- */
- public final boolean isDone() {
- switch (this.state) {
- case FAILED:
- case COMPLETED:
- case FORCE_CLOSED:
- return true;
- default:
- return false;
- }
- }
-
- /**
- * Has the job completed successfully?
- *
- * @return True if the job has been completed successfully, false otherwise.
- */
- public final boolean isSuccess() {
- return this.state == State.COMPLETED;
- }
-
- /**
- * Is the job still running?
- *
- * @return True if the job is still running, false otherwise.
- */
- public final boolean isRunning() {
- return this.state == State.RUNNING;
- }
-
- @Override
- public String toString() {
- if (this.error.isPresent()) {
- return this.state + "(" + this.error.get() + ")";
- } else {
- return this.state.toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/com/microsoft/reef/client/REEF.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/com/microsoft/reef/client/REEF.java b/reef-common/src/main/java/com/microsoft/reef/client/REEF.java
deleted file mode 100644
index 2e0f19d..0000000
--- a/reef-common/src/main/java/com/microsoft/reef/client/REEF.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.client;
-
-import com.microsoft.reef.annotations.audience.ClientSide;
-import com.microsoft.reef.annotations.audience.Public;
-import com.microsoft.reef.runtime.common.client.REEFImplementation;
-import com.microsoft.tang.Configuration;
-import com.microsoft.tang.annotations.DefaultImplementation;
-
-/**
- * The main entry point into the REEF resourcemanager.
- * <p/>
- * Every REEF resourcemanager provides an implementation of this interface. That
- * instance is used to submitTask the Driver class for execution to REEF. As with
- * all submissions in REEF, this is done in the form of a TANG Configuration
- * object.
- */
-@Public
-@ClientSide
-@DefaultImplementation(REEFImplementation.class)
-public interface REEF extends AutoCloseable {
-
- static final String REEF_VERSION = "0.6-SNAPSHOT";
-
- /**
- * Close the resourcemanager connection.
- */
- @Override
- public void close();
-
- /**
- * Submits the Driver set up in the given Configuration for execution.
- * <p/>
- * The Configuration needs to bind the Driver interface to an actual
- * implementation of that interface for the job at hand.
- *
- * @param driverConf The driver configuration: including everything it needs to execute. @see DriverConfiguration
- */
- public void submit(final Configuration driverConf);
-
- /**
- * @return the version of REEF running.
- */
- public String getVersion();
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/com/microsoft/reef/client/RunningJob.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/com/microsoft/reef/client/RunningJob.java b/reef-common/src/main/java/com/microsoft/reef/client/RunningJob.java
deleted file mode 100644
index d07860c..0000000
--- a/reef-common/src/main/java/com/microsoft/reef/client/RunningJob.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Copyright (C) 2014 Microsoft Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.microsoft.reef.client;
-
-import com.microsoft.reef.annotations.Provided;
-import com.microsoft.reef.annotations.audience.ClientSide;
-import com.microsoft.reef.annotations.audience.Public;
-import com.microsoft.reef.io.naming.Identifiable;
-import com.microsoft.reef.runtime.common.client.RunningJobImpl;
-import com.microsoft.tang.annotations.DefaultImplementation;
-
-/**
- * Represents a running REEF job.
- */
-@Public
-@ClientSide
-@Provided
-@DefaultImplementation(RunningJobImpl.class)
-public interface RunningJob extends Identifiable, AutoCloseable {
-
- /**
- * Cancels the running Job.
- */
- @Override
- public void close();
-
- /**
- * Cancels the running Job.
- *
- * @param message delivered along with cancel request.
- */
- public void close(final byte[] message);
-
- /**
- * @return the ID of the running job.
- */
- @Override
- public String getId();
-
- /**
- * Send a message to the Driver.
- *
- * @param message to send to the running driver
- */
- public void send(final byte[] message);
-}