You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/22 23:47:36 UTC

[15/22] git commit: Cleanup of merge with slot-based scheduler branch.

Cleanup of merge with slot-based scheduler branch.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/429493d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/429493d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/429493d0

Branch: refs/heads/master
Commit: 429493d027700c3635c8045ad1087511e456d04f
Parents: 86d206c
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jun 19 00:15:46 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jun 22 21:07:20 2014 +0200

----------------------------------------------------------------------
 .../eu/stratosphere/client/program/Client.java  |  17 +---
 .../client/program/PackagedProgram.java         |   1 +
 .../stratosphere/client/program/ClientTest.java |   3 +-
 .../compiler/costs/DefaultCostEstimator.java    |   6 +-
 .../compiler/dag/DataSourceNode.java            |   2 +-
 .../pact/compiler/DOPChangeTest.java            |   8 +-
 .../pact/compiler/IterationsCompilerTest.java   |   4 +-
 .../nephele/execution/RuntimeEnvironment.java   |  72 +++++++++++--
 .../nephele/protocols/JobManagerProtocol.java   |   2 +-
 .../stratosphere/nephele/taskmanager/Task.java  |  70 +++----------
 .../nephele/taskmanager/TaskManager.java        |  10 +-
 .../local/LocalInstanceManagerTest.java         |  19 ++--
 .../nephele/jobmanager/JobManagerITCase.java    |  19 ++--
 .../nephele/util/ServerTestUtils.java           |  48 +--------
 .../netty/InboundEnvelopeDecoderTest.java       |   2 +-
 .../confs/jobmanager/nephele-default.xml        |  51 ----------
 .../test/javaApiOperators/SumMinMaxITCase.java  |   4 +-
 .../PackagedProgramEndToEndITCase.java          |  48 +++++----
 .../test/util/testjar/KMeansForTest.java        | 102 +++++--------------
 19 files changed, 183 insertions(+), 305 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
index 31138f6..ec66f4a 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
@@ -78,9 +78,6 @@ public class Client {
 		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
 		
 		this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator());
-		
-		//  Disable Local Execution when using a Client
-		ContextEnvironment.disableLocalExecution();
 	}
 
 	/**
@@ -105,9 +102,6 @@ public class Client {
 		}
 
 		this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator());
-		
-		//  Disable Local Execution when using a Client
-		ContextEnvironment.disableLocalExecution();
 	}
 	
 	public void setPrintStatusDuringExecution(boolean print) {
@@ -152,20 +146,13 @@ public class Client {
 			ByteArrayOutputStream baes = new ByteArrayOutputStream();
 			System.setErr(new PrintStream(baes));
 			try {
+				ContextEnvironment.disableLocalExecution();
 				prog.invokeInteractiveModeForExecution();
 			}
 			catch (ProgramInvocationException e) {
-				System.setOut(originalOut);
-				System.setErr(originalErr);
-				System.err.println(baes);
-				System.out.println(baos);
 				throw e;
 			}
 			catch (Throwable t) {
-				System.setOut(originalOut);
-				System.setErr(originalErr);
-				System.err.println(baes);
-				System.out.println(baos);
 				// the invocation gets aborted with the preview plan
 				if (env.optimizerPlan != null) {
 					return env.optimizerPlan;
@@ -240,6 +227,8 @@ public class Client {
 			}
 			env.setAsContext();
 			
+			ContextEnvironment.disableLocalExecution();
+			
 			if (wait) {
 				// invoke here
 				prog.invokeInteractiveModeForExecution();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
index 51d2e34..edf36b3 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
@@ -215,6 +215,7 @@ public class PackagedProgram {
 			PreviewPlanEnvironment env = new PreviewPlanEnvironment();
 			env.setAsContext();
 			try {
+				ContextEnvironment.disableLocalExecution();
 				invokeInteractiveModeForExecution();
 			}
 			catch (ProgramInvocationException e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java
index 244ec4a..b3f8159 100644
--- a/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java
@@ -20,7 +20,6 @@ import static org.mockito.MockitoAnnotations.initMocks;
 import static org.powermock.api.mockito.PowerMockito.whenNew;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -95,7 +94,7 @@ public class ClientTest {
 		when(program.getPlanWithJars()).thenReturn(planWithJarsMock);
 		when(planWithJarsMock.getPlan()).thenReturn(planMock);
 		
-		whenNew(PactCompiler.class).withArguments(any(DataStatistics.class), any(CostEstimator.class), any(InetSocketAddress.class)).thenReturn(this.compilerMock);
+		whenNew(PactCompiler.class).withArguments(any(DataStatistics.class), any(CostEstimator.class)).thenReturn(this.compilerMock);
 		when(compilerMock.compile(planMock)).thenReturn(optimizedPlanMock);
 		
 		whenNew(NepheleJobGraphGenerator.class).withNoArguments().thenReturn(generatorMock);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
index fde5970..3c52f6a 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
@@ -34,7 +34,7 @@ public class DefaultCostEstimator extends CostEstimator {
 	 * The case of the estimation for all relative costs. We heuristically pick a very large data volume, which
 	 * will favor strategies that are less expensive on large data volumes. This is robust and 
 	 */
-	private static final long HEURISTIC_COST_BASE = 10000000000l;
+	private static final long HEURISTIC_COST_BASE = 1000000000L;
 	
 	// The numbers for the CPU effort are rather magic at the moment and should be seen rather ordinal
 	
@@ -105,9 +105,9 @@ public class DefaultCostEstimator extends CostEstimator {
 			} else {
 				costs.addNetworkCost(replicationFactor * estOutShipSize);
 			}
-			costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * replicationFactor);
+			costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 10 * replicationFactor);
 		} else {
-			costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 200);
+			costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 1000);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java
index 7234420..b6d6b71 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java
@@ -168,7 +168,7 @@ public class DataSourceNode extends OptimizerNode {
 			return this.cachedPlans;
 		}
 		
-		SourcePlanNode candidate = new SourcePlanNode(this, "DataSource("+this.getPactContract().getName()+")");
+		SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getPactContract().getName()+")");
 		candidate.updatePropertiesWithUniqueSets(getUniqueFields());
 		
 		final Costs costs = new Costs();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
index 273c42c..605f197 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
@@ -209,15 +209,15 @@ public class DOPChangeTest extends CompilerTestBase {
 		ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
 		ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
 		
-		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn);
-		Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn);
+		Assert.assertTrue("Invalid ship strategy for an operator.", 
+				(ShipStrategyType.PARTITION_RANDOM ==  mapIn && ShipStrategyType.PARTITION_HASH == reduceIn) || 
+				(ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn));
 	}
 	
 	
 	
 	@Test
-	public void checkPropertyHandlingWithDecreasingDegreeOfParallelism()
-	{
+	public void checkPropertyHandlingWithDecreasingDegreeOfParallelism() {
 		final int degOfPar = DEFAULT_PARALLELISM;
 		
 		// construct the plan

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
index 05a863c..c6ebf50 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
@@ -153,6 +153,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 	public void testIterationPushingWorkOut() throws Exception {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
 			
 			DataSet<Tuple2<Long, Long>> input1 = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
 			
@@ -168,6 +169,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			
 			BulkIterationPlanNode bipn = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
 			
+			// check that work has not! been pushed out, as the end of the step function does not produce the necessary properties
 			for (Channel c : bipn.getPartialSolutionPlanNode().getOutgoingChannels()) {
 				assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
 			}
@@ -182,7 +184,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 	public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
 		
 		// open a bulk iteration
-		IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(100);
+		IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(20);
 		
 		DataSet<Tuple2<Long, Long>> changes = iteration
 				.join(edges).where(0).equalTo(0).with(new Join222())

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
index cc542c0..4e07694 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
@@ -36,6 +36,7 @@ import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
 import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPool;
 import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
 import eu.stratosphere.util.StringUtils;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -108,11 +109,6 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	private final AbstractInvokable invokable;
 
 	/**
-	 * The thread executing the task in the environment.
-	 */
-	private volatile Thread executingThread = null;
-
-	/**
 	 * The ID of the job this task belongs to.
 	 */
 	private final JobID jobID;
@@ -136,6 +132,11 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	 * The observer object for the task's execution.
 	 */
 	private volatile ExecutionObserver executionObserver = null;
+	
+	/**
+	 * The thread executing the task in the environment.
+	 */
+	private volatile Thread executingThread;
 
 	/**
 	 * The RPC proxy to report accumulators to JobManager
@@ -159,7 +160,9 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 
 	private LocalBufferPool outputBufferPool;
 
-	private Map<String,FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
+	private final Map<String,FutureTask<Path>> cacheCopyTasks;
+	
+	private volatile boolean canceled;
 
 	/**
 	 * Creates a new runtime environment object which contains the runtime information for the encapsulated Nephele
@@ -174,8 +177,9 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	 */
 	public RuntimeEnvironment(final JobID jobID, final String taskName,
 							final Class<? extends AbstractInvokable> invokableClass, final Configuration taskConfiguration,
-							final Configuration jobConfiguration) throws Exception {
-
+							final Configuration jobConfiguration)
+		throws Exception
+	{
 		this.jobID = jobID;
 		this.taskName = taskName;
 		this.invokableClass = invokableClass;
@@ -186,7 +190,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 		this.memoryManager = null;
 		this.ioManager = null;
 		this.inputSplitProvider = null;
-
+		this.cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
+		
 		this.invokable = this.invokableClass.newInstance();
 		this.invokable.setEnvironment(this);
 		this.invokable.registerInputOutput();
@@ -433,6 +438,53 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 			return this.executingThread;
 		}
 	}
+	
+	public void cancelExecution() {
+		canceled = true;
+
+		LOG.info("Canceling " + getTaskNameWithIndex());
+
+		// Request user code to shut down
+		if (this.invokable != null) {
+			try {
+				this.invokable.cancel();
+			} catch (Throwable e) {
+				LOG.error("Error while cancelling the task.", e);
+			}
+		}
+		
+		// interrupt the running thread and wait for it to die
+		executingThread.interrupt();
+		
+		try {
+			executingThread.join(5000);
+		} catch (InterruptedException e) {}
+		
+		if (!executingThread.isAlive()) {
+			return;
+		}
+		
+		// Continuously interrupt the user thread until it changed to state CANCELED
+		while (executingThread != null && executingThread.isAlive()) {
+			LOG.warn("Task " + getTaskName() + " did not react to cancelling signal. Sending repeated interrupt.");
+
+			if (LOG.isDebugEnabled()) {
+				StringBuilder bld = new StringBuilder("Task ").append(getTaskName()).append(" is stuck in method:\n");
+				
+				StackTraceElement[] stack = executingThread.getStackTrace();
+				for (StackTraceElement e : stack) {
+					bld.append(e).append('\n');
+				}
+				LOG.debug(bld.toString());
+			}
+			
+			executingThread.interrupt();
+			
+			try {
+				executingThread.join(1000);
+			} catch (InterruptedException e) {}
+		}
+	}
 
 	/**
 	 * Blocks until all output channels are closed.
@@ -459,7 +511,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	 */
 	private void waitForInputChannelsToBeClosed() throws IOException, InterruptedException {
 		// Wait for disconnection of all output gates
-		while (true) {
+		while (!canceled) {
 
 			// Make sure, we leave this method with an InterruptedException when the task has been canceled
 			if (this.executionObserver.isCanceled()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java
index 5070b51..4db5e14 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java
@@ -51,7 +51,7 @@ public interface JobManagerProtocol extends VersionedProtocol {
 	 * @return whether the task manager was successfully registered
 	 */
 	RegisterTaskManagerResult registerTaskManager(InstanceConnectionInfo instanceConnectionInfo,
-						HardwareDescription hardwareDescription,IntegerRecord numberOfSlots)
+						HardwareDescription hardwareDescription, IntegerRecord numberOfSlots)
 			throws IOException;
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
index 825eae1..d1a6275 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
@@ -24,14 +24,14 @@ import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.profiling.TaskManagerProfiler;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-import eu.stratosphere.util.StringUtils;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import java.util.Iterator;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public final class Task implements ExecutionObserver {
 
@@ -49,13 +49,14 @@ public final class Task implements ExecutionObserver {
 	/**
 	 * Stores whether the task has been canceled.
 	 */
-	private volatile boolean isCanceled = false;
+	private final AtomicBoolean canceled = new AtomicBoolean(false);
 
 	/**
 	 * The current execution state of the task
 	 */
 	private volatile ExecutionState executionState = ExecutionState.STARTING;
 
+	
 	private Queue<ExecutionListener> registeredListeners = new ConcurrentLinkedQueue<ExecutionListener>();
 
 	public Task(ExecutionVertexID vertexID, final RuntimeEnvironment environment, TaskManager taskManager) {
@@ -102,11 +103,11 @@ public final class Task implements ExecutionObserver {
 	}
 
 	public void cancelExecution() {
-		cancelOrKillExecution(true);
+		cancelOrKillExecution();
 	}
 
 	public void killExecution() {
-		cancelOrKillExecution(false);
+		cancelOrKillExecution();
 	}
 
 	/**
@@ -114,10 +115,8 @@ public final class Task implements ExecutionObserver {
 	 *
 	 * @param cancel <code>true/code> if the task shall be canceled, <code>false</code> if it shall be killed
 	 */
-	private void cancelOrKillExecution(boolean cancel) {
-		final Thread executingThread = this.environment.getExecutingThread();
-
-		if (executingThread == null) {
+	private void cancelOrKillExecution() {
+		if (!this.canceled.compareAndSet(false, true)) {
 			return;
 		}
 
@@ -125,45 +124,13 @@ public final class Task implements ExecutionObserver {
 			return;
 		}
 
-		LOG.info((cancel ? "Canceling " : "Killing ") + this.environment.getTaskNameWithIndex());
-
-		if (cancel) {
-			this.isCanceled = true;
-			// Change state
-			executionStateChanged(ExecutionState.CANCELING, null);
-
-			// Request user code to shut down
-			try {
-				final AbstractInvokable invokable = this.environment.getInvokable();
-				if (invokable != null) {
-					invokable.cancel();
-				}
-			} catch (Throwable e) {
-				LOG.error(StringUtils.stringifyException(e));
-			}
-		}
-
-		// Continuously interrupt the user thread until it changed to state CANCELED
-		while (true) {
-
-			executingThread.interrupt();
-
-			if (!executingThread.isAlive()) {
-				break;
-			}
-
-			try {
-				executingThread.join(1000);
-			} catch (InterruptedException e) {}
+		executionStateChanged(ExecutionState.CANCELING, null);
 
-			if (!executingThread.isAlive()) {
-				break;
-			}
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Sending repeated " + (cancel == true ? "canceling" : "killing") + " signal to " +
-						this.environment.getTaskName() + " with state " + this.executionState);
-			}
+		// Request user code to shut down
+		try {
+			this.environment.cancelExecution();
+		} catch (Throwable e) {
+			LOG.error("Error while cancelling the task.", e);
 		}
 	}
 
@@ -271,7 +238,6 @@ public final class Task implements ExecutionObserver {
 	 * @return the name of the task associated with this observer object
 	 */
 	private String getTaskName() {
-
 		return this.environment.getTaskName() + " (" + (this.environment.getIndexInSubtaskGroup() + 1) + "/"
 				+ this.environment.getCurrentNumberOfSubtasks() + ")";
 	}
@@ -279,7 +245,6 @@ public final class Task implements ExecutionObserver {
 
 	@Override
 	public void userThreadStarted(final Thread userThread) {
-
 		// Notify the listeners
 		final Iterator<ExecutionListener> it = this.registeredListeners.iterator();
 		while (it.hasNext()) {
@@ -290,7 +255,6 @@ public final class Task implements ExecutionObserver {
 
 	@Override
 	public void userThreadFinished(final Thread userThread) {
-
 		// Notify the listeners
 		final Iterator<ExecutionListener> it = this.registeredListeners.iterator();
 		while (it.hasNext()) {
@@ -307,7 +271,6 @@ public final class Task implements ExecutionObserver {
 	 */
 
 	public void registerExecutionListener(final ExecutionListener executionListener) {
-
 		this.registeredListeners.add(executionListener);
 	}
 
@@ -320,15 +283,13 @@ public final class Task implements ExecutionObserver {
 	 */
 
 	public void unregisterExecutionListener(final ExecutionListener executionListener) {
-
 		this.registeredListeners.remove(executionListener);
 	}
 
 
 	@Override
 	public boolean isCanceled() {
-
-		return this.isCanceled;
+		return this.canceled.get();
 	}
 
 	/**
@@ -337,7 +298,6 @@ public final class Task implements ExecutionObserver {
 	 * @return the runtime environment associated with this task
 	 */
 	public RuntimeEnvironment getRuntimeEnvironment() {
-
 		return this.environment;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
index 5966cf9..f191df3 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
@@ -164,12 +164,15 @@ public class TaskManager implements TaskOperationProtocol {
 	private volatile boolean shutdownComplete;
 	
 	/**
-	 * Constructs a new task manager, starts its IPC service and attempts to discover the job manager to
-	 * receive an initial configuration. All parameters are obtained from the 
+	 * All parameters are obtained from the 
 	 * {@link GlobalConfiguration}, which must be loaded prior to instantiating the task manager.
 	 */
 	public TaskManager(ExecutionMode executionMode) throws Exception {
-
+		if (executionMode == null) {
+			throw new NullPointerException("Execution mode must not be null.");
+		}
+		
+		
 		LOG.info("TaskManager started as user " + UserGroupInformation.getCurrentUser().getShortUserName());
 		LOG.info("User system property: " + System.getProperty("user.name"));
 		LOG.info("Execution mode: " + executionMode);
@@ -340,6 +343,7 @@ public class TaskManager implements TaskOperationProtocol {
 
 		{
 			HardwareDescription resources = HardwareDescriptionFactory.extractFromSystem();
+			
 			numberOfSlots = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
 					Hardware.getNumberCPUCores());
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java
index a8f1331..b491c12 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java
@@ -13,21 +13,19 @@
 
 package eu.stratosphere.nephele.instance.local;
 
-import static org.junit.Assert.fail;
-
 import eu.stratosphere.nephele.instance.InstanceManager;
 import junit.framework.Assert;
 
 import org.junit.Test;
 
 import eu.stratosphere.nephele.ExecutionMode;
+import eu.stratosphere.configuration.ConfigConstants;
+import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.configuration.GlobalConfiguration;
 import eu.stratosphere.nephele.jobmanager.JobManager;
-import eu.stratosphere.nephele.util.ServerTestUtils;
 
 /**
  * Tests for the {@link LocalInstanceManager}.
- * 
  */
 public class LocalInstanceManagerTest {
 
@@ -39,12 +37,13 @@ public class LocalInstanceManagerTest {
 	public void testInstanceTypeFromConfiguration() {
 
 		try {
-			final String configDir = ServerTestUtils.getConfigDir();
-			if (configDir == null) {
-				fail("Cannot locate configuration directory");
-			}
-
-			GlobalConfiguration.loadConfiguration(configDir);
+			Configuration cfg = new Configuration();
+			cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1");
+			cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 6123);
+			cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
+			cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+			
+			GlobalConfiguration.includeConfiguration(cfg);
 
 			// start JobManager
 			ExecutionMode executionMode = ExecutionMode.LOCAL;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
index fa4fbfa..89f7428 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
@@ -20,6 +20,7 @@ import eu.stratosphere.core.fs.Path;
 import eu.stratosphere.nephele.ExecutionMode;
 import eu.stratosphere.nephele.client.JobClient;
 import eu.stratosphere.nephele.client.JobExecutionException;
+import eu.stratosphere.nephele.execution.RuntimeEnvironment;
 import eu.stratosphere.nephele.jobgraph.DistributionPattern;
 import eu.stratosphere.runtime.io.channels.ChannelType;
 import eu.stratosphere.nephele.jobgraph.JobFileInputVertex;
@@ -34,6 +35,7 @@ import eu.stratosphere.nephele.util.FileLineWriter;
 import eu.stratosphere.nephele.util.JarFileCreator;
 import eu.stratosphere.nephele.util.ServerTestUtils;
 import eu.stratosphere.util.LogUtils;
+
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.junit.AfterClass;
@@ -57,7 +59,8 @@ import static org.junit.Assert.fail;
 public class JobManagerITCase {
 
 	static {
-		LogUtils.initializeDefaultTestConsoleLogger();
+		// no logging, because the tests create expected exception
+		LogUtils.initializeDefaultConsoleLogger(Level.INFO);
 	}
 	
 	/**
@@ -75,7 +78,13 @@ public class JobManagerITCase {
 	@BeforeClass
 	public static void startNephele() {
 		try {
-			GlobalConfiguration.loadConfiguration(ServerTestUtils.getConfigDir());
+			Configuration cfg = new Configuration();
+			cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1");
+			cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 6123);
+			cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
+			cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+			
+			GlobalConfiguration.includeConfiguration(cfg);
 			
 			configuration = GlobalConfiguration.getConfiguration(new String[] { ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY });
 			
@@ -301,8 +310,9 @@ public class JobManagerITCase {
 			
 			// deactivate logging of expected test exceptions
 			Logger rtLogger = Logger.getLogger(Task.class);
-			Level rtLevel = rtLogger.getEffectiveLevel();
 			rtLogger.setLevel(Level.OFF);
+			Logger envLogger = Logger.getLogger(RuntimeEnvironment.class);
+			envLogger.setLevel(Level.DEBUG);
 			
 			try {
 				jobClient.submitJobAndWait();
@@ -317,9 +327,6 @@ public class JobManagerITCase {
 
 				return;
 			}
-			finally {
-				rtLogger.setLevel(rtLevel);
-			}
 
 			fail("Expected exception but did not receive it");
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/ServerTestUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/ServerTestUtils.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/ServerTestUtils.java
index 4202880..59de8cc 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/ServerTestUtils.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/ServerTestUtils.java
@@ -38,27 +38,9 @@ import eu.stratosphere.nephele.protocols.ExtendedManagementProtocol;
 public final class ServerTestUtils {
 
 	/**
-	 * The system property key to retrieve the user directory.
-	 */
-	private static final String USER_DIR_KEY = "user.dir";
-
-	/**
-	 * The directory containing the correct configuration file to be used during the tests.
-	 */
-	private static final String CORRECT_CONF_DIR = "/confs/jobmanager";
-
-	/**
-	 * The directory the configuration directory is expected in when test are executed using Eclipse.
-	 */
-	private static final String ECLIPSE_PATH_EXTENSION = "/src/test/resources";
-
-	private static final String INTELLIJ_PATH_EXTENSION = "/stratosphere-runtime/src/test/resources";
-
-	/**
 	 * Private constructor.
 	 */
-	private ServerTestUtils() {
-	}
+	private ServerTestUtils() {}
 
 	/**
 	 * Creates a file with a random name in the given sub directory within the directory for temporary files. The
@@ -182,34 +164,6 @@ public final class ServerTestUtils {
 	}
 
 	/**
-	 * Returns the directory containing the configuration files that shall be used for the test.
-	 * 
-	 * @return the directory containing the configuration files or <code>null</code> if the configuration directory
-	 *         could not be located
-	 */
-	public static String getConfigDir() {
-
-		// This is the correct path for Maven-based tests
-		String configDir = System.getProperty(USER_DIR_KEY) + CORRECT_CONF_DIR;
-		if (new File(configDir).exists()) {
-			return configDir;
-		}
-
-		configDir = System.getProperty(USER_DIR_KEY) + ECLIPSE_PATH_EXTENSION + CORRECT_CONF_DIR;
-		if (new File(configDir).exists()) {
-			return configDir;
-		}
-
-		configDir = System.getProperty(USER_DIR_KEY) + INTELLIJ_PATH_EXTENSION + CORRECT_CONF_DIR;
-
-		if(new File(configDir).exists()){
-			return configDir;
-		}
-
-		return null;
-	}
-
-	/**
 	 * Waits until the job manager for the tests has become ready to accept jobs.
 	 * 
 	 * @param jobManager

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
index 1ee9293..1c6270a 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
@@ -354,7 +354,7 @@ public class InboundEnvelopeDecoderTest {
 		buf.readerIndex(0);
 		ByteBuf[] slices = randomSlices(buf);
 
-		ch.writeInbound((Object) slices);
+		ch.writeInbound((Object[]) slices);
 
 		for (ByteBuf slice : slices) {
 			Assert.assertEquals(1, slice.refCnt());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/test/resources/confs/jobmanager/nephele-default.xml
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/resources/confs/jobmanager/nephele-default.xml b/stratosphere-runtime/src/test/resources/confs/jobmanager/nephele-default.xml
deleted file mode 100644
index 5d93d95..0000000
--- a/stratosphere-runtime/src/test/resources/confs/jobmanager/nephele-default.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-<?xml version="1.0"?>
-<configuration>
-    <property>
-        <key>jobmanager.rpc.address</key>
-        <value>127.0.0.1</value>
-    </property>
-    <property>
-        <key>jobmanager.rpc.port</key>
-        <value>6123</value>
-    </property>
-    <property>
-	<key>taskmanager.setup.usediscovery</key>
-	<value>false</value>
-    </property>
-    <property>
-        <key>discoveryservice.magicnumber</key>
-        <value>12300</value>
-    </property>
-    <property>
-        <key>instancemanager.local.classname</key>
-        <value>eu.stratosphere.nephele.instance.local.LocalInstanceManager</value>
-    </property>
-    <property>
-	<key>jobmanager.scheduler.local.classname</key>
-	<value>eu.stratosphere.nephele.jobmanager.scheduler.local.LocalScheduler</value>
-    </property>
-    <property>
-        <key>channel.network.compressor</key>
-        <value>de.tu_berlin.cit.nephele.io.compression.lzo.LzoCompressor</value>
-    </property>
-    <property>
-        <key>channel.network.decompressor</key>
-        <value>de.tu_berlin.cit.nephele.io.compression.lzo.LzoDecompressor</value>
-    </property>
-    <property>
-        <key>channel.file.compressor</key>
-        <value>de.tu_berlin.cit.nephele.io.compression.lzo.LzoCompressor</value>
-    </property>
-    <property>
-        <key>channel.file.decompressor</key>
-        <value>de.tu_berlin.cit.nephele.io.compression.lzo.LzoDecompressor</value>
-    </property>
-    <property>
-        <key>taskmanager.memory.size</key>
-        <value>8</value>
-    </property>
-    <property>
-        <key>instancemanager.local.type</key>
-        <value>test,4,4,1024,160,0</value>
-    </property>
-</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java
index 8b7dc80..cef9c05 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java
@@ -18,13 +18,14 @@ package eu.stratosphere.test.javaApiOperators;
 
 import eu.stratosphere.api.java.DataSet;
 import eu.stratosphere.api.java.ExecutionEnvironment;
-import eu.stratosphere.api.java.aggregation.Aggregations;
 import eu.stratosphere.api.java.tuple.Tuple1;
 import eu.stratosphere.api.java.tuple.Tuple2;
 import eu.stratosphere.api.java.tuple.Tuple3;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.test.javaApiOperators.util.CollectionDataSets;
 import eu.stratosphere.test.util.JavaProgramTestBase;
+
+import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.FileNotFoundException;
@@ -32,6 +33,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 
+@RunWith(Parameterized.class)
 public class SumMinMaxITCase extends JavaProgramTestBase  {
 
 	private static int NUM_PROGRAMS = 3;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
index 0e297ec..17f4a29 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
@@ -14,7 +14,6 @@ package eu.stratosphere.test.localDistributed;
 
 import java.io.File;
 import java.io.FileWriter;
-import java.net.URL;
 
 import eu.stratosphere.client.minicluster.NepheleMiniCluster;
 import org.junit.Assert;
@@ -29,23 +28,26 @@ import eu.stratosphere.util.LogUtils;
 
 public class PackagedProgramEndToEndITCase {
 
-	private static final int DOP = 4;
-
 	static {
 		LogUtils.initializeDefaultTestConsoleLogger();
 	}
 	
 	@Test
 	public void testEverything() {
+		final int PORT = 6498;
+		
 		NepheleMiniCluster cluster = new NepheleMiniCluster();
+		
+		File points = null;
+		File clusters = null;
+		File outFile = null;
+		
 		try {
 			// set up the files
-			File points = File.createTempFile("kmeans_points", ".in");
-			File clusters = File.createTempFile("kmeans_clusters", ".in");
-			File outFile = File.createTempFile("kmeans_result", ".out");
-			points.deleteOnExit();
-			clusters.deleteOnExit();
-			outFile.deleteOnExit();
+			points = File.createTempFile("kmeans_points", ".in");
+			clusters = File.createTempFile("kmeans_clusters", ".in");
+			outFile = File.createTempFile("kmeans_result", ".out");
+			
 			outFile.delete();
 
 			FileWriter fwPoints = new FileWriter(points);
@@ -56,31 +58,39 @@ public class PackagedProgramEndToEndITCase {
 			fwClusters.write(KMeansData.INITIAL_CENTERS);
 			fwClusters.close();
 
-			URL jarFileURL = getClass().getResource("/KMeansForTest.jar");
-			String jarPath = jarFileURL.getFile();
+			String jarPath = "target/maven-test-jar.jar";
 
 			// run KMeans
 			cluster.setNumTaskTracker(2);
 			cluster.setTaskManagerNumSlots(2);
+			cluster.setJobManagerRpcPort(PORT);
 			cluster.start();
-			RemoteExecutor ex = new RemoteExecutor("localhost", 6498);
+			
+			RemoteExecutor ex = new RemoteExecutor("localhost", PORT);
 
 			ex.executeJar(jarPath,
-					"eu.stratosphere.examples.scala.testing.KMeansForTest",
-					new String[] {new Integer(DOP).toString(),
+					"eu.stratosphere.test.util.testjar.KMeansForTest",
+					new String[] {
 							points.toURI().toString(),
 							clusters.toURI().toString(),
 							outFile.toURI().toString(),
 							"25"});
 
-			points.delete();
-			clusters.delete();
-			outFile.delete();
-
 		} catch (Exception e) {
 			e.printStackTrace();
 			Assert.fail(e.getMessage());
-		} finally {
+		}
+		finally {
+			if (points != null) {
+				points.delete();
+			}
+			if (cluster != null) {
+				clusters.delete();
+			}
+			if (outFile != null) {
+				outFile.delete();
+			}
+			
 			try {
 				cluster.stop();
 			} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
index d1b249a..8047649 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
@@ -23,7 +23,6 @@ import eu.stratosphere.api.common.Program;
 import eu.stratosphere.api.java.DataSet;
 import eu.stratosphere.api.java.ExecutionEnvironment;
 import eu.stratosphere.api.java.IterativeDataSet;
-import eu.stratosphere.api.java.RemoteEnvironment;
 import eu.stratosphere.api.java.functions.MapFunction;
 import eu.stratosphere.api.java.functions.ReduceFunction;
 import eu.stratosphere.api.java.tuple.Tuple2;
@@ -31,25 +30,41 @@ import eu.stratosphere.api.java.tuple.Tuple3;
 import eu.stratosphere.configuration.Configuration;
 
 @SuppressWarnings("serial")
-public class KMeansForTest implements Program{
+public class KMeansForTest implements Program {
 	
 	// *************************************************************************
 	//     PROGRAM
 	// *************************************************************************
 	
 	
+
 	@Override
 	public Plan getPlan(String... args) {
-		if(!parseParameters(args)) {
-			throw new RuntimeException("Unable to parse the arguments");
+		if (args.length < 4) {
+			throw new IllegalArgumentException("Missing parameters");
 		}
-	
-		// set up execution environment
-		ExecutionEnvironment env = new RemoteEnvironment("localhost", 1);
+		
+		final String pointsPath = args[0];
+		final String centersPath = args[1];
+		final String outputPath = args[2];
+		final int numIterations = Integer.parseInt(args[3]);
+		
+		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setDegreeOfParallelism(4);
 		
 		// get input data
-		DataSet<Point> points = getPointDataSet(env);
-		DataSet<Centroid> centroids = getCentroidDataSet(env);
+		DataSet<Point> points = env.readCsvFile(pointsPath)
+				.fieldDelimiter('|')
+				.includeFields(true, true)
+				.types(Double.class, Double.class)
+				.map(new TuplePointConverter());
+		
+		DataSet<Centroid> centroids = env.readCsvFile(centersPath)
+				.fieldDelimiter('|')
+				.includeFields(true, true, true)
+				.types(Integer.class, Double.class, Double.class)
+				.map(new TupleCentroidConverter());
 		
 		// set number of bulk iterations for KMeans algorithm
 		IterativeDataSet<Centroid> loop = centroids.iterate(numIterations);
@@ -71,11 +86,8 @@ public class KMeansForTest implements Program{
 				.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
 		
 		// emit result
-		if(fileOutput) {
-			clusteredPoints.writeAsCsv(outputPath, "\n", " ");
-		} else {
-			clusteredPoints.print();
-		}
+		clusteredPoints.writeAsCsv(outputPath, "\n", " ");
+
 		return env.createProgramPlan();
 	}
 	
@@ -229,66 +241,4 @@ public class KMeansForTest implements Program{
 			return new Centroid(value.f0, value.f1.div(value.f2));
 		}
 	}
-	
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-	
-	private static boolean fileOutput = false;
-	private static String pointsPath = null;
-	private static String centersPath = null;
-	private static String outputPath = null;
-	private static int numIterations = 10;
-	
-	private static boolean parseParameters(String[] programArguments) {
-		
-		if(programArguments.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if(programArguments.length == 4) {
-				pointsPath = programArguments[0];
-				centersPath = programArguments[1];
-				outputPath = programArguments[2];
-				numIterations = Integer.parseInt(programArguments[3]);
-			} else {
-				System.err.println("Usage: KMeans <points path> <centers path> <result path> <num iterations>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing K-Means example with default parameters and built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  We provide a data generator to create synthetic input files for this program.");
-			System.out.println("  Usage: KMeans <points path> <centers path> <result path> <num iterations>");
-		}
-		return true;
-	}
-	
-	private static DataSet<Point> getPointDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			// read points from CSV file
-			return env.readCsvFile(pointsPath)
-						.fieldDelimiter('|')
-						.includeFields(true, true)
-						.types(Double.class, Double.class)
-						.map(new TuplePointConverter());
-		} else {
-			throw new UnsupportedOperationException("Use file output");
-		}
-	}
-	
-	private static DataSet<Centroid> getCentroidDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			return env.readCsvFile(centersPath)
-						.fieldDelimiter('|')
-						.includeFields(true, true, true)
-						.types(Integer.class, Double.class, Double.class)
-						.map(new TupleCentroidConverter());
-		} else {
-			throw new UnsupportedOperationException("Use file output");
-		}
-	}
-
-	
-		
 }