You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by rb...@apache.org on 2017/02/23 22:30:28 UTC

hive git commit: HIVE-16020: LLAP : Reduce IPC connection misses (Rajesh Balamohan, Siddharth Seth, reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master 657236ee0 -> b8d7192f5


HIVE-16020: LLAP : Reduce IPC connection misses (Rajesh Balamohan, Siddharth Seth, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b8d7192f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b8d7192f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b8d7192f

Branch: refs/heads/master
Commit: b8d7192f5f28dbc832d4de3e4afc763523a4bf12
Parents: 657236e
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Fri Feb 24 04:00:20 2017 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Fri Feb 24 04:00:20 2017 +0530

----------------------------------------------------------------------
 .../hive/llap/daemon/impl/AMReporter.java       |  5 ++--
 .../llap/daemon/impl/ContainerRunnerImpl.java   | 16 ++++++----
 .../hive/llap/daemon/impl/LlapDaemon.java       |  8 +++--
 .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 29 ++++++++++++++++++
 .../hive/llap/daemon/impl/QueryTracker.java     |  4 ++-
 .../llap/daemon/impl/TaskRunnerCallable.java    | 31 +++++++++++---------
 .../daemon/impl/TaskExecutorTestHelpers.java    |  4 ++-
 7 files changed, 72 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b8d7192f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 93237e6..a30f8b9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -119,7 +119,8 @@ public class AMReporter extends AbstractService {
   private final DaemonId daemonId;
 
   public AMReporter(int numExecutors, int maxThreads, AtomicReference<InetSocketAddress>
-      localAddress, QueryFailedHandler queryFailedHandler, Configuration conf, DaemonId daemonId) {
+      localAddress, QueryFailedHandler queryFailedHandler, Configuration conf, DaemonId daemonId,
+      SocketFactory socketFactory) {
     super(AMReporter.class.getName());
     this.localAddress = localAddress;
     this.queryFailedHandler = queryFailedHandler;
@@ -151,7 +152,7 @@ public class AMReporter extends AbstractService {
         .retryUpToMaximumTimeWithFixedSleep(retryTimeout, retrySleep,
             TimeUnit.MILLISECONDS);
 
-    this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+    this.socketFactory = socketFactory;
 
     LOG.info("Setting up AMReporter with " +
         "heartbeatInterval(ms)=" + heartbeatInterval +

http://git-wip-us.apache.org/repos/asf/hive/blob/b8d7192f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 6908138..cc4eff0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -83,6 +83,8 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 
+import javax.net.SocketFactory;
+
 public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler, QueryFailedHandler {
 
   // TODO Setup a set of threads to process incoming requests.
@@ -107,12 +109,14 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
   private final String clusterId;
   private final DaemonId daemonId;
   private final UgiFactory fsUgiFactory;
+  private final SocketFactory socketFactory;
 
   public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize,
       boolean enablePreemption, String[] localDirsBase, AtomicReference<Integer> localShufflePort,
       AtomicReference<InetSocketAddress> localAddress,
       long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics,
-      AMReporter amReporter, ClassLoader classLoader, DaemonId daemonId, UgiFactory fsUgiFactory) {
+      AMReporter amReporter, ClassLoader classLoader, DaemonId daemonId, UgiFactory fsUgiFactory,
+      SocketFactory socketFactory) {
     super("ContainerRunnerImpl");
     Preconditions.checkState(numExecutors > 0,
         "Invalid number of executors: " + numExecutors + ". Must be > 0");
@@ -122,6 +126,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
     this.signer = UserGroupInformation.isSecurityEnabled()
         ? new LlapSignerImpl(conf, daemonId.getClusterString()) : null;
     this.fsUgiFactory = fsUgiFactory;
+    this.socketFactory = socketFactory;
 
     this.clusterId = daemonId.getClusterString();
     this.daemonId = daemonId;
@@ -239,7 +244,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
           queryIdentifier, qIdProto.getApplicationIdString(), dagId,
           vertex.getDagName(), vertex.getHiveQueryId(), dagIdentifier,
           vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(),
-          vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo);
+          vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, request.getAmHost(),
+          request.getAmPort());
 
       String[] localDirs = fragmentInfo.getLocalDirs();
       Preconditions.checkNotNull(localDirs);
@@ -250,12 +256,12 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
       // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream)
 
       Configuration callableConf = new Configuration(getConfig());
-      UserGroupInformation taskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi();
+      UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi();
       TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf,
           new ExecutionContextImpl(localAddress.get().getHostName()), env,
           credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
-          this, tezHadoopShim, attemptId, vertex, initialEvent, taskUgi,
-          completionListener);
+          this, tezHadoopShim, attemptId, vertex, initialEvent, fsTaskUgi,
+          completionListener, socketFactory);
       submissionState = executorService.schedule(callable);
 
       if (LOG.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b8d7192f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index fc9f530..eb05f4c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.management.ObjectName;
+import javax.net.SocketFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.JvmPauseMonitor;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge.UdfWhitelistChecker;
 import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ExitUtil;
@@ -105,6 +107,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
   private final long maxJvmMemory;
   private final String[] localDirs;
   private final DaemonId daemonId;
+  private final SocketFactory socketFactory;
 
   // TODO Not the best way to share the address
   private final AtomicReference<InetSocketAddress> srvAddress = new AtomicReference<>(),
@@ -255,8 +258,9 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
         " sessionId: " + sessionId);
 
     int maxAmReporterThreads = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_AM_REPORTER_MAX_THREADS);
+    this.socketFactory = NetUtils.getDefaultSocketFactory(daemonConf);
     this.amReporter = new AMReporter(numExecutors, maxAmReporterThreads, srvAddress,
-        new QueryFailedHandlerProxy(), daemonConf, daemonId);
+        new QueryFailedHandlerProxy(), daemonConf, daemonId, socketFactory);
 
     SecretManager sm = null;
     if (UserGroupInformation.isSecurityEnabled()) {
@@ -274,7 +278,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
     }
     this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, waitQueueSize,
         enablePreemption, localDirs, this.shufflePort, srvAddress, executorMemoryPerInstance, metrics,
-        amReporter, executorClassLoader, daemonId, fsUgiFactory);
+        amReporter, executorClassLoader, daemonId, fsUgiFactory, socketFactory);
     addIfService(containerRunner);
 
     // Not adding the registry as a service, since we need to control when it is initialized - conf used to pickup properties.

http://git-wip-us.apache.org/repos/asf/hive/blob/b8d7192f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 1080d3e..eaa3e7e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hive.llap.daemon.impl;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -25,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.base.Preconditions;
@@ -36,6 +38,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.security.JobTokenIdentifier;
 
 public class QueryInfo {
   private final QueryIdentifier queryIdentifier;
@@ -57,6 +64,7 @@ public class QueryInfo {
 
   private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker();
   private final String tokenUserName, appId;
+  private final AtomicReference<UserGroupInformation> umbilicalUgi;
 
   public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagIdString,
                    String dagName, String hiveQueryIdString,
@@ -76,6 +84,7 @@ public class QueryInfo {
     this.localFs = localFs;
     this.tokenUserName = tokenUserName;
     this.appId = tokenAppId;
+    this.umbilicalUgi = new AtomicReference<>();
   }
 
   public QueryIdentifier getQueryIdentifier() {
@@ -297,4 +306,24 @@ public class QueryInfo {
   public String getTokenAppId() {
     return appId;
   }
+
+  public void setupUmbilicalUgi(String umbilicalUser, Token<JobTokenIdentifier> appToken, String amHost, int amPort) {
+    synchronized (umbilicalUgi) {
+      if (umbilicalUgi.get() == null) {
+        UserGroupInformation taskOwner =
+            UserGroupInformation.createRemoteUser(umbilicalUser);
+        final InetSocketAddress address =
+            NetUtils.createSocketAddrForHost(amHost, amPort);
+        SecurityUtil.setTokenService(appToken, address);
+        taskOwner.addToken(appToken);
+        umbilicalUgi.set(taskOwner);
+      }
+    }
+  }
+
+  public UserGroupInformation getUmbilicalUgi() {
+    synchronized (umbilicalUgi) {
+      return umbilicalUgi.get();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b8d7192f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 9eaddd2..5cf3a38 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -139,7 +139,7 @@ public class QueryTracker extends AbstractService {
   QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagIdString,
       String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber,
       String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken,
-      String fragmentIdString, LlapTokenInfo tokenInfo) throws IOException {
+      String fragmentIdString, LlapTokenInfo tokenInfo, String amHost, int amPort) throws IOException {
 
     ReadWriteLock dagLock = getDagLock(queryIdentifier);
     // Note: This is a readLock to prevent a race with queryComplete. Operations
@@ -174,6 +174,8 @@ public class QueryTracker extends AbstractService {
         if (old != null) {
           queryInfo = old;
         } else {
+          // Ensure the UGI is setup once.
+          queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amHost, amPort);
           isExistingQueryInfo = false;
         }
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/b8d7192f/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 4b677aa..8fce546 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hive.ql.io.IOContextMap;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.log4j.MDC;
@@ -65,6 +64,7 @@ import org.apache.tez.runtime.task.TezTaskRunner2;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.SocketFactory;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
@@ -116,7 +116,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
   private final SignableVertexSpec vertex;
   private final TezEvent initialEvent;
   private final SchedulerFragmentCompletingListener completionListener;
-  private UserGroupInformation taskUgi;
+  private UserGroupInformation fsTaskUgi;
+  private final SocketFactory socketFactory;
 
   @VisibleForTesting
   public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo,
@@ -125,7 +126,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
                             LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler,
                             FragmentCompletionHandler fragmentCompleteHandler, HadoopShim tezHadoopShim,
                             TezTaskAttemptID attemptId, SignableVertexSpec vertex, TezEvent initialEvent,
-                            UserGroupInformation taskUgi, SchedulerFragmentCompletingListener completionListener) {
+                            UserGroupInformation fsTaskUgi, SchedulerFragmentCompletingListener completionListener,
+                            SocketFactory socketFactory) {
     this.request = request;
     this.fragmentInfo = fragmentInfo;
     this.conf = conf;
@@ -153,8 +155,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
     this.fragmentCompletionHanler = fragmentCompleteHandler;
     this.tezHadoopShim = tezHadoopShim;
     this.initialEvent = initialEvent;
-    this.taskUgi = taskUgi;
+    this.fsTaskUgi = fsTaskUgi;
     this.completionListener = completionListener;
+    this.socketFactory = socketFactory;
   }
 
   public long getStartTime() {
@@ -196,27 +199,27 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
 
       // TODO Consolidate this code with TezChild.
       runtimeWatch.start();
-      if (taskUgi == null) {
-        taskUgi = UserGroupInformation.createRemoteUser(vertex.getUser());
+      if (fsTaskUgi == null) {
+        fsTaskUgi = UserGroupInformation.createRemoteUser(vertex.getUser());
       }
-      taskUgi.addCredentials(credentials);
+      fsTaskUgi.addCredentials(credentials);
 
       Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>();
       serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
           TezCommonUtils.convertJobTokenToBytes(jobToken));
       Multimap<String, String> startedInputsMap = createStartedInputMap(vertex);
 
-      UserGroupInformation taskOwner =
-          UserGroupInformation.createRemoteUser(vertex.getTokenIdentifier());
+      final UserGroupInformation taskOwner = fragmentInfo.getQueryInfo().getUmbilicalUgi();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("taskOwner hashCode:" + taskOwner.hashCode());
+      }
       final InetSocketAddress address =
           NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort());
-      SecurityUtil.setTokenService(jobToken, address);
-      taskOwner.addToken(jobToken);
       umbilical = taskOwner.doAs(new PrivilegedExceptionAction<LlapTaskUmbilicalProtocol>() {
         @Override
         public LlapTaskUmbilicalProtocol run() throws Exception {
           return RPC.getProxy(LlapTaskUmbilicalProtocol.class,
-              LlapTaskUmbilicalProtocol.versionID, address, conf);
+              LlapTaskUmbilicalProtocol.versionID, address, taskOwner, conf, socketFactory);
         }
       });
 
@@ -238,7 +241,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
       try {
         synchronized (this) {
           if (shouldRunTask) {
-            taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(),
+            taskRunner = new TezTaskRunner2(conf, fsTaskUgi, fragmentInfo.getLocalDirs(),
                 taskSpec,
                 vertex.getQueryIdentifier().getAppAttemptNumber(),
                 serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor,
@@ -260,7 +263,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
           isCompleted.set(true);
           return result;
         } finally {
-          FileSystem.closeAllForUGI(taskUgi);
+          FileSystem.closeAllForUGI(fsTaskUgi);
           LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
                   runtimeWatch.stop().elapsedMillis());
           if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/b8d7192f/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 5dc1be5..ae3328a 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -44,6 +44,8 @@ import org.apache.tez.runtime.task.TaskRunner2Result;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.SocketFactory;
+
 public class TaskExecutorTestHelpers {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecutorService.class);
@@ -184,7 +186,7 @@ public class TaskExecutorTestHelpers {
           mock(KilledTaskHandler.class), mock(
               FragmentCompletionHandler.class), new DefaultHadoopShim(), null,
               requestProto.getWorkSpec().getVertex(), initialEvent, null, mock(
-              SchedulerFragmentCompletingListener.class));
+              SchedulerFragmentCompletingListener.class), mock(SocketFactory.class));
       this.workTime = workTime;
       this.canFinish = canFinish;
     }