You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2015/05/13 20:05:45 UTC
incubator-reef git commit: [REEF-310] Improve memory efficiency of
the Driver bridge.
Repository: incubator-reef
Updated Branches:
refs/heads/master 16f4e0737 -> c1b7b0528
[REEF-310] Improve memory efficiency of the Driver bridge.
This change introduces the `ActiveContextBridgeFactory` to `ActiveContextBridge` instances:
* Removed unused instances of `ActiveContextBridge`.
* Replaced the remaining constructor calls with calls to
`ActiveContextBridgeFactory.getActiveContextBridge`.
JIRA:
[REEF-310](https://issues.apache.org/jira/browse/REEF-310)
This closes #177
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/c1b7b052
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/c1b7b052
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/c1b7b052
Branch: refs/heads/master
Commit: c1b7b0528cb00ce3c1a9b020efdf884a90f10d0d
Parents: 16f4e07
Author: Markus Weimer <we...@apache.org>
Authored: Wed May 6 12:34:32 2015 -0700
Committer: Julia Wang <jw...@yahoo.com>
Committed: Wed May 13 11:03:04 2015 -0700
----------------------------------------------------------------------
.../reef/javabridge/ActiveContextBridge.java | 22 +++----
.../javabridge/ActiveContextBridgeFactory.java | 61 ++++++++++++++++++++
.../reef/javabridge/ClosedContextBridge.java | 6 +-
.../reef/javabridge/CompletedTaskBridge.java | 13 +++--
.../reef/javabridge/FailedContextBridge.java | 25 ++++----
.../reef/javabridge/FailedTaskBridge.java | 17 +++---
.../reef/javabridge/RunningTaskBridge.java | 12 ++--
.../reef/javabridge/SuspendedTaskBridge.java | 8 +--
.../reef/javabridge/generic/JobDriver.java | 32 +++++-----
9 files changed, 133 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
index b374e73..47df761 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -27,7 +27,7 @@ import org.apache.reef.tang.formats.AvroConfigurationSerializer;
import java.util.logging.Level;
import java.util.logging.Logger;
-public class ActiveContextBridge extends NativeBridge implements Identifiable {
+public final class ActiveContextBridge extends NativeBridge implements Identifiable {
private static final Logger LOG = Logger.getLogger(ActiveContextBridge.class.getName());
private final ActiveContext jactiveContext;
@@ -36,12 +36,14 @@ public class ActiveContextBridge extends NativeBridge implements Identifiable {
private final String evaluatorId;
private final ClassHierarchy clrClassHierarchy;
- public ActiveContextBridge(ActiveContext activeContext) {
- jactiveContext = activeContext;
- serializer = new AvroConfigurationSerializer();
- contextId = activeContext.getId();
- evaluatorId = activeContext.getEvaluatorId();
- clrClassHierarchy = Utilities.loadClassHierarchy(NativeInterop.CLASS_HIERARCHY_FILENAME);
+ ActiveContextBridge(final ActiveContext activeContext,
+ final ClassHierarchy clrClassHierarchy,
+ final AvroConfigurationSerializer serializer) {
+ this.jactiveContext = activeContext;
+ this.clrClassHierarchy = clrClassHierarchy;
+ this.serializer = serializer;
+ this.contextId = activeContext.getId();
+ this.evaluatorId = activeContext.getEvaluatorId();
}
public void submitTaskString(final String taskConfigurationString) {
@@ -49,7 +51,7 @@ public class ActiveContextBridge extends NativeBridge implements Identifiable {
throw new RuntimeException("empty taskConfigurationString provided.");
}
- Configuration taskConfiguration;
+ final Configuration taskConfiguration;
try {
taskConfiguration = serializer.fromString(taskConfigurationString, clrClassHierarchy);
} catch (final Exception e) {
@@ -73,6 +75,6 @@ public class ActiveContextBridge extends NativeBridge implements Identifiable {
@Override
public String getId() {
- return contextId;
+ return jactiveContext.getId();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridgeFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridgeFactory.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridgeFactory.java
new file mode 100644
index 0000000..bce91cc
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridgeFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import net.jcip.annotations.ThreadSafe;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.tang.ClassHierarchy;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+
+import javax.inject.Inject;
+import java.util.logging.Logger;
+
+/**
+ * Factory for ActiveContextBridge instances.
+ */
+@DriverSide
+@ThreadSafe
+@Private
+public final class ActiveContextBridgeFactory {
+ private final ClassHierarchy clrClassHierarchy;
+ private final AvroConfigurationSerializer configurationSerializer;
+
+ /**
+ * This is always instantiated via Tang.
+ *
+ * @param configurationSerializer passed to the ActiveContextBridge instances for configuration serialization.
+ */
+ @Inject
+ private ActiveContextBridgeFactory(final AvroConfigurationSerializer configurationSerializer) {
+ this.configurationSerializer = configurationSerializer;
+ this.clrClassHierarchy = Utilities.loadClassHierarchy(NativeInterop.CLASS_HIERARCHY_FILENAME);
+ }
+
+ /**
+ * Instantiates a new ActiveContextBridge.
+ *
+ * @param context the context for which to return an ActiveContextBridge instance.
+ * @return a new ActiveContextBridge.
+ */
+ public ActiveContextBridge getActiveContextBridge(final ActiveContext context) {
+ return new ActiveContextBridge(context, this.clrClassHierarchy, this.configurationSerializer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java
index 62f9ce7..65a7c8a 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -36,9 +36,9 @@ public class ClosedContextBridge extends NativeBridge implements ClosedContext {
private final String evaluatorId;
private final EvaluatorDescriptor evaluatorDescriptor;
- public ClosedContextBridge(final ClosedContext closedContext) {
+ public ClosedContextBridge(final ClosedContext closedContext, final ActiveContextBridgeFactory activeContextBridgeFactory) {
jcloseContext = closedContext;
- parentContext = new ActiveContextBridge(closedContext.getParentContext());
+ parentContext = activeContextBridgeFactory.getActiveContextBridge(closedContext.getParentContext());
contextId = closedContext.getId();
evaluatorId = closedContext.getEvaluatorId();
evaluatorDescriptor = closedContext.getEvaluatorDescriptor();
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java
index c95ca14..0c993a9 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,16 +22,17 @@ import org.apache.reef.driver.task.CompletedTask;
public class CompletedTaskBridge extends NativeBridge {
- private CompletedTask jcompletedTask;
+ private final CompletedTask jcompletedTask;
- private String taskId;
+ private final String taskId;
- private ActiveContextBridge jactiveContext;
+ // used by the C++ code
+ private final ActiveContextBridge jactiveContext;
- public CompletedTaskBridge(CompletedTask completedTask) {
+ public CompletedTaskBridge(final CompletedTask completedTask, final ActiveContextBridgeFactory factory) {
jcompletedTask = completedTask;
taskId = completedTask.getId();
- jactiveContext = new ActiveContextBridge(completedTask.getActiveContext());
+ jactiveContext = factory.getActiveContextBridge(completedTask.getActiveContext());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java
index dfed7f7..331040a 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,6 +18,7 @@
*/
package org.apache.reef.javabridge;
+import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.context.ContextBase;
import org.apache.reef.driver.context.FailedContext;
import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
@@ -26,7 +27,7 @@ import org.apache.reef.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;
-public class FailedContextBridge extends NativeBridge implements ContextBase {
+public final class FailedContextBridge extends NativeBridge implements ContextBase {
private static final Logger LOG = Logger.getLogger(FailedContextBridge.class.getName());
@@ -37,14 +38,20 @@ public class FailedContextBridge extends NativeBridge implements ContextBase {
private final String parentContextId;
private final FailedContext jfailedContext;
- public FailedContextBridge(final FailedContext failedContext) {
+ public FailedContextBridge(final FailedContext failedContext, final ActiveContextBridgeFactory factory) {
jfailedContext = failedContext;
evaluatorDescriptor = failedContext.getEvaluatorDescriptor();
evaluatorId = failedContext.getEvaluatorId();
contextId = failedContext.getId();
- parentContext = failedContext.getParentContext().isPresent() ?
- new ActiveContextBridge(failedContext.getParentContext().get()) : null;
- parentContextId = parentContext != null ? parentContext.getId() : null;
+ if (failedContext.getParentContext().isPresent()) {
+ final ActiveContext parent = failedContext.getParentContext().get();
+ this.parentContextId = parent.getId();
+ this.parentContext = factory.getActiveContextBridge(parent);
+ } else {
+ this.parentContextId = null;
+ this.parentContext = null;
+ }
+
}
@Override
@@ -63,11 +70,7 @@ public class FailedContextBridge extends NativeBridge implements ContextBase {
@Override
public Optional<String> getParentId() {
- if (parentContextId != null) {
- return Optional.of(parentContextId);
- } else {
- return Optional.empty();
- }
+ return Optional.ofNullable(this.parentContextId);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
index 30383ca..4d58b06 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,23 +18,24 @@
*/
package org.apache.reef.javabridge;
-import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.task.FailedTask;
-import org.apache.reef.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;
-public class FailedTaskBridge extends NativeBridge {
+public final class FailedTaskBridge extends NativeBridge {
private static final Logger LOG = Logger.getLogger(FailedTaskBridge.class.getName());
private FailedTask jfailedTask;
private ActiveContextBridge jactiveContext;
- public FailedTaskBridge(FailedTask failedTask) {
- jfailedTask = failedTask;
- Optional<ActiveContext> activeContext = failedTask.getActiveContext();
- jactiveContext = activeContext.isPresent() ? new ActiveContextBridge(activeContext.get()) : null;
+ public FailedTaskBridge(final FailedTask failedTask, final ActiveContextBridgeFactory factory) {
+ this.jfailedTask = failedTask;
+ if (failedTask.getActiveContext().isPresent()) {
+ this.jactiveContext = factory.getActiveContextBridge(failedTask.getActiveContext().get());
+ } else {
+ this.jactiveContext = null;
+ }
}
public String getFailedTaskString() {
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
index 301c4fc..25b2a78 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,21 +18,19 @@
*/
package org.apache.reef.javabridge;
-import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.task.RunningTask;
import java.util.logging.Logger;
-public class RunningTaskBridge extends NativeBridge {
+public final class RunningTaskBridge extends NativeBridge {
private static final Logger LOG = Logger.getLogger(RunningTaskBridge.class.getName());
final private RunningTask jrunningTask;
final private ActiveContextBridge jactiveContext;
- public RunningTaskBridge(RunningTask runningTask) {
- jrunningTask = runningTask;
- final ActiveContext activeContext = runningTask.getActiveContext();
- jactiveContext = new ActiveContextBridge(activeContext);
+ public RunningTaskBridge(final RunningTask runningTask, final ActiveContextBridgeFactory factory) {
+ this.jrunningTask = runningTask;
+ this.jactiveContext = factory.getActiveContextBridge(runningTask.getActiveContext());
}
public final String getId() {
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java
index 16fa3d3..27b4a11 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,16 +22,16 @@ import org.apache.reef.driver.task.SuspendedTask;
import org.apache.reef.io.Message;
import org.apache.reef.io.naming.Identifiable;
-public class SuspendedTaskBridge extends NativeBridge implements Identifiable, Message {
+public final class SuspendedTaskBridge extends NativeBridge implements Identifiable, Message {
private final SuspendedTask jsuspendedTask;
private final String taskId;
private final ActiveContextBridge jactiveContext;
- public SuspendedTaskBridge(SuspendedTask suspendedTask) {
+ public SuspendedTaskBridge(final SuspendedTask suspendedTask, final ActiveContextBridgeFactory activeContextBridgeFactory) {
jsuspendedTask = suspendedTask;
taskId = suspendedTask.getId();
- jactiveContext = new ActiveContextBridge(jsuspendedTask.getActiveContext());
+ jactiveContext = activeContextBridgeFactory.getActiveContextBridge(jsuspendedTask.getActiveContext());
}
public ActiveContextBridge getActiveContext() {
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
index 44077f0..69d8071 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
@@ -71,6 +71,7 @@ public final class JobDriver {
private final NameServer nameServer;
private final String nameServerInfo;
private final HttpServer httpServer;
+ private final ActiveContextBridgeFactory activeContextBridgeFactory;
/**
* Wake clock is used to schedule periodical job check-ups.
*/
@@ -92,7 +93,7 @@ public final class JobDriver {
private final DriverStatusManager driverStatusManager;
/**
- * NativeInterop has function to load libs when driver starts
+ * NativeInterop has function to load libs when driver starts
*/
private final LibLoader libLoader;
@@ -134,9 +135,10 @@ public final class JobDriver {
* Job driver constructor.
* All parameters are injected from TANG automatically.
*
- * @param clock Wake clock to schedule and check up running jobs.
- * @param jobMessageObserver is used to send messages back to the client.
- * @param evaluatorRequestor is used to request Evaluators.
+ * @param clock Wake clock to schedule and check up running jobs.
+ * @param jobMessageObserver is used to send messages back to the client.
+ * @param evaluatorRequestor is used to request Evaluators.
+ * @param activeContextBridgeFactory
*/
@Inject
JobDriver(final Clock clock,
@@ -147,13 +149,15 @@ public final class JobDriver {
final DriverStatusManager driverStatusManager,
final LoggingScopeFactory loggingScopeFactory,
final LibLoader libLoader,
- final LocalAddressProvider localAddressProvider) {
+ final LocalAddressProvider localAddressProvider,
+ final ActiveContextBridgeFactory activeContextBridgeFactory) {
this.clock = clock;
this.httpServer = httpServer;
this.jobMessageObserver = jobMessageObserver;
this.evaluatorRequestor = evaluatorRequestor;
this.nameServer = nameServer;
this.driverStatusManager = driverStatusManager;
+ this.activeContextBridgeFactory = activeContextBridgeFactory;
this.nameServerInfo = localAddressProvider.getLocalAddress() + ":" + this.nameServer.getPort();
this.loggingScopeFactory = loggingScopeFactory;
this.libLoader = libLoader;
@@ -257,7 +261,7 @@ public final class JobDriver {
if (JobDriver.this.activeContextHandler == 0) {
throw new RuntimeException("Active Context Handler not initialized by CLR.");
}
- ActiveContextBridge activeContextBridge = new ActiveContextBridge(context);
+ ActiveContextBridge activeContextBridge = activeContextBridgeFactory.getActiveContextBridge(context);
NativeInterop.ClrSystemActiveContextHandlerOnNext(JobDriver.this.activeContextHandler, activeContextBridge, JobDriver.this.interopLogger);
} catch (final Exception ex) {
LOG.log(Level.SEVERE, "Fail to submit task to active context");
@@ -319,7 +323,7 @@ public final class JobDriver {
LOG.log(Level.INFO, "No CLR handler bound to handle completed task.");
} else {
LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler.");
- CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task);
+ CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task, activeContextBridgeFactory);
NativeInterop.ClrSystemCompletedTaskHandlerOnNext(JobDriver.this.completedTaskHandler, completedTaskBridge, JobDriver.this.interopLogger);
}
}
@@ -438,7 +442,7 @@ public final class JobDriver {
throw new RuntimeException("Failed Task Handler not initialized by CLR.");
}
try {
- FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task);
+ FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task, activeContextBridgeFactory);
NativeInterop.ClrSystemFailedTaskHandlerOnNext(JobDriver.this.failedTaskHandler, failedTaskBridge, JobDriver.this.interopLogger);
} catch (final Exception ex) {
LOG.log(Level.SEVERE, "Fail to invoke CLR failed task handler");
@@ -459,7 +463,7 @@ public final class JobDriver {
} else {
LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId());
try {
- final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task);
+ final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task, activeContextBridgeFactory);
NativeInterop.ClrSystemRunningTaskHandlerOnNext(JobDriver.this.runningTaskHandler, runningTaskBridge, JobDriver.this.interopLogger);
} catch (final Exception ex) {
LOG.log(Level.WARNING, "Fail to invoke CLR running task handler");
@@ -483,7 +487,7 @@ public final class JobDriver {
if (JobDriver.this.clrBridgeSetup) {
if (JobDriver.this.driverRestartRunningTaskHandler != 0) {
LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR.");
- NativeInterop.ClrSystemDriverRestartRunningTaskHandlerOnNext(JobDriver.this.driverRestartRunningTaskHandler, new RunningTaskBridge(task));
+ NativeInterop.ClrSystemDriverRestartRunningTaskHandlerOnNext(JobDriver.this.driverRestartRunningTaskHandler, new RunningTaskBridge(task, activeContextBridgeFactory));
} else {
LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, done with DriverRestartRunningTaskHandler.");
}
@@ -512,7 +516,7 @@ public final class JobDriver {
if (JobDriver.this.clrBridgeSetup) {
if (JobDriver.this.driverRestartActiveContextHandler != 0) {
LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR.");
- NativeInterop.ClrSystemDriverRestartActiveContextHandlerOnNext(JobDriver.this.driverRestartActiveContextHandler, new ActiveContextBridge(context));
+ NativeInterop.ClrSystemDriverRestartActiveContextHandlerOnNext(JobDriver.this.driverRestartActiveContextHandler, activeContextBridgeFactory.getActiveContextBridge(context));
} else {
LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, done with DriverRestartActiveContextHandler.");
}
@@ -630,7 +634,7 @@ public final class JobDriver {
LOG.log(Level.INFO, message);
try (final LoggingScope ls = loggingScopeFactory.taskSuspended(task.getId())) {
if (JobDriver.this.suspendedTaskHandler != 0) {
- SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task);
+ SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task, activeContextBridgeFactory);
// if CLR implements the suspended task handler, handle it in CLR
LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge.");
NativeInterop.ClrSystemSupendedTaskHandlerOnNext(JobDriver.this.suspendedTaskHandler, suspendedTaskBridge);
@@ -669,7 +673,7 @@ public final class JobDriver {
LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
try (final LoggingScope ls = loggingScopeFactory.closedContext(context.getId())) {
if (JobDriver.this.closedContextHandler != 0) {
- ClosedContextBridge closedContextBridge = new ClosedContextBridge(context);
+ ClosedContextBridge closedContextBridge = new ClosedContextBridge(context, activeContextBridgeFactory);
// if CLR implements the closed context handler, handle it in CLR
LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge.");
NativeInterop.ClrSystemClosedContextHandlerOnNext(JobDriver.this.closedContextHandler, closedContextBridge);
@@ -692,7 +696,7 @@ public final class JobDriver {
LOG.log(Level.SEVERE, "FailedContext", context);
try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(context.getId())) {
if (JobDriver.this.failedContextHandler != 0) {
- FailedContextBridge failedContextBridge = new FailedContextBridge(context);
+ FailedContextBridge failedContextBridge = new FailedContextBridge(context, activeContextBridgeFactory);
// if CLR implements the failed context handler, handle it in CLR
LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge.");
NativeInterop.ClrSystemFailedContextHandlerOnNext(JobDriver.this.failedContextHandler, failedContextBridge);