You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ju...@apache.org on 2015/05/13 20:05:45 UTC

incubator-reef git commit: [REEF-310] Improve memory efficiency of the Driver bridge.

Repository: incubator-reef
Updated Branches:
  refs/heads/master 16f4e0737 -> c1b7b0528


[REEF-310] Improve memory efficiency of the Driver bridge.

This change introduces the `ActiveContextBridgeFactory` to `ActiveContextBridge` instances:

  * Removed unused instances of `ActiveContextBridge`.
  * Replaced the remaining constructor calls with calls to
    `ActiveContextBridgeFactory.getActiveContextBridge`.

JIRA:
  [REEF-310](https://issues.apache.org/jira/browse/REEF-310)

  This closes #177


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/c1b7b052
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/c1b7b052
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/c1b7b052

Branch: refs/heads/master
Commit: c1b7b0528cb00ce3c1a9b020efdf884a90f10d0d
Parents: 16f4e07
Author: Markus Weimer <we...@apache.org>
Authored: Wed May 6 12:34:32 2015 -0700
Committer: Julia Wang <jw...@yahoo.com>
Committed: Wed May 13 11:03:04 2015 -0700

----------------------------------------------------------------------
 .../reef/javabridge/ActiveContextBridge.java    | 22 +++----
 .../javabridge/ActiveContextBridgeFactory.java  | 61 ++++++++++++++++++++
 .../reef/javabridge/ClosedContextBridge.java    |  6 +-
 .../reef/javabridge/CompletedTaskBridge.java    | 13 +++--
 .../reef/javabridge/FailedContextBridge.java    | 25 ++++----
 .../reef/javabridge/FailedTaskBridge.java       | 17 +++---
 .../reef/javabridge/RunningTaskBridge.java      | 12 ++--
 .../reef/javabridge/SuspendedTaskBridge.java    |  8 +--
 .../reef/javabridge/generic/JobDriver.java      | 32 +++++-----
 9 files changed, 133 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
index b374e73..47df761 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -27,7 +27,7 @@ import org.apache.reef.tang.formats.AvroConfigurationSerializer;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-public class ActiveContextBridge extends NativeBridge implements Identifiable {
+public final class ActiveContextBridge extends NativeBridge implements Identifiable {
   private static final Logger LOG = Logger.getLogger(ActiveContextBridge.class.getName());
 
   private final ActiveContext jactiveContext;
@@ -36,12 +36,14 @@ public class ActiveContextBridge extends NativeBridge implements Identifiable {
   private final String evaluatorId;
   private final ClassHierarchy clrClassHierarchy;
 
-  public ActiveContextBridge(ActiveContext activeContext) {
-    jactiveContext = activeContext;
-    serializer = new AvroConfigurationSerializer();
-    contextId = activeContext.getId();
-    evaluatorId = activeContext.getEvaluatorId();
-    clrClassHierarchy = Utilities.loadClassHierarchy(NativeInterop.CLASS_HIERARCHY_FILENAME);
+  ActiveContextBridge(final ActiveContext activeContext,
+                      final ClassHierarchy clrClassHierarchy,
+                      final AvroConfigurationSerializer serializer) {
+    this.jactiveContext = activeContext;
+    this.clrClassHierarchy = clrClassHierarchy;
+    this.serializer = serializer;
+    this.contextId = activeContext.getId();
+    this.evaluatorId = activeContext.getEvaluatorId();
   }
 
   public void submitTaskString(final String taskConfigurationString) {
@@ -49,7 +51,7 @@ public class ActiveContextBridge extends NativeBridge implements Identifiable {
       throw new RuntimeException("empty taskConfigurationString provided.");
     }
 
-    Configuration taskConfiguration;
+    final Configuration taskConfiguration;
     try {
       taskConfiguration = serializer.fromString(taskConfigurationString, clrClassHierarchy);
     } catch (final Exception e) {
@@ -73,6 +75,6 @@ public class ActiveContextBridge extends NativeBridge implements Identifiable {
 
   @Override
   public String getId() {
-    return contextId;
+    return jactiveContext.getId();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridgeFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridgeFactory.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridgeFactory.java
new file mode 100644
index 0000000..bce91cc
--- /dev/null
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridgeFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.javabridge;
+
+import net.jcip.annotations.ThreadSafe;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.tang.ClassHierarchy;
+import org.apache.reef.tang.formats.AvroConfigurationSerializer;
+
+import javax.inject.Inject;
+import java.util.logging.Logger;
+
+/**
+ * Factory for ActiveContextBridge instances.
+ */
+@DriverSide
+@ThreadSafe
+@Private
+public final class ActiveContextBridgeFactory {
+  private final ClassHierarchy clrClassHierarchy;
+  private final AvroConfigurationSerializer configurationSerializer;
+
+  /**
+   * This is always instantiated via Tang.
+   *
+   * @param configurationSerializer passed to the ActiveContextBridge instances for configuration serialization.
+   */
+  @Inject
+  private ActiveContextBridgeFactory(final AvroConfigurationSerializer configurationSerializer) {
+    this.configurationSerializer = configurationSerializer;
+    this.clrClassHierarchy = Utilities.loadClassHierarchy(NativeInterop.CLASS_HIERARCHY_FILENAME);
+  }
+
+  /**
+   * Instantiates a new ActiveContextBridge.
+   *
+   * @param context the context for which to return an ActiveContextBridge instance.
+   * @return a new ActiveContextBridge.
+   */
+  public ActiveContextBridge getActiveContextBridge(final ActiveContext context) {
+    return new ActiveContextBridge(context, this.clrClassHierarchy, this.configurationSerializer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java
index 62f9ce7..65a7c8a 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ClosedContextBridge.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -36,9 +36,9 @@ public class ClosedContextBridge extends NativeBridge implements ClosedContext {
   private final String evaluatorId;
   private final EvaluatorDescriptor evaluatorDescriptor;
 
-  public ClosedContextBridge(final ClosedContext closedContext) {
+  public ClosedContextBridge(final ClosedContext closedContext, final ActiveContextBridgeFactory activeContextBridgeFactory) {
     jcloseContext = closedContext;
-    parentContext = new ActiveContextBridge(closedContext.getParentContext());
+    parentContext = activeContextBridgeFactory.getActiveContextBridge(closedContext.getParentContext());
     contextId = closedContext.getId();
     evaluatorId = closedContext.getEvaluatorId();
     evaluatorDescriptor = closedContext.getEvaluatorDescriptor();

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java
index c95ca14..0c993a9 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/CompletedTaskBridge.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,16 +22,17 @@ import org.apache.reef.driver.task.CompletedTask;
 
 public class CompletedTaskBridge extends NativeBridge {
 
-  private CompletedTask jcompletedTask;
+  private final CompletedTask jcompletedTask;
 
-  private String taskId;
+  private final String taskId;
 
-  private ActiveContextBridge jactiveContext;
+  // used by the C++ code
+  private final ActiveContextBridge jactiveContext;
 
-  public CompletedTaskBridge(CompletedTask completedTask) {
+  public CompletedTaskBridge(final CompletedTask completedTask, final ActiveContextBridgeFactory factory) {
     jcompletedTask = completedTask;
     taskId = completedTask.getId();
-    jactiveContext = new ActiveContextBridge(completedTask.getActiveContext());
+    jactiveContext = factory.getActiveContextBridge(completedTask.getActiveContext());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java
index dfed7f7..331040a 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedContextBridge.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,6 +18,7 @@
  */
 package org.apache.reef.javabridge;
 
+import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.driver.context.ContextBase;
 import org.apache.reef.driver.context.FailedContext;
 import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
@@ -26,7 +27,7 @@ import org.apache.reef.util.Optional;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-public class FailedContextBridge extends NativeBridge implements ContextBase {
+public final class FailedContextBridge extends NativeBridge implements ContextBase {
 
   private static final Logger LOG = Logger.getLogger(FailedContextBridge.class.getName());
 
@@ -37,14 +38,20 @@ public class FailedContextBridge extends NativeBridge implements ContextBase {
   private final String parentContextId;
   private final FailedContext jfailedContext;
 
-  public FailedContextBridge(final FailedContext failedContext) {
+  public FailedContextBridge(final FailedContext failedContext, final ActiveContextBridgeFactory factory) {
     jfailedContext = failedContext;
     evaluatorDescriptor = failedContext.getEvaluatorDescriptor();
     evaluatorId = failedContext.getEvaluatorId();
     contextId = failedContext.getId();
-    parentContext = failedContext.getParentContext().isPresent() ?
-        new ActiveContextBridge(failedContext.getParentContext().get()) : null;
-    parentContextId = parentContext != null ? parentContext.getId() : null;
+    if (failedContext.getParentContext().isPresent()) {
+      final ActiveContext parent = failedContext.getParentContext().get();
+      this.parentContextId = parent.getId();
+      this.parentContext = factory.getActiveContextBridge(parent);
+    } else {
+      this.parentContextId = null;
+      this.parentContext = null;
+    }
+
   }
 
   @Override
@@ -63,11 +70,7 @@ public class FailedContextBridge extends NativeBridge implements ContextBase {
 
   @Override
   public Optional<String> getParentId() {
-    if (parentContextId != null) {
-      return Optional.of(parentContextId);
-    } else {
-      return Optional.empty();
-    }
+    return Optional.ofNullable(this.parentContextId);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
index 30383ca..4d58b06 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/FailedTaskBridge.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,23 +18,24 @@
  */
 package org.apache.reef.javabridge;
 
-import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.driver.task.FailedTask;
-import org.apache.reef.util.Optional;
 
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-public class FailedTaskBridge extends NativeBridge {
+public final class FailedTaskBridge extends NativeBridge {
   private static final Logger LOG = Logger.getLogger(FailedTaskBridge.class.getName());
 
   private FailedTask jfailedTask;
   private ActiveContextBridge jactiveContext;
 
-  public FailedTaskBridge(FailedTask failedTask) {
-    jfailedTask = failedTask;
-    Optional<ActiveContext> activeContext = failedTask.getActiveContext();
-    jactiveContext = activeContext.isPresent() ? new ActiveContextBridge(activeContext.get()) : null;
+  public FailedTaskBridge(final FailedTask failedTask, final ActiveContextBridgeFactory factory) {
+    this.jfailedTask = failedTask;
+    if (failedTask.getActiveContext().isPresent()) {
+      this.jactiveContext = factory.getActiveContextBridge(failedTask.getActiveContext().get());
+    } else {
+      this.jactiveContext = null;
+    }
   }
 
   public String getFailedTaskString() {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
index 301c4fc..25b2a78 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/RunningTaskBridge.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,21 +18,19 @@
  */
 package org.apache.reef.javabridge;
 
-import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.driver.task.RunningTask;
 
 import java.util.logging.Logger;
 
-public class RunningTaskBridge extends NativeBridge {
+public final class RunningTaskBridge extends NativeBridge {
   private static final Logger LOG = Logger.getLogger(RunningTaskBridge.class.getName());
 
   final private RunningTask jrunningTask;
   final private ActiveContextBridge jactiveContext;
 
-  public RunningTaskBridge(RunningTask runningTask) {
-    jrunningTask = runningTask;
-    final ActiveContext activeContext = runningTask.getActiveContext();
-    jactiveContext = new ActiveContextBridge(activeContext);
+  public RunningTaskBridge(final RunningTask runningTask, final ActiveContextBridgeFactory factory) {
+    this.jrunningTask = runningTask;
+    this.jactiveContext = factory.getActiveContextBridge(runningTask.getActiveContext());
   }
 
   public final String getId() {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java
index 16fa3d3..27b4a11 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/SuspendedTaskBridge.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,16 +22,16 @@ import org.apache.reef.driver.task.SuspendedTask;
 import org.apache.reef.io.Message;
 import org.apache.reef.io.naming.Identifiable;
 
-public class SuspendedTaskBridge extends NativeBridge implements Identifiable, Message {
+public final class SuspendedTaskBridge extends NativeBridge implements Identifiable, Message {
 
   private final SuspendedTask jsuspendedTask;
   private final String taskId;
   private final ActiveContextBridge jactiveContext;
 
-  public SuspendedTaskBridge(SuspendedTask suspendedTask) {
+  public SuspendedTaskBridge(final SuspendedTask suspendedTask, final ActiveContextBridgeFactory activeContextBridgeFactory) {
     jsuspendedTask = suspendedTask;
     taskId = suspendedTask.getId();
-    jactiveContext = new ActiveContextBridge(jsuspendedTask.getActiveContext());
+    jactiveContext = activeContextBridgeFactory.getActiveContextBridge(jsuspendedTask.getActiveContext());
   }
 
   public ActiveContextBridge getActiveContext() {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b7b052/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
index 44077f0..69d8071 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java
@@ -71,6 +71,7 @@ public final class JobDriver {
   private final NameServer nameServer;
   private final String nameServerInfo;
   private final HttpServer httpServer;
+  private final ActiveContextBridgeFactory activeContextBridgeFactory;
   /**
    * Wake clock is used to schedule periodical job check-ups.
    */
@@ -92,7 +93,7 @@ public final class JobDriver {
   private final DriverStatusManager driverStatusManager;
 
   /**
-   *  NativeInterop has function to load libs when driver starts
+   * NativeInterop has function to load libs when driver starts
    */
   private final LibLoader libLoader;
 
@@ -134,9 +135,10 @@ public final class JobDriver {
    * Job driver constructor.
    * All parameters are injected from TANG automatically.
    *
-   * @param clock              Wake clock to schedule and check up running jobs.
-   * @param jobMessageObserver is used to send messages back to the client.
-   * @param evaluatorRequestor is used to request Evaluators.
+   * @param clock                      Wake clock to schedule and check up running jobs.
+   * @param jobMessageObserver         is used to send messages back to the client.
+   * @param evaluatorRequestor         is used to request Evaluators.
+   * @param activeContextBridgeFactory
    */
   @Inject
   JobDriver(final Clock clock,
@@ -147,13 +149,15 @@ public final class JobDriver {
             final DriverStatusManager driverStatusManager,
             final LoggingScopeFactory loggingScopeFactory,
             final LibLoader libLoader,
-            final LocalAddressProvider localAddressProvider) {
+            final LocalAddressProvider localAddressProvider,
+            final ActiveContextBridgeFactory activeContextBridgeFactory) {
     this.clock = clock;
     this.httpServer = httpServer;
     this.jobMessageObserver = jobMessageObserver;
     this.evaluatorRequestor = evaluatorRequestor;
     this.nameServer = nameServer;
     this.driverStatusManager = driverStatusManager;
+    this.activeContextBridgeFactory = activeContextBridgeFactory;
     this.nameServerInfo = localAddressProvider.getLocalAddress() + ":" + this.nameServer.getPort();
     this.loggingScopeFactory = loggingScopeFactory;
     this.libLoader = libLoader;
@@ -257,7 +261,7 @@ public final class JobDriver {
       if (JobDriver.this.activeContextHandler == 0) {
         throw new RuntimeException("Active Context Handler not initialized by CLR.");
       }
-      ActiveContextBridge activeContextBridge = new ActiveContextBridge(context);
+      ActiveContextBridge activeContextBridge = activeContextBridgeFactory.getActiveContextBridge(context);
       NativeInterop.ClrSystemActiveContextHandlerOnNext(JobDriver.this.activeContextHandler, activeContextBridge, JobDriver.this.interopLogger);
     } catch (final Exception ex) {
       LOG.log(Level.SEVERE, "Fail to submit task to active context");
@@ -319,7 +323,7 @@ public final class JobDriver {
           LOG.log(Level.INFO, "No CLR handler bound to handle completed task.");
         } else {
           LOG.log(Level.INFO, "CLR CompletedTaskHandler handler set, handling things with CLR handler.");
-          CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task);
+          CompletedTaskBridge completedTaskBridge = new CompletedTaskBridge(task, activeContextBridgeFactory);
           NativeInterop.ClrSystemCompletedTaskHandlerOnNext(JobDriver.this.completedTaskHandler, completedTaskBridge, JobDriver.this.interopLogger);
         }
       }
@@ -438,7 +442,7 @@ public final class JobDriver {
         throw new RuntimeException("Failed Task Handler not initialized by CLR.");
       }
       try {
-        FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task);
+        FailedTaskBridge failedTaskBridge = new FailedTaskBridge(task, activeContextBridgeFactory);
         NativeInterop.ClrSystemFailedTaskHandlerOnNext(JobDriver.this.failedTaskHandler, failedTaskBridge, JobDriver.this.interopLogger);
       } catch (final Exception ex) {
         LOG.log(Level.SEVERE, "Fail to invoke CLR failed task handler");
@@ -459,7 +463,7 @@ public final class JobDriver {
         } else {
           LOG.log(Level.INFO, "RunningTask will be handled by CLR handler. Task Id: {0}", task.getId());
           try {
-            final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task);
+            final RunningTaskBridge runningTaskBridge = new RunningTaskBridge(task, activeContextBridgeFactory);
             NativeInterop.ClrSystemRunningTaskHandlerOnNext(JobDriver.this.runningTaskHandler, runningTaskBridge, JobDriver.this.interopLogger);
           } catch (final Exception ex) {
             LOG.log(Level.WARNING, "Fail to invoke CLR running task handler");
@@ -483,7 +487,7 @@ public final class JobDriver {
             if (JobDriver.this.clrBridgeSetup) {
               if (JobDriver.this.driverRestartRunningTaskHandler != 0) {
                 LOG.log(Level.INFO, "CLR driver restart RunningTask handler implemented, now handle it in CLR.");
-                NativeInterop.ClrSystemDriverRestartRunningTaskHandlerOnNext(JobDriver.this.driverRestartRunningTaskHandler, new RunningTaskBridge(task));
+                NativeInterop.ClrSystemDriverRestartRunningTaskHandlerOnNext(JobDriver.this.driverRestartRunningTaskHandler, new RunningTaskBridge(task, activeContextBridgeFactory));
               } else {
                 LOG.log(Level.WARNING, "No CLR driver restart RunningTask handler implemented, done with DriverRestartRunningTaskHandler.");
               }
@@ -512,7 +516,7 @@ public final class JobDriver {
             if (JobDriver.this.clrBridgeSetup) {
               if (JobDriver.this.driverRestartActiveContextHandler != 0) {
                 LOG.log(Level.INFO, "CLR driver restart ActiveContext handler implemented, now handle it in CLR.");
-                NativeInterop.ClrSystemDriverRestartActiveContextHandlerOnNext(JobDriver.this.driverRestartActiveContextHandler, new ActiveContextBridge(context));
+                NativeInterop.ClrSystemDriverRestartActiveContextHandlerOnNext(JobDriver.this.driverRestartActiveContextHandler, activeContextBridgeFactory.getActiveContextBridge(context));
               } else {
                 LOG.log(Level.WARNING, "No CLR driver restart ActiveContext handler implemented, done with DriverRestartActiveContextHandler.");
               }
@@ -630,7 +634,7 @@ public final class JobDriver {
       LOG.log(Level.INFO, message);
       try (final LoggingScope ls = loggingScopeFactory.taskSuspended(task.getId())) {
         if (JobDriver.this.suspendedTaskHandler != 0) {
-          SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task);
+          SuspendedTaskBridge suspendedTaskBridge = new SuspendedTaskBridge(task, activeContextBridgeFactory);
           // if CLR implements the suspended task handler, handle it in CLR
           LOG.log(Level.INFO, "Handling the event of suspended task in CLR bridge.");
           NativeInterop.ClrSystemSupendedTaskHandlerOnNext(JobDriver.this.suspendedTaskHandler, suspendedTaskBridge);
@@ -669,7 +673,7 @@ public final class JobDriver {
       LOG.log(Level.INFO, "Completed Context: {0}", context.getId());
       try (final LoggingScope ls = loggingScopeFactory.closedContext(context.getId())) {
         if (JobDriver.this.closedContextHandler != 0) {
-          ClosedContextBridge closedContextBridge = new ClosedContextBridge(context);
+          ClosedContextBridge closedContextBridge = new ClosedContextBridge(context, activeContextBridgeFactory);
           // if CLR implements the closed context handler, handle it in CLR
           LOG.log(Level.INFO, "Handling the event of closed context in CLR bridge.");
           NativeInterop.ClrSystemClosedContextHandlerOnNext(JobDriver.this.closedContextHandler, closedContextBridge);
@@ -692,7 +696,7 @@ public final class JobDriver {
       LOG.log(Level.SEVERE, "FailedContext", context);
       try (final LoggingScope ls = loggingScopeFactory.evaluatorFailed(context.getId())) {
         if (JobDriver.this.failedContextHandler != 0) {
-          FailedContextBridge failedContextBridge = new FailedContextBridge(context);
+          FailedContextBridge failedContextBridge = new FailedContextBridge(context, activeContextBridgeFactory);
           // if CLR implements the failed context handler, handle it in CLR
           LOG.log(Level.INFO, "Handling the event of failed context in CLR bridge.");
           NativeInterop.ClrSystemFailedContextHandlerOnNext(JobDriver.this.failedContextHandler, failedContextBridge);