You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/22 23:47:32 UTC
[11/22] git commit: Rework the Taskmanager to a slot based model and
remove legacy cloud code
Rework the Taskmanager to a slot based model and remove legacy cloud code
Squashed commit of the following:
- Post merge cleanup
- Renamed fractionMemory into memoryFraction.
- Removed Local and QueueScheduler and replaced it instead with an unified DefaultScheduler.
- Removed Local and ClusterManager and inserted instead an unified DefaultInstanceManager.
- Removed connection IDs from execution edges
- Removed InstanceType, InstanceRequestMap, InstanceTypeDescription, InstanceTypeDescriptionTypeFactory, PendingRequestsMap
- Fixed problems with test cases.
- introduced simple slot system for scheduling.
- Removed subtasks per instance
- Added registerTaskManager to the JobManager RPC calls. RegisterTaskManager is called only once where the hardware description information is sent.
Add: Merging cloudmodel remove with new network stack
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/86d206c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/86d206c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/86d206c4
Branch: refs/heads/master
Commit: 86d206c41922a1b7b8c2839b65d3568f9be55e0c
Parents: 7b6b5a2
Author: Till Rohrmann <ti...@gmail.com>
Authored: Sun Jun 1 16:03:27 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jun 22 21:07:10 2014 +0200
----------------------------------------------------------------------
.../api/avro/AvroExternalJarProgramITCase.java | 1 +
.../eu/stratosphere/client/LocalExecutor.java | 9 +
.../client/minicluster/NepheleMiniCluster.java | 23 +-
.../eu/stratosphere/client/program/Client.java | 5 +-
.../client/CliFrontendListCancelTest.java | 11 +-
.../stratosphere/client/testjar/WordCount.java | 3 +-
.../eu/stratosphere/compiler/PactCompiler.java | 473 +---------
.../compiler/costs/DefaultCostEstimator.java | 18 +-
.../dag/AbstractPartialSolutionNode.java | 5 -
.../compiler/dag/BinaryUnionNode.java | 48 +-
.../compiler/dag/BulkIterationNode.java | 20 +-
.../stratosphere/compiler/dag/DataSinkNode.java | 18 +-
.../compiler/dag/DataSourceNode.java | 16 -
.../compiler/dag/GroupReduceNode.java | 1 -
.../compiler/dag/OptimizerNode.java | 51 +-
.../stratosphere/compiler/dag/ReduceNode.java | 1 -
.../compiler/dag/SingleInputNode.java | 41 +-
.../stratosphere/compiler/dag/SinkJoiner.java | 1 -
.../stratosphere/compiler/dag/TwoInputNode.java | 56 +-
.../compiler/dag/WorksetIterationNode.java | 12 +-
.../RequestedGlobalProperties.java | 8 +-
.../AllGroupWithPartialPreGroupProperties.java | 3 +-
.../compiler/operators/AllReduceProperties.java | 3 +-
.../GroupReduceWithCombineProperties.java | 6 +-
.../operators/PartialGroupProperties.java | 6 +-
.../compiler/operators/ReduceProperties.java | 3 +-
.../eu/stratosphere/compiler/plan/Channel.java | 59 +-
.../eu/stratosphere/compiler/plan/PlanNode.java | 25 +-
.../plandump/PlanJSONDumpGenerator.java | 3 -
.../plantranslate/NepheleJobGraphGenerator.java | 80 +-
.../pact/compiler/CompilerTestBase.java | 24 +-
.../configuration/ConfigConstants.java | 12 +-
.../java/eu/stratosphere/util/ClassUtils.java | 1 +
.../event/job/VertexAssignmentEvent.java | 32 +-
.../nephele/executiongraph/ExecutionEdge.java | 9 -
.../nephele/executiongraph/ExecutionGraph.java | 181 ++--
.../executiongraph/ExecutionGroupVertex.java | 184 +---
.../nephele/executiongraph/ExecutionStage.java | 112 +--
.../nephele/executiongraph/ExecutionVertex.java | 1 -
.../executiongraph/InternalJobStatus.java | 1 +
.../executiongraph/ManagementGraphFactory.java | 7 +-
.../nephele/instance/AbstractInstance.java | 297 ------
.../nephele/instance/AllocatedResource.java | 38 +-
.../nephele/instance/AllocatedSlot.java | 65 ++
.../nephele/instance/AllocationID.java | 4 +-
.../instance/DefaultInstanceManager.java | 393 ++++++++
.../nephele/instance/DummyInstance.java | 14 +-
.../stratosphere/nephele/instance/Hardware.java | 24 +
.../stratosphere/nephele/instance/Instance.java | 362 +++++++
.../nephele/instance/InstanceManager.java | 145 +--
.../nephele/instance/InstanceNotifier.java | 71 ++
.../nephele/instance/InstanceRequestMap.java | 184 ----
.../nephele/instance/InstanceType.java | 199 ----
.../instance/InstanceTypeDescription.java | 137 ---
.../InstanceTypeDescriptionFactory.java | 46 -
.../nephele/instance/InstanceTypeFactory.java | 91 --
.../nephele/instance/LocalInstanceManager.java | 60 ++
.../instance/cluster/AllocatedSlice.java | 120 ---
.../instance/cluster/ClusterInstance.java | 181 ----
.../cluster/ClusterInstanceNotifier.java | 71 --
.../instance/cluster/ClusterManager.java | 945 -------------------
.../instance/cluster/PendingRequestsMap.java | 97 --
.../nephele/instance/local/LocalInstance.java | 37 -
.../instance/local/LocalInstanceManager.java | 418 --------
.../instance/local/LocalInstanceNotifier.java | 70 --
.../nephele/jobgraph/AbstractJobVertex.java | 100 +-
.../nephele/jobmanager/DeploymentManager.java | 8 +-
.../nephele/jobmanager/EventCollector.java | 10 +-
.../nephele/jobmanager/JobManager.java | 98 +-
.../nephele/jobmanager/JobManagerUtils.java | 54 +-
.../scheduler/AbstractExecutionListener.java | 166 ----
.../jobmanager/scheduler/AbstractScheduler.java | 662 -------------
.../scheduler/DefaultExecutionListener.java | 127 +++
.../jobmanager/scheduler/DefaultScheduler.java | 762 +++++++++++++++
.../jobmanager/scheduler/RecoveryLogic.java | 248 -----
.../scheduler/local/LocalExecutionListener.java | 33 -
.../scheduler/local/LocalScheduler.java | 213 -----
.../scheduler/queue/QueueExecutionListener.java | 40 -
.../scheduler/queue/QueueScheduler.java | 216 -----
.../splitassigner/InputSplitManager.java | 2 +-
.../LocatableInputSplitAssigner.java | 4 +-
.../splitassigner/LocatableInputSplitList.java | 20 +-
.../file/FileInputSplitAssigner.java | 4 +-
.../splitassigner/file/FileInputSplitList.java | 20 +-
.../managementgraph/ManagementGraph.java | 4 +-
.../managementgraph/ManagementVertex.java | 35 +-
.../eu/stratosphere/nephele/net/NetUtils.java | 2 +
.../profiling/impl/JobProfilingData.java | 6 +-
.../protocols/ExtendedManagementProtocol.java | 23 +-
.../nephele/protocols/JobManagerProtocol.java | 19 +-
.../services/iomanager/ChannelAccess.java | 1 +
.../services/memorymanager/MemoryManager.java | 17 +-
.../memorymanager/spi/DefaultMemoryManager.java | 39 +-
.../nephele/taskmanager/TaskManager.java | 123 ++-
.../RegisterTaskManagerResult.java | 50 +
.../nephele/topology/NetworkNode.java | 10 -
.../eu/stratosphere/nephele/util/IOUtils.java | 1 +
.../pact/runtime/cache/FileCache.java | 9 +-
.../hash/BuildFirstHashMatchIterator.java | 8 +-
.../BuildFirstReOpenableHashMatchIterator.java | 8 +-
.../hash/BuildSecondHashMatchIterator.java | 8 +-
.../pact/runtime/hash/InMemoryPartition.java | 2 +
.../iterative/task/IterationHeadPactTask.java | 5 +-
.../pact/runtime/shipping/ShipStrategyType.java | 23 +-
.../runtime/sort/AsynchronousPartialSorter.java | 11 +-
.../AsynchronousPartialSorterCollector.java | 7 +-
.../sort/CombiningUnilateralSortMerger.java | 18 +-
.../pact/runtime/sort/UnilateralSortMerger.java | 18 +-
.../AbstractCachedBuildSideMatchDriver.java | 2 +-
.../pact/runtime/task/CrossDriver.java | 3 +-
.../pact/runtime/task/DataSinkTask.java | 2 +-
.../runtime/task/GroupReduceCombineDriver.java | 4 +-
.../pact/runtime/task/MatchDriver.java | 38 +-
.../pact/runtime/task/ReduceCombineDriver.java | 3 +-
.../pact/runtime/task/RegularPactTask.java | 12 +-
.../SynchronousChainedCombineDriver.java | 2 +-
.../pact/runtime/task/util/TaskConfig.java | 68 +-
.../runtime/io/channels/InputChannel.java | 9 +-
.../runtime/io/gates/InputGate.java | 2 +
.../runtime/io/network/RemoteReceiver.java | 20 +-
.../nephele/event/job/ManagementEventTest.java | 4 +-
.../executiongraph/ExecutionGraphTest.java | 258 +----
.../instance/cluster/ClusterManagerTest.java | 273 ------
.../cluster/ClusterManagerTestUtils.java | 66 --
.../cluster/DefaultInstanceManagerTest.java | 232 +++++
.../DefaultInstanceManagerTestUtils.java | 66 ++
.../instance/cluster/HostInClusterTest.java | 130 ++-
.../cluster/PendingRequestsMapTest.java | 91 --
.../local/LocalInstanceManagerTest.java | 17 +-
.../nephele/jobmanager/JobManagerITCase.java | 16 +-
.../scheduler/queue/DefaultSchedulerTest.java | 185 ++++
.../scheduler/queue/QueueSchedulerTest.java | 186 ----
.../scheduler/queue/TestDeploymentManager.java | 4 +-
.../scheduler/queue/TestInstanceManager.java | 118 +--
.../managementgraph/ManagementGraphTest.java | 11 +-
.../services/iomanager/IOManagerITCase.java | 2 +-
.../IOManagerPerformanceBenchmark.java | 2 +-
.../services/iomanager/IOManagerTest.java | 2 +-
.../memorymanager/MemorySegmentTest.java | 2 +-
.../nephele/util/ServerTestUtils.java | 17 +-
.../runtime/hash/HashMatchIteratorITCase.java | 14 +-
.../pact/runtime/hash/HashTableITCase.java | 2 +-
.../runtime/hash/ReOpenableHashTableITCase.java | 4 +-
.../pact/runtime/io/ChannelViewsTest.java | 8 +-
.../pact/runtime/io/SpillingBufferTest.java | 2 +-
.../event/EventWithAggregatorsTest.java | 2 +
.../resettable/BlockResettableIteratorTest.java | 2 +-
...lockResettableMutableObjectIteratorTest.java | 2 +-
.../sort/AsynchonousPartialSorterITCase.java | 14 +-
.../CombiningUnilateralSortMergerITCase.java | 8 +-
.../pact/runtime/sort/ExternalSortITCase.java | 12 +-
.../sort/MassiveStringSortingITCase.java | 4 +-
.../sort/SortMergeMatchIteratorITCase.java | 2 +-
.../runtime/task/CombineTaskExternalITCase.java | 8 +-
.../pact/runtime/task/CombineTaskTest.java | 10 +-
.../runtime/task/CrossTaskExternalITCase.java | 7 +-
.../pact/runtime/task/CrossTaskTest.java | 36 +-
.../pact/runtime/task/DataSinkTaskTest.java | 47 +-
.../runtime/task/MatchTaskExternalITCase.java | 14 +-
.../pact/runtime/task/MatchTaskTest.java | 56 +-
.../runtime/task/ReduceTaskExternalITCase.java | 8 +-
.../pact/runtime/task/ReduceTaskTest.java | 3 +-
.../runtime/task/chaining/ChainTaskTest.java | 19 +-
.../task/drivers/ReduceCombineDriverTest.java | 10 +-
.../runtime/task/drivers/TestTaskContext.java | 2 +-
.../pact/runtime/test/util/DriverTestBase.java | 8 +-
.../pact/runtime/test/util/MockEnvironment.java | 9 +-
.../netty/InboundEnvelopeDecoderTest.java | 2 +-
.../test/compiler/util/CompilerTestBase.java | 26 +-
.../test/util/AbstractTestBase.java | 48 +-
.../test/util/JavaProgramTestBase.java | 2 +
.../test/util/RecordAPITestBase.java | 3 +
.../test/accumulators/AccumulatorITCase.java | 7 +-
.../BroadcastVarsNepheleITCase.java | 16 +-
.../KMeansIterativeNepheleITCase.java | 30 +-
.../test/cancelling/CancellingTestBase.java | 10 +-
.../test/cancelling/MapCancelingITCase.java | 13 +-
.../cancelling/MatchJoinCancelingITCase.java | 17 +-
.../clients/examples/LocalExecutorITCase.java | 10 +-
.../exampleJavaPrograms/WordCountITCase.java | 4 +-
.../ComputeEdgeDegreesITCase.java | 2 +-
.../ConnectedComponentsITCase.java | 2 +-
.../EnumTrianglesOnEdgesWithDegreesITCase.java | 2 +-
.../TransitiveClosureNaiveITCase.java | 2 +-
.../WebLogAnalysisITCase.java | 2 +-
.../exampleScalaPrograms/WordCountITCase.java | 2 +-
.../WordCountPactValueITCase.java | 2 +-
.../WordCountWithCountFunctionITCase.java | 2 +-
.../test/failingPrograms/TaskFailureITCase.java | 8 +-
.../CoGroupConnectedComponentsITCase.java | 6 +-
.../iterative/ConnectedComponentsITCase.java | 6 +-
...ectedComponentsWithDeferredUpdateITCase.java | 3 +-
...tedComponentsWithSolutionSetFirstITCase.java | 7 +-
.../test/iterative/DanglingPageRankITCase.java | 3 +-
.../test/iterative/DeltaPageRankITCase.java | 3 +-
.../DependencyConnectedComponentsITCase.java | 5 +-
...IterationTerminationWithTerminationTail.java | 6 +-
.../IterationTerminationWithTwoTails.java | 6 +-
.../IterationWithAllReducerITCase.java | 6 +-
.../iterative/IterationWithChainingITCase.java | 3 +-
.../iterative/IterationWithUnionITCase.java | 3 +-
.../test/iterative/IterativeKMeansITCase.java | 6 +-
.../test/iterative/KMeansITCase.java | 8 +-
.../test/iterative/LineRankITCase.java | 5 +-
.../test/iterative/PageRankITCase.java | 3 +-
.../ConnectedComponentsNepheleITCase.java | 54 +-
.../nephele/DanglingPageRankNepheleITCase.java | 7 +-
...nglingPageRankWithCombinerNepheleITCase.java | 7 +-
.../IterationWithChainingNepheleITCase.java | 17 +-
.../test/iterative/nephele/JobGraphUtils.java | 20 +-
.../CustomCompensatableDanglingPageRank.java | 57 +-
...mpensatableDanglingPageRankWithCombiner.java | 59 +-
.../CompensatableDanglingPageRank.java | 55 +-
.../PackagedProgramEndToEndITCase.java | 15 +-
.../test/operators/UnionSinkITCase.java | 3 +-
.../recordJobTests/CollectionSourceTest.java | 8 +-
.../ComputeEdgeDegreesITCase.java | 3 +-
.../EnumTrianglesOnEdgesWithDegreesITCase.java | 3 +-
.../recordJobTests/EnumTrianglesRDFITCase.java | 4 +-
.../recordJobTests/GlobalSortingITCase.java | 5 +-
.../GlobalSortingMixedOrderITCase.java | 62 +-
.../recordJobTests/GroupOrderReduceITCase.java | 3 +-
.../recordJobTests/MergeOnlyJoinITCase.java | 1 +
.../test/recordJobTests/PairwiseSPITCase.java | 4 +-
.../test/recordJobTests/TPCHQuery10ITCase.java | 2 +-
.../test/recordJobTests/TPCHQuery3ITCase.java | 3 +-
.../TPCHQuery3WithUnionITCase.java | 6 +-
.../test/recordJobTests/TPCHQuery4ITCase.java | 6 +-
.../test/recordJobTests/TPCHQuery9ITCase.java | 6 +-
.../recordJobTests/TPCHQueryAsterixITCase.java | 6 +-
.../test/recordJobTests/TeraSortITCase.java | 7 +-
.../recordJobTests/WebLogAnalysisITCase.java | 6 +-
.../test/recordJobTests/WordCountITCase.java | 6 +-
.../WordCountUnionReduceITCase.java | 6 +-
.../test/runtime/NetworkStackThroughput.java | 49 +-
235 files changed, 3917 insertions(+), 7900 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java
index a766fcb..e398acf 100644
--- a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java
+++ b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java
@@ -47,6 +47,7 @@ public class AvroExternalJarProgramITCase {
try {
testMiniCluster = new NepheleMiniCluster();
testMiniCluster.setJobManagerRpcPort(TEST_JM_PORT);
+ testMiniCluster.setTaskManagerNumSlots(4);
testMiniCluster.start();
String jarFile = JAR_FILE;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java
index 89f996a..b017220 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java
@@ -42,6 +42,8 @@ public class LocalExecutor extends PlanExecutor {
private static boolean DEFAULT_OVERWRITE = false;
+ private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
+
private final Object lock = new Object(); // we lock to ensure singleton execution
private NepheleMiniCluster nephele;
@@ -54,6 +56,8 @@ public class LocalExecutor extends PlanExecutor {
private int taskManagerDataPort = -1;
+ private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
+
private String configDir;
private String hdfsConfigFile;
@@ -129,6 +133,10 @@ public class LocalExecutor extends PlanExecutor {
public void setDefaultAlwaysCreateDirectory(boolean defaultAlwaysCreateDirectory) {
this.defaultAlwaysCreateDirectory = defaultAlwaysCreateDirectory;
}
+
+ public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
+
+ public int getTaskManagerNumSlots() { return this.taskManagerNumSlots; }
// --------------------------------------------------------------------------------------------
@@ -157,6 +165,7 @@ public class LocalExecutor extends PlanExecutor {
}
nephele.setDefaultOverwriteFiles(defaultOverwriteFiles);
nephele.setDefaultAlwaysCreateDirectory(defaultAlwaysCreateDirectory);
+ nephele.setTaskManagerNumSlots(taskManagerNumSlots);
// start it up
this.nephele.start();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java
index 79e5c64..4daca26 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java
@@ -46,6 +46,8 @@ public class NepheleMiniCluster {
private static final boolean DEFAULT_LAZY_MEMORY_ALLOCATION = true;
+ private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = -1;
+
// --------------------------------------------------------------------------------------------
private final Object startStopLock = new Object();
@@ -56,7 +58,9 @@ public class NepheleMiniCluster {
private int taskManagerDataPort = DEFAULT_TM_DATA_PORT;
- private int numTaskManager = DEFAULT_NUM_TASK_MANAGER;
+ private int numTaskTracker = DEFAULT_NUM_TASK_MANAGER;
+
+ private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
private long memorySize = DEFAULT_MEMORY_SIZE;
@@ -149,9 +153,13 @@ public class NepheleMiniCluster {
this.defaultAlwaysCreateDirectory = defaultAlwaysCreateDirectory;
}
- public void setNumTaskManager(int numTaskManager) { this.numTaskManager = numTaskManager; }
+ public void setNumTaskTracker(int numTaskTracker) { this.numTaskTracker = numTaskTracker; }
+
+ public int getNumTaskTracker() { return numTaskTracker; }
- public int getNumTaskManager() { return numTaskManager; }
+ public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
+
+ public int getTaskManagerNumSlots() { return taskManagerNumSlots; }
// ------------------------------------------------------------------------
// Life cycle and Job Submission
@@ -172,7 +180,7 @@ public class NepheleMiniCluster {
} else {
Configuration conf = getMiniclusterDefaultConfig(jobManagerRpcPort, taskManagerRpcPort,
taskManagerDataPort, memorySize, hdfsConfigFile, lazyMemoryAllocation, defaultOverwriteFiles,
- defaultAlwaysCreateDirectory, numTaskManager);
+ defaultAlwaysCreateDirectory, taskManagerNumSlots, numTaskTracker);
GlobalConfiguration.includeConfiguration(conf);
}
@@ -196,7 +204,7 @@ public class NepheleMiniCluster {
// start the job manager
jobManager = new JobManager(ExecutionMode.LOCAL);
- waitForJobManagerToBecomeReady(numTaskManager);
+ waitForJobManagerToBecomeReady(numTaskTracker);
}
}
@@ -236,7 +244,8 @@ public class NepheleMiniCluster {
public static Configuration getMiniclusterDefaultConfig(int jobManagerRpcPort, int taskManagerRpcPort,
int taskManagerDataPort, long memorySize, String hdfsConfigFile, boolean lazyMemory,
- boolean defaultOverwriteFiles, boolean defaultAlwaysCreateDirectory, int numTaskManager)
+ boolean defaultOverwriteFiles, boolean defaultAlwaysCreateDirectory,
+ int taskManagerNumSlots, int numTaskManager)
{
final Configuration config = new Configuration();
@@ -284,6 +293,8 @@ public class NepheleMiniCluster {
config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize/numTaskManager);
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManager);
+
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
return config;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
index 00790f4..31138f6 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
@@ -77,7 +77,7 @@ public class Client {
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getAddress().getHostAddress());
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
- this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator(), jobManagerAddress);
+ this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator());
// Disable Local Execution when using a Client
ContextEnvironment.disableLocalExecution();
@@ -104,8 +104,7 @@ public class Client {
throw new CompilerException("Cannot find port to job manager's RPC service in the global configuration.");
}
- final InetSocketAddress jobManagerAddress = new InetSocketAddress(address, port);
- this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator(), jobManagerAddress);
+ this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator());
// Disable Local Execution when using a Client
ContextEnvironment.disableLocalExecution();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendListCancelTest.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendListCancelTest.java
index 7ccd420..ba02fa9 100644
--- a/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendListCancelTest.java
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendListCancelTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.junit.Assert;
@@ -34,8 +33,6 @@ import eu.stratosphere.nephele.client.JobProgressResult;
import eu.stratosphere.nephele.client.JobSubmissionResult;
import eu.stratosphere.nephele.event.job.AbstractEvent;
import eu.stratosphere.nephele.event.job.RecentJobEvent;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.managementgraph.ManagementGraph;
@@ -202,18 +199,18 @@ public class CliFrontendListCancelTest {
}
@Override
- public Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes() throws IOException {
+ public void logBufferUtilization(JobID jobID) throws IOException {
throw new UnsupportedOperationException();
}
@Override
- public void logBufferUtilization(JobID jobID) throws IOException {
+ public NetworkTopology getNetworkTopology(JobID jobID) throws IOException {
throw new UnsupportedOperationException();
}
@Override
- public NetworkTopology getNetworkTopology(JobID jobID) throws IOException {
- throw new UnsupportedOperationException();
+ public int getAvailableSlots() {
+ return 1;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java
index e827805..5218dc2 100644
--- a/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java
@@ -70,9 +70,10 @@ public class WordCount {
* FlatMapFunction. The function takes a line (String) and splits it into
* multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
*/
- @SuppressWarnings("serial")
public static final class Tokenizer extends FlatMapFunction<String, Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
index 2076902..bf3d6af 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java
@@ -13,8 +13,6 @@
package eu.stratosphere.compiler;
-import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
@@ -90,11 +88,6 @@ import eu.stratosphere.compiler.postpass.OptimizerPostPass;
import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.nephele.instance.InstanceType;
-import eu.stratosphere.nephele.instance.InstanceTypeDescription;
-import eu.stratosphere.nephele.ipc.RPC;
-import eu.stratosphere.nephele.net.NetUtils;
-import eu.stratosphere.nephele.protocols.ExtendedManagementProtocol;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.util.InstantiationUtil;
@@ -340,24 +333,10 @@ public class PactCompiler {
private final CostEstimator costEstimator;
/**
- * The connection used to connect to the job-manager.
- */
- private final InetSocketAddress jobManagerAddress;
-
- /**
- * The maximum number of machines (instances) to use, per the configuration.
- */
- private int maxMachines;
-
- /**
* The default degree of parallelism for jobs compiled by this compiler.
*/
private int defaultDegreeOfParallelism;
- /**
- * The maximum number of subtasks that should share an instance.
- */
- private int maxIntraNodeParallelism;
// ------------------------------------------------------------------------
// Constructor & Setup
@@ -420,106 +399,29 @@ public class PactCompiler {
* The <tt>CostEstimator</tt> to use to cost the individual operations.
*/
public PactCompiler(DataStatistics stats, CostEstimator estimator) {
- this(stats, estimator, null);
- }
-
- /**
- * Creates a new compiler instance that uses the statistics object to determine properties about the input.
- * Given those statistics, the compiler can make better choices for the execution strategies.
- * as if no filesystem was given. It uses the given cost estimator to compute the costs of the individual
- * operations.
- * <p>
- * The given socket-address is used to connect to the job manager to obtain system characteristics, like available
- * memory. If that parameter is null, then the address is obtained from the global configuration.
- *
- * @param stats
- * The statistics to be used to determine the input properties.
- * @param estimator
- * The <tt>CostEstimator</tt> to use to cost the individual operations.
- * @param jobManagerConnection
- * The address of the job manager that is queried for system characteristics.
- */
- public PactCompiler(DataStatistics stats, CostEstimator estimator, InetSocketAddress jobManagerConnection) {
this.statistics = stats;
this.costEstimator = estimator;
Configuration config = GlobalConfiguration.getConfiguration();
- // determine the maximum number of instances to use
- this.maxMachines = -1;
-
// determine the default parallelization degree
this.defaultDegreeOfParallelism = config.getInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
-
- // determine the default intra-node parallelism
- int maxInNodePar = config.getInteger(ConfigConstants.PARALLELIZATION_MAX_INTRA_NODE_DEGREE_KEY,
- ConfigConstants.DEFAULT_MAX_INTRA_NODE_PARALLELIZATION_DEGREE);
- if (maxInNodePar == 0 || maxInNodePar < -1) {
- LOG.error("Invalid maximum degree of intra-node parallelism: " + maxInNodePar +
- ". Ignoring parameter.");
- maxInNodePar = ConfigConstants.DEFAULT_MAX_INTRA_NODE_PARALLELIZATION_DEGREE;
- }
- this.maxIntraNodeParallelism = maxInNodePar;
-
- // assign the connection to the job-manager
- if (jobManagerConnection != null) {
- this.jobManagerAddress = jobManagerConnection;
- } else {
- final String address = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
- if (address == null) {
- throw new CompilerException(
- "Cannot find address to job manager's RPC service in the global configuration.");
- }
-
- final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
- if (port < 0) {
- throw new CompilerException(
- "Cannot find port to job manager's RPC service in the global configuration.");
- }
-
- this.jobManagerAddress = new InetSocketAddress(address, port);
- }
}
// ------------------------------------------------------------------------
// Getters / Setters
// ------------------------------------------------------------------------
- public int getMaxMachines() {
- return maxMachines;
- }
-
- public void setMaxMachines(int maxMachines) {
- if (maxMachines == -1 || maxMachines > 0) {
- this.maxMachines = maxMachines;
- } else {
- throw new IllegalArgumentException();
- }
- }
-
public int getDefaultDegreeOfParallelism() {
return defaultDegreeOfParallelism;
}
public void setDefaultDegreeOfParallelism(int defaultDegreeOfParallelism) {
- if (defaultDegreeOfParallelism == -1 || defaultDegreeOfParallelism > 0) {
+ if (defaultDegreeOfParallelism > 0) {
this.defaultDegreeOfParallelism = defaultDegreeOfParallelism;
} else {
- throw new IllegalArgumentException();
- }
- }
-
- public int getMaxIntraNodeParallelism() {
- return maxIntraNodeParallelism;
- }
-
- public void setMaxIntraNodeParallelism(int maxIntraNodeParallelism) {
- if (maxIntraNodeParallelism == -1 || maxIntraNodeParallelism > 0) {
- this.maxIntraNodeParallelism = maxIntraNodeParallelism;
- } else {
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException("Default parallelism cannot be zero or negative.");
}
}
@@ -550,14 +452,9 @@ public class PactCompiler {
// -------------------- try to get the connection to the job manager ----------------------
// --------------------------to obtain instance information --------------------------------
final OptimizerPostPass postPasser = getPostPassFromPlan(program);
- return compile(program, getInstanceTypeInfo(), postPasser);
- }
-
- public OptimizedPlan compile(Plan program, InstanceTypeDescription type) throws CompilerException {
- final OptimizerPostPass postPasser = getPostPassFromPlan(program);
- return compile(program, type, postPasser);
+ return compile(program, postPasser);
}
-
+
/**
* Translates the given pact plan in to an OptimizedPlan, where all nodes have their local strategy assigned
* and all channels have a shipping strategy assigned. The process goes through several phases:
@@ -569,8 +466,6 @@ public class PactCompiler {
* </ol>
*
* @param program The program to be translated.
- * @param type The instance type to schedule the execution on. Used also to determine the amount of memory
- * available to the tasks.
* @param postPasser The function to be used for post passing the optimizer's plan and setting the
* data type specific serialization routines.
* @return The optimized plan.
@@ -579,8 +474,8 @@ public class PactCompiler {
* Thrown, if the plan is invalid or the optimizer encountered an inconsistent
* situation during the compilation process.
*/
- private OptimizedPlan compile(Plan program, InstanceTypeDescription type, OptimizerPostPass postPasser) throws CompilerException {
- if (program == null || type == null || postPasser == null) {
+ private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws CompilerException {
+ if (program == null || postPasser == null) {
throw new NullPointerException();
}
@@ -588,73 +483,14 @@ public class PactCompiler {
if (LOG.isDebugEnabled()) {
LOG.debug("Beginning compilation of program '" + program.getJobName() + '\'');
}
-
- final String instanceName = type.getInstanceType().getIdentifier();
-
- // we subtract some percentage of the memory to accommodate for rounding errors
- final long memoryPerInstance = (long) (type.getHardwareDescription().getSizeOfFreeMemory() * 0.96f);
- final int numInstances = type.getMaximumNumberOfAvailableInstances();
-
- // determine the maximum number of machines to use
- int maxMachinesJob = program.getMaxNumberMachines();
-
- if (maxMachinesJob < 1) {
- maxMachinesJob = this.maxMachines;
- } else if (this.maxMachines >= 1) {
- // check if the program requested more than the global config allowed
- if (maxMachinesJob > this.maxMachines && LOG.isWarnEnabled()) {
- LOG.warn("Maximal number of machines specified in program (" + maxMachinesJob
- + ") exceeds the maximum number in the global configuration (" + this.maxMachines
- + "). Using the global configuration value.");
- }
-
- maxMachinesJob = Math.min(maxMachinesJob, this.maxMachines);
- }
-
- // adjust the maximum number of machines the the number of available instances
- if (maxMachinesJob < 1) {
- maxMachinesJob = numInstances;
- } else if (maxMachinesJob > numInstances) {
- maxMachinesJob = numInstances;
- if (LOG.isInfoEnabled()) {
- LOG.info("Maximal number of machines decreased to " + maxMachinesJob +
- " because no more instances are available.");
- }
- }
// set the default degree of parallelism
int defaultParallelism = program.getDefaultParallelism() > 0 ?
program.getDefaultParallelism() : this.defaultDegreeOfParallelism;
-
- if (this.maxIntraNodeParallelism > 0) {
- if (defaultParallelism < 1) {
- defaultParallelism = maxMachinesJob * this.maxIntraNodeParallelism;
- }
- else if (defaultParallelism > maxMachinesJob * this.maxIntraNodeParallelism) {
- int oldParallelism = defaultParallelism;
- defaultParallelism = maxMachinesJob * this.maxIntraNodeParallelism;
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Decreasing default degree of parallelism from " + oldParallelism +
- " to " + defaultParallelism + " to fit a maximum number of " + maxMachinesJob +
- " instances with a intra-parallelism of " + this.maxIntraNodeParallelism);
- }
- }
- } else if (defaultParallelism < 1) {
- defaultParallelism = maxMachinesJob;
- if (LOG.isInfoEnabled()) {
- LOG.info("No default parallelism specified. Using default parallelism of " + defaultParallelism + " (One task per instance)");
- }
- }
// log the output
if (LOG.isDebugEnabled()) {
- LOG.debug("Using a default degree of parallelism of " + defaultParallelism +
- ", a maximum intra-node parallelism of " + this.maxIntraNodeParallelism + '.');
- if (this.maxMachines > 0) {
- LOG.debug("The execution is limited to a maximum number of " + maxMachinesJob + " machines.");
- }
-
+ LOG.debug("Using a default degree of parallelism of " + defaultParallelism + '.');
}
// the first step in the compilation is to create the optimizer plan representation
@@ -666,7 +502,7 @@ public class PactCompiler {
// 4) It makes estimates about the data volume of the data sources and
// propagates those estimates through the plan
- GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(maxMachinesJob, defaultParallelism);
+ GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism);
program.accept(graphCreator);
// if we have a plan with multiple data sinks, add logical optimizer nodes that have two data-sinks as children
@@ -689,8 +525,7 @@ public class PactCompiler {
// now that we have all nodes created and recorded which ones consume memory, tell the nodes their minimal
// guaranteed memory, for further cost estimations. we assume an equal distribution of memory among consumer tasks
- rootNode.accept(new IdAndMemoryAndEstimatesVisitor(this.statistics,
- graphCreator.getMemoryConsumerCount() == 0 ? 0 : memoryPerInstance / graphCreator.getMemoryConsumerCount()));
+ rootNode.accept(new IdAndEstimatesVisitor(this.statistics));
// Now that the previous step is done, the next step is to traverse the graph again for the two
// steps that cannot directly be performed during the plan enumeration, because we are dealing with DAGs
@@ -733,9 +568,8 @@ public class PactCompiler {
dp.resolveDeadlocks(bestPlanSinks);
// finalize the plan
- OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program, memoryPerInstance);
- plan.setInstanceTypeName(instanceName);
-
+ OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
+
// swap the binary unions for n-ary unions. this changes no strategies or memory consumers whatsoever, so
// we can do this after the plan finalization
plan.accept(new BinaryUnionReplacer());
@@ -755,7 +589,7 @@ public class PactCompiler {
* from the plan can be traversed.
*/
public static List<DataSinkNode> createPreOptimizedPlan(Plan program) {
- GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(-1, 1);
+ GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(1);
program.accept(graphCreator);
return graphCreator.sinks;
}
@@ -783,22 +617,18 @@ public class PactCompiler {
private final List<DataSinkNode> sinks; // all data sink nodes in the optimizer plan
- private final int maxMachines; // the maximum number of machines to use
-
private final int defaultParallelism; // the default degree of parallelism
- private int numMemoryConsumers;
-
private final GraphCreatingVisitor parent; // reference to enclosing creator, in case of a recursive translation
private final boolean forceDOP;
- private GraphCreatingVisitor(int maxMachines, int defaultParallelism) {
- this(null, false, maxMachines, defaultParallelism, null);
+ private GraphCreatingVisitor(int defaultParallelism) {
+ this(null, false, defaultParallelism, null);
}
- private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP, int maxMachines,
+ private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP,
int defaultParallelism, HashMap<Operator<?>, OptimizerNode> closure) {
if (closure == null){
con2node = new HashMap<Operator<?>, OptimizerNode>();
@@ -807,7 +637,6 @@ public class PactCompiler {
}
this.sources = new ArrayList<DataSourceNode>(4);
this.sinks = new ArrayList<DataSinkNode>(2);
- this.maxMachines = maxMachines;
this.defaultParallelism = defaultParallelism;
this.parent = parent;
this.forceDOP = forceDOP;
@@ -878,7 +707,6 @@ public class PactCompiler {
// catch this for the recursive translation of step functions
BulkPartialSolutionNode p = new BulkPartialSolutionNode(holder, containingIterationNode);
p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
- p.setSubtasksPerInstance(containingIterationNode.getSubtasksPerInstance());
n = p;
}
else if (c instanceof WorksetPlaceHolder) {
@@ -890,7 +718,6 @@ public class PactCompiler {
// catch this for the recursive translation of step functions
WorksetNode p = new WorksetNode(holder, containingIterationNode);
p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
- p.setSubtasksPerInstance(containingIterationNode.getSubtasksPerInstance());
n = p;
}
else if (c instanceof SolutionSetPlaceHolder) {
@@ -902,18 +729,14 @@ public class PactCompiler {
// catch this for the recursive translation of step functions
SolutionSetNode p = new SolutionSetNode(holder, containingIterationNode);
p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
- p.setSubtasksPerInstance(containingIterationNode.getSubtasksPerInstance());
n = p;
}
else {
- throw new IllegalArgumentException("Unknown operator type: " + c.getClass() + " " + c);
+ throw new IllegalArgumentException("Unknown operator type: " + c);
}
this.con2node.put(c, n);
- // record the potential memory consumption
- this.numMemoryConsumers += n.isMemoryConsumer() ? 1 : 0;
-
// set the parallelism only if it has not been set before. some nodes have a fixed DOP, such as the
// key-less reducer (all-reduce)
if (n.getDegreeOfParallelism() < 1) {
@@ -931,19 +754,6 @@ public class PactCompiler {
n.setDegreeOfParallelism(par);
}
- // check if we need to set the instance sharing accordingly such that
- // the maximum number of machines is not exceeded
- if (n.getSubtasksPerInstance() < 1) {
- int tasksPerInstance = 1;
- if (this.maxMachines > 0) {
- int p = n.getDegreeOfParallelism();
- tasksPerInstance = (p / this.maxMachines) + (p % this.maxMachines == 0 ? 0 : 1);
- }
-
- // we group together n tasks per machine, depending on config and the above computed
- // value required to obey the maximum number of machines
- n.setSubtasksPerInstance(tasksPerInstance);
- }
return true;
}
@@ -966,7 +776,7 @@ public class PactCompiler {
// first, recursively build the data flow for the step function
final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true,
- this.maxMachines, iterNode.getDegreeOfParallelism(), closure);
+ iterNode.getDegreeOfParallelism(), closure);
BulkPartialSolutionNode partialSolution = null;
@@ -994,9 +804,6 @@ public class PactCompiler {
iterNode.setNextPartialSolution(rootOfStepFunction, terminationCriterion);
iterNode.setPartialSolution(partialSolution);
- // account for the nested memory consumers
- this.numMemoryConsumers += recursiveCreator.numMemoryConsumers;
-
// go over the contained data flow and mark the dynamic path nodes
StaticDynamicPathIdentifier identifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
rootOfStepFunction.accept(identifier);
@@ -1013,7 +820,7 @@ public class PactCompiler {
// first, recursively build the data flow for the step function
final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true,
- this.maxMachines, iterNode.getDegreeOfParallelism(), closure);
+ iterNode.getDegreeOfParallelism(), closure);
// descend from the solution set delta. check that it depends on both the workset
// and the solution set. If it does depend on both, this descend should create both nodes
iter.getSolutionSetDelta().accept(recursiveCreator);
@@ -1067,19 +874,12 @@ public class PactCompiler {
iterNode.setPartialSolution(solutionSetNode, worksetNode);
iterNode.setNextPartialSolution(solutionSetDeltaNode, nextWorksetNode);
- // account for the nested memory consumers
- this.numMemoryConsumers += recursiveCreator.numMemoryConsumers;
-
// go over the contained data flow and mark the dynamic path nodes
StaticDynamicPathIdentifier pathIdentifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
nextWorksetNode.accept(pathIdentifier);
iterNode.getSolutionSetDelta().accept(pathIdentifier);
}
}
-
- int getMemoryConsumerCount() {
- return this.numMemoryConsumers;
- }
};
private static final class StaticDynamicPathIdentifier implements Visitor<OptimizerNode> {
@@ -1107,17 +907,14 @@ public class PactCompiler {
* Simple visitor that sets the minimal guaranteed memory per task based on the amount of available memory,
* the number of memory consumers, and on the task's degree of parallelism.
*/
- private static final class IdAndMemoryAndEstimatesVisitor implements Visitor<OptimizerNode> {
+ private static final class IdAndEstimatesVisitor implements Visitor<OptimizerNode> {
private final DataStatistics statistics;
-
- private final long memoryPerTaskPerInstance;
-
+
private int id = 1;
- private IdAndMemoryAndEstimatesVisitor(DataStatistics statistics, long memoryPerTaskPerInstance) {
+ private IdAndEstimatesVisitor(DataStatistics statistics) {
this.statistics = statistics;
- this.memoryPerTaskPerInstance = memoryPerTaskPerInstance;
}
@@ -1128,11 +925,6 @@ public class PactCompiler {
return false;
}
- // assign minimum memory share, for lower bound estimates
- final long mem = visitable.isMemoryConsumer() ?
- this.memoryPerTaskPerInstance / visitable.getSubtasksPerInstance() : 0;
- visitable.setMinimalMemoryPerSubTask(mem);
-
return true;
}
@@ -1234,8 +1026,6 @@ public class PactCompiler {
private final Deque<IterationPlanNode> stackOfIterationNodes;
- private long memoryPerInstance; // the amount of memory per instance
-
private int memoryConsumerWeights; // a counter of all memory consumers
/**
@@ -1248,12 +1038,7 @@ public class PactCompiler {
this.stackOfIterationNodes = new ArrayDeque<IterationPlanNode>();
}
- private OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName, Plan originalPlan, long memPerInstance) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Available memory per instance: " + memPerInstance);
- }
-
- this.memoryPerInstance = memPerInstance;
+ private OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName, Plan originalPlan) {
this.memoryConsumerWeights = 0;
// traverse the graph
@@ -1263,44 +1048,36 @@ public class PactCompiler {
// assign the memory to each node
if (this.memoryConsumerWeights > 0) {
- final long memoryPerInstanceAndWeight = this.memoryPerInstance / this.memoryConsumerWeights;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Memory per consumer weight: " + memoryPerInstanceAndWeight);
- }
-
for (PlanNode node : this.allNodes) {
// assign memory to the driver strategy of the node
final int consumerWeight = node.getMemoryConsumerWeight();
if (consumerWeight > 0) {
- final long mem = memoryPerInstanceAndWeight * consumerWeight / node.getSubtasksPerInstance();
- node.setMemoryPerSubTask(mem);
+ final double relativeMem = (double)consumerWeight / this.memoryConsumerWeights;
+ node.setRelativeMemoryPerSubtask(relativeMem);
if (LOG.isDebugEnabled()) {
- final long mib = mem >> 20;
- LOG.debug("Assigned " + mib + " MiBytes memory to each subtask of " +
- node.getPactContract().getName() + " (" + mib * node.getDegreeOfParallelism() +
- " MiBytes total.)");
+ LOG.debug("Assigned " + relativeMem + " of total memory to each subtask of " +
+ node.getPactContract().getName() + ".");
}
}
// assign memory to the local and global strategies of the channels
for (Channel c : node.getInputs()) {
if (c.getLocalStrategy().dams()) {
- final long mem = memoryPerInstanceAndWeight / node.getSubtasksPerInstance();
- c.setMemoryLocalStrategy(mem);
+ final double relativeMem = 1.0 / this.memoryConsumerWeights;
+ c.setRelativeMemoryLocalStrategy(relativeMem);
if (LOG.isDebugEnabled()) {
- final long mib = mem >> 20;
- LOG.debug("Assigned " + mib + " MiBytes memory to each local strategy instance of " +
- c + " (" + mib * node.getDegreeOfParallelism() + " MiBytes total.)");
+ LOG.debug("Assigned " + relativeMem + " of total memory to each local strategy " +
+ "instance of " + c + ".");
}
}
if (c.getTempMode() != TempMode.NONE) {
- final long mem = memoryPerInstanceAndWeight / node.getSubtasksPerInstance();
- c.setTempMemory(mem);
+ final double relativeMem = 1.0/ this.memoryConsumerWeights;
+ c.setRelativeTempMemory(relativeMem);
if (LOG.isDebugEnabled()) {
- final long mib = mem >> 20;
- LOG.debug("Assigned " + mib + " MiBytes memory to each instance of the temp table for " +
- c + " (" + mib * node.getDegreeOfParallelism() + " MiBytes total.)");
+ LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the temp " +
+ "table" +
+ " " +
+ "for " + c + ".");
}
}
}
@@ -1525,182 +1302,4 @@ public class PactCompiler {
throw new CompilerException("Class '" + className + "' is not an optimizer post passer.", ccex);
}
}
-
- private InstanceTypeDescription getInstanceTypeInfo() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connecting compiler to JobManager to dertermine instance information.");
- }
-
- // create the connection in a separate thread, such that this thread
- // can abort, if an unsuccessful connection occurs.
- Map<InstanceType, InstanceTypeDescription> instances = null;
-
- JobManagerConnector jmc = new JobManagerConnector(this.jobManagerAddress);
- Thread connectorThread = new Thread(jmc, "Compiler - JobManager connector.");
- connectorThread.setDaemon(true);
- connectorThread.start();
-
- // connect and get the result
- try {
- jmc.waitForCompletion();
- instances = jmc.instances;
- if (instances == null) {
- throw new NullPointerException("Returned instance map is <null>");
- }
- }
- catch (IOException e) {
- throw new CompilerException(e.getMessage());
- }
- catch (Throwable t) {
- throw new CompilerException("Cannot connect to the JobManager to determine the available TaskManagers. "
- + "Check if the JobManager is running (using the web interface or log files). Reason: " +
- t.getMessage(), t);
- }
-
- // determine which type to run on
- return getType(instances);
- }
-
- /**
- * This utility method picks the instance type to be used for executing programs.
- * <p>
- *
- * @param types The available types.
- * @return The type to be used for scheduling.
- *
- * @throws CompilerException
- * @throws IllegalArgumentException
- */
- private InstanceTypeDescription getType(Map<InstanceType, InstanceTypeDescription> types)
- throws CompilerException
- {
- if (types == null || types.size() < 1) {
- throw new IllegalArgumentException("No instance type found.");
- }
-
- InstanceTypeDescription retValue = null;
- long totalMemory = 0;
- int numInstances = 0;
-
- final Iterator<InstanceTypeDescription> it = types.values().iterator();
- while(it.hasNext())
- {
- final InstanceTypeDescription descr = it.next();
-
- // skip instances for which no hardware description is available
- // this means typically that no
- if (descr.getHardwareDescription() == null || descr.getInstanceType() == null) {
- continue;
- }
-
- final int curInstances = descr.getMaximumNumberOfAvailableInstances();
- final long curMemory = curInstances * descr.getHardwareDescription().getSizeOfFreeMemory();
-
- // get, if first, or if it has more instances and not less memory, or if it has significantly more memory
- // and the same number of cores still
- if ( (retValue == null) ||
- (curInstances > numInstances && (int) (curMemory * 1.2f) > totalMemory) ||
- (curInstances * retValue.getInstanceType().getNumberOfCores() >= numInstances &&
- (int) (curMemory * 1.5f) > totalMemory)
- )
- {
- retValue = descr;
- numInstances = curInstances;
- totalMemory = curMemory;
- }
- }
-
- if (retValue == null) {
- throw new CompilerException("No instance currently registered at the job-manager. Retry later.\n" +
- "If the system has recently started, it may take a few seconds until the instances register.");
- }
-
- return retValue;
- }
-
- /**
- * Utility class for an asynchronous connection to the job manager to determine the available instances.
- */
- private static final class JobManagerConnector implements Runnable {
-
- private static final long MAX_MILLIS_TO_WAIT = 10000;
-
- private final InetSocketAddress jobManagerAddress;
-
- private final Object lock = new Object();
-
- private volatile Map<InstanceType, InstanceTypeDescription> instances;
-
- private volatile Throwable error;
-
-
- private JobManagerConnector(InetSocketAddress jobManagerAddress) {
- this.jobManagerAddress = jobManagerAddress;
- }
-
-
- public Map<InstanceType, InstanceTypeDescription> waitForCompletion() throws Throwable {
- long start = System.currentTimeMillis();
- long remaining = MAX_MILLIS_TO_WAIT;
-
- if (this.error != null) {
- throw this.error;
- }
- if (this.instances != null) {
- return this.instances;
- }
-
- do {
- try {
- synchronized (this.lock) {
- this.lock.wait(remaining);
- }
- } catch (InterruptedException iex) {}
- }
- while (this.error == null && this.instances == null &&
- (remaining = MAX_MILLIS_TO_WAIT + start - System.currentTimeMillis()) > 0);
-
- if (this.error != null) {
- throw this.error;
- }
- if (this.instances != null) {
- return this.instances;
- }
-
- throw new IOException("Could not connect to the JobManager at " + jobManagerAddress +
- ". Please make sure that the Job Manager is started properly.");
- }
-
-
- @Override
- public void run() {
- ExtendedManagementProtocol jobManagerConnection = null;
-
- try {
- jobManagerConnection = RPC.getProxy(ExtendedManagementProtocol.class,
- this.jobManagerAddress, NetUtils.getSocketFactory());
-
- this.instances = jobManagerConnection.getMapOfAvailableInstanceTypes();
- if (this.instances == null) {
- throw new IOException("Returned instance map was <null>");
- }
- } catch (Throwable t) {
- this.error = t;
- } finally {
- // first of all, signal completion
- synchronized (this.lock) {
- this.lock.notifyAll();
- }
-
- if (jobManagerConnection != null) {
- try {
- RPC.stopProxy(jobManagerConnection);
- } catch (Throwable t) {
- LOG.error("Could not cleanly shut down connection from compiler to job manager,", t);
- }
- }
- jobManagerConnection = null;
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
index 058af1a..fde5970 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
@@ -95,14 +95,20 @@ public class DefaultCostEstimator extends CostEstimator {
@Override
public void addBroadcastCost(EstimateProvider estimates, int replicationFactor, Costs costs) {
- // assumption: we need ship the whole data over the network to each node.
- final long estOutShipSize = estimates.getEstimatedOutputSize();
- if (estOutShipSize <= 0) {
- costs.setNetworkCost(Costs.UNKNOWN);
+ // if our replication factor is negative, we cannot calculate broadcast costs
+
+ if (replicationFactor > 0) {
+ // assumption: we need ship the whole data over the network to each node.
+ final long estOutShipSize = estimates.getEstimatedOutputSize();
+ if (estOutShipSize <= 0) {
+ costs.setNetworkCost(Costs.UNKNOWN);
+ } else {
+ costs.addNetworkCost(replicationFactor * estOutShipSize);
+ }
+ costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * replicationFactor);
} else {
- costs.addNetworkCost(replicationFactor * estOutShipSize);
+ costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 200);
}
- costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * replicationFactor * 100);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java
index 8fd6f79..2f7cb2b 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java
@@ -42,11 +42,6 @@ public abstract class AbstractPartialSolutionNode extends OptimizerNode {
public abstract IterationNode getIterationNode();
// --------------------------------------------------------------------------------------------
-
- @Override
- public boolean isMemoryConsumer() {
- return false;
- }
public boolean isOnDynamicPath() {
return true;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java
index 70752b5..50ec01b 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java
@@ -122,20 +122,12 @@ public class BinaryUnionNode extends TwoInputNode {
final RequestedLocalProperties noLocalProps = new RequestedLocalProperties();
final int dop = getDegreeOfParallelism();
- final int subPerInstance = getSubtasksPerInstance();
- final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1);
final int inDop1 = getFirstPredecessorNode().getDegreeOfParallelism();
- final int inSubPerInstance1 = getFirstPredecessorNode().getSubtasksPerInstance();
- final int inNumInstances1 = inDop1 / inSubPerInstance1 + (inDop1 % inSubPerInstance1 == 0 ? 0 : 1);
final int inDop2 = getSecondPredecessorNode().getDegreeOfParallelism();
- final int inSubPerInstance2 = getSecondPredecessorNode().getSubtasksPerInstance();
- final int inNumInstances2 = inDop2 / inSubPerInstance2 + (inDop2 % inSubPerInstance2 == 0 ? 0 : 1);
-
- final boolean globalDopChange1 = numInstances != inNumInstances1;
- final boolean globalDopChange2 = numInstances != inNumInstances2;
- final boolean localDopChange1 = numInstances == inNumInstances1 & subPerInstance != inSubPerInstance1;
- final boolean localDopChange2 = numInstances == inNumInstances2 & subPerInstance != inSubPerInstance2;
-
+
+ final boolean dopChange1 = dop != inDop1;
+ final boolean dopChange2 = dop != inDop2;
+
// enumerate all pairwise combination of the children's plans together with
// all possible operator strategy combination
@@ -154,15 +146,11 @@ public class BinaryUnionNode extends TwoInputNode {
Channel c1 = new Channel(child1, this.input1.getMaterializationMode());
if (this.input1.getShipStrategy() == null) {
// free to choose the ship strategy
- igps.parameterizeChannel(c1, globalDopChange1, localDopChange1);
+ igps.parameterizeChannel(c1, dopChange1);
// if the DOP changed, make sure that we cancel out properties, unless the
// ship strategy preserves/establishes them even under changing DOPs
- if (globalDopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
- c1.getGlobalProperties().reset();
- }
- if (localDopChange1 && !(c1.getShipStrategy().isNetworkStrategy() ||
- c1.getShipStrategy().compensatesForLocalDOPChanges())) {
+ if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
c1.getGlobalProperties().reset();
}
} else {
@@ -173,10 +161,8 @@ public class BinaryUnionNode extends TwoInputNode {
c1.setShipStrategy(this.input1.getShipStrategy());
}
- if (globalDopChange1) {
+ if (dopChange1) {
c1.adjustGlobalPropertiesForFullParallelismChange();
- } else if (localDopChange1) {
- c1.adjustGlobalPropertiesForLocalParallelismChange();
}
}
@@ -184,15 +170,11 @@ public class BinaryUnionNode extends TwoInputNode {
Channel c2 = new Channel(child2, this.input2.getMaterializationMode());
if (this.input2.getShipStrategy() == null) {
// free to choose the ship strategy
- igps.parameterizeChannel(c2, globalDopChange2, localDopChange2);
+ igps.parameterizeChannel(c2, dopChange2);
// if the DOP changed, make sure that we cancel out properties, unless the
// ship strategy preserves/establishes them even under changing DOPs
- if (globalDopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
- c2.getGlobalProperties().reset();
- }
- if (localDopChange2 && !(c2.getShipStrategy().isNetworkStrategy() ||
- c2.getShipStrategy().compensatesForLocalDOPChanges())) {
+ if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
c2.getGlobalProperties().reset();
}
} else {
@@ -203,10 +185,8 @@ public class BinaryUnionNode extends TwoInputNode {
c2.setShipStrategy(this.input2.getShipStrategy());
}
- if (globalDopChange2) {
+ if (dopChange2) {
c2.adjustGlobalPropertiesForFullParallelismChange();
- } else if (localDopChange2) {
- c2.adjustGlobalPropertiesForLocalParallelismChange();
}
}
@@ -224,20 +204,20 @@ public class BinaryUnionNode extends TwoInputNode {
if (c1.getShipStrategy() == ShipStrategyType.FORWARD && c2.getShipStrategy() != ShipStrategyType.FORWARD) {
// adjust c2 to c1
c2 = c2.clone();
- p1.parameterizeChannel(c2,globalDopChange2);
+ p1.parameterizeChannel(c2,dopChange2);
} else if (c2.getShipStrategy() == ShipStrategyType.FORWARD && c1.getShipStrategy() != ShipStrategyType.FORWARD) {
// adjust c1 to c2
c1 = c1.clone();
- p2.parameterizeChannel(c1,globalDopChange1);
+ p2.parameterizeChannel(c1,dopChange1);
} else if (c1.getShipStrategy() == ShipStrategyType.FORWARD && c2.getShipStrategy() == ShipStrategyType.FORWARD) {
boolean adjustC1 = c1.getEstimatedOutputSize() <= 0 || c2.getEstimatedOutputSize() <= 0 ||
c1.getEstimatedOutputSize() <= c2.getEstimatedOutputSize();
if (adjustC1) {
c2 = c2.clone();
- p1.parameterizeChannel(c2, globalDopChange2);
+ p1.parameterizeChannel(c2, dopChange2);
} else {
c1 = c1.clone();
- p2.parameterizeChannel(c1, globalDopChange1);
+ p2.parameterizeChannel(c1, dopChange1);
}
} else {
// this should never happen, as it implies both realize a different strategy, which is
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
index f6720ea..bfbca15 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java
@@ -65,9 +65,9 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
// --------------------------------------------------------------------------------------------
/**
- * Creates a new node with a single input for the optimizer plan.
+ * Creates a new node for the bulk iteration.
*
- * @param iteration The PACT that the node represents.
+ * @param iteration The bulk iteration the node represents.
*/
public BulkIterationNode(BulkIterationBase<?> iteration) {
super(iteration);
@@ -124,14 +124,12 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
public void setNextPartialSolution(OptimizerNode nextPartialSolution, OptimizerNode terminationCriterion) {
// check if the root of the step function has the same DOP as the iteration
- if (nextPartialSolution.getDegreeOfParallelism() != getDegreeOfParallelism() ||
- nextPartialSolution.getSubtasksPerInstance() != getSubtasksPerInstance() )
+ if (nextPartialSolution.getDegreeOfParallelism() != getDegreeOfParallelism())
{
// add a no-op to the root to express the re-partitioning
NoOpNode noop = new NoOpNode();
noop.setDegreeOfParallelism(getDegreeOfParallelism());
- noop.setSubtasksPerInstance(getSubtasksPerInstance());
-
+
PactConnection noOpConn = new PactConnection(nextPartialSolution, noop);
noop.setIncomingConnection(noOpConn);
nextPartialSolution.addOutgoingConnection(noOpConn);
@@ -198,12 +196,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
protected List<OperatorDescriptorSingle> getPossibleProperties() {
return Collections.<OperatorDescriptorSingle>singletonList(new NoOpDescriptor());
}
-
- @Override
- public boolean isMemoryConsumer() {
- return true;
- }
-
+
@Override
public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
final InterestingProperties intProps = getInterestingProperties().clone();
@@ -306,12 +299,11 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode
else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
// attach a no-op node through which we create the properties of the original input
Channel toNoOp = new Channel(candidate);
- globPropsReq.parameterizeChannel(toNoOp, false, false);
+ globPropsReq.parameterizeChannel(toNoOp, false);
locPropsReq.parameterizeChannel(toNoOp);
UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST);
rebuildPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism());
- rebuildPropertiesNode.setSubtasksPerInstance(candidate.getSubtasksPerInstance());
SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP);
rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
index fe823d2..d4f9d67 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java
@@ -87,11 +87,6 @@ public class DataSinkNode extends OptimizerNode {
}
@Override
- public boolean isMemoryConsumer() {
- return getPactContract().getPartitionOrdering() != null || getPactContract().getLocalOrder() != null;
- }
-
- @Override
public List<PactConnection> getIncomingConnections() {
return Collections.singletonList(this.input);
}
@@ -194,21 +189,16 @@ public class DataSinkNode extends OptimizerNode {
List<PlanNode> outputPlans = new ArrayList<PlanNode>();
final int dop = getDegreeOfParallelism();
- final int subPerInstance = getSubtasksPerInstance();
final int inDop = getPredecessorNode().getDegreeOfParallelism();
- final int inSubPerInstance = getPredecessorNode().getSubtasksPerInstance();
- final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1);
- final int inNumInstances = inDop / inSubPerInstance + (inDop % inSubPerInstance == 0 ? 0 : 1);
-
- final boolean globalDopChange = numInstances != inNumInstances;
- final boolean localDopChange = numInstances == inNumInstances & subPerInstance != inSubPerInstance;
-
+
+ final boolean dopChange = dop != inDop;
+
InterestingProperties ips = this.input.getInterestingProperties();
for (PlanNode p : subPlans) {
for (RequestedGlobalProperties gp : ips.getGlobalProperties()) {
for (RequestedLocalProperties lp : ips.getLocalProperties()) {
Channel c = new Channel(p);
- gp.parameterizeChannel(c, globalDopChange, localDopChange);
+ gp.parameterizeChannel(c, dopChange);
lp.parameterizeChannel(c);
c.setRequiredLocalProps(lp);
c.setRequiredGlobalProps(gp);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java
index 17c11c9..7234420 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java
@@ -55,7 +55,6 @@ public class DataSourceNode extends OptimizerNode {
if (NonParallelInput.class.isAssignableFrom(pactContract.getUserCodeWrapper().getUserCodeClass())) {
setDegreeOfParallelism(1);
- setSubtasksPerInstance(1);
this.sequentialInput = true;
} else {
this.sequentialInput = false;
@@ -78,27 +77,12 @@ public class DataSourceNode extends OptimizerNode {
}
@Override
- public boolean isMemoryConsumer() {
- return false;
- }
-
-
- @Override
public void setDegreeOfParallelism(int degreeOfParallelism) {
// if unsplittable, DOP remains at 1
if (!this.sequentialInput) {
super.setDegreeOfParallelism(degreeOfParallelism);
}
}
-
-
- @Override
- public void setSubtasksPerInstance(int instancesPerMachine) {
- // if unsplittable, DOP remains at 1
- if (!this.sequentialInput) {
- super.setSubtasksPerInstance(instancesPerMachine);
- }
- }
@Override
public List<PactConnection> getIncomingConnections() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/GroupReduceNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/GroupReduceNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/GroupReduceNode.java
index 6eb2903..4d7230e 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/GroupReduceNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/GroupReduceNode.java
@@ -46,7 +46,6 @@ public class GroupReduceNode extends SingleInputNode {
if (this.keys == null) {
// case of a key-less reducer. force a parallelism of 1
setDegreeOfParallelism(1);
- setSubtasksPerInstance(1);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
index b2c9330..85a6568 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java
@@ -262,13 +262,6 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
*/
@Override
public abstract void accept(Visitor<OptimizerNode> visitor);
-
- /**
- * Checks, whether this node requires memory for its tasks or not.
- *
- * @return True, if this node contains logic that requires memory usage, false otherwise.
- */
- public abstract boolean isMemoryConsumer();
/**
* Checks whether a field is modified by the user code or whether it is kept unchanged.
@@ -408,7 +401,7 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
* @param degreeOfParallelism
* The degree of parallelism to set.
* @throws IllegalArgumentException
- * If the degree of parallelism is smaller than one.
+ * If the degree of parallelism is smaller than one and not -1.
*/
public void setDegreeOfParallelism(int degreeOfParallelism) {
if (degreeOfParallelism < 1) {
@@ -416,48 +409,6 @@ public abstract class OptimizerNode implements Visitable<OptimizerNode>, Estimat
}
this.degreeOfParallelism = degreeOfParallelism;
}
-
- /**
- * Gets the number of parallel instances of the contract that are
- * to be executed on the same compute instance (logical machine).
- *
- * @return The number of subtask instances per machine.
- */
- public int getSubtasksPerInstance() {
- return this.subtasksPerInstance;
- }
-
- /**
- * Sets the number of parallel task instances of the contract that are
- * to be executed on the same computing instance (logical machine).
- *
- * @param instancesPerMachine The instances per machine.
- * @throws IllegalArgumentException If the number of instances per machine is smaller than one.
- */
- public void setSubtasksPerInstance(int instancesPerMachine) {
- if (instancesPerMachine < 1) {
- throw new IllegalArgumentException();
- }
- this.subtasksPerInstance = instancesPerMachine;
- }
-
- /**
- * Gets the minimal guaranteed memory per subtask for tasks represented by this OptimizerNode.
- *
- * @return The minimal guaranteed memory per subtask, in bytes.
- */
- public long getMinimalMemoryPerSubTask() {
- return this.minimalMemoryPerSubTask;
- }
-
- /**
- * Sets the minimal guaranteed memory per subtask for tasks represented by this OptimizerNode.
- *
- * @param minimalGuaranteedMemory The minimal guaranteed memory per subtask, in bytes.
- */
- public void setMinimalMemoryPerSubTask(long minimalGuaranteedMemory) {
- this.minimalMemoryPerSubTask = minimalGuaranteedMemory;
- }
/**
* Gets the amount of memory that all subtasks of this task have jointly available.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/ReduceNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/ReduceNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/ReduceNode.java
index 2190060..409d027 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/ReduceNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/ReduceNode.java
@@ -36,7 +36,6 @@ public class ReduceNode extends SingleInputNode {
if (this.keys == null) {
// case of a key-less reducer. force a parallelism of 1
setDegreeOfParallelism(1);
- setSubtasksPerInstance(1);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
index 8bf3f16..0b872a7 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
@@ -206,22 +206,6 @@ public abstract class SingleInputNode extends OptimizerNode {
protected abstract List<OperatorDescriptorSingle> getPossibleProperties();
-
- @Override
- public boolean isMemoryConsumer() {
- for (OperatorDescriptorSingle dps : getPossibleProperties()) {
- if (dps.getStrategy().firstDam().isMaterializing()) {
- return true;
- }
- for (RequestedLocalProperties rlp : dps.getPossibleLocalProperties()) {
- if (!rlp.isTrivial()) {
- return true;
- }
- }
- }
- return false;
- }
-
@Override
public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
// get what we inherit and what is preserved by our user code
@@ -284,30 +268,21 @@ public abstract class SingleInputNode extends OptimizerNode {
final ArrayList<PlanNode> outputPlans = new ArrayList<PlanNode>();
final int dop = getDegreeOfParallelism();
- final int subPerInstance = getSubtasksPerInstance();
final int inDop = getPredecessorNode().getDegreeOfParallelism();
- final int inSubPerInstance = getPredecessorNode().getSubtasksPerInstance();
- final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1);
- final int inNumInstances = inDop / inSubPerInstance + (inDop % inSubPerInstance == 0 ? 0 : 1);
-
- final boolean globalDopChange = numInstances != inNumInstances;
- final boolean localDopChange = numInstances == inNumInstances & subPerInstance != inSubPerInstance;
-
+
+ final boolean dopChange = inDop != dop;
+
// create all candidates
for (PlanNode child : subPlans) {
if (this.inConn.getShipStrategy() == null) {
// pick the strategy ourselves
for (RequestedGlobalProperties igps: intGlobal) {
final Channel c = new Channel(child, this.inConn.getMaterializationMode());
- igps.parameterizeChannel(c, globalDopChange, localDopChange);
+ igps.parameterizeChannel(c, dopChange);
// if the DOP changed, make sure that we cancel out properties, unless the
// ship strategy preserves/establishes them even under changing DOPs
- if (globalDopChange && !c.getShipStrategy().isNetworkStrategy()) {
- c.getGlobalProperties().reset();
- }
- if (localDopChange && !(c.getShipStrategy().isNetworkStrategy() ||
- c.getShipStrategy().compensatesForLocalDOPChanges())) {
+ if (dopChange && !c.getShipStrategy().isNetworkStrategy()) {
c.getGlobalProperties().reset();
}
@@ -332,12 +307,10 @@ public abstract class SingleInputNode extends OptimizerNode {
c.setShipStrategy(this.inConn.getShipStrategy());
}
- if (globalDopChange) {
+ if (dopChange) {
c.adjustGlobalPropertiesForFullParallelismChange();
- } else if (localDopChange) {
- c.adjustGlobalPropertiesForLocalParallelismChange();
}
-
+
// check whether we meet any of the accepted properties
for (RequestedGlobalProperties rgps: allValidGlobals) {
if (rgps.isMetBy(c.getGlobalProperties())) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SinkJoiner.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SinkJoiner.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SinkJoiner.java
index 2c765a5..a711ac5 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SinkJoiner.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SinkJoiner.java
@@ -42,7 +42,6 @@ public class SinkJoiner extends TwoInputNode {
this.input2 = conn2;
setDegreeOfParallelism(1);
- setSubtasksPerInstance(1);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
index 9898c81..97a92d0 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
@@ -251,22 +251,6 @@ public abstract class TwoInputNode extends OptimizerNode {
}
protected abstract List<OperatorDescriptorDual> getPossibleProperties();
-
- @Override
- public boolean isMemoryConsumer() {
- for (OperatorDescriptorDual dpd : this.possibleProperties) {
- if (dpd.getStrategy().firstDam().isMaterializing() ||
- dpd.getStrategy().secondDam().isMaterializing()) {
- return true;
- }
- for (LocalPropertiesPair prp : dpd.getPossibleLocalProperties()) {
- if (!(prp.getProperties1().isTrivial() && prp.getProperties2().isTrivial())) {
- return true;
- }
- }
- }
- return false;
- }
@Override
public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
@@ -348,20 +332,12 @@ public abstract class TwoInputNode extends OptimizerNode {
final ArrayList<PlanNode> outputPlans = new ArrayList<PlanNode>();
final int dop = getDegreeOfParallelism();
- final int subPerInstance = getSubtasksPerInstance();
- final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1);
final int inDop1 = getFirstPredecessorNode().getDegreeOfParallelism();
- final int inSubPerInstance1 = getFirstPredecessorNode().getSubtasksPerInstance();
- final int inNumInstances1 = inDop1 / inSubPerInstance1 + (inDop1 % inSubPerInstance1 == 0 ? 0 : 1);
final int inDop2 = getSecondPredecessorNode().getDegreeOfParallelism();
- final int inSubPerInstance2 = getSecondPredecessorNode().getSubtasksPerInstance();
- final int inNumInstances2 = inDop2 / inSubPerInstance2 + (inDop2 % inSubPerInstance2 == 0 ? 0 : 1);
-
- final boolean globalDopChange1 = numInstances != inNumInstances1;
- final boolean globalDopChange2 = numInstances != inNumInstances2;
- final boolean localDopChange1 = numInstances == inNumInstances1 & subPerInstance != inSubPerInstance1;
- final boolean localDopChange2 = numInstances == inNumInstances2 & subPerInstance != inSubPerInstance2;
-
+
+ final boolean dopChange1 = dop != inDop1;
+ final boolean dopChange2 = dop != inDop2;
+
// enumerate all pairwise combination of the children's plans together with
// all possible operator strategy combination
@@ -380,15 +356,11 @@ public abstract class TwoInputNode extends OptimizerNode {
final Channel c1 = new Channel(child1, this.input1.getMaterializationMode());
if (this.input1.getShipStrategy() == null) {
// free to choose the ship strategy
- igps1.parameterizeChannel(c1, globalDopChange1, localDopChange1);
+ igps1.parameterizeChannel(c1, dopChange1);
// if the DOP changed, make sure that we cancel out properties, unless the
// ship strategy preserves/establishes them even under changing DOPs
- if (globalDopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
- c1.getGlobalProperties().reset();
- }
- if (localDopChange1 && !(c1.getShipStrategy().isNetworkStrategy() ||
- c1.getShipStrategy().compensatesForLocalDOPChanges())) {
+ if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) {
c1.getGlobalProperties().reset();
}
} else {
@@ -399,10 +371,8 @@ public abstract class TwoInputNode extends OptimizerNode {
c1.setShipStrategy(this.input1.getShipStrategy());
}
- if (globalDopChange1) {
+ if (dopChange1) {
c1.adjustGlobalPropertiesForFullParallelismChange();
- } else if (localDopChange1) {
- c1.adjustGlobalPropertiesForLocalParallelismChange();
}
}
@@ -411,15 +381,11 @@ public abstract class TwoInputNode extends OptimizerNode {
final Channel c2 = new Channel(child2, this.input2.getMaterializationMode());
if (this.input2.getShipStrategy() == null) {
// free to choose the ship strategy
- igps2.parameterizeChannel(c2, globalDopChange2, localDopChange2);
+ igps2.parameterizeChannel(c2, dopChange2);
// if the DOP changed, make sure that we cancel out properties, unless the
// ship strategy preserves/establishes them even under changing DOPs
- if (globalDopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
- c2.getGlobalProperties().reset();
- }
- if (localDopChange2 && !(c2.getShipStrategy().isNetworkStrategy() ||
- c2.getShipStrategy().compensatesForLocalDOPChanges())) {
+ if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) {
c2.getGlobalProperties().reset();
}
} else {
@@ -430,10 +396,8 @@ public abstract class TwoInputNode extends OptimizerNode {
c2.setShipStrategy(this.input2.getShipStrategy());
}
- if (globalDopChange2) {
+ if (dopChange2) {
c2.adjustGlobalPropertiesForFullParallelismChange();
- } else if (localDopChange2) {
- c2.adjustGlobalPropertiesForLocalParallelismChange();
}
}