You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/04/22 07:52:36 UTC
[2/3] incubator-reef git commit: [REEF-128] Replace the protocol
buffer use in the runtime API with POJOs
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
index c0db65d..461f1c0 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
@@ -20,7 +20,6 @@ package org.apache.reef.runtime.common.driver.resourcemanager;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.driver.DriverStatusManager;
import org.apache.reef.runtime.common.driver.idle.DriverIdleManager;
@@ -38,7 +37,7 @@ import java.util.logging.Logger;
*/
@DriverSide
@Private
-public final class ResourceManagerStatus implements EventHandler<DriverRuntimeProtocol.RuntimeStatusProto>,
+public final class ResourceManagerStatus implements EventHandler<RuntimeStatusEvent>,
DriverIdlenessSource {
private static final Logger LOG = Logger.getLogger(ResourceManagerStatus.class.getName());
@@ -64,22 +63,22 @@ public final class ResourceManagerStatus implements EventHandler<DriverRuntimePr
}
@Override
- public synchronized void onNext(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
- final ReefServiceProtos.State newState = runtimeStatusProto.getState();
- LOG.log(Level.FINEST, "Runtime status " + runtimeStatusProto);
- this.outstandingContainerRequests = runtimeStatusProto.getOutstandingContainerRequests();
- this.containerAllocationCount = runtimeStatusProto.getContainerAllocationCount();
- this.setState(runtimeStatusProto.getState());
+ public synchronized void onNext(final RuntimeStatusEvent runtimeStatusEvent) {
+ final ReefServiceProtos.State newState = runtimeStatusEvent.getState();
+ LOG.log(Level.FINEST, "Runtime status " + runtimeStatusEvent);
+ this.outstandingContainerRequests = runtimeStatusEvent.getOutstandingContainerRequests().get();
+ this.containerAllocationCount = runtimeStatusEvent.getContainerAllocationList().size();
+ this.setState(runtimeStatusEvent.getState());
switch (newState) {
case FAILED:
- this.onRMFailure(runtimeStatusProto);
+ this.onRMFailure(runtimeStatusEvent);
break;
case DONE:
- this.onRMDone(runtimeStatusProto);
+ this.onRMDone(runtimeStatusEvent);
break;
case RUNNING:
- this.onRMRunning(runtimeStatusProto);
+ this.onRMRunning(runtimeStatusEvent);
break;
}
}
@@ -110,19 +109,19 @@ public final class ResourceManagerStatus implements EventHandler<DriverRuntimePr
}
- private synchronized void onRMFailure(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
- assert (runtimeStatusProto.getState() == ReefServiceProtos.State.FAILED);
- this.resourceManagerErrorHandler.onNext(runtimeStatusProto.getError());
+ private synchronized void onRMFailure(final RuntimeStatusEvent runtimeStatusEvent) {
+ assert (runtimeStatusEvent.getState() == ReefServiceProtos.State.FAILED);
+ this.resourceManagerErrorHandler.onNext(runtimeStatusEvent.getError().get());
}
- private synchronized void onRMDone(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
- assert (runtimeStatusProto.getState() == ReefServiceProtos.State.DONE);
+ private synchronized void onRMDone(final RuntimeStatusEvent runtimeStatusEvent) {
+ assert (runtimeStatusEvent.getState() == ReefServiceProtos.State.DONE);
LOG.log(Level.INFO, "Resource Manager shutdown happened. Triggering Driver shutdown.");
this.driverStatusManager.onComplete();
}
- private synchronized void onRMRunning(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
- assert (runtimeStatusProto.getState() == ReefServiceProtos.State.RUNNING);
+ private synchronized void onRMRunning(final RuntimeStatusEvent runtimeStatusEvent) {
+ assert (runtimeStatusEvent.getState() == ReefServiceProtos.State.RUNNING);
if (this.isIdle()) {
this.driverIdleManager.get().onPotentiallyIdle(IDLE_MESSAGE);
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java
new file mode 100644
index 0000000..1c17b15
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEvent.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.util.Optional;
+
+/**
+ * Event from Driver Runtime -> Driver Process
+ * Status of a resource in the cluster
+ */
+@RuntimeAuthor
+@DriverSide
+@DefaultImplementation(ResourceStatusEventImpl.class)
+public interface ResourceStatusEvent {
+ /**
+ * @return Id of the resource
+ */
+ String getIdentifier();
+
+ /**
+ * @return State of the resource
+ */
+ ReefServiceProtos.State getState();
+
+ /**
+ * @return Diagnostics from the resource
+ */
+ Optional<String> getDiagnostics();
+
+ /**
+ * @return Exit code of the resource, if it has exited
+ */
+ Optional<Integer> getExitCode();
+
+ /**
+ * @return If true, this resource is from a previous Driver (the Driver was restarted)
+ */
+ Optional<Boolean> getIsFromPreviousDriver();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java
new file mode 100644
index 0000000..7258c42
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusEventImpl.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.util.BuilderUtils;
+import org.apache.reef.util.Optional;
+
+/**
+ * Default POJO implementation of ResourceStatusEvent.
+ * Use newBuilder to construct an instance.
+ */
+public final class ResourceStatusEventImpl implements ResourceStatusEvent {
+ private final String identifier;
+ private final ReefServiceProtos.State state;
+ private final Optional<String> diagnostics;
+ private final Optional<Integer> exitCode;
+ private final Optional<Boolean> isFromPreviousDriver;
+
+ private ResourceStatusEventImpl(final Builder builder) {
+ this.identifier = BuilderUtils.notNull(builder.identifier);
+ this.state = BuilderUtils.notNull(builder.state);
+ this.diagnostics = Optional.ofNullable(builder.diagnostics);
+ this.exitCode = Optional.ofNullable(builder.exitCode);
+ this.isFromPreviousDriver = Optional.ofNullable(builder.isFromPreviousDriver);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public ReefServiceProtos.State getState() {
+ return state;
+ }
+
+ @Override
+ public Optional<String> getDiagnostics() {
+ return diagnostics;
+ }
+
+ @Override
+ public Optional<Integer> getExitCode() {
+ return exitCode;
+ }
+
+ @Override
+ public Optional<Boolean> getIsFromPreviousDriver() {
+ return isFromPreviousDriver;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder used to create ResourceStatusEvent instances.
+ */
+ public static final class Builder implements org.apache.reef.util.Builder<ResourceStatusEvent> {
+ private String identifier;
+ private ReefServiceProtos.State state;
+ private String diagnostics;
+ private Integer exitCode;
+ private Boolean isFromPreviousDriver;
+
+ /**
+ * @see ResourceStatusEvent#getIdentifier()
+ */
+ public Builder setIdentifier(final String identifier) {
+ this.identifier = identifier;
+ return this;
+ }
+
+ /**
+ * @see ResourceStatusEvent#getState()
+ */
+ public Builder setState(final ReefServiceProtos.State state) {
+ this.state = state;
+ return this;
+ }
+
+ /**
+ * @see ResourceStatusEvent#getDiagnostics()
+ */
+ public Builder setDiagnostics(final String diagnostics) {
+ this.diagnostics = diagnostics;
+ return this;
+ }
+
+ /**
+ * @see ResourceStatusEvent#getExitCode()
+ */
+ public Builder setExitCode(final int exitCode) {
+ this.exitCode = exitCode;
+ return this;
+ }
+
+ /**
+ * @see ResourceStatusEvent#getIsFromPreviousDriver()
+ */
+ public Builder setIsFromPreviousDriver(final boolean isFromPreviousDriver) {
+ this.isFromPreviousDriver = isFromPreviousDriver;
+ return this;
+ }
+
+ @Override
+ public ResourceStatusEvent build() {
+ return new ResourceStatusEventImpl(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
index 9abafbd..6d31824 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
@@ -19,7 +19,6 @@
package org.apache.reef.runtime.common.driver.resourcemanager;
import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManagerFactory;
import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
@@ -33,7 +32,7 @@ import javax.inject.Inject;
* about the current state of a given resource. Ideally, we should think the same thing.
*/
@Private
-public final class ResourceStatusHandler implements EventHandler<DriverRuntimeProtocol.ResourceStatusProto> {
+public final class ResourceStatusHandler implements EventHandler<ResourceStatusEvent> {
private final Evaluators evaluators;
private final EvaluatorManagerFactory evaluatorManagerFactory;
@@ -49,21 +48,21 @@ public final class ResourceStatusHandler implements EventHandler<DriverRuntimePr
* about the state of the resource executing an Evaluator; This method simply passes the message
* off to the referenced EvaluatorManager
*
- * @param resourceStatusProto resource status message from the ResourceManager
+ * @param resourceStatusEvent resource status message from the ResourceManager
*/
@Override
- public void onNext(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) {
- final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(resourceStatusProto.getIdentifier());
+ public void onNext(final ResourceStatusEvent resourceStatusEvent) {
+ final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(resourceStatusEvent.getIdentifier());
if (evaluatorManager.isPresent()) {
- evaluatorManager.get().onResourceStatusMessage(resourceStatusProto);
+ evaluatorManager.get().onResourceStatusMessage(resourceStatusEvent);
} else {
- if (resourceStatusProto.getIsFromPreviousDriver()) {
- EvaluatorManager previousEvaluatorManager = this.evaluatorManagerFactory.createForEvaluatorFailedDuringDriverRestart(resourceStatusProto);
- previousEvaluatorManager.onResourceStatusMessage(resourceStatusProto);
+ if (resourceStatusEvent.getIsFromPreviousDriver().get()) {
+ EvaluatorManager previousEvaluatorManager = this.evaluatorManagerFactory.createForEvaluatorFailedDuringDriverRestart(resourceStatusEvent);
+ previousEvaluatorManager.onResourceStatusMessage(resourceStatusEvent);
} else {
throw new RuntimeException(
- "Unknown resource status from evaluator " + resourceStatusProto.getIdentifier() +
- " with state " + resourceStatusProto.getState()
+ "Unknown resource status from evaluator " + resourceStatusEvent.getIdentifier() +
+ " with state " + resourceStatusEvent.getState()
);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java
new file mode 100644
index 0000000..bca7354
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEvent.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.util.Optional;
+
+import java.util.List;
+
+/**
+ * Event from Driver Runtime -> Driver Process
+ * A status update from the Driver Runtime to the Driver Process
+ */
+@RuntimeAuthor
+@DriverSide
+@DefaultImplementation(RuntimeStatusEventImpl.class)
+public interface RuntimeStatusEvent {
+ /**
+ * @return Name of the Runtime
+ */
+ String getName();
+
+ /**
+ * @return State of the Runtime
+ */
+ ReefServiceProtos.State getState();
+
+ /**
+ * @return List of allocated containers
+ */
+ List<String> getContainerAllocationList();
+
+ /**
+ * @return Error from the Runtime
+ */
+ Optional<ReefServiceProtos.RuntimeErrorProto> getError();
+
+ /**
+ * @return Number of outstanding container requests
+ */
+ Optional<Integer> getOutstandingContainerRequests();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java
new file mode 100644
index 0000000..5d8060d
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/RuntimeStatusEventImpl.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.util.BuilderUtils;
+import org.apache.reef.util.Optional;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Default POJO implementation of RuntimeStatusEvent.
+ * Use newBuilder to construct an instance.
+ */
+public final class RuntimeStatusEventImpl implements RuntimeStatusEvent {
+ private final String name;
+ private final ReefServiceProtos.State state;
+ private final List<String> containerAllocationList;
+ private final Optional<ReefServiceProtos.RuntimeErrorProto> error;
+ private final Optional<Integer> outstandingContainerRequests;
+
+ private RuntimeStatusEventImpl(final Builder builder) {
+ this.name = BuilderUtils.notNull(builder.name);
+ this.state = BuilderUtils.notNull(builder.state);
+ this.containerAllocationList = BuilderUtils.notNull(builder.containerAllocationList);
+ this.error = Optional.ofNullable(builder.error);
+ this.outstandingContainerRequests = Optional.ofNullable(builder.outstandingContainerRequests);
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public ReefServiceProtos.State getState() {
+ return state;
+ }
+
+ @Override
+ public List<String> getContainerAllocationList() {
+ return containerAllocationList;
+ }
+
+ @Override
+ public Optional<ReefServiceProtos.RuntimeErrorProto> getError() {
+ return error;
+ }
+
+ @Override
+ public Optional<Integer> getOutstandingContainerRequests() {
+ return outstandingContainerRequests;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder used to create RuntimeStatusEvent instances.
+ */
+ public static final class Builder implements org.apache.reef.util.Builder<RuntimeStatusEvent> {
+ private String name;
+ private ReefServiceProtos.State state;
+ private List<String> containerAllocationList = new ArrayList<>();
+ private ReefServiceProtos.RuntimeErrorProto error;
+ private Integer outstandingContainerRequests;
+
+ /**
+ * @see RuntimeStatusEvent#getName()
+ */
+ public Builder setName(final String name) {
+ this.name = name;
+ return this;
+ }
+
+ /**
+ * @see RuntimeStatusEvent#getState()
+ */
+ public Builder setState(final ReefServiceProtos.State state) {
+ this.state = state;
+ return this;
+ }
+
+ /**
+ * Add an entry to containerAllocationList
+ * @see RuntimeStatusEvent#getContainerAllocationList()
+ */
+ public Builder addContainerAllocation(final String containerAllocation) {
+ this.containerAllocationList.add(containerAllocation);
+ return this;
+ }
+
+ /**
+ * @see RuntimeStatusEvent#getError()
+ */
+ public Builder setError(final ReefServiceProtos.RuntimeErrorProto error) {
+ this.error = error;
+ return this;
+ }
+
+ /**
+ * @see RuntimeStatusEvent#getOutstandingContainerRequests()
+ */
+ public Builder setOutstandingContainerRequests(final int outstandingContainerRequests) {
+ this.outstandingContainerRequests = outstandingContainerRequests;
+ return this;
+ }
+
+ @Override
+ public RuntimeStatusEvent build() {
+ return new RuntimeStatusEventImpl(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResource.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResource.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResource.java
new file mode 100644
index 0000000..e088afd
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResource.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.files;
+
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.tang.annotations.DefaultImplementation;
+
+/**
+ * A File Resource with a FileType for use by Runtimes
+ */
+@RuntimeAuthor
+@DefaultImplementation(FileResourceImpl.class)
+public interface FileResource {
+
+ /**
+ * @return Type of the file
+ */
+ FileType getType();
+
+ /**
+ * @return Name of the file
+ */
+ String getName();
+
+ /**
+ * @return Path of the file
+ */
+ String getPath();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResourceImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResourceImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResourceImpl.java
new file mode 100644
index 0000000..2f617e8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileResourceImpl.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.files;
+
+import org.apache.reef.util.BuilderUtils;
+
+/**
+ * Default POJO implementation of FileResource.
+ * Use newBuilder to construct an instance.
+ */
+public final class FileResourceImpl implements FileResource {
+ private final FileType type;
+ private final String name;
+ private final String path;
+
+ private FileResourceImpl(final Builder builder) {
+ this.type = BuilderUtils.notNull(builder.type);
+ this.name = BuilderUtils.notNull(builder.name);
+ this.path = BuilderUtils.notNull(builder.path);
+ }
+
+ @Override
+ public FileType getType() {
+ return type;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getPath() {
+ return path;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder used to create FileResource instances.
+ */
+ public static final class Builder implements org.apache.reef.util.Builder<FileResource> {
+ private FileType type;
+ private String name;
+ private String path;
+
+ /**
+ * @see FileResource#getType()
+ */
+ public Builder setType(final FileType type) {
+ this.type = type;
+ return this;
+ }
+
+ /**
+ * @see FileResource#getName()
+ */
+ public Builder setName(final String name) {
+ this.name = name;
+ return this;
+ }
+
+ /**
+ * @see FileResource#getPath()
+ */
+ public Builder setPath(final String path) {
+ this.path = path;
+ return this;
+ }
+
+ @Override
+ public FileResource build() {
+ return new FileResourceImpl(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileType.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileType.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileType.java
new file mode 100644
index 0000000..1b729ad
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/FileType.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.files;
+
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+
+/**
+ * Type of a File Resource used by Runtimes
+ */
+@RuntimeAuthor
+public enum FileType {
+ PLAIN,
+ LIB,
+ ARCHIVE
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
index 5297158..ecf3aa8 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
@@ -21,8 +21,7 @@ package org.apache.reef.runtime.common.files;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.annotations.audience.RuntimeAuthor;
-import org.apache.reef.proto.ClientRuntimeProtocol;
-import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
import org.apache.reef.runtime.common.parameters.DeleteTempFiles;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.annotations.Parameter;
@@ -59,13 +58,13 @@ public final class JobJarMaker {
this.deleteTempFilesOnExit = deleteTempFilesOnExit;
}
- public static void copy(final Iterable<ReefServiceProtos.FileResourceProto> files, final File destinationFolder) {
+ public static void copy(final Iterable<FileResource> files, final File destinationFolder) {
if (!destinationFolder.exists()) {
destinationFolder.mkdirs();
}
- for (final ReefServiceProtos.FileResourceProto fileProto : files) {
+ for (final FileResource fileProto : files) {
final File sourceFile = toFile(fileProto);
final File destinationFile = new File(destinationFolder, fileProto.getName());
if (destinationFile.exists()) {
@@ -89,12 +88,12 @@ public final class JobJarMaker {
}
}
- private static File toFile(final ReefServiceProtos.FileResourceProto fileProto) {
+ private static File toFile(final FileResource fileProto) {
return new File(fileProto.getPath());
}
public File createJobSubmissionJAR(
- final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
+ final JobSubmissionEvent jobSubmissionEvent,
final Configuration driverConfiguration) throws IOException {
// Copy all files to a local job submission folder
@@ -104,8 +103,8 @@ public final class JobJarMaker {
final File localFolder = new File(jobSubmissionFolder, this.fileNames.getLocalFolderName());
final File globalFolder = new File(jobSubmissionFolder, this.fileNames.getGlobalFolderName());
- this.copy(jobSubmissionProto.getGlobalFileList(), globalFolder);
- this.copy(jobSubmissionProto.getLocalFileList(), localFolder);
+ this.copy(jobSubmissionEvent.getGlobalFileSet(), globalFolder);
+ this.copy(jobSubmissionEvent.getLocalFileSet(), localFolder);
// Store the Driver Configuration in the JAR file.
this.configurationSerializer.toFile(
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProcessType.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProcessType.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProcessType.java
new file mode 100644
index 0000000..286174e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProcessType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+/**
+ * The type of a process to be launched by the Runtime
+ */
+public enum ProcessType {
+ JVM,
+ CLR
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
index 8a81184..39a8f8d 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
@@ -52,9 +52,7 @@ public final class REEFMessageCodec implements Codec<GeneratedMessage> {
public GeneratedMessage decode(final byte[] bytes) {
try {
final REEFProtocol.REEFMessage message = REEFProtocol.REEFMessage.parseFrom(bytes);
- if (message.hasJobSubmission()) {
- return message.getJobSubmission();
- } else if (message.hasJobControl()) {
+ if (message.hasJobControl()) {
return message.getJobControl();
} else if (message.hasRuntimeError()) {
return message.getRuntimeError();
@@ -75,9 +73,7 @@ public final class REEFMessageCodec implements Codec<GeneratedMessage> {
public byte[] encode(final GeneratedMessage msg) {
final REEFProtocol.REEFMessage.Builder message = REEFProtocol.REEFMessage.newBuilder();
- if (msg instanceof ClientRuntimeProtocol.JobSubmissionProto) {
- message.setJobSubmission((ClientRuntimeProtocol.JobSubmissionProto) msg);
- } else if (msg instanceof ClientRuntimeProtocol.JobControlProto) {
+ if (msg instanceof ClientRuntimeProtocol.JobControlProto) {
message.setJobControl((ClientRuntimeProtocol.JobControlProto) msg);
} else if (msg instanceof ReefServiceProtos.RuntimeErrorProto) {
message.setRuntimeError((ReefServiceProtos.RuntimeErrorProto) msg);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/java/org/apache/reef/util/BuilderUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/util/BuilderUtils.java b/lang/java/reef-common/src/main/java/org/apache/reef/util/BuilderUtils.java
new file mode 100644
index 0000000..68cd7ce
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/util/BuilderUtils.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util;
+
+/**
+ * Utilities for creating Builders
+ */
+public final class BuilderUtils {
+ /**
+ * Throws a runtime exception if the parameter is null
+ */
+ public static <T> T notNull(final T parameter) {
+ if (parameter == null) {
+ throw new IllegalArgumentException("required parameter");
+ } else {
+ return parameter;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/proto/client_runtime.proto
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/proto/client_runtime.proto b/lang/java/reef-common/src/main/proto/client_runtime.proto
index 36c648b..2846619 100644
--- a/lang/java/reef-common/src/main/proto/client_runtime.proto
+++ b/lang/java/reef-common/src/main/proto/client_runtime.proto
@@ -23,22 +23,6 @@ import "reef_service_protos.proto";
// Messages from REEF Client -> Driver Runtime
-message JobSubmissionProto {
- required string identifier = 1; // the job identifier
- required string remote_id = 2; // the remote identifier
- required string configuration = 5; // the runtime configuration
- required string user_name = 6; // the user name
-
- //optional SIZE driver_size = 7; // Removed in REEF 0.3 in favor of driver_memory below.
- optional int32 driver_memory = 8;
- optional int32 priority = 9;
- optional string queue = 10;
-
- repeated FileResourceProto global_file = 11; // files that should be placed on the driver and all subsequent evaluators
- repeated FileResourceProto local_File = 12; // files that should be placed on the driver only
-
-}
-
enum Signal {
SIG_TERMINATE = 1;
SIG_SUSPEND = 2;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/proto/driver_runtime.proto
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/proto/driver_runtime.proto b/lang/java/reef-common/src/main/proto/driver_runtime.proto
deleted file mode 100644
index 64f5dc1..0000000
--- a/lang/java/reef-common/src/main/proto/driver_runtime.proto
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-option java_package = "org.apache.reef.proto";
-option java_outer_classname = "DriverRuntimeProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-
-import "reef_service_protos.proto";
-
-// Messages from Driver Runtime -> Driver Process
-
-message DriverProcessRegistrationProto {
- required string remote_identifier = 1;
-}
-
-
-message NodeDescriptorProto {
- required string identifier = 1;
- required string host_name = 2; // e.g., IP address
- required int32 port = 3; // e.g., IP port
- required int32 memory_size = 4;
- optional string rack_name = 5; // e.g., /default-rack
-}
-
-message ResourceAllocationProto {
- required string identifier = 1; // e.g., the container id, or the thread id
- required int32 resource_memory = 2; // megabytes
- required string node_id = 3;
- optional int32 virtual_cores = 4;
-}
-
-message ResourceStatusProto {
- required string identifier = 1;
- required State state = 2;
- optional string diagnostics = 3;
- optional int32 exit_code = 4;
- optional bool is_from_previous_driver = 5;
-}
-
-message RuntimeStatusProto {
- required string name = 1; // e.g., local, yarn21
- required State state = 2;
- optional RuntimeErrorProto error = 3; // runtime (e.g., YARN) error
-
- optional int32 outstanding_container_requests = 5;
- repeated string container_allocation = 6;
-}
-
-//////////////////////////////////////////////////////
-// Messages from Driver Process -> Driver Runtime
-
-message ResourceRequestProto {
- // optional SIZE resource_size = 1; // Removed in REEF 0.3 in favor of memory_size.
- optional int32 memory_size = 2; // Memory size of the evaluator in MB
- optional int32 priority = 3;
- optional int32 virtual_cores = 4;
- required int32 resource_count = 5;
- repeated string node_name = 6; // a list of specific nodes
- repeated string rack_name = 7; // a list of specific racks
-
- optional bool relax_locality = 10;
-}
-
-message ResourceReleaseProto {
- required string identifier = 1;
-}
-
-message ResourceLaunchProto {
- required string identifier = 1;
- required string remote_id = 2;
- required string evaluator_conf = 3;
- required ProcessType type = 4;
- repeated FileResourceProto file = 10;
-}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/proto/reef_protocol.proto
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/proto/reef_protocol.proto b/lang/java/reef-common/src/main/proto/reef_protocol.proto
index a8c793f..09c4476 100644
--- a/lang/java/reef-common/src/main/proto/reef_protocol.proto
+++ b/lang/java/reef-common/src/main/proto/reef_protocol.proto
@@ -30,8 +30,9 @@ option java_generate_equals_and_hash = true;
option java_outer_classname = "REEFProtocol";
message REEFMessage {
+ // Field 1 removed
+
// Messages defined in client_runtime.proto
- optional JobSubmissionProto jobSubmission = 1;
optional JobControlProto jobControl = 2;
// Messages defined in reef_service_protos.proto
optional RuntimeErrorProto runtimeError = 3;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/main/proto/reef_service_protos.proto
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/proto/reef_service_protos.proto b/lang/java/reef-common/src/main/proto/reef_service_protos.proto
index 7494737..38d2b39 100644
--- a/lang/java/reef-common/src/main/proto/reef_service_protos.proto
+++ b/lang/java/reef-common/src/main/proto/reef_service_protos.proto
@@ -32,31 +32,6 @@ enum State {
KILLED = 5;
}
-enum FileType {
- PLAIN = 0;
- LIB = 1;
- ARCHIVE = 2;
-}
-
-// Removed in REEF 0.3 in favor of explicit memory sizes.
-// enum SIZE {
-// SMALL = 0;
-// MEDIUM = 1;
-// LARGE = 2;
-// XLARGE = 3;
-//}
-
-enum ProcessType {
- JVM = 0;
- CLR = 1;
-}
-
-message FileResourceProto {
- required FileType type = 1;
- required string name = 2;
- required string path = 3;
-}
-
message RuntimeErrorProto {
required string name = 1; // e.g., local, yarn21
required string message = 2;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java b/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java
index ddbd83b..dce5f50 100644
--- a/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java
+++ b/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java
@@ -21,7 +21,7 @@ package org.apache.reef.runtime.common.driver;
import org.apache.reef.driver.catalog.ResourceCatalog;
import org.apache.reef.driver.evaluator.EvaluatorRequest;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;
@@ -53,8 +53,8 @@ public class EvaluatorRequestorImplTest {
final DummyRequestHandler requestHandler = new DummyRequestHandler();
final EvaluatorRequestor evaluatorRequestor = new EvaluatorRequestorImpl(resourceCatalog, requestHandler, loggingScopeFactory);
evaluatorRequestor.submit(EvaluatorRequest.newBuilder().setMemory(memory).build());
- Assert.assertEquals("Memory request did not make it", requestHandler.get().getMemorySize(), memory);
- Assert.assertEquals("Number of requests did not make it", requestHandler.get().getResourceCount(), 1);
+ Assert.assertEquals("Memory request did not make it", memory, requestHandler.get().getMemorySize().get().intValue());
+ Assert.assertEquals("Number of requests did not make it", 1, requestHandler.get().getResourceCount());
}
/**
@@ -67,8 +67,8 @@ public class EvaluatorRequestorImplTest {
final DummyRequestHandler requestHandler = new DummyRequestHandler();
final EvaluatorRequestor evaluatorRequestor = new EvaluatorRequestorImpl(resourceCatalog, requestHandler, loggingScopeFactory);
evaluatorRequestor.submit(EvaluatorRequest.newBuilder().setMemory(memory).setNumber(count).build());
- Assert.assertEquals("Memory request did not make it", requestHandler.get().getMemorySize(), memory);
- Assert.assertEquals("Number of requests did not make it", requestHandler.get().getResourceCount(), count);
+ Assert.assertEquals("Memory request did not make it", memory, requestHandler.get().getMemorySize().get().intValue());
+ Assert.assertEquals("Number of requests did not make it", count, requestHandler.get().getResourceCount());
}
/**
@@ -96,14 +96,14 @@ public class EvaluatorRequestorImplTest {
}
private class DummyRequestHandler implements ResourceRequestHandler {
- private DriverRuntimeProtocol.ResourceRequestProto request;
+ private ResourceRequestEvent request;
@Override
- public void onNext(DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) {
- this.request = resourceRequestProto;
+ public void onNext(final ResourceRequestEvent resourceRequestEvent) {
+ this.request = resourceRequestEvent;
}
- public DriverRuntimeProtocol.ResourceRequestProto get() {
+ public ResourceRequestEvent get() {
return this.request;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/catalog/CatalogTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/catalog/CatalogTest.java b/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/catalog/CatalogTest.java
index 66203b2..72503d0 100644
--- a/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/catalog/CatalogTest.java
+++ b/lang/java/reef-common/src/test/java/org/apache/reef/runtime/common/driver/catalog/CatalogTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.reef.runtime.common.driver.catalog;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
import org.junit.Assert;
import org.junit.Test;
@@ -35,7 +35,7 @@ public final class CatalogTest {
final ResourceCatalogImpl catalog = new ResourceCatalogImpl();
for (int i = 0; i < nodes; i++) {
- catalog.handle(DriverRuntimeProtocol.NodeDescriptorProto.newBuilder()
+ catalog.handle(NodeDescriptorEventImpl.newBuilder()
.setRackName("test-rack")
.setHostName("test-" + i)
.setPort(0)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
index 7ac314a..dc56abf 100644
--- a/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-hdinsight/src/main/java/org/apache/reef/runtime/hdinsight/client/HDInsightJobSubmissionHandler.java
@@ -22,7 +22,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.JobJarMaker;
@@ -33,7 +33,6 @@ import org.apache.reef.runtime.hdinsight.client.yarnrest.*;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Configurations;
import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.tang.formats.ConfigurationSerializer;
import javax.inject.Inject;
import java.io.File;
@@ -54,7 +53,6 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler
private final AzureUploader uploader;
private final JobJarMaker jobJarMaker;
private final HDInsightInstance hdInsightInstance;
- private final ConfigurationSerializer configurationSerializer;
private final REEFFileNames filenames;
private final ClasspathProvider classpath;
private final double jvmHeapSlack;
@@ -63,14 +61,12 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler
HDInsightJobSubmissionHandler(final AzureUploader uploader,
final JobJarMaker jobJarMaker,
final HDInsightInstance hdInsightInstance,
- final ConfigurationSerializer configurationSerializer,
final REEFFileNames filenames,
final ClasspathProvider classpath,
final @Parameter(JVMHeapSlack.class) double jvmHeapSlack) {
this.uploader = uploader;
this.jobJarMaker = jobJarMaker;
this.hdInsightInstance = hdInsightInstance;
- this.configurationSerializer = configurationSerializer;
this.filenames = filenames;
this.classpath = classpath;
this.jvmHeapSlack = jvmHeapSlack;
@@ -82,7 +78,7 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler
}
@Override
- public void onNext(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
+ public void onNext(final JobSubmissionEvent jobSubmissionEvent) {
try {
@@ -96,25 +92,25 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler
LOG.log(Level.FINE, "Assembling Configuration for the Driver.");
final Configuration driverConfiguration =
- makeDriverConfiguration(jobSubmissionProto, applicationID.getId(), jobFolderURL);
+ makeDriverConfiguration(jobSubmissionEvent, applicationID.getId(), jobFolderURL);
LOG.log(Level.FINE, "Making Job JAR.");
final File jobSubmissionJarFile =
- this.jobJarMaker.createJobSubmissionJAR(jobSubmissionProto, driverConfiguration);
+ this.jobJarMaker.createJobSubmissionJAR(jobSubmissionEvent, driverConfiguration);
LOG.log(Level.FINE, "Uploading Job JAR to Azure.");
final FileResource uploadedFile = this.uploader.uploadFile(jobSubmissionJarFile);
LOG.log(Level.FINE, "Assembling application submission.");
- final String command = getCommandString(jobSubmissionProto);
+ final String command = getCommandString(jobSubmissionEvent);
final ApplicationSubmission applicationSubmission = new ApplicationSubmission()
.setApplicationId(applicationID.getId())
- .setApplicationName(jobSubmissionProto.getIdentifier())
- .setResource(getResource(jobSubmissionProto))
+ .setApplicationName(jobSubmissionEvent.getIdentifier())
+ .setResource(getResource(jobSubmissionEvent))
.setContainerInfo(new ContainerInfo()
- .addFileResource(this.filenames.getREEFFolderName(), uploadedFile)
- .addCommand(command));
+ .addFileResource(this.filenames.getREEFFolderName(), uploadedFile)
+ .addCommand(command));
this.hdInsightInstance.submitApplication(applicationSubmission);
LOG.log(Level.INFO, "Submitted application to HDInsight. The application id is: {0}", applicationID.getId());
@@ -126,13 +122,13 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler
}
/**
- * Extracts the resource demands from the jobSubmissionProto.
+ * Extracts the resource demands from the jobSubmissionEvent.
*/
private final Resource getResource(
- final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
+ final JobSubmissionEvent jobSubmissionEvent) {
return new Resource()
- .setMemory(String.valueOf(jobSubmissionProto.getDriverMemory()))
+ .setMemory(String.valueOf(jobSubmissionEvent.getDriverMemory().get()))
.setvCores("1");
}
@@ -140,30 +136,30 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler
* Assembles the command to execute the Driver.
*/
private String getCommandString(
- final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
- return StringUtils.join(getCommandList(jobSubmissionProto), ' ');
+ final JobSubmissionEvent jobSubmissionEvent) {
+ return StringUtils.join(getCommandList(jobSubmissionEvent), ' ');
}
/**
* Assembles the command to execute the Driver in list form.
*/
private List<String> getCommandList(
- final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
+ final JobSubmissionEvent jobSubmissionEvent) {
return new JavaLaunchCommandBuilder()
.setJavaPath("%JAVA_HOME%/bin/java")
- .setErrorHandlerRID(jobSubmissionProto.getRemoteId())
- .setLaunchID(jobSubmissionProto.getIdentifier())
+ .setErrorHandlerRID(jobSubmissionEvent.getRemoteId())
+ .setLaunchID(jobSubmissionEvent.getIdentifier())
.setConfigurationFileName(this.filenames.getDriverConfigurationPath())
.setClassPath(this.classpath.getDriverClasspath())
- .setMemory(jobSubmissionProto.getDriverMemory())
+ .setMemory(jobSubmissionEvent.getDriverMemory().get())
.setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStderrFileName())
.setStandardOut(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getDriverStdoutFileName())
.build();
}
private Configuration makeDriverConfiguration(
- final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
+ final JobSubmissionEvent jobSubmissionEvent,
final String applicationId,
final String jobFolderURL) throws IOException {
@@ -174,7 +170,7 @@ public final class HDInsightJobSubmissionHandler implements JobSubmissionHandler
.build();
return Configurations.merge(
- this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration()),
+ jobSubmissionEvent.getConfiguration(),
hdinsightDriverConfiguration);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverFiles.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverFiles.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverFiles.java
index c54f9e0..42ced95 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverFiles.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/DriverFiles.java
@@ -18,8 +18,9 @@
*/
package org.apache.reef.runtime.local.client;
-import org.apache.reef.proto.ClientRuntimeProtocol;
-import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
+import org.apache.reef.runtime.common.files.FileResource;
+import org.apache.reef.runtime.common.files.FileType;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.tang.formats.ConfigurationModule;
import org.apache.reef.tang.formats.OptionalParameter;
@@ -51,28 +52,28 @@ final class DriverFiles {
/**
* Instantiates an instance based on the given JobSubmissionProto.
*
- * @param jobSubmissionProto the JobSubmissionProto to parse.
+ * @param jobSubmissionEvent the JobSubmissionProto to parse.
* @return a DriverFiles instance pre-populated with the information from the given JobSubmissionProto.
* @throws IOException
*/
public static DriverFiles fromJobSubmission(
- final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
+ final JobSubmissionEvent jobSubmissionEvent,
final REEFFileNames fileNames) throws IOException {
final DriverFiles driverFiles = new DriverFiles(fileNames);
- for (final ReefServiceProtos.FileResourceProto frp : jobSubmissionProto.getGlobalFileList()) {
+ for (final FileResource frp : jobSubmissionEvent.getGlobalFileSet()) {
final File f = new File(frp.getPath());
- if (frp.getType() == ReefServiceProtos.FileType.LIB) {
+ if (frp.getType() == FileType.LIB) {
driverFiles.addGlobalLib(f);
} else {
driverFiles.addGlobalFile(f);
}
}
- for (final ReefServiceProtos.FileResourceProto frp : jobSubmissionProto.getLocalFileList()) {
+ for (final FileResource frp : jobSubmissionEvent.getLocalFileSet()) {
final File f = new File(frp.getPath());
- if (frp.getType() == ReefServiceProtos.FileType.LIB) {
+ if (frp.getType() == FileType.LIB) {
driverFiles.addLocalLib(f);
} else {
driverFiles.addLocalFile(f);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
index d417fce..f4befc1 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/client/LocalJobSubmissionHandler.java
@@ -20,14 +20,11 @@ package org.apache.reef.runtime.local.client;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.runtime.common.client.api.JobSubmissionEvent;
import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.REEFFileNames;
-import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
import org.apache.reef.runtime.local.client.parameters.RootFolder;
-import org.apache.reef.runtime.local.process.LoggingRunnableProcessObserver;
-import org.apache.reef.runtime.local.process.RunnableProcess;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.formats.ConfigurationSerializer;
@@ -53,7 +50,6 @@ final class LocalJobSubmissionHandler implements JobSubmissionHandler {
private final String rootFolderName;
private final ConfigurationSerializer configurationSerializer;
private final REEFFileNames fileNames;
- private final ClasspathProvider classpath;
private final PreparedDriverFolderLauncher driverLauncher;
private final LoggingScopeFactory loggingScopeFactory;
private final DriverConfigurationProvider driverConfigurationProvider;
@@ -64,7 +60,6 @@ final class LocalJobSubmissionHandler implements JobSubmissionHandler {
final @Parameter(RootFolder.class) String rootFolderName,
final ConfigurationSerializer configurationSerializer,
final REEFFileNames fileNames,
- final ClasspathProvider classpath,
final PreparedDriverFolderLauncher driverLauncher,
final LoggingScopeFactory loggingScopeFactory,
@@ -73,7 +68,6 @@ final class LocalJobSubmissionHandler implements JobSubmissionHandler {
this.executor = executor;
this.configurationSerializer = configurationSerializer;
this.fileNames = fileNames;
- this.classpath = classpath;
this.driverLauncher = driverLauncher;
this.driverConfigurationProvider = driverConfigurationProvider;
@@ -89,7 +83,7 @@ final class LocalJobSubmissionHandler implements JobSubmissionHandler {
}
@Override
- public final void onNext(final ClientRuntimeProtocol.JobSubmissionProto t) {
+ public final void onNext(final JobSubmissionEvent t) {
try (final LoggingScope lf = loggingScopeFactory.localJobSubmission()) {
try {
LOG.log(Level.FINEST, "Starting local job {0}", t.getIdentifier());
@@ -104,8 +98,7 @@ final class LocalJobSubmissionHandler implements JobSubmissionHandler {
driverFiles.copyTo(driverFolder);
final Configuration driverConfiguration = this.driverConfigurationProvider
- .getDriverConfiguration(jobFolder, t.getRemoteId(), t.getIdentifier(),
- configurationSerializer.fromString(t.getConfiguration()));
+ .getDriverConfiguration(jobFolder, t.getRemoteId(), t.getIdentifier(), t.getConfiguration());
this.configurationSerializer.toFile(driverConfiguration,
new File(driverFolder, this.fileNames.getDriverConfigurationPath()));
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
index 65290c1..9127da9 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
@@ -21,9 +21,10 @@ package org.apache.reef.runtime.local.driver;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.client.FailedRuntime;
-import org.apache.reef.proto.DriverRuntimeProtocol;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent;
+import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.utils.RemoteManager;
import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators;
@@ -68,7 +69,7 @@ final class ContainerManager implements AutoCloseable {
private final String errorHandlerRID;
private final int capacity;
- private final EventHandler<DriverRuntimeProtocol.NodeDescriptorProto> nodeDescriptorHandler;
+ private final EventHandler<NodeDescriptorEvent> nodeDescriptorHandler;
private final File rootFolder;
private final REEFFileNames fileNames;
private final ReefRunnableProcessObserver processObserver;
@@ -82,10 +83,9 @@ final class ContainerManager implements AutoCloseable {
final @Parameter(MaxNumberOfEvaluators.class) int capacity,
final @Parameter(RootFolder.class) String rootFolderName,
final @Parameter(RuntimeParameters.NodeDescriptorHandler.class)
- EventHandler<DriverRuntimeProtocol.NodeDescriptorProto> nodeDescriptorHandler,
+ EventHandler<NodeDescriptorEvent> nodeDescriptorHandler,
final ReefRunnableProcessObserver processObserver,
final LocalAddressProvider localAddressProvider) {
-
this.capacity = capacity;
this.fileNames = fileNames;
this.processObserver = processObserver;
@@ -131,7 +131,7 @@ final class ContainerManager implements AutoCloseable {
for (int i = 0; i < capacity; i++) {
final String id = idmaker.getNextID();
this.freeNodeList.add(id);
- nodeDescriptorHandler.onNext(DriverRuntimeProtocol.NodeDescriptorProto.newBuilder()
+ nodeDescriptorHandler.onNext(NodeDescriptorEventImpl.newBuilder()
.setIdentifier(id)
.setRackName("/default-rack")
.setHostName(this.localAddress)
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceLaunchHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceLaunchHandler.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceLaunchHandler.java
index be3ead8..bf4a177 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceLaunchHandler.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceLaunchHandler.java
@@ -20,7 +20,7 @@ package org.apache.reef.runtime.local.driver;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
import javax.inject.Inject;
@@ -40,7 +40,7 @@ final class LocalResourceLaunchHandler implements ResourceLaunchHandler {
}
@Override
- public void onNext(final DriverRuntimeProtocol.ResourceLaunchProto t) {
+ public void onNext(final ResourceLaunchEvent t) {
this.resourceManager.onResourceLaunchRequest(t);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceReleaseHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceReleaseHandler.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceReleaseHandler.java
index 04c2730..514a297 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceReleaseHandler.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceReleaseHandler.java
@@ -20,7 +20,7 @@ package org.apache.reef.runtime.local.driver;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
import javax.inject.Inject;
@@ -41,7 +41,7 @@ public final class LocalResourceReleaseHandler implements ResourceReleaseHandler
}
@Override
- public void onNext(final DriverRuntimeProtocol.ResourceReleaseProto t) {
+ public void onNext(final ResourceReleaseEvent t) {
this.resourceManager.onResourceReleaseRequest(t);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceRequestHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceRequestHandler.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceRequestHandler.java
index a08b05c..99ae748 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceRequestHandler.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/LocalResourceRequestHandler.java
@@ -20,7 +20,7 @@ package org.apache.reef.runtime.local.driver;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
import javax.inject.Inject;
@@ -40,7 +40,7 @@ final class LocalResourceRequestHandler implements ResourceRequestHandler {
}
@Override
- public void onNext(final DriverRuntimeProtocol.ResourceRequestProto t) {
+ public void onNext(final ResourceRequestEvent t) {
this.resourceManager.onResourceRequest(t);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
index e30eba3..7d190cb 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
@@ -20,10 +20,17 @@ package org.apache.reef.runtime.local.driver;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl;
+import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent;
+import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
import org.apache.reef.runtime.common.files.ClasspathProvider;
+import org.apache.reef.runtime.common.files.FileResource;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.launch.CLRLaunchCommandBuilder;
import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
@@ -58,9 +65,9 @@ public final class ResourceManager {
private final ResourceRequestQueue requestQueue = new ResourceRequestQueue();
- private final EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> allocationHandler;
+ private final EventHandler<ResourceAllocationEvent> allocationHandler;
private final ContainerManager theContainers;
- private final EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusHandlerEventHandler;
+ private final EventHandler<RuntimeStatusEvent> runtimeStatusHandlerEventHandler;
private final int defaultMemorySize;
private final int defaultNumberOfCores;
private final ConfigurationSerializer configurationSerializer;
@@ -73,8 +80,8 @@ public final class ResourceManager {
@Inject
ResourceManager(
final ContainerManager containerManager,
- final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> allocationHandler,
- final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusHandlerEventHandler,
+ final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<ResourceAllocationEvent> allocationHandler,
+ final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<RuntimeStatusEvent> runtimeStatusHandlerEventHandler,
final @Parameter(DefaultMemorySize.class) int defaultMemorySize,
final @Parameter(DefaultNumberOfCores.class) int defaultNumberOfCores,
final @Parameter(JVMHeapSlack.class) double jvmHeapSlack,
@@ -105,9 +112,9 @@ public final class ResourceManager {
* @param launchRequest the ResourceLaunchProto to parse
* @return a list of files set in the given ResourceLaunchProto
*/
- private static List<File> getLocalFiles(final DriverRuntimeProtocol.ResourceLaunchProto launchRequest) {
+ private static List<File> getLocalFiles(final ResourceLaunchEvent launchRequest) {
final List<File> files = new ArrayList<>(); // Libraries local to this evaluator
- for (final ReefServiceProtos.FileResourceProto frp : launchRequest.getFileList()) {
+ for (final FileResource frp : launchRequest.getFileSet()) {
files.add(new File(frp.getPath()).getAbsoluteFile());
}
return files;
@@ -120,7 +127,7 @@ public final class ResourceManager {
*
* @param resourceRequest the resource request to be handled.
*/
- final void onResourceRequest(final DriverRuntimeProtocol.ResourceRequestProto resourceRequest) {
+ final void onResourceRequest(final ResourceRequestEvent resourceRequest) {
synchronized (this.theContainers) {
this.requestQueue.add(new ResourceRequest(resourceRequest));
this.checkRequestQueue();
@@ -132,7 +139,7 @@ public final class ResourceManager {
*
* @param releaseRequest the release request to be processed
*/
- final void onResourceReleaseRequest(final DriverRuntimeProtocol.ResourceReleaseProto releaseRequest) {
+ final void onResourceReleaseRequest(final ResourceReleaseEvent releaseRequest) {
synchronized (this.theContainers) {
LOG.log(Level.FINEST, "Release container: {0}", releaseRequest.getIdentifier());
this.theContainers.release(releaseRequest.getIdentifier());
@@ -158,7 +165,7 @@ public final class ResourceManager {
* @param launchRequest the launch request to be processed.
*/
final void onResourceLaunchRequest(
- final DriverRuntimeProtocol.ResourceLaunchProto launchRequest) {
+ final ResourceLaunchEvent launchRequest) {
synchronized (this.theContainers) {
@@ -173,8 +180,7 @@ public final class ResourceManager {
final File evaluatorConfigurationFile = new File(c.getFolder(), fileNames.getEvaluatorConfigurationPath());
try {
- this.configurationSerializer.toFile(this.configurationSerializer.fromString(launchRequest.getEvaluatorConf()),
- evaluatorConfigurationFile);
+ this.configurationSerializer.toFile(launchRequest.getEvaluatorConf(), evaluatorConfigurationFile);
} catch (final IOException | BindException e) {
throw new RuntimeException("Unable to write configuration.", e);
}
@@ -218,16 +224,16 @@ public final class ResourceManager {
if (this.theContainers.hasContainerAvailable() && this.requestQueue.hasOutStandingRequests()) {
// Record the satisfaction of one request and get its details.
- final DriverRuntimeProtocol.ResourceRequestProto requestProto = this.requestQueue.satisfyOne();
+ final ResourceRequestEvent requestEvent = this.requestQueue.satisfyOne();
// Allocate a Container
final Container container = this.theContainers.allocateOne(
- requestProto.hasMemorySize() ? requestProto.getMemorySize() : this.defaultMemorySize,
- requestProto.hasVirtualCores() ? requestProto.getVirtualCores() : this.defaultNumberOfCores);
+ requestEvent.getMemorySize().orElse(this.defaultMemorySize),
+ requestEvent.getVirtualCores().orElse(this.defaultNumberOfCores));
// Tell the receivers about it
- final DriverRuntimeProtocol.ResourceAllocationProto alloc =
- DriverRuntimeProtocol.ResourceAllocationProto.newBuilder()
+ final ResourceAllocationEvent alloc =
+ ResourceAllocationEventImpl.newBuilder()
.setIdentifier(container.getContainerID())
.setNodeId(container.getNodeID())
.setResourceMemory(container.getMemory())
@@ -250,16 +256,18 @@ public final class ResourceManager {
private void sendRuntimeStatus() {
- final DriverRuntimeProtocol.RuntimeStatusProto msg =
- DriverRuntimeProtocol.RuntimeStatusProto.newBuilder()
+ final RuntimeStatusEventImpl.Builder builder =
+ RuntimeStatusEventImpl.newBuilder()
.setName("LOCAL")
.setState(ReefServiceProtos.State.RUNNING)
- .setOutstandingContainerRequests(this.requestQueue.getNumberOfOutstandingRequests())
- .addAllContainerAllocation(this.theContainers.getAllocatedContainerIDs())
- .build();
+ .setOutstandingContainerRequests(this.requestQueue.getNumberOfOutstandingRequests());
+ for (final String containerAllocation : this.theContainers.getAllocatedContainerIDs()) {
+ builder.addContainerAllocation(containerAllocation);
+ }
+ final RuntimeStatusEvent msg = builder.build();
LOG.log(Level.INFO, "Allocated: {0}, Outstanding requests: {1}",
- new Object[]{msg.getContainerAllocationCount(), msg.getOutstandingContainerRequests()});
+ new Object[]{msg.getContainerAllocationList().size(), msg.getOutstandingContainerRequests()});
this.runtimeStatusHandlerEventHandler.onNext(msg);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java
index bbb20a3..4b275cb 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequest.java
@@ -20,21 +20,21 @@ package org.apache.reef.runtime.local.driver;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
/**
- * Manages a ResourceRequestProto and its satisfaction.
+ * Manages a ResourceRequestEvent and its satisfaction.
*/
@Private
@DriverSide
final class ResourceRequest {
- private final DriverRuntimeProtocol.ResourceRequestProto req;
+ private final ResourceRequestEvent req;
private int satisfied = 0;
- ResourceRequest(final DriverRuntimeProtocol.ResourceRequestProto req) {
+ ResourceRequest(final ResourceRequestEvent req) {
if (null == req) {
- throw new IllegalArgumentException("Can't instantiate a ResourceRequest without a ResourceRequestProto");
+ throw new IllegalArgumentException("Can't instantiate a ResourceRequest without a ResourceRequestEvent");
}
this.req = req;
}
@@ -57,7 +57,7 @@ final class ResourceRequest {
return this.satisfied == req.getResourceCount();
}
- final DriverRuntimeProtocol.ResourceRequestProto getRequestProto() {
+ final ResourceRequestEvent getRequestProto() {
return this.req;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
index c2a87ff..dcebe20 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
@@ -20,7 +20,7 @@ package org.apache.reef.runtime.local.driver;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -54,7 +54,7 @@ final class ResourceRequestQueue {
* Satisfies one resource for the front-most request. If that satisfies the
* request, it is removed from the queue.
*/
- final synchronized DriverRuntimeProtocol.ResourceRequestProto satisfyOne() {
+ final synchronized ResourceRequestEvent satisfyOne() {
final ResourceRequest req = this.requestQueue.element();
req.satisfyOne();
if (req.isSatisfied()) {
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
index 36ee916..3d5051a 100644
--- a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
+++ b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/process/ReefRunnableProcessObserver.java
@@ -19,9 +19,10 @@
package org.apache.reef.runtime.local.process;
import net.jcip.annotations.ThreadSafe;
-import org.apache.reef.proto.DriverRuntimeProtocol;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
+import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl;
import org.apache.reef.runtime.local.driver.ResourceManager;
import org.apache.reef.tang.InjectionFuture;
import org.apache.reef.tang.annotations.Parameter;
@@ -38,7 +39,7 @@ import java.util.logging.Logger;
public final class ReefRunnableProcessObserver implements RunnableProcessObserver {
private static final Logger LOG = Logger.getLogger(ReefRunnableProcessObserver.class.getName());
- private final EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler;
+ private final EventHandler<ResourceStatusEvent> resourceStatusHandler;
private final InjectionFuture<ResourceManager> resourceManager;
/**
@@ -46,7 +47,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve
*/
@Inject
public ReefRunnableProcessObserver(final @Parameter(RuntimeParameters.ResourceStatusHandler.class)
- EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler,
+ EventHandler<ResourceStatusEvent> resourceStatusHandler,
final InjectionFuture<ResourceManager> resourceManager) {
this.resourceStatusHandler = resourceStatusHandler;
this.resourceManager = resourceManager;
@@ -55,7 +56,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve
@Override
public void onProcessStarted(final String processId) {
this.onResourceStatus(
- DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
+ ResourceStatusEventImpl.newBuilder()
.setIdentifier(processId)
.setState(ReefServiceProtos.State.RUNNING)
.build()
@@ -84,7 +85,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve
*/
private void onCleanExit(final String processId) {
this.onResourceStatus(
- DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
+ ResourceStatusEventImpl.newBuilder()
.setIdentifier(processId)
.setState(ReefServiceProtos.State.DONE)
.setExitCode(0)
@@ -100,7 +101,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve
*/
private void onUncleanExit(final String processId, final int exitCode) {
this.onResourceStatus(
- DriverRuntimeProtocol.ResourceStatusProto.newBuilder()
+ ResourceStatusEventImpl.newBuilder()
.setIdentifier(processId)
.setState(ReefServiceProtos.State.FAILED)
.setExitCode(exitCode)
@@ -108,7 +109,7 @@ public final class ReefRunnableProcessObserver implements RunnableProcessObserve
);
}
- private void onResourceStatus(final DriverRuntimeProtocol.ResourceStatusProto resourceStatus) {
+ private void onResourceStatus(final ResourceStatusEvent resourceStatus) {
LOG.log(Level.INFO, "Sending resource status: {0} ", resourceStatus);
// Here, we introduce an arbitrary wait. This is to make sure that at the exit of an Evaluator, the last