You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/04/22 07:52:36 UTC

[2/3] incubator-reef git commit: [REEF-128] Replace the protocol buffer use in the runtime API with POJOs

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
index c0db65d..461f1c0 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
@@ -20,7 +20,6 @@ package org.apache.reef.runtime.common.driver.resourcemanager;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
 import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.driver.DriverStatusManager;
 import org.apache.reef.runtime.common.driver.idle.DriverIdleManager;
@@ -38,7 +37,7 @@ import java.util.logging.Logger;
  */
 @DriverSide
 @Private
-public final class ResourceManagerStatus implements EventHandler<DriverRuntimeProtocol.RuntimeStatusProto>,
+public final class ResourceManagerStatus implements EventHandler<RuntimeStatusEvent>,
     DriverIdlenessSource {
   private static final Logger LOG = Logger.getLogger(ResourceManagerStatus.class.getName());
 
@@ -64,22 +63,22 @@ public final class ResourceManagerStatus implements EventHandler<DriverRuntimePr
   }
 
   @Override
-  public synchronized void onNext(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
-    final ReefServiceProtos.State newState = runtimeStatusProto.getState();
-    LOG.log(Level.FINEST, "Runtime status " + runtimeStatusProto);
-    this.outstandingContainerRequests = runtimeStatusProto.getOutstandingContainerRequests();
-    this.containerAllocationCount = runtimeStatusProto.getContainerAllocationCount();
-    this.setState(runtimeStatusProto.getState());
+  public synchronized void onNext(final RuntimeStatusEvent runtimeStatusEvent) {
+    final ReefServiceProtos.State newState = runtimeStatusEvent.getState();
+    LOG.log(Level.FINEST, "Runtime status " + runtimeStatusEvent);
+    this.outstandingContainerRequests = runtimeStatusEvent.getOutstandingContainerRequests().get();
+    this.containerAllocationCount = runtimeStatusEvent.getContainerAllocationList().size();
+    this.setState(runtimeStatusEvent.getState());
 
     switch (newState) {
       case FAILED:
-        this.onRMFailure(runtimeStatusProto);
+        this.onRMFailure(runtimeStatusEvent);
         break;
       case DONE:
-        this.onRMDone(runtimeStatusProto);
+        this.onRMDone(runtimeStatusEvent);
         break;
       case RUNNING:
-        this.onRMRunning(runtimeStatusProto);
+        this.onRMRunning(runtimeStatusEvent);
         break;
     }
   }
@@ -110,19 +109,19 @@ public final class ResourceManagerStatus implements EventHandler<DriverRuntimePr
   }
 
 
-  private synchronized void onRMFailure(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
-    assert (runtimeStatusProto.getState() == ReefServiceProtos.State.FAILED);
-    this.resourceManagerErrorHandler.onNext(runtimeStatusProto.getError());
+  private synchronized void onRMFailure(final RuntimeStatusEvent runtimeStatusEvent) {
+    assert (runtimeStatusEvent.getState() == ReefServiceProtos.State.FAILED);
+    this.resourceManagerErrorHandler.onNext(runtimeStatusEvent.getError().get());
   }
 
-  private synchronized void onRMDone(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
-    assert (runtimeStatusProto.getState() == ReefServiceProtos.State.DONE);
+  private synchronized void onRMDone(final RuntimeStatusEvent runtimeStatusEvent) {
+    assert (runtimeStatusEvent.getState() == ReefServiceProtos.State.DONE);
     LOG.log(Level.INFO, "Resource Manager shutdown happened. Triggering Driver shutdown.");
     this.driverStatusManager.onComplete();
   }
 
-  private synchronized void onRMRunning(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
-    assert (runtimeStatusProto.getState() == ReefServiceProtos.State.RUNNING);
+  private synchronized void onRMRunning(final RuntimeStatusEvent runtimeStatusEvent) {
+    assert (runtimeStatusEvent.getState() == ReefServiceProtos.State.RUNNING);
     if (this.isIdle()) {
       this.driverIdleManager.get().onPotentiallyIdle(IDLE_MESSAGE);
     }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java
new file mode 100644
index 0000000..1c17b15
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.util.Optional;
+
+/**
+ * Event from Driver Runtime -> Driver Process
+ * Status of a resource in the cluster
+ */
+@RuntimeAuthor
+@DriverSide
+@DefaultImplementation(ResourceStatusEventImpl.class)
+public interface ResourceStatusEvent {
+  /**
+   * @return Id of the resource
+   */
+  String getIdentifier();
+
+  /**
+   * @return State of the resource
+   */
+  ReefServiceProtos.State getState();
+
+  /**
+   * @return Diagnostics from the resource
+   */
+  Optional<String> getDiagnostics();
+
+  /**
+   * @return Exit code of the resource, if it has exited
+   */
+  Optional<Integer> getExitCode();
+
+  /**
+   * @return If true, this resource is from a previous Driver (the Driver was restarted)
+   */
+  Optional<Boolean> getIsFromPreviousDriver();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java
new file mode 100644
index 0000000..7258c42
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.util.BuilderUtils;
+import org.apache.reef.util.Optional;
+
+/**
+ * Default POJO implementation of ResourceStatusEvent.
+ * Use newBuilder to construct an instance.
+ */
+public final class ResourceStatusEventImpl implements ResourceStatusEvent {
+  private final String identifier;
+  private final ReefServiceProtos.State state;
+  private final Optional<String> diagnostics;
+  private final Optional<Integer> exitCode;
+  private final Optional<Boolean> isFromPreviousDriver;
+
+  private ResourceStatusEventImpl(final Builder builder) {
+    this.identifier = BuilderUtils.notNull(builder.identifier);
+    this.state = BuilderUtils.notNull(builder.state);
+    this.diagnostics = Optional.ofNullable(builder.diagnostics);
+    this.exitCode = Optional.ofNullable(builder.exitCode);
+    this.isFromPreviousDriver = Optional.ofNullable(builder.isFromPreviousDriver);
+  }
+
+  @Override
+  public String getIdentifier() {
+    return identifier;
+  }
+
+  @Override
+  public ReefServiceProtos.State getState() {
+    return state;
+  }
+
+  @Override
+  public Optional<String> getDiagnostics() {
+    return diagnostics;
+  }
+
+  @Override
+  public Optional<Integer> getExitCode() {
+    return exitCode;
+  }
+
+  @Override
+  public Optional<Boolean> getIsFromPreviousDriver() {
+    return isFromPreviousDriver;
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder used to create ResourceStatusEvent instances.
+   */
+  public static final class Builder implements org.apache.reef.util.Builder<ResourceStatusEvent> {
+    private String identifier;
+    private ReefServiceProtos.State state;
+    private String diagnostics;
+    private Integer exitCode;
+    private Boolean isFromPreviousDriver;
+
+    /**
+     * @see ResourceStatusEvent#getIdentifier()
+     */
+    public Builder setIdentifier(final String identifier) {
+      this.identifier = identifier;
+      return this;
+    }
+
+    /**
+     * @see ResourceStatusEvent#getState()
+     */
+    public Builder setState(final ReefServiceProtos.State state) {
+      this.state = state;
+      return this;
+    }
+
+    /**
+     * @see ResourceStatusEvent#getDiagnostics()
+     */
+    public Builder setDiagnostics(final String diagnostics) {
+      this.diagnostics = diagnostics;
+      return this;
+    }
+
+    /**
+     * @see ResourceStatusEvent#getExitCode()
+     */
+    public Builder setExitCode(final int exitCode) {
+      this.exitCode = exitCode;
+      return this;
+    }
+
+    /**
+     * @see ResourceStatusEvent#getIsFromPreviousDriver()
+     */
+    public Builder setIsFromPreviousDriver(final boolean isFromPreviousDriver) {
+      this.isFromPreviousDriver = isFromPreviousDriver;
+      return this;
+    }
+
+    @Override
+    public ResourceStatusEvent build() {
+      return new ResourceStatusEventImpl(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
index 9abafbd..6d31824 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
@@ -19,7 +19,6 @@
 package org.apache.reef.runtime.common.driver.resourcemanager;
 
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
 import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
 import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManagerFactory;
 import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
@@ -33,7 +32,7 @@ import javax.inject.Inject;
  * about the current state of a given resource. Ideally, we should think the same thing.
  */
 @Private
-public final class ResourceStatusHandler implements EventHandler<DriverRuntimeProtocol.ResourceStatusProto> {
+public final class ResourceStatusHandler implements EventHandler<ResourceStatusEvent> {
 
   private final Evaluators evaluators;
   private final EvaluatorManagerFactory evaluatorManagerFactory;
@@ -49,21 +48,21 @@ public final class ResourceStatusHandler implements EventHandler<DriverRuntimePr
    * about the state of the resource executing an Evaluator; This method simply passes the message
    * off to the referenced EvaluatorManager
    *
-   * @param resourceStatusProto resource status message from the ResourceManager
+   * @param resourceStatusEvent resource status message from the ResourceManager
    */
   @Override
-  public void onNext(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) {
-    final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(resourceStatusProto.getIdentifier());
+  public void onNext(final ResourceStatusEvent resourceStatusEvent) {
+    final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(resourceStatusEvent.getIdentifier());
     if (evaluatorManager.isPresent()) {
-      evaluatorManager.get().onResourceStatusMessage(resourceStatusProto);
+      evaluatorManager.get().onResourceStatusMessage(resourceStatusEvent);
     } else {
-      if (resourceStatusProto.getIsFromPreviousDriver()) {
-        EvaluatorManager previousEvaluatorManager = this.evaluatorManagerFactory.createForEvaluatorFailedDuringDriverRestart(resourceStatusProto);
-        previousEvaluatorManager.onResourceStatusMessage(resourceStatusProto);
+      if (resourceStatusEvent.getIsFromPreviousDriver().get()) {
+        EvaluatorManager previousEvaluatorManager = this.evaluatorManagerFactory.createForEvaluatorFailedDuringDriverRestart(resourceStatusEvent);
+        previousEvaluatorManager.onResourceStatusMessage(resourceStatusEvent);
       } else {
         throw new RuntimeException(
-            "Unknown resource status from evaluator " + resourceStatusProto.getIdentifier() +
-                " with state " + resourceStatusProto.getState()
+            "Unknown resource status from evaluator " + resourceStatusEvent.getIdentifier() +
+                " with state " + resourceStatusEvent.getState()
         );
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java
new file mode 100644
index 0000000..bca7354
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.util.Optional;
+
+import java.util.List;
+
+/**
+ * Event from Driver Runtime -> Driver Process
+ * A status update from the Driver Runtime to the Driver Process
+ */
+@RuntimeAuthor
+@DriverSide
+@DefaultImplementation(RuntimeStatusEventImpl.class)
+public interface RuntimeStatusEvent {
+  /**
+   * @return Name of the Runtime
+   */
+  String getName();
+
+  /**
+   * @return State of the Runtime
+   */
+  ReefServiceProtos.State getState();
+
+  /**
+   * @return List of allocated containers
+   */
+  List<String> getContainerAllocationList();
+
+  /**
+   * @return Error from the Runtime
+   */
+  Optional<ReefServiceProtos.RuntimeErrorProto> getError();
+
+  /**
+   * @return Number of outstanding container requests
+   */
+  Optional<Integer> getOutstandingContainerRequests();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java
new file mode 100644
index 0000000..5d8060d
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.util.BuilderUtils;
+import org.apache.reef.util.Optional;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Default POJO implementation of RuntimeStatusEvent.
+ * Use newBuilder to construct an instance.
+ */
+public final class RuntimeStatusEventImpl implements RuntimeStatusEvent {
+  private final String name;
+  private final ReefServiceProtos.State state;
+  private final List<String> containerAllocationList;
+  private final Optional<ReefServiceProtos.RuntimeErrorProto> error;
+  private final Optional<Integer> outstandingContainerRequests;
+
+  private RuntimeStatusEventImpl(final Builder builder) {
+    this.name = BuilderUtils.notNull(builder.name);
+    this.state = BuilderUtils.notNull(builder.state);
+    this.containerAllocationList = BuilderUtils.notNull(builder.containerAllocationList);
+    this.error = Optional.ofNullable(builder.error);
+    this.outstandingContainerRequests = Optional.ofNullable(builder.outstandingContainerRequests);
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public ReefServiceProtos.State getState() {
+    return state;
+  }
+
+  @Override
+  public List<String> getContainerAllocationList() {
+    return containerAllocationList;
+  }
+
+  @Override
+  public Optional<ReefServiceProtos.RuntimeErrorProto> getError() {
+    return error;
+  }
+
+  @Override
+  public Optional<Integer> getOutstandingContainerRequests() {
+    return outstandingContainerRequests;
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder used to create RuntimeStatusEvent instances.
+   */
+  public static final class Builder implements org.apache.reef.util.Builder<RuntimeStatusEvent> {
+    private String name;
+    private ReefServiceProtos.State state;
+    private List<String> containerAllocationList = new ArrayList<>();
+    private ReefServiceProtos.RuntimeErrorProto error;
+    private Integer outstandingContainerRequests;
+
+    /**
+     * @see RuntimeStatusEvent#getName()
+     */
+    public Builder setName(final String name) {
+      this.name = name;
+      return this;
+    }
+
+    /**
+     * @see RuntimeStatusEvent#getState()
+     */
+    public Builder setState(final ReefServiceProtos.State state) {
+      this.state = state;
+      return this;
+    }
+
+    /**
+     * Add an entry to containerAllocationList
+     * @see RuntimeStatusEvent#getContainerAllocationList()
+     */
+    public Builder addContainerAllocation(final String containerAllocation) {
+      this.containerAllocationList.add(containerAllocation);
+      return this;
+    }
+
+    /**
+     * @see RuntimeStatusEvent#getError()
+     */
+    public Builder setError(final ReefServiceProtos.RuntimeErrorProto error) {
+      this.error = error;
+      return this;
+    }
+
+    /**
+     * @see RuntimeStatusEvent#getOutstandingContainerRequests()
+     */
+    public Builder setOutstandingContainerRequests(final int outstandingContainerRequests) {
+      this.outstandingContainerRequests = outstandingContainerRequests;
+      return this;
+    }
+
+    @Override
+    public RuntimeStatusEvent build() {
+      return new RuntimeStatusEventImpl(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResource.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResource.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResource.java
new file mode 100644
index 0000000..e088afd
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResource.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.files;
+
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+/**
+ * A File Resource with a FileType for use by Runtimes
+ */
+@RuntimeAuthor
+@DefaultImplementation(FileResourceImpl.class)
+public interface FileResource {
+
+  /**
+   * @return Type of the file
+   */
+  FileType getType();
+
+  /**
+   * @return Name of the file
+   */
+  String getName();
+
+  /**
+   * @return Path of the file
+   */
+  String getPath();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResourceImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResourceImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResourceImpl.java
new file mode 100644
index 0000000..2f617e8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResourceImpl.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.files;
+
+import org.apache.reef.util.BuilderUtils;
+
+/**
+ * Default POJO implementation of FileResource.
+ * Use newBuilder to construct an instance.
+ */
+public final class FileResourceImpl implements FileResource {
+  private final FileType type;
+  private final String name;
+  private final String path;
+
+  private FileResourceImpl(final Builder builder) {
+    this.type = BuilderUtils.notNull(builder.type);
+    this.name = BuilderUtils.notNull(builder.name);
+    this.path = BuilderUtils.notNull(builder.path);
+  }
+
+  @Override
+  public FileType getType() {
+    return type;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public String getPath() {
+    return path;
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder used to create FileResource instances.
+   */
+  public static final class Builder implements org.apache.reef.util.Builder<FileResource> {
+    private FileType type;
+    private String name;
+    private String path;
+
+    /**
+     * @see FileResource#getType()
+     */
+    public Builder setType(final FileType type) {
+      this.type = type;
+      return this;
+    }
+
+    /**
+     * @see FileResource#getName()
+     */
+    public Builder setName(final String name) {
+      this.name = name;
+      return this;
+    }
+
+    /**
+     * @see FileResource#getPath()
+     */
+    public Builder setPath(final String path) {
+      this.path = path;
+      return this;
+    }
+
+    @Override
+    public FileResource build() {
+      return new FileResourceImpl(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileType.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileType.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileType.java
new file mode 100644
index 0000000..1b729ad
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileType.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.files;
+
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+
+/**
+ * Type of a File Resource used by Runtimes
+ */
+@RuntimeAuthor
+public enum FileType {
+  PLAIN,
+  LIB,
+  ARCHIVE
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
index 5297158..ecf3aa8 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
@@ -21,8 +21,7 @@ package org.apache.reef.runtime.common.files;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.annotations.audience.RuntimeAuthor;
-import org.apache.reef.proto.ClientRuntimeProtocol;
-import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
 import org.apache.reef.runtime.common.parameters.DeleteTempFiles;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.annotations.Parameter;
@@ -59,13 +58,13 @@ public final class JobJarMaker {
     this.deleteTempFilesOnExit = deleteTempFilesOnExit;
   }
 
-  public static void copy(final Iterable<ReefServiceProtos.FileResourceProto> files, final File destinationFolder) {
+  public static void copy(final Iterable<FileResource> files, final File destinationFolder) {
 
     if (!destinationFolder.exists()) {
       destinationFolder.mkdirs();
     }
 
-    for (final ReefServiceProtos.FileResourceProto fileProto : files) {
+    for (final FileResource fileProto : files) {
       final File sourceFile = toFile(fileProto);
       final File destinationFile = new File(destinationFolder, fileProto.getName());
       if (destinationFile.exists()) {
@@ -89,12 +88,12 @@ public final class JobJarMaker {
     }
   }
 
-  private static File toFile(final ReefServiceProtos.FileResourceProto fileProto) {
+  private static File toFile(final FileResource fileProto) {
     return new File(fileProto.getPath());
   }
 
   public File createJobSubmissionJAR(
-      final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
+      final JobSubmissionEvent jobSubmissionEvent,
       final Configuration driverConfiguration) throws IOException {
 
     // Copy all files to a local job submission folder
@@ -104,8 +103,8 @@ public final class JobJarMaker {
     final File localFolder = new File(jobSubmissionFolder, this.fileNames.getLocalFolderName());
     final File globalFolder = new File(jobSubmissionFolder, this.fileNames.getGlobalFolderName());
 
-    this.copy(jobSubmissionProto.getGlobalFileList(), globalFolder);
-    this.copy(jobSubmissionProto.getLocalFileList(), localFolder);
+    this.copy(jobSubmissionEvent.getGlobalFileSet(), globalFolder);
+    this.copy(jobSubmissionEvent.getLocalFileSet(), localFolder);
 
     // Store the Driver Configuration in the JAR file.
     this.configurationSerializer.toFile(

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProcessType.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProcessType.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProcessType.java
new file mode 100644
index 0000000..286174e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProcessType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+/**
+ * The type of a process to be launched by the Runtime
+ */
+public enum ProcessType {
+    JVM,
+    CLR
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
index 8a81184..39a8f8d 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
@@ -52,9 +52,7 @@ public final class REEFMessageCodec implements Codec<GeneratedMessage> {
   public GeneratedMessage decode(final byte[] bytes) {
     try {
       final REEFProtocol.REEFMessage message = REEFProtocol.REEFMessage.parseFrom(bytes);
-      if (message.hasJobSubmission()) {
-        return message.getJobSubmission();
-      } else if (message.hasJobControl()) {
+      if (message.hasJobControl()) {
         return message.getJobControl();
       } else if (message.hasRuntimeError()) {
         return message.getRuntimeError();
@@ -75,9 +73,7 @@ public final class REEFMessageCodec implements Codec<GeneratedMessage> {
   public byte[] encode(final GeneratedMessage msg) {
     final REEFProtocol.REEFMessage.Builder message = REEFProtocol.REEFMessage.newBuilder();
 
-    if (msg instanceof ClientRuntimeProtocol.JobSubmissionProto) {
-      message.setJobSubmission((ClientRuntimeProtocol.JobSubmissionProto) msg);
-    } else if (msg instanceof ClientRuntimeProtocol.JobControlProto) {
+    if (msg instanceof ClientRuntimeProtocol.JobControlProto) {
       message.setJobControl((ClientRuntimeProtocol.JobControlProto) msg);
     } else if (msg instanceof ReefServiceProtos.RuntimeErrorProto) {
       message.setRuntimeError((ReefServiceProtos.RuntimeErrorProto) msg);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/util/BuilderUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/util/BuilderUtils.java b/lang/java/reef-common/src/main/java/org/apache/reef/util/BuilderUtils.java
new file mode 100644
index 0000000..68cd7ce
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/util/BuilderUtils.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util;
+
+/**
+ * Utilities for creating Builders
+ */
+public final class BuilderUtils {
+  /**
+   * Throws a runtime exception if the parameter is null
+   */
+  public static <T> T notNull(final T parameter) {
+    if (parameter == null) {
+      throw new IllegalArgumentException("required parameter");
+    } else {
+      return parameter;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/proto/client_runtime.proto
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/proto/client_runtime.proto b/lang/java/reef-common/src/main/proto/client_runtime.proto
index 36c648b..2846619 100644
--- a/lang/java/reef-common/src/main/proto/client_runtime.proto
+++ b/lang/java/reef-common/src/main/proto/client_runtime.proto
@@ -23,22 +23,6 @@ import "reef_service_protos.proto";
 
 // Messages from REEF Client -> Driver Runtime
 
-message JobSubmissionProto {
-	required string identifier     = 1; // the job identifier
-	required string remote_id      = 2; // the remote identifier
-	required string configuration  = 5; // the runtime configuration
-	required string user_name      = 6; // the user name
-
-  //optional SIZE   driver_size    = 7; // Removed in REEF 0.3 in favor of driver_memory below.
-  optional int32  driver_memory  = 8;
-  optional int32  priority       = 9;
-  optional string queue          = 10;
-
-	repeated FileResourceProto global_file = 11; // files that should be placed on the driver and all subsequent evaluators
-	repeated FileResourceProto local_File  = 12; // files that should be placed on the driver only
-
-}
-
 enum Signal {
 	SIG_TERMINATE = 1;
 	SIG_SUSPEND   = 2;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/proto/driver_runtime.proto
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/proto/driver_runtime.proto b/lang/java/reef-common/src/main/proto/driver_runtime.proto
deleted file mode 100644
index 64f5dc1..0000000
--- a/lang/java/reef-common/src/main/proto/driver_runtime.proto
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-option java_package = "org.apache.reef.proto";
-option java_outer_classname = "DriverRuntimeProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-
-import "reef_service_protos.proto";
-
-// Messages from Driver Runtime -> Driver Process
-
-message DriverProcessRegistrationProto {
-	required string remote_identifier = 1;
-}
-
-
-message NodeDescriptorProto {
-	required string identifier = 1;
-	required string host_name  = 2; // e.g., IP address
-	required int32 port        = 3; // e.g., IP port
-	required int32 memory_size = 4;
-	optional string rack_name  = 5; // e.g., /default-rack
-}
-
-message ResourceAllocationProto {
-	required string identifier     = 1; // e.g., the container id, or the thread id
-	required int32 resource_memory = 2; // megabytes
-	required string node_id        = 3;
-	optional int32 virtual_cores   = 4;
-}
-
-message ResourceStatusProto {
-	required string identifier   = 1;
-	required State  state        = 2;
-	optional string diagnostics  = 3;
-	optional int32  exit_code    = 4;
-	optional bool is_from_previous_driver = 5;
-}
-
-message RuntimeStatusProto {
-   required string name  = 1;   // e.g., local, yarn21
-   required State  state = 2;
-   optional RuntimeErrorProto error = 3; // runtime (e.g., YARN) error
-
-   optional int32 outstanding_container_requests = 5;
-   repeated string container_allocation = 6;
-}
-
-//////////////////////////////////////////////////////
-// Messages from Driver Process -> Driver Runtime
-
-message ResourceRequestProto {
-	// optional SIZE resource_size  = 1; // Removed in REEF 0.3 in favor of memory_size.
-    optional int32 memory_size    = 2; // Memory size of the evaluator in MB
-    optional int32 priority       = 3;
-    optional int32 virtual_cores  = 4;
-    required int32 resource_count = 5;
-	  repeated string node_name     = 6; // a list of specific nodes
-	  repeated string rack_name     = 7; // a list of specific racks
-
-    optional bool relax_locality = 10;
-}
-
-message ResourceReleaseProto {
-	required string identifier = 1;
-}
-
-message ResourceLaunchProto {
-	required string identifier      = 1;
-	required string remote_id       = 2;
-	required string evaluator_conf  = 3;
-  required ProcessType type       = 4;
-	repeated FileResourceProto file = 10;
-}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/proto/reef_protocol.proto
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/proto/reef_protocol.proto b/lang/java/reef-common/src/main/proto/reef_protocol.proto
index a8c793f..09c4476 100644
--- a/lang/java/reef-common/src/main/proto/reef_protocol.proto
+++ b/lang/java/reef-common/src/main/proto/reef_protocol.proto
@@ -30,8 +30,9 @@ option java_generate_equals_and_hash = true;
 option java_outer_classname = "REEFProtocol";
 
 message REEFMessage {
+    // Field 1 removed
+
     // Messages defined in client_runtime.proto
-    optional JobSubmissionProto jobSubmission = 1;
     optional JobControlProto jobControl = 2;
     // Messages defined in reef_service_protos.proto
     optional RuntimeErrorProto runtimeError = 3;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/proto/reef_service_protos.proto
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/proto/reef_service_protos.proto b/lang/java/reef-common/src/main/proto/reef_service_protos.proto
index 7494737..38d2b39 100644
--- a/lang/java/reef-common/src/main/proto/reef_service_protos.proto
+++ b/lang/java/reef-common/src/main/proto/reef_service_protos.proto
@@ -32,31 +32,6 @@ enum State {
     KILLED = 5;
 }
 
-enum FileType {
-    PLAIN = 0;
-    LIB = 1;
-    ARCHIVE = 2;
-}
-
-// Removed in REEF 0.3 in favor of explicit memory sizes.
-// enum SIZE {
-//    SMALL = 0;
-//    MEDIUM = 1;
-//    LARGE = 2;
-//    XLARGE = 3;
-//}
-
-enum ProcessType {
-    JVM = 0;
-    CLR = 1;
-}
-
-message FileResourceProto {
-    required FileType type = 1;
-    required string name = 2;
-    required string path = 3;
-}
-
 message RuntimeErrorProto {
     required string name = 1; // e.g., local, yarn21
     required string message = 2;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java b/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java
index ddbd83b..dce5f50 100644
--- a/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java
+++ b/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java
@@ -21,7 +21,7 @@ package org.apache.reef.runtime.common.driver;
 import org.apache.reef.driver.catalog.ResourceCatalog;
 import org.apache.reef.driver.evaluator.EvaluatorRequest;
 import org.apache.reef.driver.evaluator.EvaluatorRequestor;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
@@ -53,8 +53,8 @@ public class EvaluatorRequestorImplTest {
     final DummyRequestHandler requestHandler = new DummyRequestHandler();
     final EvaluatorRequestor evaluatorRequestor = new EvaluatorRequestorImpl(resourceCatalog, requestHandler, loggingScopeFactory);
     evaluatorRequestor.submit(EvaluatorRequest.newBuilder().setMemory(memory).build());
-    Assert.assertEquals("Memory request did not make it", requestHandler.get().getMemorySize(), memory);
-    Assert.assertEquals("Number of requests did not make it", requestHandler.get().getResourceCount(), 1);
+    Assert.assertEquals("Memory request did not make it", memory, requestHandler.get().getMemorySize().get().intValue());
+    Assert.assertEquals("Number of requests did not make it", 1, requestHandler.get().getResourceCount());
   }
 
   /**
@@ -67,8 +67,8 @@ public class EvaluatorRequestorImplTest {
     final DummyRequestHandler requestHandler = new DummyRequestHandler();
     final EvaluatorRequestor evaluatorRequestor = new EvaluatorRequestorImpl(resourceCatalog, requestHandler, loggingScopeFactory);
     evaluatorRequestor.submit(EvaluatorRequest.newBuilder().setMemory(memory).setNumber(count).build());
-    Assert.assertEquals("Memory request did not make it", requestHandler.get().getMemorySize(), memory);
-    Assert.assertEquals("Number of requests did not make it", requestHandler.get().getResourceCount(), count);
+    Assert.assertEquals("Memory request did not make it", memory, requestHandler.get().getMemorySize().get().intValue());
+    Assert.assertEquals("Number of requests did not make it", count, requestHandler.get().getResourceCount());
   }
 
   /**
@@ -96,14 +96,14 @@ public class EvaluatorRequestorImplTest {
   }
 
   private class DummyRequestHandler implements ResourceRequestHandler {
-    private DriverRuntimeProtocol.ResourceRequestProto request;
+    private ResourceRequestEvent request;
 
     @Override
-    public void onNext(DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) {
-      this.request = resourceRequestProto;
+    public void onNext(final ResourceRequestEvent resourceRequestEvent) {
+      this.request = resourceRequestEvent;
     }
 
-    public DriverRuntimeProtocol.ResourceRequestProto get() {
+    public ResourceRequestEvent get() {
       return this.request;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/catalog/CatalogTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/catalog/CatalogTest.java b/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/catalog/CatalogTest.java
index 66203b2..72503d0 100644
--- a/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/catalog/CatalogTest.java
+++ b/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/catalog/CatalogTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.reef.runtime.common.driver.catalog;
 
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -35,7 +35,7 @@ public final class CatalogTest {
     final ResourceCatalogImpl catalog = new ResourceCatalogImpl();
 
     for (int i = 0; i < nodes; i++) {
-      catalog.handle(DriverRuntimeProtocol.NodeDescriptorProto.newBuilder()
+      catalog.handle(NodeDescriptorEventImpl.newBuilder()
           .setRackName("test-rack")
           .setHostName("test-" + i)
           .setPort(0)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
index 7ac314a..dc56abf 100644
--- a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
@@ -22,7 +22,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
 import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.JobJarMaker;
@@ -33,7 +33,6 @@ import org.apache.reef.runtime.hdinsight.client.yarnrest.*;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Configurations;
 import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.tang.formats.ConfigurationSerializer;
 
 import javax.inject.Inject;
 import java.io.File;
@@ -54,7 +53,6 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler
   private final AzureUploader uploader;
   private final JobJarMaker jobJarMaker;
   private final HDInsightInstance hdInsightInstance;
-  private final ConfigurationSerializer configurationSerializer;
   private final REEFFileNames filenames;
   private final ClasspathProvider classpath;
   private final double jvmHeapSlack;
@@ -63,14 +61,12 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler
   HDInsightJobSubmissionHandler(final AzureUploader uploader,
                                 final JobJarMaker jobJarMaker,
                                 final HDInsightInstance hdInsightInstance,
-                                final ConfigurationSerializer configurationSerializer,
                                 final REEFFileNames filenames,
                                 final ClasspathProvider classpath,
                                 final @Parameter(JVMHeapSlack.class) double jvmHeapSlack) {
     this.uploader = uploader;
     this.jobJarMaker = jobJarMaker;
     this.hdInsightInstance = hdInsightInstance;
-    this.configurationSerializer = configurationSerializer;
     this.filenames = filenames;
     this.classpath = classpath;
     this.jvmHeapSlack = jvmHeapSlack;
@@ -82,7 +78,7 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler
   }
 
   @Override
-  public void onNext(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
+  public void onNext(final JobSubmissionEvent jobSubmissionEvent) {
 
     try {
 
@@ -96,25 +92,25 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler
 
       LOG.log(Level.FINE, "Assembling Configuration for the Driver.");
       final Configuration driverConfiguration =
-          makeDriverConfiguration(jobSubmissionProto, applicationID.getId(), jobFolderURL);
+          makeDriverConfiguration(jobSubmissionEvent, applicationID.getId(), jobFolderURL);
 
       LOG.log(Level.FINE, "Making Job JAR.");
       final File jobSubmissionJarFile =
-          this.jobJarMaker.createJobSubmissionJAR(jobSubmissionProto, driverConfiguration);
+          this.jobJarMaker.createJobSubmissionJAR(jobSubmissionEvent, driverConfiguration);
 
       LOG.log(Level.FINE, "Uploading Job JAR to Azure.");
       final FileResource uploadedFile = this.uploader.uploadFile(jobSubmissionJarFile);
 
       LOG.log(Level.FINE, "Assembling application submission.");
-      final String command = getCommandString(jobSubmissionProto);
+      final String command = getCommandString(jobSubmissionEvent);
 
       final ApplicationSubmission applicationSubmission = new ApplicationSubmission()
           .setApplicationId(applicationID.getId())
-          .setApplicationName(jobSubmissionProto.getIdentifier())
-          .setResource(getResource(jobSubmissionProto))
+          .setApplicationName(jobSubmissionEvent.getIdentifier())
+          .setResource(getResource(jobSubmissionEvent))
           .setContainerInfo(new ContainerInfo()
-              .addFileResource(this.filenames.getREEFFolderName(), uploadedFile)
-              .addCommand(command));
+                  .addFileResource(this.filenames.getREEFFolderName(), uploadedFile)
+                  .addCommand(command));
 
       this.hdInsightInstance.submitApplication(applicationSubmission);
       LOG.log(Level.INFO, "Submitted application to HDInsight. The application id is: {0}", applicationID.getId());
@@ -126,13 +122,13 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler
   }
 
   /**
-   * Extracts the resource demands from the jobSubmissionProto.
+   * Extracts the resource demands from the jobSubmissionEvent.
    */
   private final Resource getResource(
-      final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
+      final JobSubmissionEvent jobSubmissionEvent) {
 
     return new Resource()
-        .setMemory(String.valueOf(jobSubmissionProto.getDriverMemory()))
+        .setMemory(String.valueOf(jobSubmissionEvent.getDriverMemory().get()))
         .setvCores("1");
   }
 
@@ -140,30 +136,30 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler
    * Assembles the command to execute the Driver.
    */
   private String getCommandString(
-      final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
-    return StringUtils.join(getCommandList(jobSubmissionProto), ' ');
+      final JobSubmissionEvent jobSubmissionEvent) {
+    return StringUtils.join(getCommandList(jobSubmissionEvent), ' ');
   }
 
   /**
    * Assembles the command to execute the Driver in list form.
    */
   private List<String> getCommandList(
-      final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
+      final JobSubmissionEvent jobSubmissionEvent) {
 
     return new JavaLaunchCommandBuilder()
         .setJavaPath("%JAVA_HOME%/bin/java")
-        .setErrorHandlerRID(jobSubmissionProto.getRemoteId())
-        .setLaunchID(jobSubmissionProto.getIdentifier())
+        .setErrorHandlerRID(jobSubmissionEvent.getRemoteId())
+        .setLaunchID(jobSubmissionEvent.getIdentifier())
         .setConfigurationFileName(this.filenames.getDriverConfigurationPath())
         .setClassPath(this.classpath.getDriverClasspath())
-        .setMemory(jobSubmissionProto.getDriverMemory())
+        .setMemory(jobSubmissionEvent.getDriverMemory().get())
         .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStderrFileName())
         .setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStdoutFileName())
         .build();
   }
 
   private Configuration makeDriverConfiguration(
-      final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
+      final JobSubmissionEvent jobSubmissionEvent,
       final String applicationId,
       final String jobFolderURL) throws IOException {
 
@@ -174,7 +170,7 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler
         .build();
 
     return Configurations.merge(
-        this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration()),
+        jobSubmissionEvent.getConfiguration(),
         hdinsightDriverConfiguration);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverFiles.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverFiles.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverFiles.java
index c54f9e0..42ced95 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverFiles.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverFiles.java
@@ -18,8 +18,9 @@
  */
 package org.apache.reef.runtime.local.client;
 
-import org.apache.reef.proto.ClientRuntimeProtocol;
-import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
+import org.apache.reef.runtime.common.files.FileResource;
+import org.apache.reef.runtime.common.files.FileType;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.tang.formats.ConfigurationModule;
 import org.apache.reef.tang.formats.OptionalParameter;
@@ -51,28 +52,28 @@ final class DriverFiles {
   /**
    * Instantiates an instance based on the given JobSubmissionProto.
    *
-   * @param jobSubmissionProto the JobSubmissionProto to parse.
+   * @param jobSubmissionEvent the JobSubmissionProto to parse.
    * @return a DriverFiles instance pre-populated with the information from the given JobSubmissionProto.
    * @throws IOException
    */
   public static DriverFiles fromJobSubmission(
-      final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
+      final JobSubmissionEvent jobSubmissionEvent,
       final REEFFileNames fileNames) throws IOException {
 
     final DriverFiles driverFiles = new DriverFiles(fileNames);
 
-    for (final ReefServiceProtos.FileResourceProto frp : jobSubmissionProto.getGlobalFileList()) {
+    for (final FileResource frp : jobSubmissionEvent.getGlobalFileSet()) {
       final File f = new File(frp.getPath());
-      if (frp.getType() == ReefServiceProtos.FileType.LIB) {
+      if (frp.getType() == FileType.LIB) {
         driverFiles.addGlobalLib(f);
       } else {
         driverFiles.addGlobalFile(f);
       }
     }
 
-    for (final ReefServiceProtos.FileResourceProto frp : jobSubmissionProto.getLocalFileList()) {
+    for (final FileResource frp : jobSubmissionEvent.getLocalFileSet()) {
       final File f = new File(frp.getPath());
-      if (frp.getType() == ReefServiceProtos.FileType.LIB) {
+      if (frp.getType() == FileType.LIB) {
         driverFiles.addLocalLib(f);
       } else {
         driverFiles.addLocalFile(f);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
index d417fce..f4befc1 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
@@ -20,14 +20,11 @@ package org.apache.reef.runtime.local.client;
 
 import org.apache.reef.annotations.audience.ClientSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
 import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
 import org.apache.reef.runtime.local.client.parameters.RootFolder;
-import org.apache.reef.runtime.local.process.LoggingRunnableProcessObserver;
-import org.apache.reef.runtime.local.process.RunnableProcess;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.formats.ConfigurationSerializer;
@@ -53,7 +50,6 @@ final class LocalJobSubmissionHandler implements JobSubmissionHandler {
   private final String rootFolderName;
   private final ConfigurationSerializer configurationSerializer;
   private final REEFFileNames fileNames;
-  private final ClasspathProvider classpath;
   private final PreparedDriverFolderLauncher driverLauncher;
   private final LoggingScopeFactory loggingScopeFactory;
   private final DriverConfigurationProvider driverConfigurationProvider;
@@ -64,7 +60,6 @@ final class LocalJobSubmissionHandler implements JobSubmissionHandler {
       final @Parameter(RootFolder.class) String rootFolderName,
       final ConfigurationSerializer configurationSerializer,
       final REEFFileNames fileNames,
-      final ClasspathProvider classpath,
 
       final PreparedDriverFolderLauncher driverLauncher,
       final LoggingScopeFactory loggingScopeFactory,
@@ -73,7 +68,6 @@ final class LocalJobSubmissionHandler implements JobSubmissionHandler {
     this.executor = executor;
     this.configurationSerializer = configurationSerializer;
     this.fileNames = fileNames;
-    this.classpath = classpath;
 
     this.driverLauncher = driverLauncher;
     this.driverConfigurationProvider = driverConfigurationProvider;
@@ -89,7 +83,7 @@ final class LocalJobSubmissionHandler implements JobSubmissionHandler {
   }
 
   @Override
-  public final void onNext(final ClientRuntimeProtocol.JobSubmissionProto t) {
+  public final void onNext(final JobSubmissionEvent t) {
     try (final LoggingScope lf = loggingScopeFactory.localJobSubmission()) {
       try {
         LOG.log(Level.FINEST, "Starting local job {0}", t.getIdentifier());
@@ -104,8 +98,7 @@ final class LocalJobSubmissionHandler implements JobSubmissionHandler {
         driverFiles.copyTo(driverFolder);
 
         final Configuration driverConfiguration = this.driverConfigurationProvider
-            .getDriverConfiguration(jobFolder, t.getRemoteId(), t.getIdentifier(),
-                configurationSerializer.fromString(t.getConfiguration()));
+            .getDriverConfiguration(jobFolder, t.getRemoteId(), t.getIdentifier(), t.getConfiguration());
 
         this.configurationSerializer.toFile(driverConfiguration,
             new File(driverFolder, this.fileNames.getDriverConfigurationPath()));

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
index 65290c1..9127da9 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
@@ -21,9 +21,10 @@ package org.apache.reef.runtime.local.driver;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.client.FailedRuntime;
-import org.apache.reef.proto.DriverRuntimeProtocol;
 import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent;
+import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.common.utils.RemoteManager;
 import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators;
@@ -68,7 +69,7 @@ final class ContainerManager implements AutoCloseable {
 
   private final String errorHandlerRID;
   private final int capacity;
-  private final EventHandler<DriverRuntimeProtocol.NodeDescriptorProto> nodeDescriptorHandler;
+  private final EventHandler<NodeDescriptorEvent> nodeDescriptorHandler;
   private final File rootFolder;
   private final REEFFileNames fileNames;
   private final ReefRunnableProcessObserver processObserver;
@@ -82,10 +83,9 @@ final class ContainerManager implements AutoCloseable {
       final @Parameter(MaxNumberOfEvaluators.class) int capacity,
       final @Parameter(RootFolder.class) String rootFolderName,
       final @Parameter(RuntimeParameters.NodeDescriptorHandler.class)
-      EventHandler<DriverRuntimeProtocol.NodeDescriptorProto> nodeDescriptorHandler,
+      EventHandler<NodeDescriptorEvent> nodeDescriptorHandler,
       final ReefRunnableProcessObserver processObserver,
       final LocalAddressProvider localAddressProvider) {
-
     this.capacity = capacity;
     this.fileNames = fileNames;
     this.processObserver = processObserver;
@@ -131,7 +131,7 @@ final class ContainerManager implements AutoCloseable {
     for (int i = 0; i < capacity; i++) {
       final String id = idmaker.getNextID();
       this.freeNodeList.add(id);
-      nodeDescriptorHandler.onNext(DriverRuntimeProtocol.NodeDescriptorProto.newBuilder()
+      nodeDescriptorHandler.onNext(NodeDescriptorEventImpl.newBuilder()
           .setIdentifier(id)
           .setRackName("/default-rack")
           .setHostName(this.localAddress)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceLaunchHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceLaunchHandler.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceLaunchHandler.java
index be3ead8..bf4a177 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceLaunchHandler.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceLaunchHandler.java
@@ -20,7 +20,7 @@ package org.apache.reef.runtime.local.driver;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
 
 import javax.inject.Inject;
@@ -40,7 +40,7 @@ final class LocalResourceLaunchHandler implements ResourceLaunchHandler {
   }
 
   @Override
-  public void onNext(final DriverRuntimeProtocol.ResourceLaunchProto t) {
+  public void onNext(final ResourceLaunchEvent t) {
     this.resourceManager.onResourceLaunchRequest(t);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceReleaseHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceReleaseHandler.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceReleaseHandler.java
index 04c2730..514a297 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceReleaseHandler.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceReleaseHandler.java
@@ -20,7 +20,7 @@ package org.apache.reef.runtime.local.driver;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
 
 import javax.inject.Inject;
@@ -41,7 +41,7 @@ public final class LocalResourceReleaseHandler implements ResourceReleaseHandler
   }
 
   @Override
-  public void onNext(final DriverRuntimeProtocol.ResourceReleaseProto t) {
+  public void onNext(final ResourceReleaseEvent t) {
     this.resourceManager.onResourceReleaseRequest(t);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceRequestHandler.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceRequestHandler.java
index a08b05c..99ae748 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceRequestHandler.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceRequestHandler.java
@@ -20,7 +20,7 @@ package org.apache.reef.runtime.local.driver;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
 
 import javax.inject.Inject;
@@ -40,7 +40,7 @@ final class LocalResourceRequestHandler implements ResourceRequestHandler {
   }
 
   @Override
-  public void onNext(final DriverRuntimeProtocol.ResourceRequestProto t) {
+  public void onNext(final ResourceRequestEvent t) {
     this.resourceManager.onResourceRequest(t);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
index e30eba3..7d190cb 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
@@ -20,10 +20,17 @@ package org.apache.reef.runtime.local.driver;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
 import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
 import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl;
+import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent;
+import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
 import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.FileResource;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.common.launch.CLRLaunchCommandBuilder;
 import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
@@ -58,9 +65,9 @@ public final class ResourceManager {
 
   private final ResourceRequestQueue requestQueue = new ResourceRequestQueue();
 
-  private final EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> allocationHandler;
+  private final EventHandler<ResourceAllocationEvent> allocationHandler;
   private final ContainerManager theContainers;
-  private final EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusHandlerEventHandler;
+  private final EventHandler<RuntimeStatusEvent> runtimeStatusHandlerEventHandler;
   private final int defaultMemorySize;
   private final int defaultNumberOfCores;
   private final ConfigurationSerializer configurationSerializer;
@@ -73,8 +80,8 @@ public final class ResourceManager {
   @Inject
   ResourceManager(
       final ContainerManager containerManager,
-      final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> allocationHandler,
-      final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusHandlerEventHandler,
+      final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<ResourceAllocationEvent> allocationHandler,
+      final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<RuntimeStatusEvent> runtimeStatusHandlerEventHandler,
       final @Parameter(DefaultMemorySize.class) int defaultMemorySize,
       final @Parameter(DefaultNumberOfCores.class) int defaultNumberOfCores,
       final @Parameter(JVMHeapSlack.class) double jvmHeapSlack,
@@ -105,9 +112,9 @@ public final class ResourceManager {
    * @param launchRequest the ResourceLaunchProto to parse
    * @return a list of files set in the given ResourceLaunchProto
    */
-  private static List<File> getLocalFiles(final DriverRuntimeProtocol.ResourceLaunchProto launchRequest) {
+  private static List<File> getLocalFiles(final ResourceLaunchEvent launchRequest) {
     final List<File> files = new ArrayList<>();  // Libraries local to this evaluator
-    for (final ReefServiceProtos.FileResourceProto frp : launchRequest.getFileList()) {
+    for (final FileResource frp : launchRequest.getFileSet()) {
       files.add(new File(frp.getPath()).getAbsoluteFile());
     }
     return files;
@@ -120,7 +127,7 @@ public final class ResourceManager {
    *
    * @param resourceRequest the resource request to be handled.
    */
-  final void onResourceRequest(final DriverRuntimeProtocol.ResourceRequestProto resourceRequest) {
+  final void onResourceRequest(final ResourceRequestEvent resourceRequest) {
     synchronized (this.theContainers) {
       this.requestQueue.add(new ResourceRequest(resourceRequest));
       this.checkRequestQueue();
@@ -132,7 +139,7 @@ public final class ResourceManager {
    *
    * @param releaseRequest the release request to be processed
    */
-  final void onResourceReleaseRequest(final DriverRuntimeProtocol.ResourceReleaseProto releaseRequest) {
+  final void onResourceReleaseRequest(final ResourceReleaseEvent releaseRequest) {
     synchronized (this.theContainers) {
       LOG.log(Level.FINEST, "Release container: {0}", releaseRequest.getIdentifier());
       this.theContainers.release(releaseRequest.getIdentifier());
@@ -158,7 +165,7 @@ public final class ResourceManager {
    * @param launchRequest the launch request to be processed.
    */
   final void onResourceLaunchRequest(
-      final DriverRuntimeProtocol.ResourceLaunchProto launchRequest) {
+      final ResourceLaunchEvent launchRequest) {
 
     synchronized (this.theContainers) {
 
@@ -173,8 +180,7 @@ public final class ResourceManager {
         final File evaluatorConfigurationFile = new File(c.getFolder(), fileNames.getEvaluatorConfigurationPath());
 
         try {
-          this.configurationSerializer.toFile(this.configurationSerializer.fromString(launchRequest.getEvaluatorConf()),
-              evaluatorConfigurationFile);
+          this.configurationSerializer.toFile(launchRequest.getEvaluatorConf(), evaluatorConfigurationFile);
         } catch (final IOException | BindException e) {
           throw new RuntimeException("Unable to write configuration.", e);
         }
@@ -218,16 +224,16 @@ public final class ResourceManager {
     if (this.theContainers.hasContainerAvailable() && this.requestQueue.hasOutStandingRequests()) {
 
       // Record the satisfaction of one request and get its details.
-      final DriverRuntimeProtocol.ResourceRequestProto requestProto = this.requestQueue.satisfyOne();
+      final ResourceRequestEvent requestEvent = this.requestQueue.satisfyOne();
 
       // Allocate a Container
       final Container container = this.theContainers.allocateOne(
-          requestProto.hasMemorySize() ? requestProto.getMemorySize() : this.defaultMemorySize,
-          requestProto.hasVirtualCores() ? requestProto.getVirtualCores() : this.defaultNumberOfCores);
+              requestEvent.getMemorySize().orElse(this.defaultMemorySize),
+              requestEvent.getVirtualCores().orElse(this.defaultNumberOfCores));
 
       // Tell the receivers about it
-      final DriverRuntimeProtocol.ResourceAllocationProto alloc =
-          DriverRuntimeProtocol.ResourceAllocationProto.newBuilder()
+      final ResourceAllocationEvent alloc =
+          ResourceAllocationEventImpl.newBuilder()
               .setIdentifier(container.getContainerID())
               .setNodeId(container.getNodeID())
               .setResourceMemory(container.getMemory())
@@ -250,16 +256,18 @@ public final class ResourceManager {
 
   private void sendRuntimeStatus() {
 
-    final DriverRuntimeProtocol.RuntimeStatusProto msg =
-        DriverRuntimeProtocol.RuntimeStatusProto.newBuilder()
+    final RuntimeStatusEventImpl.Builder builder =
+        RuntimeStatusEventImpl.newBuilder()
             .setName("LOCAL")
             .setState(ReefServiceProtos.State.RUNNING)
-            .setOutstandingContainerRequests(this.requestQueue.getNumberOfOutstandingRequests())
-            .addAllContainerAllocation(this.theContainers.getAllocatedContainerIDs())
-            .build();
+            .setOutstandingContainerRequests(this.requestQueue.getNumberOfOutstandingRequests());
+    for (final String containerAllocation : this.theContainers.getAllocatedContainerIDs()) {
+      builder.addContainerAllocation(containerAllocation);
+    }
+    final RuntimeStatusEvent msg = builder.build();
 
     LOG.log(Level.INFO, "Allocated: {0}, Outstanding requests: {1}",
-        new Object[]{msg.getContainerAllocationCount(), msg.getOutstandingContainerRequests()});
+        new Object[]{msg.getContainerAllocationList().size(), msg.getOutstandingContainerRequests()});
     this.runtimeStatusHandlerEventHandler.onNext(msg);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java
index bbb20a3..4b275cb 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java
@@ -20,21 +20,21 @@ package org.apache.reef.runtime.local.driver;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
 
 /**
- * Manages a ResourceRequestProto and its satisfaction.
+ * Manages a ResourceRequestEvent and its satisfaction.
  */
 @Private
 @DriverSide
 final class ResourceRequest {
 
-  private final DriverRuntimeProtocol.ResourceRequestProto req;
+  private final ResourceRequestEvent req;
   private int satisfied = 0;
 
-  ResourceRequest(final DriverRuntimeProtocol.ResourceRequestProto req) {
+  ResourceRequest(final ResourceRequestEvent req) {
     if (null == req) {
-      throw new IllegalArgumentException("Can't instantiate a ResourceRequest without a ResourceRequestProto");
+      throw new IllegalArgumentException("Can't instantiate a ResourceRequest without a ResourceRequestEvent");
     }
     this.req = req;
   }
@@ -57,7 +57,7 @@ final class ResourceRequest {
     return this.satisfied == req.getResourceCount();
   }
 
-  final DriverRuntimeProtocol.ResourceRequestProto getRequestProto() {
+  final ResourceRequestEvent getRequestProto() {
     return this.req;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
index c2a87ff..dcebe20 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
@@ -20,7 +20,7 @@ package org.apache.reef.runtime.local.driver;
 
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -54,7 +54,7 @@ final class ResourceRequestQueue {
    * Satisfies one resource for the front-most request. If that satisfies the
    * request, it is removed from the queue.
    */
-  final synchronized DriverRuntimeProtocol.ResourceRequestProto satisfyOne() {
+  final synchronized ResourceRequestEvent satisfyOne() {
     final ResourceRequest req = this.requestQueue.element();
     req.satisfyOne();
     if (req.isSatisfied()) {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
index 36ee916..3d5051a 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
@@ -19,9 +19,10 @@
 package org.apache.reef.runtime.local.process;
 
 import net.jcip.annotations.ThreadSafe;
-import org.apache.reef.proto.DriverRuntimeProtocol;
 import org.apache.reef.proto.ReefServiceProtos;
 import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
 import org.apache.reef.runtime.local.driver.ResourceManager;
 import org.apache.reef.tang.InjectionFuture;
 import org.apache.reef.tang.annotations.Parameter;
@@ -38,7 +39,7 @@ import java.util.logging.Logger;
 public final class ReefRunnableProcessObserver implements RunnableProcessObserver {
   private static final Logger LOG = Logger.getLogger(ReefRunnableProcessObserver.class.getName());
 
-  private final EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler;
+  private final EventHandler<ResourceStatusEvent> resourceStatusHandler;
   private final InjectionFuture<ResourceManager> resourceManager;
 
   /**
@@ -46,7 +47,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve
    */
   @Inject
   public ReefRunnableProcessObserver(final @Parameter(RuntimeParameters.ResourceStatusHandler.class)
-                                     EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler,
+                                     EventHandler<ResourceStatusEvent> resourceStatusHandler,
                                      final InjectionFuture<ResourceManager> resourceManager) {
     this.resourceStatusHandler = resourceStatusHandler;
     this.resourceManager = resourceManager;
@@ -55,7 +56,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve
   @Override
   public void onProcessStarted(final String processId) {
     this.onResourceStatus(
-        DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
+        ResourceStatusEventImpl.newBuilder()
             .setIdentifier(processId)
             .setState(ReefServiceProtos.State.RUNNING)
             .build()
@@ -84,7 +85,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve
    */
   private void onCleanExit(final String processId) {
     this.onResourceStatus(
-        DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
+        ResourceStatusEventImpl.newBuilder()
             .setIdentifier(processId)
             .setState(ReefServiceProtos.State.DONE)
             .setExitCode(0)
@@ -100,7 +101,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve
    */
   private void onUncleanExit(final String processId, final int exitCode) {
     this.onResourceStatus(
-        DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
+        ResourceStatusEventImpl.newBuilder()
             .setIdentifier(processId)
             .setState(ReefServiceProtos.State.FAILED)
             .setExitCode(exitCode)
@@ -108,7 +109,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve
     );
   }
 
-  private void onResourceStatus(final DriverRuntimeProtocol.ResourceStatusProto resourceStatus) {
+  private void onResourceStatus(final ResourceStatusEvent resourceStatus) {
     LOG.log(Level.INFO, "Sending resource status: {0} ", resourceStatus);
 
     // Here, we introduce an arbitrary wait. This is to make sure that at the exit of an Evaluator, the last