You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/21 04:12:53 UTC
[29/63] [abbrv] Redesign Scheduler from pre-assignment to more
flexible schedule-on-demand model
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index a479bd3..2d30cf2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.taskmanager;
import java.io.File;
@@ -55,8 +54,8 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.configuration.ConfigConstants;
@@ -75,6 +74,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.ChannelManager;
import org.apache.flink.runtime.io.network.InsufficientResourcesException;
@@ -95,7 +95,6 @@ import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
import org.apache.flink.runtime.protocols.JobManagerProtocol;
import org.apache.flink.runtime.protocols.TaskOperationProtocol;
-import org.apache.flink.runtime.taskmanager.transferenvelope.RegisterTaskManagerResult;
import org.apache.flink.runtime.types.IntegerRecord;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.SerializableArrayList;
@@ -110,12 +109,19 @@ import org.apache.flink.util.StringUtils;
*/
public class TaskManager implements TaskOperationProtocol {
- private static final Logger LOG = LoggerFactory.getLogger(TaskManager.class);
+ private static final Log LOG = LogFactory.getLog(TaskManager.class);
- private final static int FAILURE_RETURN_CODE = -1;
+ private static final int STARTUP_FAILURE_RETURN_CODE = 1;
+
+ private static final int CRITICAL_ERROR_RETURN_CODE = 2;
private static final int IPC_HANDLER_COUNT = 1;
+ private static final int MAX_LOST_HEART_BEATS = 3;
+
+ private static final int DELAY_AFTER_LOST_CONNECTION = 10000;
+
+
public final static String ARG_CONF_DIR = "tempDir";
// --------------------------------------------------------------------------------------------
@@ -126,7 +132,6 @@ public class TaskManager implements TaskOperationProtocol {
private final ExecutionMode executionMode;
-
private final JobManagerProtocol jobManager;
private final InputSplitProviderProtocol globalInputSplitProvider;
@@ -169,9 +174,12 @@ public class TaskManager implements TaskOperationProtocol {
private final AtomicBoolean shutdownStarted = new AtomicBoolean(false);
+ private volatile InstanceID registeredId;
+
/** Stores whether the task manager has already been shut down. */
private volatile boolean shutdownComplete;
+
/**
* All parameters are obtained from the
* {@link GlobalConfiguration}, which must be loaded prior to instantiating the task manager.
@@ -203,7 +211,7 @@ public class TaskManager implements TaskOperationProtocol {
jobManagerAddress = new InetSocketAddress(tmpAddress, port);
}
catch (UnknownHostException e) {
- LOG.error("Could not resolve JobManager host name.");
+ LOG.fatal("Could not resolve JobManager host name.");
throw new Exception("Could not resolve JobManager host name: " + e.getMessage(), e);
}
@@ -214,7 +222,7 @@ public class TaskManager implements TaskOperationProtocol {
try {
this.jobManager = RPC.getProxy(JobManagerProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
} catch (IOException e) {
- LOG.error("Could not connect to the JobManager: " + e.getMessage(), e);
+ LOG.fatal("Could not connect to the JobManager: " + e.getMessage(), e);
throw new Exception("Failed to initialize connection to JobManager: " + e.getMessage(), e);
}
@@ -245,7 +253,7 @@ public class TaskManager implements TaskOperationProtocol {
this.taskManagerServer = RPC.getServer(this, taskManagerAddress.getHostAddress(), ipcPort, IPC_HANDLER_COUNT);
this.taskManagerServer.start();
} catch (IOException e) {
- LOG.error("Failed to start TaskManager server. " + e.getMessage(), e);
+ LOG.fatal("Failed to start TaskManager server. " + e.getMessage(), e);
throw new Exception("Failed to start taskmanager server. " + e.getMessage(), e);
}
}
@@ -254,7 +262,7 @@ public class TaskManager implements TaskOperationProtocol {
try {
this.globalInputSplitProvider = RPC.getProxy(InputSplitProviderProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
} catch (IOException e) {
- LOG.error(e.getMessage(), e);
+ LOG.fatal(e.getMessage(), e);
throw new Exception("Failed to initialize connection to global input split provider: " + e.getMessage(), e);
}
@@ -262,7 +270,7 @@ public class TaskManager implements TaskOperationProtocol {
try {
this.lookupService = RPC.getProxy(ChannelLookupProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
} catch (IOException e) {
- LOG.error(e.getMessage(), e);
+ LOG.fatal(e.getMessage(), e);
throw new Exception("Failed to initialize channel lookup protocol. " + e.getMessage(), e);
}
@@ -270,7 +278,7 @@ public class TaskManager implements TaskOperationProtocol {
try {
this.accumulatorProtocolProxy = RPC.getProxy(AccumulatorProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
} catch (IOException e) {
- LOG.error("Failed to initialize accumulator protocol: " + e.getMessage(), e);
+ LOG.fatal("Failed to initialize accumulator protocol: " + e.getMessage(), e);
throw new Exception("Failed to initialize accumulator protocol: " + e.getMessage(), e);
}
@@ -324,17 +332,13 @@ public class TaskManager implements TaskOperationProtocol {
ConfigConstants.TASK_MANAGER_NET_NUM_OUT_THREADS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS);
- int lowWaterMark = GlobalConfiguration.getInteger(
- ConfigConstants.TASK_MANAGER_NET_NETTY_LOW_WATER_MARK,
- ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK);
-
- int highWaterMark = GlobalConfiguration.getInteger(
- ConfigConstants.TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK,
- ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK);
+ int closeAfterIdleForMs = GlobalConfiguration.getInteger(
+ ConfigConstants.TASK_MANAGER_NET_CLOSE_AFTER_IDLE_FOR_MS_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NET_CLOSE_AFTER_IDLE_FOR_MS);
networkConnectionManager = new NettyConnectionManager(
localInstanceConnectionInfo.address(), localInstanceConnectionInfo.dataPort(),
- bufferSize, numInThreads, numOutThreads, lowWaterMark, highWaterMark);
+ bufferSize, numInThreads, numOutThreads, closeAfterIdleForMs);
break;
}
@@ -391,7 +395,7 @@ public class TaskManager implements TaskOperationProtocol {
this.memoryManager = new DefaultMemoryManager(memorySize, this.numberOfSlots, pageSize);
} catch (Throwable t) {
- LOG.error("Unable to initialize memory manager with " + (memorySize >>> 20) + " megabytes of memory.", t);
+ LOG.fatal("Unable to initialize memory manager with " + (memorySize >>> 20) + " megabytes of memory.", t);
throw new Exception("Unable to initialize memory manager.", t);
}
}
@@ -400,15 +404,21 @@ public class TaskManager implements TaskOperationProtocol {
this.ioManager = new IOManager(tmpDirPaths);
- this.heartbeatThread = new Thread() {
- @Override
- public void run() {
- runHeartbeatLoop();
- }
- };
-
- this.heartbeatThread.setName("Heartbeat Thread");
- this.heartbeatThread.start();
+ // start the heart beats
+ {
+ final long interval = GlobalConfiguration.getInteger(
+ ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL);
+
+ this.heartbeatThread = new Thread() {
+ @Override
+ public void run() {
+ registerAndRunHeartbeatLoop(interval, MAX_LOST_HEART_BEATS);
+ }
+ };
+ this.heartbeatThread.setName("Heartbeat Thread");
+ this.heartbeatThread.start();
+ }
// --------------------------------------------------------------------
// Memory Usage
@@ -500,7 +510,7 @@ public class TaskManager implements TaskOperationProtocol {
line = parser.parse(options, args);
} catch (ParseException e) {
System.err.println("CLI Parsing failed. Reason: " + e.getMessage());
- System.exit(FAILURE_RETURN_CODE);
+ System.exit(STARTUP_FAILURE_RETURN_CODE);
}
String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
@@ -524,8 +534,8 @@ public class TaskManager implements TaskOperationProtocol {
try {
new TaskManager(ExecutionMode.CLUSTER);
} catch (Exception e) {
- LOG.error("Taskmanager startup failed: " + e.getMessage(), e);
- System.exit(FAILURE_RETURN_CODE);
+ LOG.fatal("Taskmanager startup failed: " + e.getMessage(), e);
+ System.exit(STARTUP_FAILURE_RETURN_CODE);
}
// park the main thread to keep the JVM alive (all other threads may be daemon threads)
@@ -537,63 +547,7 @@ public class TaskManager implements TaskOperationProtocol {
}
}
- /**
- * This method send the periodic heartbeats.
- */
- private void runHeartbeatLoop() {
- final long interval = GlobalConfiguration.getInteger(
- ConfigConstants.TASK_MANAGER_HEARTBEAT_INTERVAL_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL);
-
- try {
- while(!shutdownStarted.get()){
- RegisterTaskManagerResult result = this.jobManager.registerTaskManager(this
- .localInstanceConnectionInfo,this.hardwareDescription,
- new IntegerRecord(this.numberOfSlots));
-
- if(result.getReturnCode() == RegisterTaskManagerResult.ReturnCode.SUCCESS){
- break;
- }
-
- try{
- Thread.sleep(50);
- }catch(InterruptedException e){
- if (!shutdownStarted.get()) {
- LOG.error("TaskManager register task manager loop was interrupted without shutdown.");
- }
- }
- }
-
- } catch (IOException e) {
- if(!shutdownStarted.get()){
- LOG.error("Registering task manager caused an exception: " + e.getMessage(), e);
- }
- return;
- }
- while (!shutdownStarted.get()) {
- // sleep until the next heart beat
- try {
- Thread.sleep(interval);
- }
- catch (InterruptedException e) {
- if (!shutdownStarted.get()) {
- LOG.error("TaskManager heart beat loop was interrupted without shutdown.");
- }
- }
-
- // send heart beat
- try {
- this.jobManager.sendHeartbeat(this.localInstanceConnectionInfo);
- } catch (IOException e) {
- if (shutdownStarted.get()) {
- break;
- } else {
- LOG.error("Sending the heart beat caused an exception: " + e.getMessage(), e);
- }
- }
- }
- }
/**
@@ -979,7 +933,7 @@ public class TaskManager implements TaskOperationProtocol {
this.jobManager.updateTaskExecutionState(new TaskExecutionState(jobID, id, newExecutionState,
optionalDescription));
} catch (IOException e) {
- LOG.error("Could not update task execution state.", e);
+ LOG.error(e);
}
}
}
@@ -1046,7 +1000,7 @@ public class TaskManager implements TaskOperationProtocol {
this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Got interrupted while awaiting the termination of the executor service.", e);
+ LOG.debug(e);
}
}
}
@@ -1065,7 +1019,6 @@ public class TaskManager implements TaskOperationProtocol {
@Override
public void logBufferUtilization() {
-
this.channelManager.logBufferUtilization();
}
@@ -1090,38 +1043,8 @@ public class TaskManager implements TaskOperationProtocol {
this.channelManager.invalidateLookupCacheEntries(channelIDs);
}
- /**
- * Checks, whether the given strings describe existing directories that are writable. If that is not
- * the case, an exception is raised.
- *
- * @param tempDirs
- * An array of strings which are checked to be paths to writable directories.
- * @throws Exception
- * Thrown, if any of the mentioned checks fails.
- */
- private static final void checkTempDirs(final String[] tempDirs) throws Exception {
-
- for (int i = 0; i < tempDirs.length; ++i) {
-
- final String dir = tempDirs[i];
- if (dir == null) {
- throw new Exception("Temporary file directory #" + (i + 1) + " is null.");
- }
-
- final File f = new File(dir);
-
- if (!f.exists()) {
- throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' does not exist.");
- }
-
- if (!f.isDirectory()) {
- throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' is not a directory.");
- }
-
- if (!f.canWrite()) {
- throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' is not writable.");
- }
- }
+ public void cancelAndClearEverything() {
+ LOG.info("Cancelling all computations and discarding all cached data.");
}
// --------------------------------------------------------------------------------------------
@@ -1136,8 +1059,152 @@ public class TaskManager implements TaskOperationProtocol {
return this.executionMode;
}
+ /**
+ * Gets the ID under which the TaskManager is currently registered at its JobManager.
+ * If the TaskManager has not been registered, yet, or if it lost contact, this is is null.
+ *
+ * @return The ID under which the TaskManager is currently registered.
+ */
+ public InstanceID getRegisteredId() {
+ return this.registeredId;
+ }
+
+ /**
+ * Checks if the TaskManager is properly registered and ready to receive work.
+ *
+ * @return True, if the TaskManager is registered, false otherwise.
+ */
+ public boolean isRegistered() {
+ return this.registeredId != null;
+ }
+
// --------------------------------------------------------------------------------------------
- // Memory and Garbace Collection Debugging Utilities
+ // Heartbeats
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * This method registers the TaskManager at the jobManager and send periodic heartbeats.
+ */
+ private void registerAndRunHeartbeatLoop(long interval, int maxNonSuccessfulHeatbeats) {
+
+ while (!shutdownStarted.get()) {
+ InstanceID resultId = null;
+
+ // try to register. We try as long as we need to, because it may be that the jobmanager is not yet online
+ {
+ final long maxDelay = 10000; // the maximal delay between registration attempts
+ final long reportingDelay = 5000;
+ long currentDelay = 100; // initially, wait 100 msecs for the next registration attempt
+
+ while (!shutdownStarted.get())
+ {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to register at Jobmanager...");
+ }
+
+ try {
+ resultId = this.jobManager.registerTaskManager(this.localInstanceConnectionInfo,
+ this.hardwareDescription, this.numberOfSlots);
+ }
+ catch (IOException e) {
+ // this may be if the job manager was not yet online
+ // if this has happened for a while, report it. if it has just happened
+ // at the very beginning, this may not mean anything (JM still in startup)
+ if (currentDelay >= reportingDelay) {
+ LOG.error("Connection to JobManager failed.", e);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Could not connect to JobManager.", e);
+ }
+ }
+
+ // check if we were accepted
+ if (resultId != null) {
+ // success
+ this.registeredId = resultId;
+ break;
+ } else {
+ // this is bad. The job manager refused us. report and try again later
+ LOG.error("Registration attempt refused by JobManager.");
+ }
+
+ try {
+ Thread.sleep(currentDelay);
+ }
+ catch (InterruptedException e) {
+ // may be due to shutdown
+ if (!shutdownStarted.get()) {
+ LOG.error("TaskManager's registration loop was interrupted without shutdown.");
+ }
+ }
+
+ // increase the time between registration attempts, to not keep on pinging overly frequently
+ currentDelay = Math.min(2 * currentDelay, maxDelay);
+ }
+ }
+
+ // registration complete, or shutdown
+ int successiveUnsuccessfulHeartbeats = 0;
+
+ // the heart beat loop
+ while (!shutdownStarted.get()) {
+ // sleep until the next heart beat
+ try {
+ Thread.sleep(interval);
+ }
+ catch (InterruptedException e) {
+ if (!shutdownStarted.get()) {
+ LOG.error("TaskManager heart beat loop was interrupted without shutdown.");
+ }
+ }
+
+ // send heart beat
+ try {
+ boolean accepted = this.jobManager.sendHeartbeat(resultId);
+
+ if (accepted) {
+ // reset the unsuccessful heart beats
+ successiveUnsuccessfulHeartbeats = 0;
+ } else {
+ successiveUnsuccessfulHeartbeats++;
+ LOG.error("JobManager rejected heart beat.");
+ }
+ }
+ catch (IOException e) {
+ if (!shutdownStarted.get()) {
+ successiveUnsuccessfulHeartbeats++;
+ LOG.error("Sending the heart beat failed on I/O error: " + e.getMessage(), e);
+ }
+ }
+
+ if (successiveUnsuccessfulHeartbeats == maxNonSuccessfulHeatbeats) {
+ // we are done for, we cannot connect to the jobmanager any more
+ // or we are not welcome there any more
+ // what to do now? Wait for a while and try to reconnect
+ LOG.error("TaskManager has lost connection to JobManager.");
+
+ // mark us as disconnected and abort all computation
+ this.registeredId = null;
+ cancelAndClearEverything();
+
+ // wait for a while, then attempt to register again
+ try {
+ Thread.sleep(DELAY_AFTER_LOST_CONNECTION);
+ }
+ catch (InterruptedException e) {
+ if (!shutdownStarted.get()) {
+ LOG.error("TaskManager heart beat loop was interrupted without shutdown.");
+ }
+ }
+
+ // leave the heart beat loop
+ break;
+ }
+ } // end heart beat loop
+ } // end while not shutdown
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Memory and Garbage Collection Debugging Utilities
// --------------------------------------------------------------------------------------------
private String getMemoryUsageStatsAsString(MemoryMXBean memoryMXBean) {
@@ -1175,4 +1242,55 @@ public class TaskManager implements TaskOperationProtocol {
return str.toString();
}
+
+
+ // --------------------------------------------------------------------------------------------
+ // Miscellaneous Utilities
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Checks, whether the given strings describe existing directories that are writable. If that is not
+ * the case, an exception is raised.
+ *
+ * @param tempDirs An array of strings which are checked to be paths to writable directories.
+ * @throws Exception Thrown, if any of the mentioned checks fails.
+ */
+ private static final void checkTempDirs(final String[] tempDirs) throws Exception {
+ for (int i = 0; i < tempDirs.length; ++i) {
+ final String dir = tempDirs[i];
+ if (dir == null) {
+ throw new Exception("Temporary file directory #" + (i + 1) + " is null.");
+ }
+
+ final File f = new File(dir);
+
+ if (!f.exists()) {
+ throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' does not exist.");
+ }
+
+ if (!f.isDirectory()) {
+ throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' is not a directory.");
+ }
+
+ if (!f.canWrite()) {
+ throw new Exception("Temporary file directory '" + f.getAbsolutePath() + "' is not writable.");
+ }
+ }
+ }
+
+ public static class EmergencyShutdownExceptionHandler implements Thread.UncaughtExceptionHandler {
+
+ private final TaskManager tm;
+
+ public EmergencyShutdownExceptionHandler(TaskManager tm) {
+ this.tm = tm;
+ }
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.fatal("Thread " + t.getName() + " caused an unrecoverable exception.", e);
+ tm.shutdown();
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/transferenvelope/RegisterTaskManagerResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/transferenvelope/RegisterTaskManagerResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/transferenvelope/RegisterTaskManagerResult.java
deleted file mode 100644
index 27fae1c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/transferenvelope/RegisterTaskManagerResult.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.taskmanager.transferenvelope;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.util.EnumUtils;
-
-public class RegisterTaskManagerResult implements IOReadableWritable {
- public enum ReturnCode{
- SUCCESS, FAILURE
- };
-
- public RegisterTaskManagerResult(){
- this.returnCode = ReturnCode.SUCCESS;
- }
-
- public RegisterTaskManagerResult(ReturnCode returnCode){
- this.returnCode = returnCode;
- }
-
- private ReturnCode returnCode;
-
- public ReturnCode getReturnCode() { return this.returnCode; }
-
-
- @Override
- public void write(DataOutputView out) throws IOException {
- EnumUtils.writeEnum(out, this.returnCode);
- }
-
- @Override
- public void read(DataInputView in) throws IOException {
- this.returnCode = EnumUtils.readEnum(in, ReturnCode.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
index 290326c..02c814c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/LocalInstanceManagerTest.java
@@ -55,7 +55,6 @@ import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
import org.apache.flink.runtime.protocols.JobManagerProtocol;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.taskmanager.transferenvelope.RegisterTaskManagerResult;
import org.apache.flink.runtime.types.IntegerRecord;
import org.junit.After;
@@ -214,16 +213,6 @@ public class LocalInstanceManagerTest {
}
@Override
- public void sendHeartbeat(InstanceConnectionInfo instanceConnectionInfo) {}
-
- @Override
- public RegisterTaskManagerResult registerTaskManager(InstanceConnectionInfo instanceConnectionInfo,
- HardwareDescription hardwareDescription, IntegerRecord numberOfSlots)
- {
- return new RegisterTaskManagerResult(RegisterTaskManagerResult.ReturnCode.SUCCESS);
- }
-
- @Override
public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultSchedulerTest.java
index 76616b8..3a24f97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultSchedulerTest.java
@@ -16,185 +16,165 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.jobmanager.scheduler;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
-import java.io.IOException;
-import java.util.List;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
-import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.StringRecord;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.GraphConversionException;
-import org.apache.flink.runtime.io.network.api.RecordReader;
-import org.apache.flink.runtime.io.network.api.RecordWriter;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphDefinitionException;
-import org.apache.flink.runtime.jobgraph.JobInputVertex;
-import org.apache.flink.runtime.jobgraph.JobOutputVertex;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
-import org.apache.flink.runtime.jobmanager.scheduler.SchedulingException;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.util.StringUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex2;
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobgraph.JobID;
import org.junit.Test;
/**
* This class checks the functionality of the {@link org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler} class
*/
-@SuppressWarnings("serial")
public class DefaultSchedulerTest {
-
- public static final class InputTask extends AbstractInvokable {
-
- @Override
- public void registerInputOutput() {
- new RecordWriter<StringRecord>(this);
- }
-
- @Override
- public void invoke() throws Exception {}
-
- }
-
- public static final class OutputTask extends AbstractInvokable {
-
- @Override
- public void registerInputOutput() {
- new RecordReader<StringRecord>(this, StringRecord.class);
- }
-
- @Override
- public void invoke() throws Exception {}
-
- }
-
- public static final class DummyInputFormat extends GenericInputFormat<IntValue> {
-
- @Override
- public boolean reachedEnd() throws IOException {
- return true;
- }
-
- @Override
- public IntValue nextRecord(IntValue reuse) throws IOException {
- return null;
- }
- }
-
- public static final class DummyOutputFormat implements OutputFormat<IntValue> {
-
- @Override
- public void configure(Configuration parameters) {}
-
- @Override
- public void open(int taskNumber, int numTasks) {}
-
- @Override
- public void writeRecord(IntValue record) {}
-
- @Override
- public void close() {}
- }
-
- /**
- * Constructs a sample execution graph consisting of two vertices connected by a channel of the given type.
- *
- * @param channelType
- * the channel type to connect the vertices with
- * @return a sample execution graph
- */
- private ExecutionGraph createExecutionGraph(ChannelType channelType) {
-
- final JobGraph jobGraph = new JobGraph("Job Graph");
-
- final JobInputVertex inputVertex = new JobInputVertex("Input 1", jobGraph);
- inputVertex.setInvokableClass(InputTask.class);
- inputVertex.setInputFormat(new DummyInputFormat());
- inputVertex.setNumberOfSubtasks(1);
-
- final JobOutputVertex outputVertex = new JobOutputVertex("Output 1", jobGraph);
- outputVertex.setInvokableClass(OutputTask.class);
- outputVertex.setOutputFormat(new DummyOutputFormat());
- outputVertex.setNumberOfSubtasks(1);
-
+ private int portNum = 10000;
+
+ @Test
+ public void testAddAndRemoveInstance() {
try {
- inputVertex.connectTo(outputVertex, channelType);
- } catch (JobGraphDefinitionException e) {
- fail(StringUtils.stringifyException(e));
+ DefaultScheduler scheduler = new DefaultScheduler();
+
+ Instance i1 = getRandomInstance(2);
+ Instance i2 = getRandomInstance(2);
+ Instance i3 = getRandomInstance(2);
+
+ assertEquals(0, scheduler.getNumberOfAvailableInstances());
+ scheduler.newInstanceAvailable(i1);
+ assertEquals(1, scheduler.getNumberOfAvailableInstances());
+ scheduler.newInstanceAvailable(i2);
+ assertEquals(2, scheduler.getNumberOfAvailableInstances());
+ scheduler.newInstanceAvailable(i3);
+ assertEquals(3, scheduler.getNumberOfAvailableInstances());
+
+ // cannot add available instance again
+ try {
+ scheduler.newInstanceAvailable(i2);
+ fail("Scheduler accepted instance twice");
+ }
+ catch (IllegalArgumentException e) {
+ // bueno!
+ }
+
+ // some instances die
+ assertEquals(3, scheduler.getNumberOfAvailableInstances());
+ scheduler.instanceDied(i2);
+ assertEquals(2, scheduler.getNumberOfAvailableInstances());
+
+ // try to add a dead instance
+ try {
+ scheduler.newInstanceAvailable(i2);
+ fail("Scheduler accepted dead instance");
+ }
+ catch (IllegalArgumentException e) {
+ // stimmt
+
+ }
+
+ scheduler.instanceDied(i1);
+ assertEquals(1, scheduler.getNumberOfAvailableInstances());
+ scheduler.instanceDied(i3);
+ assertEquals(0, scheduler.getNumberOfAvailableInstances());
+
+ assertFalse(i1.isAlive());
+ assertFalse(i2.isAlive());
+ assertFalse(i3.isAlive());
}
-
- try {
- LibraryCacheManager.register(jobGraph.getJobID(), new String[0]);
- return new ExecutionGraph(jobGraph, 1);
-
- } catch (GraphConversionException e) {
- fail(StringUtils.stringifyException(e));
- } catch (IOException e) {
- fail(StringUtils.stringifyException(e));
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
}
-
- return null;
}
-
- /**
- * Checks the behavior of the scheduleJob() method with a job consisting of two tasks connected via an in-memory
- * channel.
- */
+
+
@Test
- public void testScheduleJobWithInMemoryChannel() {
-
- final TestInstanceManager tim = new TestInstanceManager();
- final TestDeploymentManager tdm = new TestDeploymentManager();
- final DefaultScheduler scheduler = new DefaultScheduler(tdm, tim);
-
- final ExecutionGraph executionGraph = createExecutionGraph(ChannelType.IN_MEMORY);
-
+ public void testAssignToSlots() {
try {
+ final JobID jobId = new JobID();
+
+ DefaultScheduler scheduler = new DefaultScheduler();
+
+ scheduler.newInstanceAvailable(getRandomInstance(2));
+ scheduler.newInstanceAvailable(getRandomInstance(2));
+ scheduler.newInstanceAvailable(getRandomInstance(2));
+
+ ResourceId id1 = new ResourceId();
+ ResourceId id2 = new ResourceId();
+ ResourceId id3 = new ResourceId();
+ ResourceId id4 = new ResourceId();
+ ResourceId id5 = new ResourceId();
+ ResourceId id6 = new ResourceId();
+
+ AllocatedSlot s1 = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id1), true);
+ AllocatedSlot s2 = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id2), true);
+ AllocatedSlot s3 = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id3), true);
+ AllocatedSlot s4 = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id4), true);
+ AllocatedSlot s5 = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id5), true);
+ AllocatedSlot s6 = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id6), true);
+
+ // no more slots available, the next call should throw an exception
try {
- scheduler.scheduleJob(executionGraph);
- } catch (SchedulingException e) {
- fail(StringUtils.stringifyException(e));
- }
-
- // Wait for the deployment to complete
- tdm.waitForDeployment();
-
- assertEquals(executionGraph.getJobID(), tdm.getIDOfLastDeployedJob());
- final List<ExecutionVertex> listOfDeployedVertices = tdm.getListOfLastDeployedVertices();
- assertNotNull(listOfDeployedVertices);
- // Vertices connected via in-memory channels must be deployed in a single cycle.
- assertEquals(2, listOfDeployedVertices.size());
-
- // Check if the release of the allocated resources works properly by simulating the vertices' life cycle
- assertEquals(0, tim.getNumberOfReleaseMethodCalls());
-
- // Simulate vertex life cycle
- for (final ExecutionVertex vertex : listOfDeployedVertices) {
- vertex.updateExecutionState(ExecutionState.STARTING);
- vertex.updateExecutionState(ExecutionState.RUNNING);
- vertex.updateExecutionState(ExecutionState.FINISHING);
- vertex.updateExecutionState(ExecutionState.FINISHED);
+ scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), new ResourceId()), true);
+ fail("Scheduler accepted scheduling request without available resource.");
}
-
- assertEquals(1, tim.getNumberOfReleaseMethodCalls());
- } finally {
- try {
- LibraryCacheManager.unregister(executionGraph.getJobID());
- } catch (IOException ioe) {
- // Ignore exception here
+ catch (NoResourceAvailableException e) {
+ // expected
}
+
+ // schedule something into the same slots as before
+ AllocatedSlot s1s = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id1), true);
+ AllocatedSlot s2s = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id2), true);
+ AllocatedSlot s3s = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id3), true);
+ AllocatedSlot s4s = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id4), true);
+ AllocatedSlot s5s = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id5), true);
+ AllocatedSlot s6s = scheduler.getResourceToScheduleUnit(new ScheduledUnit(jobId, getDummyVertex(), id6), true);
+
+ assertEquals(s1, s1s);
+ assertEquals(s2, s2s);
+ assertEquals(s3, s3s);
+ assertEquals(s4, s4s);
+ assertEquals(s5, s5s);
+ assertEquals(s6, s6s);
}
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+
+
+ // --------------------------------------------------------------------------------------------
+
+ private Instance getRandomInstance(int numSlots) {
+ InetAddress address;
+ try {
+ address = InetAddress.getByName("127.0.0.1");
+ } catch (UnknownHostException e) {
+ throw new RuntimeException("Test could not create IP address for localhost loopback.");
+ }
+
+ int ipcPort = portNum++;
+ int dataPort = portNum++;
+
+ InstanceConnectionInfo ci = new InstanceConnectionInfo(address, ipcPort, dataPort);
+
+ final long GB = 1024L*1024*1024;
+ HardwareDescription resources = new HardwareDescription(4, 4*GB, 3*GB, 2*GB);
+
+ return new Instance(ci, new InstanceID(), resources, numSlots);
+ }
+
+ private ExecutionVertex2 getDummyVertex() {
+ return new ExecutionVertex2();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/LifoSetQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/LifoSetQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/LifoSetQueueTest.java
new file mode 100644
index 0000000..4a2a0cb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/LifoSetQueueTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+/**
+ * Test for the basic functionality of the {@link LifoSetQueue}.
+ */
+public class LifoSetQueueTest {
+
+ @Test
+ public void testSizeAddPollAndPeek() {
+ try {
+ LifoSetQueue<Integer> queue = new LifoSetQueue<Integer>();
+
+ // empty queue
+ assertEquals(0, queue.size());
+ assertNull(queue.poll());
+ assertNull(queue.peek());
+
+ // add some elements
+ assertTrue(queue.add(1));
+ assertTrue(queue.offer(2));
+ assertTrue(queue.offer(3));
+ assertEquals(3, queue.size());
+
+ assertEquals(3, queue.peek().intValue());
+
+ // prevent duplicates. note that the methods return true, because no capacity constraint is violated
+ assertTrue(queue.add(1));
+ assertTrue(queue.offer(1));
+ assertTrue(queue.add(3));
+ assertTrue(queue.offer(3));
+ assertTrue(queue.add(2));
+ assertTrue(queue.offer(2));
+ assertEquals(3, queue.size());
+
+ // peek and poll some elements
+ assertEquals(3, queue.peek().intValue());
+ assertEquals(3, queue.size());
+ assertEquals(3, queue.poll().intValue());
+ assertEquals(2, queue.size());
+ assertEquals(2, queue.peek().intValue());
+ assertEquals(2, queue.size());
+ assertEquals(2, queue.poll().intValue());
+ assertEquals(1, queue.size());
+ assertEquals(1, queue.peek().intValue());
+ assertEquals(1, queue.size());
+ assertEquals(1, queue.poll().intValue());
+ assertEquals(0, queue.size());
+ assertTrue(queue.isEmpty());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " : " + e.getMessage());
+ }
+ }
+
+ /**
+ * Remove is tricky, because it goes through the iterator and calls remove() on the iterator.
+ */
+ @Test
+ public void testRemove() {
+ try {
+ LifoSetQueue<String> queue = new LifoSetQueue<String>();
+ queue.add("1");
+ queue.add("2");
+ queue.add("3");
+ queue.add("4");
+ queue.add("5");
+ queue.add("6");
+ queue.add("7");
+
+ assertEquals(7, queue.size());
+ assertEquals("7", queue.peek());
+
+ // remove non-existing
+ assertFalse(queue.remove("8"));
+
+ // remove the last
+ assertTrue(queue.remove("7"));
+ // remove the first
+ assertTrue(queue.remove("1"));
+ // remove in the middle
+ assertTrue(queue.remove("3"));
+
+ assertEquals(4, queue.size());
+
+ // check that we can re-add the removed elements
+ assertTrue(queue.add("1"));
+ assertTrue(queue.add("7"));
+ assertTrue(queue.add("3"));
+ assertEquals(7, queue.size());
+
+ // check the order
+ assertEquals("3", queue.poll());
+ assertEquals("7", queue.poll());
+ assertEquals("1", queue.poll());
+ assertEquals("6", queue.poll());
+ assertEquals("5", queue.poll());
+ assertEquals("4", queue.poll());
+ assertEquals("2", queue.poll());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " : " + e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestDeploymentManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestDeploymentManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestDeploymentManager.java
deleted file mode 100644
index 81a7d60..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestDeploymentManager.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import java.util.List;
-
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.jobmanager.DeploymentManager;
-
-/**
- * This class provides an implementation of the {@DeploymentManager} interface which is used during
- * the unit tests.
- * <p>
- * This class is thread-safe.
- *
- */
-public class TestDeploymentManager implements DeploymentManager {
-
- /**
- * The ID of the job to be deployed.
- */
- private volatile JobID jobID = null;
-
- /**
- * The list of vertices to be deployed.
- */
- private volatile List<ExecutionVertex> verticesToBeDeployed = null;
-
- /**
- * Auxiliary object to synchronize on.
- */
- private final Object synchronizationObject = new Object();
-
-
- @Override
- public void deploy(final JobID jobID, final Instance instance,
- final List<ExecutionVertex> verticesToBeDeployed) {
-
- this.jobID = jobID;
- this.verticesToBeDeployed = verticesToBeDeployed;
-
- synchronized (this.synchronizationObject) {
- this.synchronizationObject.notify();
- }
- }
-
- /**
- * Returns the ID of the last deployed job.
- */
- JobID getIDOfLastDeployedJob() {
-
- return this.jobID;
- }
-
- /**
- * Returns a list of the last deployed vertices.
- *
- * @return a list of the last deployed vertices
- */
- List<ExecutionVertex> getListOfLastDeployedVertices() {
-
- return this.verticesToBeDeployed;
- }
-
- /**
- * Clears the internal state of the test deployment manager.
- */
- void clear() {
-
- this.jobID = null;
- this.verticesToBeDeployed = null;
- }
-
- /**
- * Wait for the scheduler to complete the deployment.
- */
- void waitForDeployment() {
-
- while (this.jobID == null) {
- synchronized (this.synchronizationObject) {
- try {
- this.synchronizationObject.wait(50);
- } catch (InterruptedException e) {
- // Ignore exception
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java
deleted file mode 100644
index 9286def..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/TestInstanceManager.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.jobmanager.scheduler;
-
-import java.net.Inet4Address;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.instance.*;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.topology.NetworkNode;
-import org.apache.flink.runtime.topology.NetworkTopology;
-import org.apache.flink.util.StringUtils;
-
-/**
- * A dummy implementation of an {@link org.apache.flink.runtime.instance.InstanceManager}.
- */
-public final class TestInstanceManager implements InstanceManager {
-
- /**
- * Counts the number of times the method releaseAllocatedResource is called.
- */
- private volatile int numberOfReleaseCalls = 0;
-
- /**
- * The instance listener.
- */
- private volatile InstanceListener instanceListener = null;
-
- /**
- * The list of resources allocated to a job.
- */
- private final List<AllocatedResource> allocatedResources;
-
- /**
- * The test instance
- */
- private final TestInstance testInstance;
-
- /**
- * Test implementation of {@link org.apache.flink.runtime.instance.Instance}.
- *
- */
- private static final class TestInstance extends Instance {
-
- /**
- * Constructs a new test instance.
- *
- * @param instanceConnectionInfo
- * the instance connection information
- * @param parentNode
- * the parent node in the network topology
- * @param networkTopology
- * the network topology
- * @param hardwareDescription
- * the hardware description
- * @param numberSlots
- * the number of slots available on the instance
- */
- public TestInstance(final InstanceConnectionInfo instanceConnectionInfo,
- final NetworkNode parentNode, final NetworkTopology networkTopology,
- final HardwareDescription hardwareDescription, int numberSlots) {
- super(instanceConnectionInfo, parentNode, networkTopology, hardwareDescription, numberSlots);
- }
- }
-
- /**
- * Constructs a new test instance manager
- */
- public TestInstanceManager() {
-
- final HardwareDescription hd = HardwareDescriptionFactory.construct(1, 1L, 1L);
-
- this.allocatedResources = new ArrayList<AllocatedResource>();
- try {
- final InstanceConnectionInfo ici = new InstanceConnectionInfo(Inet4Address.getLocalHost(), 1, 1);
- final NetworkTopology nt = new NetworkTopology();
- this.testInstance = new TestInstance(ici, nt.getRootNode(), nt, hd, 1);
- this.allocatedResources.add(new AllocatedResource(testInstance, new AllocationID()));
- } catch (UnknownHostException e) {
- throw new RuntimeException(StringUtils.stringifyException(e));
- }
- }
-
-
- @Override
- public void requestInstance(final JobID jobID, final Configuration conf,
- int requiredSlots) throws InstanceException {
-
- if (this.instanceListener == null) {
- throw new InstanceException("instanceListener not registered with TestInstanceManager");
- }
-
- final InstanceListener il = this.instanceListener;
-
- final Runnable runnable = new Runnable() {
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void run() {
- il.resourcesAllocated(jobID, allocatedResources);
- }
- };
-
- new Thread(runnable).start();
- }
-
-
- @Override
- public void releaseAllocatedResource(final AllocatedResource allocatedResource) throws InstanceException {
- ++this.numberOfReleaseCalls;
- }
-
- /**
- * Returns the number of times the method releaseAllocatedResource has been called.
- *
- * @return the number of times the method releaseAllocatedResource has been called
- */
- int getNumberOfReleaseMethodCalls() {
- return this.numberOfReleaseCalls;
- }
-
-
- @Override
- public void reportHeartBeat(final InstanceConnectionInfo instanceConnectionInfo) {
- throw new IllegalStateException("reportHeartBeat called on TestInstanceManager");
- }
-
- @Override
- public void registerTaskManager(final InstanceConnectionInfo instanceConnectionInfo,
- final HardwareDescription hardwareDescription, int numberSlots){
- throw new IllegalStateException("registerTaskManager called on TestInstanceManager.");
- }
-
- @Override
- public NetworkTopology getNetworkTopology(final JobID jobID) {
- throw new IllegalStateException("getNetworkTopology called on TestInstanceManager");
- }
-
-
- @Override
- public void setInstanceListener(final InstanceListener instanceListener) {
-
- this.instanceListener = instanceListener;
- }
-
- @Override
- public Instance getInstanceByName(final String name) {
- throw new IllegalStateException("getInstanceByName called on TestInstanceManager");
- }
-
- @Override
- public void shutdown() {
- throw new IllegalStateException("shutdown called on TestInstanceManager");
- }
-
- @Override
- public int getNumberOfTaskManagers() {
- throw new IllegalStateException("getNumberOfTaskTrackers called on TestInstanceManager");
- }
-
- @Override
- public int getNumberOfSlots() {
- return this.testInstance.getNumberOfSlots();
- }
-
-
- @Override
- public Map<InstanceConnectionInfo, Instance> getInstances() {
- throw new IllegalStateException("getInstances called on TestInstanceManager");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
index 5b76d53..2b2c81d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultSplitAssignerTest.java
@@ -1,17 +1,20 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flink.runtime.jobmanager.splitassigner;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2d6199ff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
index ddad0d3..fdc1cf4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableSplitAssignerTest.java
@@ -1,17 +1,20 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.flink.runtime.jobmanager.splitassigner;