You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/22 23:47:36 UTC
[15/22] git commit: Cleanup of merge with slot-based scheduler branch.
Cleanup of merge with slot-based scheduler branch.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/429493d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/429493d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/429493d0
Branch: refs/heads/master
Commit: 429493d027700c3635c8045ad1087511e456d04f
Parents: 86d206c
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jun 19 00:15:46 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jun 22 21:07:20 2014 +0200
----------------------------------------------------------------------
.../eu/stratosphere/client/program/Client.java | 17 +---
.../client/program/PackagedProgram.java | 1 +
.../stratosphere/client/program/ClientTest.java | 3 +-
.../compiler/costs/DefaultCostEstimator.java | 6 +-
.../compiler/dag/DataSourceNode.java | 2 +-
.../pact/compiler/DOPChangeTest.java | 8 +-
.../pact/compiler/IterationsCompilerTest.java | 4 +-
.../nephele/execution/RuntimeEnvironment.java | 72 +++++++++++--
.../nephele/protocols/JobManagerProtocol.java | 2 +-
.../stratosphere/nephele/taskmanager/Task.java | 70 +++----------
.../nephele/taskmanager/TaskManager.java | 10 +-
.../local/LocalInstanceManagerTest.java | 19 ++--
.../nephele/jobmanager/JobManagerITCase.java | 19 ++--
.../nephele/util/ServerTestUtils.java | 48 +--------
.../netty/InboundEnvelopeDecoderTest.java | 2 +-
.../confs/jobmanager/nephele-default.xml | 51 ----------
.../test/javaApiOperators/SumMinMaxITCase.java | 4 +-
.../PackagedProgramEndToEndITCase.java | 48 +++++----
.../test/util/testjar/KMeansForTest.java | 102 +++++--------------
19 files changed, 183 insertions(+), 305 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
index 31138f6..ec66f4a 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
@@ -78,9 +78,6 @@ public class Client {
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator());
-
- // Disable Local Execution when using a Client
- ContextEnvironment.disableLocalExecution();
}
/**
@@ -105,9 +102,6 @@ public class Client {
}
this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator());
-
- // Disable Local Execution when using a Client
- ContextEnvironment.disableLocalExecution();
}
public void setPrintStatusDuringExecution(boolean print) {
@@ -152,20 +146,13 @@ public class Client {
ByteArrayOutputStream baes = new ByteArrayOutputStream();
System.setErr(new PrintStream(baes));
try {
+ ContextEnvironment.disableLocalExecution();
prog.invokeInteractiveModeForExecution();
}
catch (ProgramInvocationException e) {
- System.setOut(originalOut);
- System.setErr(originalErr);
- System.err.println(baes);
- System.out.println(baos);
throw e;
}
catch (Throwable t) {
- System.setOut(originalOut);
- System.setErr(originalErr);
- System.err.println(baes);
- System.out.println(baos);
// the invocation gets aborted with the preview plan
if (env.optimizerPlan != null) {
return env.optimizerPlan;
@@ -240,6 +227,8 @@ public class Client {
}
env.setAsContext();
+ ContextEnvironment.disableLocalExecution();
+
if (wait) {
// invoke here
prog.invokeInteractiveModeForExecution();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
index 51d2e34..edf36b3 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
@@ -215,6 +215,7 @@ public class PackagedProgram {
PreviewPlanEnvironment env = new PreviewPlanEnvironment();
env.setAsContext();
try {
+ ContextEnvironment.disableLocalExecution();
invokeInteractiveModeForExecution();
}
catch (ProgramInvocationException e) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java
index 244ec4a..b3f8159 100644
--- a/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/program/ClientTest.java
@@ -20,7 +20,6 @@ import static org.mockito.MockitoAnnotations.initMocks;
import static org.powermock.api.mockito.PowerMockito.whenNew;
import java.io.IOException;
-import java.net.InetSocketAddress;
import org.junit.Before;
import org.junit.Test;
@@ -95,7 +94,7 @@ public class ClientTest {
when(program.getPlanWithJars()).thenReturn(planWithJarsMock);
when(planWithJarsMock.getPlan()).thenReturn(planMock);
- whenNew(PactCompiler.class).withArguments(any(DataStatistics.class), any(CostEstimator.class), any(InetSocketAddress.class)).thenReturn(this.compilerMock);
+ whenNew(PactCompiler.class).withArguments(any(DataStatistics.class), any(CostEstimator.class)).thenReturn(this.compilerMock);
when(compilerMock.compile(planMock)).thenReturn(optimizedPlanMock);
whenNew(NepheleJobGraphGenerator.class).withNoArguments().thenReturn(generatorMock);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
index fde5970..3c52f6a 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
@@ -34,7 +34,7 @@ public class DefaultCostEstimator extends CostEstimator {
* The case of the estimation for all relative costs. We heuristically pick a very large data volume, which
* will favor strategies that are less expensive on large data volumes. This is robust and
*/
- private static final long HEURISTIC_COST_BASE = 10000000000l;
+ private static final long HEURISTIC_COST_BASE = 1000000000L;
// The numbers for the CPU effort are rather magic at the moment and should be seen rather ordinal
@@ -105,9 +105,9 @@ public class DefaultCostEstimator extends CostEstimator {
} else {
costs.addNetworkCost(replicationFactor * estOutShipSize);
}
- costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * replicationFactor);
+ costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 10 * replicationFactor);
} else {
- costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 200);
+ costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 1000);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java
index 7234420..b6d6b71 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java
@@ -168,7 +168,7 @@ public class DataSourceNode extends OptimizerNode {
return this.cachedPlans;
}
- SourcePlanNode candidate = new SourcePlanNode(this, "DataSource("+this.getPactContract().getName()+")");
+ SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getPactContract().getName()+")");
candidate.updatePropertiesWithUniqueSets(getUniqueFields());
final Costs costs = new Costs();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
index 273c42c..605f197 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/DOPChangeTest.java
@@ -209,15 +209,15 @@ public class DOPChangeTest extends CompilerTestBase {
ShipStrategyType mapIn = map2Node.getInput().getShipStrategy();
ShipStrategyType reduceIn = red2Node.getInput().getShipStrategy();
- Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.FORWARD, mapIn);
- Assert.assertEquals("Invalid ship strategy for an operator.", ShipStrategyType.PARTITION_HASH, reduceIn);
+ Assert.assertTrue("Invalid ship strategy for an operator.",
+ (ShipStrategyType.PARTITION_RANDOM == mapIn && ShipStrategyType.PARTITION_HASH == reduceIn) ||
+ (ShipStrategyType.PARTITION_HASH == mapIn && ShipStrategyType.FORWARD == reduceIn));
}
@Test
- public void checkPropertyHandlingWithDecreasingDegreeOfParallelism()
- {
+ public void checkPropertyHandlingWithDecreasingDegreeOfParallelism() {
final int degOfPar = DEFAULT_PARALLELISM;
// construct the plan
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
index 05a863c..c6ebf50 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/IterationsCompilerTest.java
@@ -153,6 +153,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
public void testIterationPushingWorkOut() throws Exception {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
DataSet<Tuple2<Long, Long>> input1 = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
@@ -168,6 +169,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
BulkIterationPlanNode bipn = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();
+ // check that work has not! been pushed out, as the end of the step function does not produce the necessary properties
for (Channel c : bipn.getPartialSolutionPlanNode().getOutgoingChannels()) {
assertEquals(ShipStrategyType.PARTITION_HASH, c.getShipStrategy());
}
@@ -182,7 +184,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
// open a bulk iteration
- IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(100);
+ IterativeDataSet<Tuple2<Long, Long>> iteration = vertices.iterate(20);
DataSet<Tuple2<Long, Long>> changes = iteration
.join(edges).where(0).equalTo(0).with(new Join222())
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
index cc542c0..4e07694 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/execution/RuntimeEnvironment.java
@@ -36,6 +36,7 @@ import eu.stratosphere.runtime.io.network.bufferprovider.GlobalBufferPool;
import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPool;
import eu.stratosphere.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
import eu.stratosphere.util.StringUtils;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -108,11 +109,6 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
private final AbstractInvokable invokable;
/**
- * The thread executing the task in the environment.
- */
- private volatile Thread executingThread = null;
-
- /**
* The ID of the job this task belongs to.
*/
private final JobID jobID;
@@ -136,6 +132,11 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
* The observer object for the task's execution.
*/
private volatile ExecutionObserver executionObserver = null;
+
+ /**
+ * The thread executing the task in the environment.
+ */
+ private volatile Thread executingThread;
/**
* The RPC proxy to report accumulators to JobManager
@@ -159,7 +160,9 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
private LocalBufferPool outputBufferPool;
- private Map<String,FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
+ private final Map<String,FutureTask<Path>> cacheCopyTasks;
+
+ private volatile boolean canceled;
/**
* Creates a new runtime environment object which contains the runtime information for the encapsulated Nephele
@@ -174,8 +177,9 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
*/
public RuntimeEnvironment(final JobID jobID, final String taskName,
final Class<? extends AbstractInvokable> invokableClass, final Configuration taskConfiguration,
- final Configuration jobConfiguration) throws Exception {
-
+ final Configuration jobConfiguration)
+ throws Exception
+ {
this.jobID = jobID;
this.taskName = taskName;
this.invokableClass = invokableClass;
@@ -186,7 +190,8 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
this.memoryManager = null;
this.ioManager = null;
this.inputSplitProvider = null;
-
+ this.cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
+
this.invokable = this.invokableClass.newInstance();
this.invokable.setEnvironment(this);
this.invokable.registerInputOutput();
@@ -433,6 +438,53 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
return this.executingThread;
}
}
+
+ public void cancelExecution() {
+ canceled = true;
+
+ LOG.info("Canceling " + getTaskNameWithIndex());
+
+ // Request user code to shut down
+ if (this.invokable != null) {
+ try {
+ this.invokable.cancel();
+ } catch (Throwable e) {
+ LOG.error("Error while cancelling the task.", e);
+ }
+ }
+
+ // interrupt the running thread and wait for it to die
+ executingThread.interrupt();
+
+ try {
+ executingThread.join(5000);
+ } catch (InterruptedException e) {}
+
+ if (!executingThread.isAlive()) {
+ return;
+ }
+
+ // Continuously interrupt the user thread until it changed to state CANCELED
+ while (executingThread != null && executingThread.isAlive()) {
+ LOG.warn("Task " + getTaskName() + " did not react to cancelling signal. Sending repeated interrupt.");
+
+ if (LOG.isDebugEnabled()) {
+ StringBuilder bld = new StringBuilder("Task ").append(getTaskName()).append(" is stuck in method:\n");
+
+ StackTraceElement[] stack = executingThread.getStackTrace();
+ for (StackTraceElement e : stack) {
+ bld.append(e).append('\n');
+ }
+ LOG.debug(bld.toString());
+ }
+
+ executingThread.interrupt();
+
+ try {
+ executingThread.join(1000);
+ } catch (InterruptedException e) {}
+ }
+ }
/**
* Blocks until all output channels are closed.
@@ -459,7 +511,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
*/
private void waitForInputChannelsToBeClosed() throws IOException, InterruptedException {
// Wait for disconnection of all output gates
- while (true) {
+ while (!canceled) {
// Make sure, we leave this method with an InterruptedException when the task has been canceled
if (this.executionObserver.isCanceled()) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java
index 5070b51..4db5e14 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/protocols/JobManagerProtocol.java
@@ -51,7 +51,7 @@ public interface JobManagerProtocol extends VersionedProtocol {
* @return whether the task manager was successfully registered
*/
RegisterTaskManagerResult registerTaskManager(InstanceConnectionInfo instanceConnectionInfo,
- HardwareDescription hardwareDescription,IntegerRecord numberOfSlots)
+ HardwareDescription hardwareDescription, IntegerRecord numberOfSlots)
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
index 825eae1..d1a6275 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/Task.java
@@ -24,14 +24,14 @@ import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.profiling.TaskManagerProfiler;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-import eu.stratosphere.util.StringUtils;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
public final class Task implements ExecutionObserver {
@@ -49,13 +49,14 @@ public final class Task implements ExecutionObserver {
/**
* Stores whether the task has been canceled.
*/
- private volatile boolean isCanceled = false;
+ private final AtomicBoolean canceled = new AtomicBoolean(false);
/**
* The current execution state of the task
*/
private volatile ExecutionState executionState = ExecutionState.STARTING;
+
private Queue<ExecutionListener> registeredListeners = new ConcurrentLinkedQueue<ExecutionListener>();
public Task(ExecutionVertexID vertexID, final RuntimeEnvironment environment, TaskManager taskManager) {
@@ -102,11 +103,11 @@ public final class Task implements ExecutionObserver {
}
public void cancelExecution() {
- cancelOrKillExecution(true);
+ cancelOrKillExecution();
}
public void killExecution() {
- cancelOrKillExecution(false);
+ cancelOrKillExecution();
}
/**
@@ -114,10 +115,8 @@ public final class Task implements ExecutionObserver {
*
* @param cancel <code>true/code> if the task shall be canceled, <code>false</code> if it shall be killed
*/
- private void cancelOrKillExecution(boolean cancel) {
- final Thread executingThread = this.environment.getExecutingThread();
-
- if (executingThread == null) {
+ private void cancelOrKillExecution() {
+ if (!this.canceled.compareAndSet(false, true)) {
return;
}
@@ -125,45 +124,13 @@ public final class Task implements ExecutionObserver {
return;
}
- LOG.info((cancel ? "Canceling " : "Killing ") + this.environment.getTaskNameWithIndex());
-
- if (cancel) {
- this.isCanceled = true;
- // Change state
- executionStateChanged(ExecutionState.CANCELING, null);
-
- // Request user code to shut down
- try {
- final AbstractInvokable invokable = this.environment.getInvokable();
- if (invokable != null) {
- invokable.cancel();
- }
- } catch (Throwable e) {
- LOG.error(StringUtils.stringifyException(e));
- }
- }
-
- // Continuously interrupt the user thread until it changed to state CANCELED
- while (true) {
-
- executingThread.interrupt();
-
- if (!executingThread.isAlive()) {
- break;
- }
-
- try {
- executingThread.join(1000);
- } catch (InterruptedException e) {}
+ executionStateChanged(ExecutionState.CANCELING, null);
- if (!executingThread.isAlive()) {
- break;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sending repeated " + (cancel == true ? "canceling" : "killing") + " signal to " +
- this.environment.getTaskName() + " with state " + this.executionState);
- }
+ // Request user code to shut down
+ try {
+ this.environment.cancelExecution();
+ } catch (Throwable e) {
+ LOG.error("Error while cancelling the task.", e);
}
}
@@ -271,7 +238,6 @@ public final class Task implements ExecutionObserver {
* @return the name of the task associated with this observer object
*/
private String getTaskName() {
-
return this.environment.getTaskName() + " (" + (this.environment.getIndexInSubtaskGroup() + 1) + "/"
+ this.environment.getCurrentNumberOfSubtasks() + ")";
}
@@ -279,7 +245,6 @@ public final class Task implements ExecutionObserver {
@Override
public void userThreadStarted(final Thread userThread) {
-
// Notify the listeners
final Iterator<ExecutionListener> it = this.registeredListeners.iterator();
while (it.hasNext()) {
@@ -290,7 +255,6 @@ public final class Task implements ExecutionObserver {
@Override
public void userThreadFinished(final Thread userThread) {
-
// Notify the listeners
final Iterator<ExecutionListener> it = this.registeredListeners.iterator();
while (it.hasNext()) {
@@ -307,7 +271,6 @@ public final class Task implements ExecutionObserver {
*/
public void registerExecutionListener(final ExecutionListener executionListener) {
-
this.registeredListeners.add(executionListener);
}
@@ -320,15 +283,13 @@ public final class Task implements ExecutionObserver {
*/
public void unregisterExecutionListener(final ExecutionListener executionListener) {
-
this.registeredListeners.remove(executionListener);
}
@Override
public boolean isCanceled() {
-
- return this.isCanceled;
+ return this.canceled.get();
}
/**
@@ -337,7 +298,6 @@ public final class Task implements ExecutionObserver {
* @return the runtime environment associated with this task
*/
public RuntimeEnvironment getRuntimeEnvironment() {
-
return this.environment;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
index 5966cf9..f191df3 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
@@ -164,12 +164,15 @@ public class TaskManager implements TaskOperationProtocol {
private volatile boolean shutdownComplete;
/**
- * Constructs a new task manager, starts its IPC service and attempts to discover the job manager to
- * receive an initial configuration. All parameters are obtained from the
+ * All parameters are obtained from the
* {@link GlobalConfiguration}, which must be loaded prior to instantiating the task manager.
*/
public TaskManager(ExecutionMode executionMode) throws Exception {
-
+ if (executionMode == null) {
+ throw new NullPointerException("Execution mode must not be null.");
+ }
+
+
LOG.info("TaskManager started as user " + UserGroupInformation.getCurrentUser().getShortUserName());
LOG.info("User system property: " + System.getProperty("user.name"));
LOG.info("Execution mode: " + executionMode);
@@ -340,6 +343,7 @@ public class TaskManager implements TaskOperationProtocol {
{
HardwareDescription resources = HardwareDescriptionFactory.extractFromSystem();
+
numberOfSlots = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
Hardware.getNumberCPUCores());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java
index a8f1331..b491c12 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/instance/local/LocalInstanceManagerTest.java
@@ -13,21 +13,19 @@
package eu.stratosphere.nephele.instance.local;
-import static org.junit.Assert.fail;
-
import eu.stratosphere.nephele.instance.InstanceManager;
import junit.framework.Assert;
import org.junit.Test;
import eu.stratosphere.nephele.ExecutionMode;
+import eu.stratosphere.configuration.ConfigConstants;
+import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.jobmanager.JobManager;
-import eu.stratosphere.nephele.util.ServerTestUtils;
/**
* Tests for the {@link LocalInstanceManager}.
- *
*/
public class LocalInstanceManagerTest {
@@ -39,12 +37,13 @@ public class LocalInstanceManagerTest {
public void testInstanceTypeFromConfiguration() {
try {
- final String configDir = ServerTestUtils.getConfigDir();
- if (configDir == null) {
- fail("Cannot locate configuration directory");
- }
-
- GlobalConfiguration.loadConfiguration(configDir);
+ Configuration cfg = new Configuration();
+ cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1");
+ cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 6123);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+
+ GlobalConfiguration.includeConfiguration(cfg);
// start JobManager
ExecutionMode executionMode = ExecutionMode.LOCAL;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
index fa4fbfa..89f7428 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/jobmanager/JobManagerITCase.java
@@ -20,6 +20,7 @@ import eu.stratosphere.core.fs.Path;
import eu.stratosphere.nephele.ExecutionMode;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.client.JobExecutionException;
+import eu.stratosphere.nephele.execution.RuntimeEnvironment;
import eu.stratosphere.nephele.jobgraph.DistributionPattern;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobFileInputVertex;
@@ -34,6 +35,7 @@ import eu.stratosphere.nephele.util.FileLineWriter;
import eu.stratosphere.nephele.util.JarFileCreator;
import eu.stratosphere.nephele.util.ServerTestUtils;
import eu.stratosphere.util.LogUtils;
+
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.AfterClass;
@@ -57,7 +59,8 @@ import static org.junit.Assert.fail;
public class JobManagerITCase {
static {
- LogUtils.initializeDefaultTestConsoleLogger();
+ // no logging, because the tests create expected exception
+ LogUtils.initializeDefaultConsoleLogger(Level.INFO);
}
/**
@@ -75,7 +78,13 @@ public class JobManagerITCase {
@BeforeClass
public static void startNephele() {
try {
- GlobalConfiguration.loadConfiguration(ServerTestUtils.getConfigDir());
+ Configuration cfg = new Configuration();
+ cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1");
+ cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 6123);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
+ cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+
+ GlobalConfiguration.includeConfiguration(cfg);
configuration = GlobalConfiguration.getConfiguration(new String[] { ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY });
@@ -301,8 +310,9 @@ public class JobManagerITCase {
// deactivate logging of expected test exceptions
Logger rtLogger = Logger.getLogger(Task.class);
- Level rtLevel = rtLogger.getEffectiveLevel();
rtLogger.setLevel(Level.OFF);
+ Logger envLogger = Logger.getLogger(RuntimeEnvironment.class);
+ envLogger.setLevel(Level.DEBUG);
try {
jobClient.submitJobAndWait();
@@ -317,9 +327,6 @@ public class JobManagerITCase {
return;
}
- finally {
- rtLogger.setLevel(rtLevel);
- }
fail("Expected exception but did not receive it");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/ServerTestUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/ServerTestUtils.java b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/ServerTestUtils.java
index 4202880..59de8cc 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/ServerTestUtils.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/nephele/util/ServerTestUtils.java
@@ -38,27 +38,9 @@ import eu.stratosphere.nephele.protocols.ExtendedManagementProtocol;
public final class ServerTestUtils {
/**
- * The system property key to retrieve the user directory.
- */
- private static final String USER_DIR_KEY = "user.dir";
-
- /**
- * The directory containing the correct configuration file to be used during the tests.
- */
- private static final String CORRECT_CONF_DIR = "/confs/jobmanager";
-
- /**
- * The directory the configuration directory is expected in when test are executed using Eclipse.
- */
- private static final String ECLIPSE_PATH_EXTENSION = "/src/test/resources";
-
- private static final String INTELLIJ_PATH_EXTENSION = "/stratosphere-runtime/src/test/resources";
-
- /**
* Private constructor.
*/
- private ServerTestUtils() {
- }
+ private ServerTestUtils() {}
/**
* Creates a file with a random name in the given sub directory within the directory for temporary files. The
@@ -182,34 +164,6 @@ public final class ServerTestUtils {
}
/**
- * Returns the directory containing the configuration files that shall be used for the test.
- *
- * @return the directory containing the configuration files or <code>null</code> if the configuration directory
- * could not be located
- */
- public static String getConfigDir() {
-
- // This is the correct path for Maven-based tests
- String configDir = System.getProperty(USER_DIR_KEY) + CORRECT_CONF_DIR;
- if (new File(configDir).exists()) {
- return configDir;
- }
-
- configDir = System.getProperty(USER_DIR_KEY) + ECLIPSE_PATH_EXTENSION + CORRECT_CONF_DIR;
- if (new File(configDir).exists()) {
- return configDir;
- }
-
- configDir = System.getProperty(USER_DIR_KEY) + INTELLIJ_PATH_EXTENSION + CORRECT_CONF_DIR;
-
- if(new File(configDir).exists()){
- return configDir;
- }
-
- return null;
- }
-
- /**
* Waits until the job manager for the tests has become ready to accept jobs.
*
* @param jobManager
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
index 1ee9293..1c6270a 100644
--- a/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
+++ b/stratosphere-runtime/src/test/java/eu/stratosphere/runtime/io/network/netty/InboundEnvelopeDecoderTest.java
@@ -354,7 +354,7 @@ public class InboundEnvelopeDecoderTest {
buf.readerIndex(0);
ByteBuf[] slices = randomSlices(buf);
- ch.writeInbound((Object) slices);
+ ch.writeInbound((Object[]) slices);
for (ByteBuf slice : slices) {
Assert.assertEquals(1, slice.refCnt());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-runtime/src/test/resources/confs/jobmanager/nephele-default.xml
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/test/resources/confs/jobmanager/nephele-default.xml b/stratosphere-runtime/src/test/resources/confs/jobmanager/nephele-default.xml
deleted file mode 100644
index 5d93d95..0000000
--- a/stratosphere-runtime/src/test/resources/confs/jobmanager/nephele-default.xml
+++ /dev/null
@@ -1,51 +0,0 @@
-<?xml version="1.0"?>
-<configuration>
- <property>
- <key>jobmanager.rpc.address</key>
- <value>127.0.0.1</value>
- </property>
- <property>
- <key>jobmanager.rpc.port</key>
- <value>6123</value>
- </property>
- <property>
- <key>taskmanager.setup.usediscovery</key>
- <value>false</value>
- </property>
- <property>
- <key>discoveryservice.magicnumber</key>
- <value>12300</value>
- </property>
- <property>
- <key>instancemanager.local.classname</key>
- <value>eu.stratosphere.nephele.instance.local.LocalInstanceManager</value>
- </property>
- <property>
- <key>jobmanager.scheduler.local.classname</key>
- <value>eu.stratosphere.nephele.jobmanager.scheduler.local.LocalScheduler</value>
- </property>
- <property>
- <key>channel.network.compressor</key>
- <value>de.tu_berlin.cit.nephele.io.compression.lzo.LzoCompressor</value>
- </property>
- <property>
- <key>channel.network.decompressor</key>
- <value>de.tu_berlin.cit.nephele.io.compression.lzo.LzoDecompressor</value>
- </property>
- <property>
- <key>channel.file.compressor</key>
- <value>de.tu_berlin.cit.nephele.io.compression.lzo.LzoCompressor</value>
- </property>
- <property>
- <key>channel.file.decompressor</key>
- <value>de.tu_berlin.cit.nephele.io.compression.lzo.LzoDecompressor</value>
- </property>
- <property>
- <key>taskmanager.memory.size</key>
- <value>8</value>
- </property>
- <property>
- <key>instancemanager.local.type</key>
- <value>test,4,4,1024,160,0</value>
- </property>
-</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java
index 8b7dc80..cef9c05 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/SumMinMaxITCase.java
@@ -18,13 +18,14 @@ package eu.stratosphere.test.javaApiOperators;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
-import eu.stratosphere.api.java.aggregation.Aggregations;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.test.javaApiOperators.util.CollectionDataSets;
import eu.stratosphere.test.util.JavaProgramTestBase;
+
+import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.FileNotFoundException;
@@ -32,6 +33,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
+@RunWith(Parameterized.class)
public class SumMinMaxITCase extends JavaProgramTestBase {
private static int NUM_PROGRAMS = 3;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
index 0e297ec..17f4a29 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
@@ -14,7 +14,6 @@ package eu.stratosphere.test.localDistributed;
import java.io.File;
import java.io.FileWriter;
-import java.net.URL;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import org.junit.Assert;
@@ -29,23 +28,26 @@ import eu.stratosphere.util.LogUtils;
public class PackagedProgramEndToEndITCase {
- private static final int DOP = 4;
-
static {
LogUtils.initializeDefaultTestConsoleLogger();
}
@Test
public void testEverything() {
+ final int PORT = 6498;
+
NepheleMiniCluster cluster = new NepheleMiniCluster();
+
+ File points = null;
+ File clusters = null;
+ File outFile = null;
+
try {
// set up the files
- File points = File.createTempFile("kmeans_points", ".in");
- File clusters = File.createTempFile("kmeans_clusters", ".in");
- File outFile = File.createTempFile("kmeans_result", ".out");
- points.deleteOnExit();
- clusters.deleteOnExit();
- outFile.deleteOnExit();
+ points = File.createTempFile("kmeans_points", ".in");
+ clusters = File.createTempFile("kmeans_clusters", ".in");
+ outFile = File.createTempFile("kmeans_result", ".out");
+
outFile.delete();
FileWriter fwPoints = new FileWriter(points);
@@ -56,31 +58,39 @@ public class PackagedProgramEndToEndITCase {
fwClusters.write(KMeansData.INITIAL_CENTERS);
fwClusters.close();
- URL jarFileURL = getClass().getResource("/KMeansForTest.jar");
- String jarPath = jarFileURL.getFile();
+ String jarPath = "target/maven-test-jar.jar";
// run KMeans
cluster.setNumTaskTracker(2);
cluster.setTaskManagerNumSlots(2);
+ cluster.setJobManagerRpcPort(PORT);
cluster.start();
- RemoteExecutor ex = new RemoteExecutor("localhost", 6498);
+
+ RemoteExecutor ex = new RemoteExecutor("localhost", PORT);
ex.executeJar(jarPath,
- "eu.stratosphere.examples.scala.testing.KMeansForTest",
- new String[] {new Integer(DOP).toString(),
+ "eu.stratosphere.test.util.testjar.KMeansForTest",
+ new String[] {
points.toURI().toString(),
clusters.toURI().toString(),
outFile.toURI().toString(),
"25"});
- points.delete();
- clusters.delete();
- outFile.delete();
-
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
- } finally {
+ }
+ finally {
+ if (points != null) {
+ points.delete();
+ }
+ if (cluster != null) {
+ clusters.delete();
+ }
+ if (outFile != null) {
+ outFile.delete();
+ }
+
try {
cluster.stop();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/429493d0/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
index d1b249a..8047649 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
@@ -23,7 +23,6 @@ import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.IterativeDataSet;
-import eu.stratosphere.api.java.RemoteEnvironment;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.functions.ReduceFunction;
import eu.stratosphere.api.java.tuple.Tuple2;
@@ -31,25 +30,41 @@ import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.configuration.Configuration;
@SuppressWarnings("serial")
-public class KMeansForTest implements Program{
+public class KMeansForTest implements Program {
// *************************************************************************
// PROGRAM
// *************************************************************************
+
@Override
public Plan getPlan(String... args) {
- if(!parseParameters(args)) {
- throw new RuntimeException("Unable to parse the arguments");
+ if (args.length < 4) {
+ throw new IllegalArgumentException("Missing parameters");
}
-
- // set up execution environment
- ExecutionEnvironment env = new RemoteEnvironment("localhost", 1);
+
+ final String pointsPath = args[0];
+ final String centersPath = args[1];
+ final String outputPath = args[2];
+ final int numIterations = Integer.parseInt(args[3]);
+
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(4);
// get input data
- DataSet<Point> points = getPointDataSet(env);
- DataSet<Centroid> centroids = getCentroidDataSet(env);
+ DataSet<Point> points = env.readCsvFile(pointsPath)
+ .fieldDelimiter('|')
+ .includeFields(true, true)
+ .types(Double.class, Double.class)
+ .map(new TuplePointConverter());
+
+ DataSet<Centroid> centroids = env.readCsvFile(centersPath)
+ .fieldDelimiter('|')
+ .includeFields(true, true, true)
+ .types(Integer.class, Double.class, Double.class)
+ .map(new TupleCentroidConverter());
// set number of bulk iterations for KMeans algorithm
IterativeDataSet<Centroid> loop = centroids.iterate(numIterations);
@@ -71,11 +86,8 @@ public class KMeansForTest implements Program{
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
// emit result
- if(fileOutput) {
- clusteredPoints.writeAsCsv(outputPath, "\n", " ");
- } else {
- clusteredPoints.print();
- }
+ clusteredPoints.writeAsCsv(outputPath, "\n", " ");
+
return env.createProgramPlan();
}
@@ -229,66 +241,4 @@ public class KMeansForTest implements Program{
return new Centroid(value.f0, value.f1.div(value.f2));
}
}
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String pointsPath = null;
- private static String centersPath = null;
- private static String outputPath = null;
- private static int numIterations = 10;
-
- private static boolean parseParameters(String[] programArguments) {
-
- if(programArguments.length > 0) {
- // parse input arguments
- fileOutput = true;
- if(programArguments.length == 4) {
- pointsPath = programArguments[0];
- centersPath = programArguments[1];
- outputPath = programArguments[2];
- numIterations = Integer.parseInt(programArguments[3]);
- } else {
- System.err.println("Usage: KMeans <points path> <centers path> <result path> <num iterations>");
- return false;
- }
- } else {
- System.out.println("Executing K-Means example with default parameters and built-in default data.");
- System.out.println(" Provide parameters to read input data from files.");
- System.out.println(" See the documentation for the correct format of input files.");
- System.out.println(" We provide a data generator to create synthetic input files for this program.");
- System.out.println(" Usage: KMeans <points path> <centers path> <result path> <num iterations>");
- }
- return true;
- }
-
- private static DataSet<Point> getPointDataSet(ExecutionEnvironment env) {
- if(fileOutput) {
- // read points from CSV file
- return env.readCsvFile(pointsPath)
- .fieldDelimiter('|')
- .includeFields(true, true)
- .types(Double.class, Double.class)
- .map(new TuplePointConverter());
- } else {
- throw new UnsupportedOperationException("Use file output");
- }
- }
-
- private static DataSet<Centroid> getCentroidDataSet(ExecutionEnvironment env) {
- if(fileOutput) {
- return env.readCsvFile(centersPath)
- .fieldDelimiter('|')
- .includeFields(true, true, true)
- .types(Integer.class, Double.class, Double.class)
- .map(new TupleCentroidConverter());
- } else {
- throw new UnsupportedOperationException("Use file output");
- }
- }
-
-
-
}