You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2014/11/13 03:30:57 UTC
[2/2] incubator-reef git commit: Add timings
Add timings
This addresses [REEF-28]: Timings are added to the various important event
handlers using the recently added `LogScope` classes.
The code was written by Julia Wang <Qi...@microsoft.com>
Closes #12
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/8026c8ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/8026c8ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/8026c8ad
Branch: refs/heads/master
Commit: 8026c8ad6466702e9bd19c9587ac1a4011d229fe
Parents: 141e4a3
Author: Markus Weimer <we...@apache.org>
Authored: Wed Nov 12 18:16:40 2014 -0800
Committer: Markus Weimer <we...@apache.org>
Committed: Wed Nov 12 18:16:40 2014 -0800
----------------------------------------------------------------------
.../javabridge/AllocatedEvaluatorBridge.java | 10 +-
.../javabridge/EvaluatorRequestorBridge.java | 17 +-
.../reef/javabridge/FailedEvaluatorBridge.java | 5 +-
.../reef/javabridge/generic/JobClient.java | 91 ++--
.../reef/javabridge/generic/JobDriver.java | 470 ++++++++++---------
.../apache/reef/javabridge/generic/Launch.java | 57 ++-
.../common/client/REEFImplementation.java | 37 +-
.../common/driver/EvaluatorRequestorImpl.java | 39 +-
.../evaluator/AllocatedEvaluatorImpl.java | 131 +++---
.../driver/evaluator/EvaluatorManager.java | 8 +-
.../reef/util/logging/LoggingScopeFactory.java | 12 +-
.../driver/EvaluatorRequestorImplTest.java | 18 +-
.../local/client/LocalJobSubmissionHandler.java | 115 ++---
.../runtime/local/driver/ResourceManager.java | 77 +--
14 files changed, 611 insertions(+), 476 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8026c8ad/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java
----------------------------------------------------------------------
diff --git a/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java b/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java
index 71fb581..5d88355 100644
--- a/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java
+++ b/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java
@@ -30,11 +30,11 @@ public class AllocatedEvaluatorBridge extends NativeBridge {
private static final Logger LOG = Logger.getLogger(AllocatedEvaluatorBridge.class.getName());
- private AllocatedEvaluator jallocatedEvaluator;
- private AvroConfigurationSerializer serializer;
- private ClassHierarchy clrClassHierarchy;
- private String evaluatorId;
- private String nameServerInfo;
+ private final AllocatedEvaluator jallocatedEvaluator;
+ private final AvroConfigurationSerializer serializer;
+ private final ClassHierarchy clrClassHierarchy;
+ private final String evaluatorId;
+ private final String nameServerInfo;
public AllocatedEvaluatorBridge(final AllocatedEvaluator allocatedEvaluator, final String serverInfo) {
jallocatedEvaluator = allocatedEvaluator;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8026c8ad/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
----------------------------------------------------------------------
diff --git a/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java b/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
index cfabfde..a712fc4 100644
--- a/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
+++ b/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
@@ -20,6 +20,8 @@ package org.apache.reef.javabridge;
import org.apache.reef.driver.evaluator.EvaluatorRequest;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -28,14 +30,17 @@ public final class EvaluatorRequestorBridge extends NativeBridge {
private static final Logger LOG = Logger.getLogger(EvaluatorRequestorBridge.class.getName());
private final boolean isBlocked;
private final EvaluatorRequestor jevaluatorRequestor;
+ private final LoggingScopeFactory loggingScopeFactory;
+
// accumulate how many evaluators have been submitted through this instance
// of EvaluatorRequestorBridge
private int clrEvaluatorsNumber;
- public EvaluatorRequestorBridge(final EvaluatorRequestor evaluatorRequestor, final boolean isBlocked) {
+ public EvaluatorRequestorBridge(final EvaluatorRequestor evaluatorRequestor, final boolean isBlocked, final LoggingScopeFactory loggingScopeFactory) {
this.jevaluatorRequestor = evaluatorRequestor;
this.clrEvaluatorsNumber = 0;
this.isBlocked = isBlocked;
+ this.loggingScopeFactory = loggingScopeFactory;
}
public void submit(final int evaluatorsNumber, final int memory, final int virtualCore, final String rack) {
@@ -47,16 +52,18 @@ public final class EvaluatorRequestorBridge extends NativeBridge {
LOG.log(Level.WARNING, "Ignoring rack preference.");
}
- clrEvaluatorsNumber += evaluatorsNumber;
+ try (final LoggingScope ls = loggingScopeFactory.evaluatorRequestSubmitToJavaDriver(evaluatorsNumber)) {
+ clrEvaluatorsNumber += evaluatorsNumber;
- final EvaluatorRequest request = EvaluatorRequest.newBuilder()
+ final EvaluatorRequest request = EvaluatorRequest.newBuilder()
.setNumber(evaluatorsNumber)
.setMemory(memory)
.setNumberOfCores(virtualCore)
.build();
- LOG.log(Level.FINE, "submitting evaluator request {0}", request);
- jevaluatorRequestor.submit(request);
+ LOG.log(Level.FINE, "submitting evaluator request {0}", request);
+ jevaluatorRequestor.submit(request);
+ }
}
public int getEvaluatorNumber() {
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8026c8ad/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java
----------------------------------------------------------------------
diff --git a/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java b/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java
index e4eab7d..bae4946 100644
--- a/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java
+++ b/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedEvaluatorBridge.java
@@ -20,6 +20,7 @@ package org.apache.reef.javabridge;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.util.logging.LoggingScopeFactory;
import java.util.logging.Logger;
@@ -29,10 +30,10 @@ public class FailedEvaluatorBridge extends NativeBridge {
private EvaluatorRequestorBridge evaluatorRequestorBridge;
private String evaluatorId;
- public FailedEvaluatorBridge(FailedEvaluator failedEvaluator, EvaluatorRequestor evaluatorRequestor, boolean blockedForAdditionalEvaluator) {
+ public FailedEvaluatorBridge(FailedEvaluator failedEvaluator, EvaluatorRequestor evaluatorRequestor, boolean blockedForAdditionalEvaluator, final LoggingScopeFactory loggingScopeFactory) {
jfailedEvaluator = failedEvaluator;
evaluatorId = failedEvaluator.getId();
- evaluatorRequestorBridge = new EvaluatorRequestorBridge(evaluatorRequestor, blockedForAdditionalEvaluator);
+ evaluatorRequestorBridge = new EvaluatorRequestorBridge(evaluatorRequestor, blockedForAdditionalEvaluator, loggingScopeFactory);
}
public int getNewlyRequestedEvaluatorNumber() {
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8026c8ad/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobClient.java
----------------------------------------------------------------------
diff --git a/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobClient.java b/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobClient.java
index 403a489..62bfac1 100644
--- a/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobClient.java
+++ b/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobClient.java
@@ -28,6 +28,8 @@ import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.tang.formats.AvroConfigurationSerializer;
import org.apache.reef.tang.formats.ConfigurationModule;
import org.apache.reef.util.EnvironmentUtils;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.webserver.HttpHandlerConfiguration;
import org.apache.reef.webserver.HttpServerReefEventHandler;
@@ -82,13 +84,18 @@ public class JobClient {
private String jobSubmissionDirectory = "reefTmp/job_" + System.currentTimeMillis();
/**
+ * A factory that provides LoggingScope
+ */
+ private final LoggingScopeFactory loggingScopeFactory;
+ /**
* Clr Bridge client.
* Parameters are injected automatically by TANG.
*
* @param reef Reference to the REEF framework.
*/
@Inject
- JobClient(final REEF reef) throws BindException {
+ JobClient(final REEF reef, final LoggingScopeFactory loggingScopeFactory) throws BindException {
+ this.loggingScopeFactory = loggingScopeFactory;
this.reef = reef;
this.driverConfigModule = getDriverConfiguration();
}
@@ -141,38 +148,40 @@ public class JobClient {
}
public void addCLRFiles(final File folder) throws BindException {
- ConfigurationModule result = this.driverConfigModule;
- for (final File f : folder.listFiles()) {
- if (f.canRead() && f.exists() && f.isFile()) {
- result = result.set(DriverConfiguration.GLOBAL_FILES, f.getAbsolutePath());
+ try (final LoggingScope ls = this.loggingScopeFactory.getNewLoggingScope("JobClient::addCLRFiles")) {
+ ConfigurationModule result = this.driverConfigModule;
+ for (final File f : folder.listFiles()) {
+ if (f.canRead() && f.exists() && f.isFile()) {
+ result = result.set(DriverConfiguration.GLOBAL_FILES, f.getAbsolutePath());
+ }
}
- }
-
- // set the driver memory, id and job submission directory
- this.driverConfigModule = result
- .set(DriverConfiguration.DRIVER_MEMORY, this.driverMemory)
- .set(DriverConfiguration.DRIVER_IDENTIFIER, this.driverId)
- .set(DriverConfiguration.DRIVER_JOB_SUBMISSION_DIRECTORY, this.jobSubmissionDirectory);
+ // set the driver memory, id and job submission directory
+ this.driverConfigModule = result
+ .set(DriverConfiguration.DRIVER_MEMORY, this.driverMemory)
+ .set(DriverConfiguration.DRIVER_IDENTIFIER, this.driverId)
+ .set(DriverConfiguration.DRIVER_JOB_SUBMISSION_DIRECTORY, this.jobSubmissionDirectory);
+
+
+ Path globalLibFile = Paths.get(NativeInterop.GLOBAL_LIBRARIES_FILENAME);
+ if (!Files.exists(globalLibFile)) {
+ LOG.log(Level.FINE, "Cannot find global classpath file at: {0}, assume there is none.", globalLibFile.toAbsolutePath());
+ } else {
+ String globalLibString = "";
+ try {
+ globalLibString = new String(Files.readAllBytes(globalLibFile));
+ } catch (final Exception e) {
+ LOG.log(Level.WARNING, "Cannot read from {0}, global libraries not added " + globalLibFile.toAbsolutePath());
+ }
- Path globalLibFile = Paths.get(NativeInterop.GLOBAL_LIBRARIES_FILENAME);
- if (!Files.exists(globalLibFile)) {
- LOG.log(Level.FINE, "Cannot find global classpath file at: {0}, assume there is none.", globalLibFile.toAbsolutePath());
- } else {
- String globalLibString = "";
- try {
- globalLibString = new String(Files.readAllBytes(globalLibFile));
- } catch (final Exception e) {
- LOG.log(Level.WARNING, "Cannot read from {0}, global libraries not added " + globalLibFile.toAbsolutePath());
+ for (final String s : globalLibString.split(",")) {
+ File f = new File(s);
+ this.driverConfigModule = this.driverConfigModule.set(DriverConfiguration.GLOBAL_LIBRARIES, f.getPath());
+ }
}
- for (final String s : globalLibString.split(",")) {
- File f = new File(s);
- this.driverConfigModule = this.driverConfigModule.set(DriverConfiguration.GLOBAL_LIBRARIES, f.getPath());
- }
+ this.driverConfiguration = Configurations.merge(this.driverConfigModule.build(), getHTTPConfiguration(), getNameServerConfiguration());
}
-
- this.driverConfiguration = Configurations.merge(this.driverConfigModule.build(), getHTTPConfiguration(), getNameServerConfiguration());
}
/**
@@ -181,20 +190,22 @@ public class JobClient {
* @throws org.apache.reef.tang.exceptions.BindException configuration error.
*/
public void submit(final File clrFolder, final boolean submitDriver, final Configuration clientConfig) {
- try {
- addCLRFiles(clrFolder);
- } catch (final BindException e) {
- LOG.log(Level.FINE, "Failed to bind", e);
- }
- if (submitDriver) {
- this.reef.submit(this.driverConfiguration);
- } else {
- File driverConfig = new File(System.getProperty("user.dir") + "/driver.config");
+ try (final LoggingScope ls = this.loggingScopeFactory.driverSubmit(submitDriver)) {
try {
- new AvroConfigurationSerializer().toFile(Configurations.merge(this.driverConfiguration, clientConfig), driverConfig);
- LOG.log(Level.INFO, "Driver configuration file created at " + driverConfig.getAbsolutePath());
- } catch (final IOException e) {
- throw new RuntimeException("Cannot create driver configuration file at " + driverConfig.getAbsolutePath());
+ addCLRFiles(clrFolder);
+ } catch (final BindException e) {
+ LOG.log(Level.FINE, "Failed to bind", e);
+ }
+ if (submitDriver) {
+ this.reef.submit(this.driverConfiguration);
+ } else {
+ File driverConfig = new File(System.getProperty("user.dir") + "/driver.config");
+ try {
+ new AvroConfigurationSerializer().toFile(Configurations.merge(this.driverConfiguration, clientConfig), driverConfig);
+ LOG.log(Level.INFO, "Driver configuration file created at " + driverConfig.getAbsolutePath());
+ } catch (final IOException e) {
+ throw new RuntimeException("Cannot create driver configuration file at " + driverConfig.getAbsolutePath());
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8026c8ad/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
----------------------------------------------------------------------
diff --git a/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
index abdea4f..16c6469 100644
--- a/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
+++ b/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
@@ -32,6 +32,8 @@ import org.apache.reef.runtime.common.driver.DriverStatusManager;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.util.Optional;
import org.apache.reef.util.logging.CLRBufferedLogHandler;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.NetUtils;
import org.apache.reef.wake.remote.impl.ObjectSerializableCodec;
@@ -92,6 +94,12 @@ public final class JobDriver {
* Map from context ID to running evaluator context.
*/
private final Map<String, ActiveContext> contexts = new HashMap<>();
+
+ /**
+ * Logging scope factory that provides LoggingScope
+ */
+ private final LoggingScopeFactory loggingScopeFactory;
+
private long evaluatorRequestorHandler = 0;
private long allocatedEvaluatorHandler = 0;
private long activeContextHandler = 0;
@@ -127,7 +135,8 @@ public final class JobDriver {
final NameServer nameServer,
final JobMessageObserver jobMessageObserver,
final EvaluatorRequestor evaluatorRequestor,
- final DriverStatusManager driverStatusManager) {
+ final DriverStatusManager driverStatusManager,
+ final LoggingScopeFactory loggingScopeFactory) {
this.clock = clock;
this.httpServer = httpServer;
this.jobMessageObserver = jobMessageObserver;
@@ -135,61 +144,66 @@ public final class JobDriver {
this.nameServer = nameServer;
this.driverStatusManager = driverStatusManager;
this.nameServerInfo = NetUtils.getLocalAddress() + ":" + this.nameServer.getPort();
+ this.loggingScopeFactory = loggingScopeFactory;
}
private void setupBridge(final StartTime startTime) {
// Signal to the clr buffered log handler that the driver has started and that
// we can begin logging
LOG.log(Level.INFO, "Initializing CLRBufferedLogHandler...");
- final CLRBufferedLogHandler handler = getCLRBufferedLogHandler();
- if (handler == null) {
- LOG.log(Level.WARNING, "CLRBufferedLogHandler could not be initialized");
- } else {
- handler.setDriverInitialized();
- LOG.log(Level.INFO, "CLRBufferedLogHandler init complete.");
- }
+ try (final LoggingScope lb = this.loggingScopeFactory.setupBridge()) {
+ final CLRBufferedLogHandler handler = getCLRBufferedLogHandler();
+ if (handler == null) {
+ LOG.log(Level.WARNING, "CLRBufferedLogHandler could not be initialized");
+ } else {
+ handler.setDriverInitialized();
+ LOG.log(Level.INFO, "CLRBufferedLogHandler init complete.");
+ }
- LOG.log(Level.INFO, "StartTime: {0}", new Object[]{startTime});
- long[] handlers = NativeInterop.CallClrSystemOnStartHandler(startTime.toString());
- if (handlers != null) {
- if (handlers.length != NativeInterop.nHandlers) {
- throw new RuntimeException(
- String.format("%s handlers initialized in CLR while native bridge is expecting %s handlers",
- String.valueOf(handlers.length),
- String.valueOf(NativeInterop.nHandlers)));
+ LOG.log(Level.INFO, "StartTime: {0}", new Object[]{startTime});
+ long[] handlers = NativeInterop.CallClrSystemOnStartHandler(startTime.toString());
+ if (handlers != null) {
+ if (handlers.length != NativeInterop.nHandlers) {
+ throw new RuntimeException(
+ String.format("%s handlers initialized in CLR while native bridge is expecting %s handlers",
+ String.valueOf(handlers.length),
+ String.valueOf(NativeInterop.nHandlers)));
+ }
+ this.evaluatorRequestorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.EvaluatorRequestorKey)];
+ this.allocatedEvaluatorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.AllocatedEvaluatorKey)];
+ this.activeContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.ActiveContextKey)];
+ this.taskMessageHandler = handlers[NativeInterop.Handlers.get(NativeInterop.TaskMessageKey)];
+ this.failedTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.FailedTaskKey)];
+ this.failedEvaluatorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.FailedEvaluatorKey)];
+ this.httpServerEventHandler = handlers[NativeInterop.Handlers.get(NativeInterop.HttpServerKey)];
+ this.completedTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.CompletedTaskKey)];
+ this.runningTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.RunningTaskKey)];
+ this.suspendedTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.SuspendedTaskKey)];
+ this.completedEvaluatorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.CompletedEvaluatorKey)];
+ this.closedContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.ClosedContextKey)];
+ this.failedContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.FailedContextKey)];
+ this.contextMessageHandler = handlers[NativeInterop.Handlers.get(NativeInterop.ContextMessageKey)];
+ this.driverRestartHandler = handlers[NativeInterop.Handlers.get(NativeInterop.DriverRestartKey)];
+ this.driverRestartActiveContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.DriverRestartActiveContextKey)];
+ this.driverRestartRunningTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.DriverRestartRunningTaskKey)];
}
- this.evaluatorRequestorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.EvaluatorRequestorKey)];
- this.allocatedEvaluatorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.AllocatedEvaluatorKey)];
- this.activeContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.ActiveContextKey)];
- this.taskMessageHandler = handlers[NativeInterop.Handlers.get(NativeInterop.TaskMessageKey)];
- this.failedTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.FailedTaskKey)];
- this.failedEvaluatorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.FailedEvaluatorKey)];
- this.httpServerEventHandler = handlers[NativeInterop.Handlers.get(NativeInterop.HttpServerKey)];
- this.completedTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.CompletedTaskKey)];
- this.runningTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.RunningTaskKey)];
- this.suspendedTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.SuspendedTaskKey)];
- this.completedEvaluatorHandler = handlers[NativeInterop.Handlers.get(NativeInterop.CompletedEvaluatorKey)];
- this.closedContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.ClosedContextKey)];
- this.failedContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.FailedContextKey)];
- this.contextMessageHandler = handlers[NativeInterop.Handlers.get(NativeInterop.ContextMessageKey)];
- this.driverRestartHandler = handlers[NativeInterop.Handlers.get(NativeInterop.DriverRestartKey)];
- this.driverRestartActiveContextHandler = handlers[NativeInterop.Handlers.get(NativeInterop.DriverRestartActiveContextKey)];
- this.driverRestartRunningTaskHandler = handlers[NativeInterop.Handlers.get(NativeInterop.DriverRestartRunningTaskKey)];
- }
- final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge("SPEC");
- NativeInterop.ClrSystemHttpServerHandlerOnNext(this.httpServerEventHandler, httpServerEventBridge, this.interopLogger);
- final String specList = httpServerEventBridge.getUriSpecification();
- LOG.log(Level.INFO, "Starting http server, getUriSpecification: {0}", specList);
- if (specList != null) {
- final String[] specs = specList.split(":");
- for (final String s : specs) {
- final HttpHandler h = new HttpServerBridgeEventHandler();
- h.setUriSpecification(s);
- this.httpServer.addHttpHandler(h);
+ try (final LoggingScope lp = this.loggingScopeFactory.getNewLoggingScope("setupBridge::ClrSystemHttpServerHandlerOnNext")) {
+ final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge("SPEC");
+ NativeInterop.ClrSystemHttpServerHandlerOnNext(this.httpServerEventHandler, httpServerEventBridge, this.interopLogger);
+ final String specList = httpServerEventBridge.getUriSpecification();
+ LOG.log(Level.INFO, "Starting http server, getUriSpecification: {0}", specList);
+ if (specList != null) {
+ final String[] specs = specList.split(":");
+ for (final String s : specs) {
+ final HttpHandler h = new HttpServerBridgeEventHandler();
+ h.setUriSpecification(s);
+ this.httpServer.addHttpHandler(h);
+ }
+ }
}
+ this.clrBridgeSetup = true;
}
- this.clrBridgeSetup = true;
LOG.log(Level.INFO, "CLR Bridge setup.");
}
@@ -238,11 +252,13 @@ public final class JobDriver {
final class AllocatedEvaluatorHandler implements EventHandler<AllocatedEvaluator> {
@Override
public void onNext(final AllocatedEvaluator allocatedEvaluator) {
- synchronized (JobDriver.this) {
- LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext");
- if (JobDriver.this.nCLREvaluators > 0) {
- JobDriver.this.submitEvaluator(allocatedEvaluator, EvaluatorType.CLR);
- JobDriver.this.nCLREvaluators--;
+ try (final LoggingScope ls = loggingScopeFactory.evaluatorAllocated(allocatedEvaluator.getId())) {
+ synchronized (JobDriver.this) {
+ LOG.log(Level.INFO, "AllocatedEvaluatorHandler.OnNext");
+ if (JobDriver.this.nCLREvaluators > 0) {
+ JobDriver.this.submitEvaluator(allocatedEvaluator, EvaluatorType.CLR);
+ JobDriver.this.nCLREvaluators--;
+ }
}
}
}
@@ -254,11 +270,13 @@ public final class JobDriver {
final class ActiveContextHandler implements EventHandler<ActiveContext> {
@Override
public void onNext(final ActiveContext context) {
- synchronized (JobDriver.this) {
- LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0} expect {1}",
- new Object[]{context.getId(), JobDriver.this.nCLREvaluators});
- JobDriver.this.contexts.put(context.getId(), context);
- JobDriver.this.submit(context);
+ try (final LoggingScope ls = loggingScopeFactory.activeContextReceived(context.getId())) {
+ synchronized (JobDriver.this) {
+ LOG.log(Level.INFO, "ActiveContextHandler: Context available: {0} expect {1}",
+ new Object[]{context.getId(), JobDriver.this.nCLREvaluators});
+ JobDriver.this.contexts.put(context.getId(), context);
+ JobDriver.this.submit(context);
+ }
}
}
}
@@ -270,21 +288,23 @@ public final class JobDriver {
@Override
public void onNext(final CompletedTask task) {
LOG.log(Level.INFO, "Completed task: {0}", task.getId());
- // Take the message returned by the task and add it to the running result.
- String result = "default result";
- try {
- result = new String(task.get());
- } catch (final Exception e) {
- LOG.log(Level.WARNING, "failed to decode task outcome");
- }
- LOG.log(Level.INFO, "Return results to the client:\n{0}", result);
- JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(result));
- if (JobDriver.this.completedTaskHandler == 0) {
- LOG.log(Level.INFO, "No CLR handler bound to handle completed task.");
- } else {
- LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler.");
- CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task);
- NativeInterop.ClrSystemCompletedTaskHandlerOnNext(JobDriver.this.completedTaskHandler, completedTaskBridge, JobDriver.this.interopLogger);
+ try (final LoggingScope ls = loggingScopeFactory.taskCompleted(task.getId())) {
+ // Take the message returned by the task and add it to the running result.
+ String result = "default result";
+ try {
+ result = new String(task.get());
+ } catch (final Exception e) {
+ LOG.log(Level.WARNING, "failed to decode task outcome");
+ }
+ LOG.log(Level.INFO, "Return results to the client:\n{0}", result);
+ JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(result));
+ if (JobDriver.this.completedTaskHandler == 0) {
+ LOG.log(Level.INFO, "No CLR handler bound to handle completed task.");
+ } else {
+ LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler.");
+ CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task);
+ NativeInterop.ClrSystemCompletedTaskHandlerOnNext(JobDriver.this.completedTaskHandler, completedTaskBridge, JobDriver.this.interopLogger);
+ }
}
}
}
@@ -295,38 +315,40 @@ public final class JobDriver {
final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> {
@Override
public void onNext(final FailedEvaluator eval) {
- synchronized (JobDriver.this) {
- LOG.log(Level.SEVERE, "FailedEvaluator", eval);
- for (final FailedContext failedContext : eval.getFailedContextList()) {
- String failedContextId = failedContext.getId();
- LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts.");
- JobDriver.this.contexts.remove(failedContextId);
- }
- String message = "Evaluator " + eval.getId() + " failed with message: "
- + eval.getEvaluatorException().getMessage();
- JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
-
- if (failedEvaluatorHandler == 0) {
- if (JobDriver.this.clrBridgeSetup) {
- message = "No CLR FailedEvaluator handler was set, exiting now";
- LOG.log(Level.WARNING, message);
- JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
- return;
- } else {
- clock.scheduleAlarm(0, new EventHandler<Alarm>() {
- @Override
- public void onNext(final Alarm time) {
- if (JobDriver.this.clrBridgeSetup) {
- handleFailedEvaluatorInCLR(eval);
- } else {
- LOG.log(Level.INFO, "Waiting for CLR bridge to be set up");
- clock.scheduleAlarm(5000, this);
+ try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(eval.getId())) {
+ synchronized (JobDriver.this) {
+ LOG.log(Level.SEVERE, "FailedEvaluator", eval);
+ for (final FailedContext failedContext : eval.getFailedContextList()) {
+ String failedContextId = failedContext.getId();
+ LOG.log(Level.INFO, "removing context " + failedContextId + " from job driver contexts.");
+ JobDriver.this.contexts.remove(failedContextId);
+ }
+ String message = "Evaluator " + eval.getId() + " failed with message: "
+ + eval.getEvaluatorException().getMessage();
+ JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
+
+ if (failedEvaluatorHandler == 0) {
+ if (JobDriver.this.clrBridgeSetup) {
+ message = "No CLR FailedEvaluator handler was set, exiting now";
+ LOG.log(Level.WARNING, message);
+ JobDriver.this.jobMessageObserver.sendMessageToClient(message.getBytes());
+ return;
+ } else {
+ clock.scheduleAlarm(0, new EventHandler<Alarm>() {
+ @Override
+ public void onNext(final Alarm time) {
+ if (JobDriver.this.clrBridgeSetup) {
+ handleFailedEvaluatorInCLR(eval);
+ } else {
+ LOG.log(Level.INFO, "Waiting for CLR bridge to be set up");
+ clock.scheduleAlarm(5000, this);
+ }
}
- }
- });
+ });
+ }
+ } else {
+ handleFailedEvaluatorInCLR(eval);
}
- } else {
- handleFailedEvaluatorInCLR(eval);
}
}
}
@@ -334,7 +356,7 @@ public final class JobDriver {
private void handleFailedEvaluatorInCLR(final FailedEvaluator eval) {
final String message = "CLR FailedEvaluator handler set, handling things with CLR handler.";
LOG.log(Level.INFO, message);
- FailedEvaluatorBridge failedEvaluatorBridge = new FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor, JobDriver.this.isRestarted);
+ FailedEvaluatorBridge failedEvaluatorBridge = new FailedEvaluatorBridge(eval, JobDriver.this.evaluatorRequestor, JobDriver.this.isRestarted, loggingScopeFactory);
NativeInterop.ClrSystemFailedEvaluatorHandlerOnNext(JobDriver.this.failedEvaluatorHandler, failedEvaluatorBridge, JobDriver.this.interopLogger);
int additionalRequestedEvaluatorNumber = failedEvaluatorBridge.getNewlyRequestedEvaluatorNumber();
if (additionalRequestedEvaluatorNumber > 0) {
@@ -366,19 +388,21 @@ public final class JobDriver {
@Override
public void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response) throws IOException, ServletException {
LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest: {0}", parsedHttpRequest.getRequestUri());
- final AvroHttpSerializer httpSerializer = new AvroHttpSerializer();
- final AvroHttpRequest avroHttpRequest = httpSerializer.toAvro(parsedHttpRequest);
- final byte[] requestBytes = httpSerializer.toBytes(avroHttpRequest);
+ try (final LoggingScope ls = loggingScopeFactory.httpRequest(parsedHttpRequest.getRequestUri())) {
+ final AvroHttpSerializer httpSerializer = new AvroHttpSerializer();
+ final AvroHttpRequest avroHttpRequest = httpSerializer.toAvro(parsedHttpRequest);
+ final byte[] requestBytes = httpSerializer.toBytes(avroHttpRequest);
- try {
- final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge(requestBytes);
- NativeInterop.ClrSystemHttpServerHandlerOnNext(JobDriver.this.httpServerEventHandler, httpServerEventBridge, JobDriver.this.interopLogger);
- final String responseBody = new String(httpServerEventBridge.getQueryResponseData(), "UTF-8");
- response.getWriter().println(responseBody);
- LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest received response: {0}", responseBody);
- } catch (final Exception ex) {
- LOG.log(Level.SEVERE, "Fail to invoke CLR Http Server handler", ex);
- throw new RuntimeException(ex);
+ try {
+ final HttpServerEventBridge httpServerEventBridge = new HttpServerEventBridge(requestBytes);
+ NativeInterop.ClrSystemHttpServerHandlerOnNext(JobDriver.this.httpServerEventHandler, httpServerEventBridge, JobDriver.this.interopLogger);
+ final String responseBody = new String(httpServerEventBridge.getQueryResponseData(), "UTF-8");
+ response.getWriter().println(responseBody);
+ LOG.log(Level.INFO, "HttpServerBridgeEventHandler onHttpRequest received response: {0}", responseBody);
+ } catch (final Exception ex) {
+ LOG.log(Level.SEVERE, "Fail to invoke CLR Http Server handler", ex);
+ throw new RuntimeException(ex);
+ }
}
}
}
@@ -410,16 +434,18 @@ public final class JobDriver {
final class RunningTaskHandler implements EventHandler<RunningTask> {
@Override
public void onNext(final RunningTask task) {
- if (JobDriver.this.runningTaskHandler == 0) {
- LOG.log(Level.INFO, "RunningTask event received but no CLR handler was bound. Exiting handler.");
- } else {
- LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId());
- try {
- final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task);
- NativeInterop.ClrSystemRunningTaskHandlerOnNext(JobDriver.this.runningTaskHandler, runningTaskBridge, JobDriver.this.interopLogger);
- } catch (final Exception ex) {
- LOG.log(Level.WARNING, "Fail to invoke CLR running task handler");
- throw new RuntimeException(ex);
+ try (final LoggingScope ls = loggingScopeFactory.taskRunning(task.getId())) {
+ if (JobDriver.this.runningTaskHandler == 0) {
+ LOG.log(Level.INFO, "RunningTask event received but no CLR handler was bound. Exiting handler.");
+ } else {
+ LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId());
+ try {
+ final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task);
+ NativeInterop.ClrSystemRunningTaskHandlerOnNext(JobDriver.this.runningTaskHandler, runningTaskBridge, JobDriver.this.interopLogger);
+ } catch (final Exception ex) {
+ LOG.log(Level.WARNING, "Fail to invoke CLR running task handler");
+ throw new RuntimeException(ex);
+ }
}
}
}
@@ -431,23 +457,24 @@ public final class JobDriver {
final class DriverRestartRunningTaskHandler implements EventHandler<RunningTask> {
@Override
public void onNext(final RunningTask task) {
- LOG.log(Level.INFO, "DriverRestartRunningTask event received: " + task.getId());
- clock.scheduleAlarm(0, new EventHandler<Alarm>() {
- @Override
- public void onNext(final Alarm time) {
- if (JobDriver.this.clrBridgeSetup) {
- if (JobDriver.this.driverRestartRunningTaskHandler != 0) {
- LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR.");
- NativeInterop.ClrSystemDriverRestartRunningTaskHandlerOnNext(JobDriver.this.driverRestartRunningTaskHandler, new RunningTaskBridge(task));
+ try (final LoggingScope ls = loggingScopeFactory.driverRestartRunningTask(task.getId())) {
+ clock.scheduleAlarm(0, new EventHandler<Alarm>() {
+ @Override
+ public void onNext(final Alarm time) {
+ if (JobDriver.this.clrBridgeSetup) {
+ if (JobDriver.this.driverRestartRunningTaskHandler != 0) {
+ LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR.");
+ NativeInterop.ClrSystemDriverRestartRunningTaskHandlerOnNext(JobDriver.this.driverRestartRunningTaskHandler, new RunningTaskBridge(task));
+ } else {
+ LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, done with DriverRestartRunningTaskHandler.");
+ }
} else {
- LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, done with DriverRestartRunningTaskHandler.");
+ LOG.log(Level.INFO, "Waiting for driver to complete restart process before checking out CLR driver restart RunningTaskHandler...");
+ clock.scheduleAlarm(2000, this);
}
- } else {
- LOG.log(Level.INFO, "Waiting for driver to complete restart process before checking out CLR driver restart RunningTaskHandler...");
- clock.scheduleAlarm(2000, this);
}
- }
- });
+ });
+ }
}
}
@@ -457,24 +484,26 @@ public final class JobDriver {
final class DriverRestartActiveContextHandler implements EventHandler<ActiveContext> {
@Override
public void onNext(final ActiveContext context) {
- JobDriver.this.contexts.put(context.getId(), context);
+ try (final LoggingScope ls = loggingScopeFactory.driverRestartActiveContextReceived(context.getId())) {
+ JobDriver.this.contexts.put(context.getId(), context);
LOG.log(Level.INFO, "DriverRestartActiveContextHandler event received: " + context.getId());
- clock.scheduleAlarm(0, new EventHandler<Alarm>() {
- @Override
- public void onNext(final Alarm time) {
- if (JobDriver.this.clrBridgeSetup) {
- if (JobDriver.this.driverRestartActiveContextHandler != 0) {
- LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR.");
- NativeInterop.ClrSystemDriverRestartActiveContextHandlerOnNext(JobDriver.this.driverRestartActiveContextHandler, new ActiveContextBridge(context));
+ clock.scheduleAlarm(0, new EventHandler<Alarm>() {
+ @Override
+ public void onNext(final Alarm time) {
+ if (JobDriver.this.clrBridgeSetup) {
+ if (JobDriver.this.driverRestartActiveContextHandler != 0) {
+ LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR.");
+ NativeInterop.ClrSystemDriverRestartActiveContextHandlerOnNext(JobDriver.this.driverRestartActiveContextHandler, new ActiveContextBridge(context));
+ } else {
+ LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, done with DriverRestartActiveContextHandler.");
+ }
} else {
- LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, done with DriverRestartActiveContextHandler.");
+ LOG.log(Level.INFO, "Waiting for driver to complete restart process before checking out CLR driver restart DriverRestartActiveContextHandler...");
+ clock.scheduleAlarm(2000, this);
}
- } else {
- LOG.log(Level.INFO, "Waiting for driver to complete restart process before checking out CLR driver restart DriverRestartActiveContextHandler...");
- clock.scheduleAlarm(2000, this);
}
- }
- });
+ });
+ }
}
}
@@ -484,20 +513,22 @@ public final class JobDriver {
final class StartHandler implements EventHandler<StartTime> {
@Override
public void onNext(final StartTime startTime) {
- synchronized (JobDriver.this) {
+ try (final LoggingScope ls = loggingScopeFactory.driverStart(startTime)) {
+ synchronized (JobDriver.this) {
- setupBridge(startTime);
+ setupBridge(startTime);
- LOG.log(Level.INFO, "Driver Started");
+ LOG.log(Level.INFO, "Driver Started");
- if (JobDriver.this.evaluatorRequestorHandler == 0) {
- throw new RuntimeException("Evaluator Requestor Handler not initialized by CLR.");
+ if (JobDriver.this.evaluatorRequestorHandler == 0) {
+ throw new RuntimeException("Evaluator Requestor Handler not initialized by CLR.");
+ }
+ EvaluatorRequestorBridge evaluatorRequestorBridge = new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false, loggingScopeFactory);
+ NativeInterop.ClrSystemEvaluatorRequstorHandlerOnNext(JobDriver.this.evaluatorRequestorHandler, evaluatorRequestorBridge, JobDriver.this.interopLogger);
+ // get the evaluator numbers set by CLR handler
+ nCLREvaluators += evaluatorRequestorBridge.getEvaluatorNumber();
+ LOG.log(Level.INFO, "evaluator requested: " + nCLREvaluators);
}
- EvaluatorRequestorBridge evaluatorRequestorBridge = new EvaluatorRequestorBridge(JobDriver.this.evaluatorRequestor, false);
- NativeInterop.ClrSystemEvaluatorRequstorHandlerOnNext(JobDriver.this.evaluatorRequestorHandler, evaluatorRequestorBridge, JobDriver.this.interopLogger);
- // get the evaluator numbers set by CLR handler
- nCLREvaluators += evaluatorRequestorBridge.getEvaluatorNumber();
- LOG.log(Level.INFO, "evaluator requested: " + nCLREvaluators);
}
}
}
@@ -509,13 +540,15 @@ public final class JobDriver {
final class RestartHandler implements EventHandler<StartTime> {
@Override
public void onNext(final StartTime startTime) {
- synchronized (JobDriver.this) {
+ try (final LoggingScope ls = loggingScopeFactory.driverRestart(startTime)) {
+ synchronized (JobDriver.this) {
- setupBridge(startTime);
+ setupBridge(startTime);
- JobDriver.this.isRestarted = true;
+ JobDriver.this.isRestarted = true;
- LOG.log(Level.INFO, "Driver Restarted and CLR bridge set up.");
+ LOG.log(Level.INFO, "Driver Restarted and CLR bridge set up.");
+ }
}
}
}
@@ -527,11 +560,14 @@ public final class JobDriver {
@Override
public void onNext(final DriverRestartCompleted driverRestartCompleted) {
LOG.log(Level.INFO, "Java DriverRestartCompleted event received at time [{0}]. ", driverRestartCompleted.getTimeStamp());
- if (JobDriver.this.driverRestartHandler != 0) {
- LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR.");
- NativeInterop.ClrSystemDriverRestartHandlerOnNext(JobDriver.this.driverRestartHandler);
- } else {
- LOG.log(Level.WARNING, "No CLR driver restart handler implemented, done with DriverRestartCompletedHandler.");
+ try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted(driverRestartCompleted.getTimeStamp())) {
+ if (JobDriver.this.driverRestartHandler != 0) {
+ LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR.");
+ NativeInterop.ClrSystemDriverRestartHandlerOnNext(JobDriver.this.driverRestartHandler);
+ } else {
+ LOG.log(Level.WARNING, "No CLR driver restart handler implemented, done with DriverRestartCompletedHandler.");
+
+ }
}
}
}
@@ -543,8 +579,10 @@ public final class JobDriver {
@Override
public void onNext(final StopTime time) {
LOG.log(Level.INFO, " StopTime: {0}", new Object[]{time});
- for (final ActiveContext context : contexts.values()) {
- context.close();
+ try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimeStamp())) {
+ for (final ActiveContext context : contexts.values()) {
+ context.close();
+ }
}
}
}
@@ -552,12 +590,15 @@ public final class JobDriver {
final class TaskMessageHandler implements EventHandler<TaskMessage> {
@Override
public void onNext(final TaskMessage taskMessage) {
- LOG.log(Level.INFO, "Received TaskMessage: {0} from CLR", new String(taskMessage.get()));
+ String msg = new String(taskMessage.get());
+ LOG.log(Level.INFO, "Received TaskMessage: {0} from CLR", msg);
+ //try (LoggingScope ls = loggingScopeFactory.taskMessageReceived(new String(msg))) {
if (JobDriver.this.taskMessageHandler != 0) {
TaskMessageBridge taskMessageBridge = new TaskMessageBridge(taskMessage);
// if CLR implements the task message handler, handle the bytes in CLR handler
NativeInterop.ClrSystemTaskMessageHandlerOnNext(JobDriver.this.taskMessageHandler, taskMessage.get(), taskMessageBridge, JobDriver.this.interopLogger);
}
+ //}
}
}
@@ -569,13 +610,15 @@ public final class JobDriver {
public final void onNext(final SuspendedTask task) {
final String message = "Received notification that task [" + task.getId() + "] has been suspended.";
LOG.log(Level.INFO, message);
- if (JobDriver.this.suspendedTaskHandler != 0) {
- SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task);
- // if CLR implements the suspended task handler, handle it in CLR
- LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge.");
- NativeInterop.ClrSystemSupendedTaskHandlerOnNext(JobDriver.this.suspendedTaskHandler, suspendedTaskBridge);
+ try (final LoggingScope ls = loggingScopeFactory.taskSuspended(task.getId())) {
+ if (JobDriver.this.suspendedTaskHandler != 0) {
+ SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task);
+ // if CLR implements the suspended task handler, handle it in CLR
+ LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge.");
+ NativeInterop.ClrSystemSupendedTaskHandlerOnNext(JobDriver.this.suspendedTaskHandler, suspendedTaskBridge);
+ }
+ JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(message));
}
- JobDriver.this.jobMessageObserver.sendMessageToClient(JVM_CODEC.encode(message));
}
}
@@ -586,11 +629,13 @@ public final class JobDriver {
@Override
public void onNext(final CompletedEvaluator evaluator) {
LOG.log(Level.INFO, " Completed Evaluator {0}", evaluator.getId());
- if (JobDriver.this.completedEvaluatorHandler != 0) {
- CompletedEvaluatorBridge completedEvaluatorBridge = new CompletedEvaluatorBridge(evaluator);
- // if CLR implements the completed evaluator handler, handle it in CLR
- LOG.log(Level.INFO, "Handling the event of completed evaluator in CLR bridge.");
- NativeInterop.ClrSystemCompletdEvaluatorHandlerOnNext(completedEvaluatorHandler, completedEvaluatorBridge);
+ try (final LoggingScope ls = loggingScopeFactory.evaluatorCompleted(evaluator.getId())) {
+ if (JobDriver.this.completedEvaluatorHandler != 0) {
+ CompletedEvaluatorBridge completedEvaluatorBridge = new CompletedEvaluatorBridge(evaluator);
+ // if CLR implements the completed evaluator handler, handle it in CLR
+ LOG.log(Level.INFO, "Handling the event of completed evaluator in CLR bridge.");
+ NativeInterop.ClrSystemCompletdEvaluatorHandlerOnNext(completedEvaluatorHandler, completedEvaluatorBridge);
+ }
}
}
}
@@ -604,14 +649,16 @@ public final class JobDriver {
@Override
public void onNext(final ClosedContext context) {
LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
- if (JobDriver.this.closedContextHandler != 0) {
- ClosedContextBridge closedContextBridge = new ClosedContextBridge(context);
- // if CLR implements the closed context handler, handle it in CLR
- LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge.");
- NativeInterop.ClrSystemClosedContextHandlerOnNext(JobDriver.this.closedContextHandler, closedContextBridge);
- }
- synchronized (JobDriver.this) {
- JobDriver.this.contexts.remove(context.getId());
+ try (final LoggingScope ls = loggingScopeFactory.closedContext(context.getId())) {
+ if (JobDriver.this.closedContextHandler != 0) {
+ ClosedContextBridge closedContextBridge = new ClosedContextBridge(context);
+ // if CLR implements the closed context handler, handle it in CLR
+ LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge.");
+ NativeInterop.ClrSystemClosedContextHandlerOnNext(JobDriver.this.closedContextHandler, closedContextBridge);
+ }
+ synchronized (JobDriver.this) {
+ JobDriver.this.contexts.remove(context.getId());
+ }
}
}
}
@@ -625,18 +672,20 @@ public final class JobDriver {
@Override
public void onNext(final FailedContext context) {
LOG.log(Level.SEVERE, "FailedContext", context);
- if (JobDriver.this.failedContextHandler != 0) {
- FailedContextBridge failedContextBridge = new FailedContextBridge(context);
- // if CLR implements the failed context handler, handle it in CLR
- LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge.");
- NativeInterop.ClrSystemFailedContextHandlerOnNext(JobDriver.this.failedContextHandler, failedContextBridge);
- }
- synchronized (JobDriver.this) {
- JobDriver.this.contexts.remove(context.getId());
- }
- Optional<byte[]> err = context.getData();
- if (err.isPresent()) {
- JobDriver.this.jobMessageObserver.sendMessageToClient(err.get());
+ try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(context.getId())) {
+ if (JobDriver.this.failedContextHandler != 0) {
+ FailedContextBridge failedContextBridge = new FailedContextBridge(context);
+ // if CLR implements the failed context handler, handle it in CLR
+ LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge.");
+ NativeInterop.ClrSystemFailedContextHandlerOnNext(JobDriver.this.failedContextHandler, failedContextBridge);
+ }
+ synchronized (JobDriver.this) {
+ JobDriver.this.contexts.remove(context.getId());
+ }
+ Optional<byte[]> err = context.getData();
+ if (err.isPresent()) {
+ JobDriver.this.jobMessageObserver.sendMessageToClient(err.get());
+ }
}
}
}
@@ -649,13 +698,14 @@ public final class JobDriver {
@Override
public void onNext(final ContextMessage message) {
LOG.log(Level.SEVERE, "Received ContextMessage:", message.get());
- if (JobDriver.this.contextMessageHandler != 0) {
- ContextMessageBridge contextMessageBridge = new ContextMessageBridge(message);
- // if CLR implements the context message handler, handle it in CLR
- LOG.log(Level.INFO, "Handling the event of context message in CLR bridge.");
- NativeInterop.ClrSystemContextMessageHandlerOnNext(JobDriver.this.contextMessageHandler, contextMessageBridge);
+ try (final LoggingScope ls = loggingScopeFactory.contextMessageReceived(message.get().toString())) {
+ if (JobDriver.this.contextMessageHandler != 0) {
+ ContextMessageBridge contextMessageBridge = new ContextMessageBridge(message);
+ // if CLR implements the context message handler, handle it in CLR
+ LOG.log(Level.INFO, "Handling the event of context message in CLR bridge.");
+ NativeInterop.ClrSystemContextMessageHandlerOnNext(JobDriver.this.contextMessageHandler, contextMessageBridge);
+ }
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8026c8ad/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java
----------------------------------------------------------------------
diff --git a/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java b/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java
index d5cd353..b1473ee 100644
--- a/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java
+++ b/reef-bridge-project/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/Launch.java
@@ -30,10 +30,14 @@ import org.apache.reef.tang.annotations.NamedParameter;
import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.tang.formats.CommandLine;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+import org.apache.reef.util.logging.LoggingScopeImpl;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Date;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -101,33 +105,35 @@ public final class Launch {
private static Configuration getClientConfiguration(final String[] args)
throws BindException, InjectionException, IOException {
- final Configuration commandLineConf = parseCommandLine(args);
-
- final Configuration clientConfiguration = ClientConfiguration.CONF
- .set(ClientConfiguration.ON_JOB_COMPLETED, JobClient.CompletedJobHandler.class)
- .set(ClientConfiguration.ON_JOB_FAILED, JobClient.FailedJobHandler.class)
- .set(ClientConfiguration.ON_RUNTIME_ERROR, JobClient.RuntimeErrorHandler.class)
- //.set(ClientConfiguration.ON_WAKE_ERROR, JobClient.WakeErrorHandler.class )
- .build();
-
- // TODO: Remove the injector, have stuff injected.
- final Injector commandLineInjector = Tang.Factory.getTang().newInjector(commandLineConf);
- final boolean isLocal = commandLineInjector.getNamedInstance(Local.class);
- final Configuration runtimeConfiguration;
- if (isLocal) {
- LOG.log(Level.INFO, "Running on the local runtime");
- runtimeConfiguration = LocalRuntimeConfiguration.CONF
- .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS)
+ try (final LoggingScope ls = LoggingScopeFactory.getNewLoggingScope(Level.INFO, "Launch::getClientConfiguration")) {
+ final Configuration commandLineConf = parseCommandLine(args);
+
+ final Configuration clientConfiguration = ClientConfiguration.CONF
+ .set(ClientConfiguration.ON_JOB_COMPLETED, JobClient.CompletedJobHandler.class)
+ .set(ClientConfiguration.ON_JOB_FAILED, JobClient.FailedJobHandler.class)
+ .set(ClientConfiguration.ON_RUNTIME_ERROR, JobClient.RuntimeErrorHandler.class)
+ //.set(ClientConfiguration.ON_WAKE_ERROR, JobClient.WakeErrorHandler.class )
.build();
- } else {
- LOG.log(Level.INFO, "Running on YARN");
- runtimeConfiguration = YarnClientConfiguration.CONF.build();
- }
- return Tang.Factory.getTang()
- .newConfigurationBuilder(runtimeConfiguration, clientConfiguration,
- cloneCommandLineConfiguration(commandLineConf))
- .build();
+ // TODO: Remove the injector, have stuff injected.
+ final Injector commandLineInjector = Tang.Factory.getTang().newInjector(commandLineConf);
+ final boolean isLocal = commandLineInjector.getNamedInstance(Local.class);
+ final Configuration runtimeConfiguration;
+ if (isLocal) {
+ LOG.log(Level.INFO, "Running on the local runtime");
+ runtimeConfiguration = LocalRuntimeConfiguration.CONF
+ .set(LocalRuntimeConfiguration.NUMBER_OF_THREADS, NUM_LOCAL_THREADS)
+ .build();
+ } else {
+ LOG.log(Level.INFO, "Running on YARN");
+ runtimeConfiguration = YarnClientConfiguration.CONF.build();
+ }
+
+ return Tang.Factory.getTang()
+ .newConfigurationBuilder(runtimeConfiguration, clientConfiguration,
+ cloneCommandLineConfiguration(commandLineConf))
+ .build();
+ }
}
/**
@@ -136,6 +142,7 @@ public final class Launch {
* @param args command line parameters.
*/
public static void main(final String[] args) {
+ LOG.log(Level.INFO, "Entering Launch at :::" + new Date());
try {
if (args == null || args.length == 0) {
throw new IllegalArgumentException("No arguments provided, at least a clrFolder should be supplied.");
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8026c8ad/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java b/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java
index d74f2db..a44441d 100644
--- a/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/client/REEFImplementation.java
@@ -28,6 +28,8 @@ import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
import javax.inject.Inject;
import java.util.logging.Logger;
@@ -43,6 +45,7 @@ public final class REEFImplementation implements REEF {
private final RunningJobs runningJobs;
private final JobSubmissionHelper jobSubmissionHelper;
private final ClientWireUp clientWireUp;
+ private final LoggingScopeFactory loggingScopeFactory;
/**
* @param jobSubmissionHandler
@@ -56,12 +59,14 @@ public final class REEFImplementation implements REEF {
final RunningJobs runningJobs,
final JobSubmissionHelper jobSubmissionHelper,
final JobStatusMessageHandler jobStatusMessageHandler,
- final ClientWireUp clientWireUp) {
+ final ClientWireUp clientWireUp,
+ final LoggingScopeFactory loggingScopeFactory) {
this.jobSubmissionHandler = jobSubmissionHandler;
this.runningJobs = runningJobs;
this.jobSubmissionHelper = jobSubmissionHelper;
this.clientWireUp = clientWireUp;
clientWireUp.performWireUp();
+ this.loggingScopeFactory = loggingScopeFactory;
}
@Override
@@ -72,22 +77,24 @@ public final class REEFImplementation implements REEF {
@Override
public void submit(final Configuration driverConf) {
- final JobSubmissionProto submissionMessage;
- try {
- if (this.clientWireUp.isClientPresent()) {
- submissionMessage = this.jobSubmissionHelper.getJobsubmissionProto(driverConf)
- .setRemoteId(this.clientWireUp.getRemoteManagerIdentifier())
- .build();
- } else {
- submissionMessage = this.jobSubmissionHelper.getJobsubmissionProto(driverConf)
- .setRemoteId(ErrorHandlerRID.NONE)
- .build();
+ try (LoggingScope ls = this.loggingScopeFactory.reefSubmit()) {
+ final JobSubmissionProto submissionMessage;
+ try {
+ if (this.clientWireUp.isClientPresent()) {
+ submissionMessage = this.jobSubmissionHelper.getJobsubmissionProto(driverConf)
+ .setRemoteId(this.clientWireUp.getRemoteManagerIdentifier())
+ .build();
+ } else {
+ submissionMessage = this.jobSubmissionHelper.getJobsubmissionProto(driverConf)
+ .setRemoteId(ErrorHandlerRID.NONE)
+ .build();
+ }
+ } catch (final Exception e) {
+ throw new RuntimeException("Exception while processing driver configuration.", e);
}
- } catch (final Exception e) {
- throw new RuntimeException("Exception while processing driver configuration.", e);
- }
- this.jobSubmissionHandler.onNext(submissionMessage);
+ this.jobSubmissionHandler.onNext(submissionMessage);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8026c8ad/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
index 6e8f876..940de3c 100644
--- a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
@@ -25,6 +25,8 @@ import org.apache.reef.driver.evaluator.EvaluatorRequest;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
import org.apache.reef.proto.DriverRuntimeProtocol;
import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
import javax.inject.Inject;
import java.util.logging.Level;
@@ -40,6 +42,7 @@ public final class EvaluatorRequestorImpl implements EvaluatorRequestor {
private final ResourceCatalog resourceCatalog;
private final ResourceRequestHandler resourceRequestHandler;
+ private final LoggingScopeFactory loggingScopeFactory;
/**
* @param resourceCatalog
@@ -47,9 +50,11 @@ public final class EvaluatorRequestorImpl implements EvaluatorRequestor {
*/
@Inject
public EvaluatorRequestorImpl(final ResourceCatalog resourceCatalog,
- final ResourceRequestHandler resourceRequestHandler) {
+ final ResourceRequestHandler resourceRequestHandler,
+ final LoggingScopeFactory loggingScopeFactory) {
this.resourceCatalog = resourceCatalog;
this.resourceRequestHandler = resourceRequestHandler;
+ this.loggingScopeFactory = loggingScopeFactory;
}
@Override
@@ -66,23 +71,25 @@ public final class EvaluatorRequestorImpl implements EvaluatorRequestor {
throw new IllegalArgumentException("Given an unsupported number of evaluators: " + req.getNumber());
}
- final DriverRuntimeProtocol.ResourceRequestProto.Builder request = DriverRuntimeProtocol.ResourceRequestProto
- .newBuilder()
- .setResourceCount(req.getNumber())
- .setVirtualCores(req.getNumberOfCores())
- .setMemorySize(req.getMegaBytes());
+ try (LoggingScope ls = loggingScopeFactory.evaluatorSubmit(req.getNumber())) {
+ final DriverRuntimeProtocol.ResourceRequestProto.Builder request = DriverRuntimeProtocol.ResourceRequestProto
+ .newBuilder()
+ .setResourceCount(req.getNumber())
+ .setVirtualCores(req.getNumberOfCores())
+ .setMemorySize(req.getMegaBytes());
- final ResourceCatalog.Descriptor descriptor = req.getDescriptor();
- if (descriptor != null) {
- if (descriptor instanceof RackDescriptor) {
- request.addRackName(descriptor.getName());
- } else if (descriptor instanceof NodeDescriptor) {
- request.addNodeName(descriptor.getName());
- } else {
- throw new IllegalArgumentException("Unable to operate on descriptors of type " + descriptor.getClass().getName());
+ final ResourceCatalog.Descriptor descriptor = req.getDescriptor();
+ if (descriptor != null) {
+ if (descriptor instanceof RackDescriptor) {
+ request.addRackName(descriptor.getName());
+ } else if (descriptor instanceof NodeDescriptor) {
+ request.addNodeName(descriptor.getName());
+ } else {
+ throw new IllegalArgumentException("Unable to operate on descriptors of type " + descriptor.getClass().getName());
+ }
}
- }
- this.resourceRequestHandler.onNext(request.build());
+ this.resourceRequestHandler.onNext(request.build());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8026c8ad/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java
index 4e9b84a..462d6c9 100644
--- a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java
@@ -32,6 +32,8 @@ import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.tang.formats.ConfigurationModule;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.Optional;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
import java.io.File;
import java.util.Collection;
@@ -52,6 +54,7 @@ final class AllocatedEvaluatorImpl implements AllocatedEvaluator {
private final String remoteID;
private final ConfigurationSerializer configurationSerializer;
private final String jobIdentifier;
+ private final LoggingScopeFactory loggingScopeFactory;
/**
* The set of files to be places on the Evaluator.
@@ -65,11 +68,13 @@ final class AllocatedEvaluatorImpl implements AllocatedEvaluator {
AllocatedEvaluatorImpl(final EvaluatorManager evaluatorManager,
final String remoteID,
final ConfigurationSerializer configurationSerializer,
- final String jobIdentifier) {
+ final String jobIdentifier,
+ final LoggingScopeFactory loggingScopeFactory) {
this.evaluatorManager = evaluatorManager;
this.remoteID = remoteID;
this.configurationSerializer = configurationSerializer;
this.jobIdentifier = jobIdentifier;
+ this.loggingScopeFactory = loggingScopeFactory;
}
@Override
@@ -139,74 +144,76 @@ final class AllocatedEvaluatorImpl implements AllocatedEvaluator {
private final void launch(final Configuration contextConfiguration,
final Optional<Configuration> serviceConfiguration,
final Optional<Configuration> taskConfiguration) {
- try {
- final ConfigurationModule evaluatorConfigurationModule = EvaluatorConfiguration.CONF
- .set(EvaluatorConfiguration.APPLICATION_IDENTIFIER, this.jobIdentifier)
- .set(EvaluatorConfiguration.DRIVER_REMOTE_IDENTIFIER, this.remoteID)
- .set(EvaluatorConfiguration.EVALUATOR_IDENTIFIER, this.getId());
-
- final String encodedContextConfigurationString = this.configurationSerializer.toString(contextConfiguration);
- // Add the (optional) service configuration
- final ConfigurationModule contextConfigurationModule;
- if (serviceConfiguration.isPresent()) {
- // With service configuration
- final String encodedServiceConfigurationString = this.configurationSerializer.toString(serviceConfiguration.get());
- contextConfigurationModule = evaluatorConfigurationModule
- .set(EvaluatorConfiguration.ROOT_SERVICE_CONFIGURATION, encodedServiceConfigurationString)
- .set(EvaluatorConfiguration.ROOT_CONTEXT_CONFIGURATION, encodedContextConfigurationString);
- } else {
- // No service configuration
- contextConfigurationModule = evaluatorConfigurationModule
- .set(EvaluatorConfiguration.ROOT_CONTEXT_CONFIGURATION, encodedContextConfigurationString);
- }
+ try (final LoggingScope lb = loggingScopeFactory.evaluatorLaunch(this.getId())) {
+ try {
+ final ConfigurationModule evaluatorConfigurationModule = EvaluatorConfiguration.CONF
+ .set(EvaluatorConfiguration.APPLICATION_IDENTIFIER, this.jobIdentifier)
+ .set(EvaluatorConfiguration.DRIVER_REMOTE_IDENTIFIER, this.remoteID)
+ .set(EvaluatorConfiguration.EVALUATOR_IDENTIFIER, this.getId());
+
+ final String encodedContextConfigurationString = this.configurationSerializer.toString(contextConfiguration);
+ // Add the (optional) service configuration
+ final ConfigurationModule contextConfigurationModule;
+ if (serviceConfiguration.isPresent()) {
+ // With service configuration
+ final String encodedServiceConfigurationString = this.configurationSerializer.toString(serviceConfiguration.get());
+ contextConfigurationModule = evaluatorConfigurationModule
+ .set(EvaluatorConfiguration.ROOT_SERVICE_CONFIGURATION, encodedServiceConfigurationString)
+ .set(EvaluatorConfiguration.ROOT_CONTEXT_CONFIGURATION, encodedContextConfigurationString);
+ } else {
+ // No service configuration
+ contextConfigurationModule = evaluatorConfigurationModule
+ .set(EvaluatorConfiguration.ROOT_CONTEXT_CONFIGURATION, encodedContextConfigurationString);
+ }
- // Add the (optional) task configuration
- final Configuration evaluatorConfiguration;
- if (taskConfiguration.isPresent()) {
- final String encodedTaskConfigurationString = this.configurationSerializer.toString(taskConfiguration.get());
- evaluatorConfiguration = contextConfigurationModule
- .set(EvaluatorConfiguration.TASK_CONFIGURATION, encodedTaskConfigurationString).build();
- } else {
- evaluatorConfiguration = contextConfigurationModule.build();
- }
+ // Add the (optional) task configuration
+ final Configuration evaluatorConfiguration;
+ if (taskConfiguration.isPresent()) {
+ final String encodedTaskConfigurationString = this.configurationSerializer.toString(taskConfiguration.get());
+ evaluatorConfiguration = contextConfigurationModule
+ .set(EvaluatorConfiguration.TASK_CONFIGURATION, encodedTaskConfigurationString).build();
+ } else {
+ evaluatorConfiguration = contextConfigurationModule.build();
+ }
- final DriverRuntimeProtocol.ResourceLaunchProto.Builder rbuilder =
- DriverRuntimeProtocol.ResourceLaunchProto.newBuilder()
- .setIdentifier(this.evaluatorManager.getId())
- .setRemoteId(this.remoteID)
- .setEvaluatorConf(configurationSerializer.toString(evaluatorConfiguration));
-
- for (final File file : this.files) {
- rbuilder.addFile(ReefServiceProtos.FileResourceProto.newBuilder()
- .setName(file.getName())
- .setPath(file.getPath())
- .setType(ReefServiceProtos.FileType.PLAIN)
- .build());
- }
+ final DriverRuntimeProtocol.ResourceLaunchProto.Builder rbuilder =
+ DriverRuntimeProtocol.ResourceLaunchProto.newBuilder()
+ .setIdentifier(this.evaluatorManager.getId())
+ .setRemoteId(this.remoteID)
+ .setEvaluatorConf(configurationSerializer.toString(evaluatorConfiguration));
+
+ for (final File file : this.files) {
+ rbuilder.addFile(ReefServiceProtos.FileResourceProto.newBuilder()
+ .setName(file.getName())
+ .setPath(file.getPath())
+ .setType(ReefServiceProtos.FileType.PLAIN)
+ .build());
+ }
- for (final File lib : this.libraries) {
- rbuilder.addFile(ReefServiceProtos.FileResourceProto.newBuilder()
- .setName(lib.getName())
- .setPath(lib.getPath().toString())
- .setType(ReefServiceProtos.FileType.LIB)
- .build());
- }
+ for (final File lib : this.libraries) {
+ rbuilder.addFile(ReefServiceProtos.FileResourceProto.newBuilder()
+ .setName(lib.getName())
+ .setPath(lib.getPath().toString())
+ .setType(ReefServiceProtos.FileType.LIB)
+ .build());
+ }
- { // Set the type
- switch (this.evaluatorManager.getEvaluatorDescriptor().getType()) {
- case CLR:
- rbuilder.setType(ReefServiceProtos.ProcessType.CLR);
- break;
- default:
- rbuilder.setType(ReefServiceProtos.ProcessType.JVM);
+ { // Set the type
+ switch (this.evaluatorManager.getEvaluatorDescriptor().getType()) {
+ case CLR:
+ rbuilder.setType(ReefServiceProtos.ProcessType.CLR);
+ break;
+ default:
+ rbuilder.setType(ReefServiceProtos.ProcessType.JVM);
+ }
}
- }
- this.evaluatorManager.onResourceLaunch(rbuilder.build());
+ this.evaluatorManager.onResourceLaunch(rbuilder.build());
- } catch (final BindException ex) {
- LOG.log(Level.SEVERE, "Bad Evaluator configuration", ex);
- throw new RuntimeException("Bad Evaluator configuration", ex);
+ } catch (final BindException ex) {
+ LOG.log(Level.SEVERE, "Bad Evaluator configuration", ex);
+ throw new RuntimeException("Bad Evaluator configuration", ex);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8026c8ad/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index 10eb1bc..3938e06 100644
--- a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -47,6 +47,7 @@ import org.apache.reef.tang.annotations.NamedParameter;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.formats.ConfigurationSerializer;
import org.apache.reef.util.Optional;
+import org.apache.reef.util.logging.LoggingScopeFactory;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.RemoteMessage;
import org.apache.reef.wake.time.Clock;
@@ -90,6 +91,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
private final ExceptionCodec exceptionCodec;
private final DriverStatusManager driverStatusManager;
private final EventHandlerIdlenessSource idlenessSource;
+ private final LoggingScopeFactory loggingScopeFactory;
// Mutable fields
@@ -112,7 +114,8 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
final EvaluatorStatusManager stateManager,
final DriverStatusManager driverStatusManager,
final ExceptionCodec exceptionCodec,
- final EventHandlerIdlenessSource idlenessSource) {
+ final EventHandlerIdlenessSource idlenessSource,
+ final LoggingScopeFactory loggingScopeFactory) {
this.contextRepresenters = contextRepresenters;
this.idlenessSource = idlenessSource;
LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId);
@@ -128,9 +131,10 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
this.stateManager = stateManager;
this.driverStatusManager = driverStatusManager;
this.exceptionCodec = exceptionCodec;
+ this.loggingScopeFactory = loggingScopeFactory;
final AllocatedEvaluator allocatedEvaluator =
- new AllocatedEvaluatorImpl(this, remoteManager.getMyIdentifier(), configurationSerializer, getJobIdentifier());
+ new AllocatedEvaluatorImpl(this, remoteManager.getMyIdentifier(), configurationSerializer, getJobIdentifier(), loggingScopeFactory);
LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", evaluatorId);
this.messageDispatcher.onEvaluatorAllocated(allocatedEvaluator);
LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: [{0}]", this.getId());
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8026c8ad/reef-common/src/main/java/org/apache/reef/util/logging/LoggingScopeFactory.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/util/logging/LoggingScopeFactory.java b/reef-common/src/main/java/org/apache/reef/util/logging/LoggingScopeFactory.java
index e64d158..31210bc 100644
--- a/reef-common/src/main/java/org/apache/reef/util/logging/LoggingScopeFactory.java
+++ b/reef-common/src/main/java/org/apache/reef/util/logging/LoggingScopeFactory.java
@@ -75,7 +75,17 @@ public class LoggingScopeFactory {
}
/**
- * Get a new instance of LoggingScope with msg through new
+ * Get a new instance of LoggingScope with specified log level
+ * @param logLevel
+ * @param msg
+ * @return
+ */
+ public static LoggingScope getNewLoggingScope(final Level logLevel, final String msg) {
+ return new LoggingScopeImpl(LOG, logLevel, msg);
+ }
+
+ /**
+ * Get a new instance of LoggingScope with injected LoggingScopeFactory instance
* @param msg
* @return
*/
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8026c8ad/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java
----------------------------------------------------------------------
diff --git a/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java b/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java
index 043b792..ddbd83b 100644
--- a/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java
+++ b/reef-common/src/test/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImplTest.java
@@ -23,7 +23,11 @@ import org.apache.reef.driver.evaluator.EvaluatorRequest;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
import org.apache.reef.proto.DriverRuntimeProtocol;
import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.logging.LoggingScopeFactory;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import static org.mockito.Mockito.mock;
@@ -33,6 +37,12 @@ import static org.mockito.Mockito.mock;
*/
public class EvaluatorRequestorImplTest {
private final ResourceCatalog resourceCatalog = mock(ResourceCatalog.class);
+ private LoggingScopeFactory loggingScopeFactory;
+
+ @Before
+ public void setUp() throws InjectionException {
+ loggingScopeFactory = Tang.Factory.getTang().newInjector().getInstance(LoggingScopeFactory.class);
+ }
/**
* If only memory, no count is given, 1 evaluator should be requested.
@@ -41,7 +51,7 @@ public class EvaluatorRequestorImplTest {
public void testMemoryOnly() {
final int memory = 777;
final DummyRequestHandler requestHandler = new DummyRequestHandler();
- final EvaluatorRequestor evaluatorRequestor = new EvaluatorRequestorImpl(resourceCatalog, requestHandler);
+ final EvaluatorRequestor evaluatorRequestor = new EvaluatorRequestorImpl(resourceCatalog, requestHandler, loggingScopeFactory);
evaluatorRequestor.submit(EvaluatorRequest.newBuilder().setMemory(memory).build());
Assert.assertEquals("Memory request did not make it", requestHandler.get().getMemorySize(), memory);
Assert.assertEquals("Number of requests did not make it", requestHandler.get().getResourceCount(), 1);
@@ -55,7 +65,7 @@ public class EvaluatorRequestorImplTest {
final int memory = 777;
final int count = 9;
final DummyRequestHandler requestHandler = new DummyRequestHandler();
- final EvaluatorRequestor evaluatorRequestor = new EvaluatorRequestorImpl(resourceCatalog, requestHandler);
+ final EvaluatorRequestor evaluatorRequestor = new EvaluatorRequestorImpl(resourceCatalog, requestHandler, loggingScopeFactory);
evaluatorRequestor.submit(EvaluatorRequest.newBuilder().setMemory(memory).setNumber(count).build());
Assert.assertEquals("Memory request did not make it", requestHandler.get().getMemorySize(), memory);
Assert.assertEquals("Number of requests did not make it", requestHandler.get().getResourceCount(), count);
@@ -69,7 +79,7 @@ public class EvaluatorRequestorImplTest {
final int memory = 0;
final int count = 1;
final DummyRequestHandler requestHandler = new DummyRequestHandler();
- final EvaluatorRequestor evaluatorRequestor = new EvaluatorRequestorImpl(resourceCatalog, requestHandler);
+ final EvaluatorRequestor evaluatorRequestor = new EvaluatorRequestorImpl(resourceCatalog, requestHandler, loggingScopeFactory);
evaluatorRequestor.submit(EvaluatorRequest.newBuilder().setMemory(memory).setNumberOfCores(1).setNumber(count).build());
}
@@ -81,7 +91,7 @@ public class EvaluatorRequestorImplTest {
final int memory = 128;
final int count = 0;
final DummyRequestHandler requestHandler = new DummyRequestHandler();
- final EvaluatorRequestor evaluatorRequestor = new EvaluatorRequestorImpl(resourceCatalog, requestHandler);
+ final EvaluatorRequestor evaluatorRequestor = new EvaluatorRequestorImpl(resourceCatalog, requestHandler, loggingScopeFactory);
evaluatorRequestor.submit(EvaluatorRequest.newBuilder().setMemory(memory).setNumberOfCores(1).setNumber(count).build());
}