You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/10 13:42:43 UTC
[1/7] flink git commit: [FLINK-6010] [docs] Correct IntelliJ IDEA
Plugins path in 'Installing the Scala plugin' section
Repository: flink
Updated Branches:
refs/heads/master 7456d78d2 -> 206ea2119
[FLINK-6010] [docs] Correct IntelliJ IDEA Plugins path in 'Installing the Scala plugin' section
This closes #3504
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a87b5270
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a87b5270
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a87b5270
Branch: refs/heads/master
Commit: a87b5270fb62043fd2a3d049ec525889cee04070
Parents: 7456d78
Author: Bowen Li <bo...@gmail.com>
Authored: Thu Mar 9 13:55:07 2017 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 10 10:50:04 2017 +0100
----------------------------------------------------------------------
docs/internals/ide_setup.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a87b5270/docs/internals/ide_setup.md
----------------------------------------------------------------------
diff --git a/docs/internals/ide_setup.md b/docs/internals/ide_setup.md
index f170588..329f4cd 100644
--- a/docs/internals/ide_setup.md
+++ b/docs/internals/ide_setup.md
@@ -59,7 +59,7 @@ The IntelliJ installation setup offers to install the Scala plugin.
If it is not installed, follow these instructions before importing Flink
to enable support for Scala projects and files:
-1. Go to IntelliJ plugins settings (File -> Settings -> Plugins) and
+1. Go to IntelliJ plugins settings (IntelliJ IDEA -> Preferences -> Plugins) and
click on "Install Jetbrains plugin...".
2. Select and install the "Scala" plugin.
3. Restart IntelliJ
[4/7] flink git commit: [FLINK-5980] [core] Expose max-parallelism
value in RuntimeContext.
Posted by se...@apache.org.
[FLINK-5980] [core] Expose max-parallelism value in RuntimeContext.
This closes #3487
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f47e31b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f47e31b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f47e31b
Branch: refs/heads/master
Commit: 6f47e31babe159ec28c8619263d8ea874e56f8fc
Parents: bb1197d
Author: biao.liub <bi...@alibaba-inc.com>
Authored: Tue Mar 7 20:11:10 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 10 13:30:57 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/api/common/TaskInfo.java | 16 ++++++++--------
.../flink/api/common/functions/RuntimeContext.java | 8 ++++++++
.../functions/util/AbstractRuntimeUDFContext.java | 7 ++++++-
.../optimizer/plantranslate/JobGraphGenerator.java | 3 +++
.../runtime/executiongraph/ExecutionJobVertex.java | 9 +++++++--
.../runtime/executiongraph/TaskInformation.java | 10 +++++-----
.../org/apache/flink/runtime/taskmanager/Task.java | 2 +-
.../api/functions/async/RichAsyncFunction.java | 5 +++++
.../api/operators/AbstractStreamOperator.java | 5 +++--
.../util/AbstractStreamOperatorTestHarness.java | 2 +-
.../KeyedOneInputStreamOperatorTestHarness.java | 2 +-
11 files changed, 48 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
index 5627ca8..33f2e0c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
@@ -31,20 +31,20 @@ public class TaskInfo {
private final String taskName;
private final String taskNameWithSubtasks;
- private final int numberOfKeyGroups;
+ private final int maxNumberOfParallelSubtasks;
private final int indexOfSubtask;
private final int numberOfParallelSubtasks;
private final int attemptNumber;
- public TaskInfo(String taskName, int numberOfKeyGroups, int indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) {
+ public TaskInfo(String taskName, int maxNumberOfParallelSubtasks, int indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) {
checkArgument(indexOfSubtask >= 0, "Task index must be a non-negative number.");
- checkArgument(numberOfKeyGroups >= 1, "Max parallelism must be a positive number.");
- checkArgument(numberOfKeyGroups >= numberOfParallelSubtasks, "Max parallelism must be >= than parallelism.");
+ checkArgument(maxNumberOfParallelSubtasks >= 1, "Max parallelism must be a positive number.");
+ checkArgument(maxNumberOfParallelSubtasks >= numberOfParallelSubtasks, "Max parallelism must be >= than parallelism.");
checkArgument(numberOfParallelSubtasks >= 1, "Parallelism must be a positive number.");
checkArgument(indexOfSubtask < numberOfParallelSubtasks, "Task index must be less than parallelism.");
checkArgument(attemptNumber >= 0, "Attempt number must be a non-negative number.");
this.taskName = checkNotNull(taskName, "Task Name must not be null.");
- this.numberOfKeyGroups = numberOfKeyGroups;
+ this.maxNumberOfParallelSubtasks = maxNumberOfParallelSubtasks;
this.indexOfSubtask = indexOfSubtask;
this.numberOfParallelSubtasks = numberOfParallelSubtasks;
this.attemptNumber = attemptNumber;
@@ -61,10 +61,10 @@ public class TaskInfo {
}
/**
- * Gets the number of key groups aka the max parallelism aka the max number of subtasks.
+ * Gets the max parallelism aka the max number of subtasks.
*/
- public int getNumberOfKeyGroups() {
- return numberOfKeyGroups;
+ public int getMaxNumberOfParallelSubtasks() {
+ return maxNumberOfParallelSubtasks;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 98ad018..2978f3a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -77,6 +77,14 @@ public interface RuntimeContext {
int getNumberOfParallelSubtasks();
/**
+ * Gets the number of max-parallelism with which the parallel task runs.
+ *
+ * @return The max-parallelism with which the parallel task runs.
+ */
+ @PublicEvolving
+ int getMaxNumberOfParallelSubtasks();
+
+ /**
* Gets the number of this parallel subtask. The numbering starts from 0 and goes up to
* parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}).
*
http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 2538799..bcd0ad0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -97,10 +97,15 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
}
@Override
+ public int getMaxNumberOfParallelSubtasks() {
+ return taskInfo.getMaxNumberOfParallelSubtasks();
+ }
+
+ @Override
public int getIndexOfThisSubtask() {
return taskInfo.getIndexOfThisSubtask();
}
-
+
@Override
public MetricGroup getMetricGroup() {
return metrics;
http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 61e5327..bbc944e 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -394,6 +394,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
// set parallelism
int pd = node.getParallelism();
vertex.setParallelism(pd);
+ vertex.setMaxParallelism(pd);
vertex.setSlotSharingGroup(sharingGroup);
@@ -1276,6 +1277,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
final JobVertex sync = new JobVertex("Sync(" + bulkNode.getNodeName() + ")");
sync.setInvokableClass(IterationSynchronizationSinkTask.class);
sync.setParallelism(1);
+ sync.setMaxParallelism(1);
this.auxVertices.add(sync);
final TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
@@ -1412,6 +1414,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
final JobVertex sync = new JobVertex("Sync (" + iterNode.getNodeName() + ")");
sync.setInvokableClass(IterationSynchronizationSinkTask.class);
sync.setParallelism(1);
+ sync.setMaxParallelism(1);
this.auxVertices.add(sync);
syncConfig = new TaskConfig(sync.getConfiguration());
http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 754148e..852d530 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.Archiveable;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -223,10 +224,14 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
}
private void setMaxParallelismInternal(int maxParallelism) {
+ if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+ maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+ }
+
Preconditions.checkArgument(maxParallelism > 0
&& maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
- "Overriding max parallelism is not in valid bounds (1.." +
- KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + "), found:" + maxParallelism);
+ "Overriding max parallelism is not in valid bounds (1..%s), found: %s",
+ KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, maxParallelism);
this.maxParallelism = maxParallelism;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
index ccec118..7fb68e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
@@ -42,7 +42,7 @@ public class TaskInformation implements Serializable {
private final int numberOfSubtasks;
/** The maximum parallelism == number of key groups */
- private final int numberOfKeyGroups;
+ private final int maxNumberOfSubtaks;
/** Class name of the invokable to run */
private final String invokableClassName;
@@ -54,13 +54,13 @@ public class TaskInformation implements Serializable {
JobVertexID jobVertexId,
String taskName,
int numberOfSubtasks,
- int numberOfKeyGroups,
+ int maxNumberOfSubtaks,
String invokableClassName,
Configuration taskConfiguration) {
this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
this.taskName = Preconditions.checkNotNull(taskName);
this.numberOfSubtasks = Preconditions.checkNotNull(numberOfSubtasks);
- this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
+ this.maxNumberOfSubtaks = Preconditions.checkNotNull(maxNumberOfSubtaks);
this.invokableClassName = Preconditions.checkNotNull(invokableClassName);
this.taskConfiguration = Preconditions.checkNotNull(taskConfiguration);
}
@@ -77,8 +77,8 @@ public class TaskInformation implements Serializable {
return numberOfSubtasks;
}
- public int getNumberOfKeyGroups() {
- return numberOfKeyGroups;
+ public int getMaxNumberOfSubtaks() {
+ return maxNumberOfSubtaks;
}
public String getInvokableClassName() {
http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 8732c60..b0f0eb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -291,7 +291,7 @@ public class Task implements Runnable, TaskActions {
this.taskInfo = new TaskInfo(
taskInformation.getTaskName(),
- taskInformation.getNumberOfKeyGroups(),
+ taskInformation.getMaxNumberOfSubtaks(),
subtaskIndex,
taskInformation.getNumberOfSubtasks(),
attemptNumber);
http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
index 7971460..8c788b4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
@@ -120,6 +120,11 @@ public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction im
}
@Override
+ public int getMaxNumberOfParallelSubtasks() {
+ return runtimeContext.getMaxNumberOfParallelSubtasks();
+ }
+
+ @Override
public int getIndexOfThisSubtask() {
return runtimeContext.getIndexOfThisSubtask();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a81056f..6e6b147 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -283,13 +283,14 @@ public abstract class AbstractStreamOperator<OUT>
// create a keyed state backend if there is keyed state, as indicated by the presence of a key serializer
if (null != keySerializer) {
KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
- container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(),
+ container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),
container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),
container.getEnvironment().getTaskInfo().getIndexOfThisSubtask());
this.keyedStateBackend = container.createKeyedStateBackend(
keySerializer,
- container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(),
+ // The maximum parallelism == number of key group
+ container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),
subTaskKeyGroupRange);
this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 07424f7..c984eed 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -339,7 +339,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
}
if (operatorStateHandles != null) {
- int numKeyGroups = getEnvironment().getTaskInfo().getNumberOfKeyGroups();
+ int numKeyGroups = getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks();
int numSubtasks = getEnvironment().getTaskInfo().getNumberOfParallelSubtasks();
int subtaskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask();
http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index effb44c..d45ae21 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -211,7 +211,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
@Override
public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
if (operatorStateHandles != null) {
- int numKeyGroups = getEnvironment().getTaskInfo().getNumberOfKeyGroups();
+ int numKeyGroups = getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks();
int numSubtasks = getEnvironment().getTaskInfo().getNumberOfParallelSubtasks();
int subtaskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask();
[3/7] flink git commit: [FLINK-5976] [tests] Deduplicate Tokenizer in
tests
Posted by se...@apache.org.
[FLINK-5976] [tests] Deduplicate Tokenizer in tests
This closes #3485
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5e6da0b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5e6da0b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5e6da0b
Branch: refs/heads/master
Commit: e5e6da0b38495d4030fd0b790a81ca70f9ce84b2
Parents: 6f47e31
Author: liuyuzhong7 <li...@gmail.com>
Authored: Tue Mar 7 19:15:40 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 10 13:30:57 2017 +0100
----------------------------------------------------------------------
.../clients/examples/LocalExecutorITCase.java | 29 ++++------------
.../hadoop/mapred/WordCountMapredITCase.java | 19 +----------
.../mapreduce/WordCountMapreduceITCase.java | 19 +----------
.../api/outputformat/CsvOutputFormatITCase.java | 21 +-----------
.../outputformat/TextOutputFormatITCase.java | 3 +-
.../flink/test/testfunctions/Tokenizer.java | 36 ++++++++++++++++++++
6 files changed, 48 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e5e6da0b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
index 4ed28a8..95d1ab2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
@@ -20,15 +20,12 @@ package org.apache.flink.test.clients.examples;
import java.io.File;
import java.io.FileWriter;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.LocalExecutor;
import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.Collector;
+import org.apache.flink.test.testfunctions.Tokenizer;
import org.junit.Assert;
import org.junit.Test;
@@ -45,7 +42,7 @@ public class LocalExecutorITCase {
File outFile = File.createTempFile("wctext", ".out");
inFile.deleteOnExit();
outFile.deleteOnExit();
-
+
try (FileWriter fw = new FileWriter(inFile)) {
fw.write(WordCountData.TEXT);
}
@@ -64,27 +61,15 @@ public class LocalExecutorITCase {
Assert.fail(e.getMessage());
}
}
-
+
private Plan getWordCountPlan(File inFile, File outFile, int parallelism) {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.readTextFile(inFile.getAbsolutePath())
- .flatMap(new Tokenizer())
- .groupBy(0)
- .sum(1)
- .writeAsCsv(outFile.getAbsolutePath());
+ .flatMap(new Tokenizer())
+ .groupBy(0)
+ .sum(1)
+ .writeAsCsv(outFile.getAbsolutePath());
return env.createProgramPlan();
}
-
- public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
- String[] tokens = value.toLowerCase().split("\\W+");
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new Tuple2<>(token, 1));
- }
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e5e6da0b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
index 9528d94..7c1b30e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.hadoop.mapred;
-import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -27,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.util.Collector;
+import org.apache.flink.test.testfunctions.Tokenizer;
import org.apache.flink.util.OperatingSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
@@ -115,20 +114,4 @@ public class WordCountMapredITCase extends JavaProgramTestBase {
words.output(hadoopOutputFormat);
env.execute("Hadoop Compat WordCount");
}
-
- public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
- // normalize and split the line
- String[] tokens = value.toLowerCase().split("\\W+");
-
- // emit the pairs
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new Tuple2<String, Integer>(token, 1));
- }
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e5e6da0b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
index 64062d2..fbc1994 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.test.hadoop.mapreduce;
-import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -27,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.util.Collector;
+import org.apache.flink.test.testfunctions.Tokenizer;
import org.apache.flink.util.OperatingSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
@@ -115,20 +114,4 @@ public class WordCountMapreduceITCase extends JavaProgramTestBase {
words.output(hadoopOutputFormat);
env.execute("Hadoop Compat WordCount");
}
-
- public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
- // normalize and split the line
- String[] tokens = value.toLowerCase().split("\\W+");
-
- // emit the pairs
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new Tuple2<String, Integer>(token, 1));
- }
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e5e6da0b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
index c2155ac..61dddb8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
@@ -17,13 +17,12 @@
package org.apache.flink.test.streaming.api.outputformat;
-import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.Collector;
+import org.apache.flink.test.testfunctions.Tokenizer;
public class CsvOutputFormatITCase extends StreamingProgramTestBase {
@@ -56,23 +55,5 @@ public class CsvOutputFormatITCase extends StreamingProgramTestBase {
.replaceAll("[\\\\(\\\\)]", ""), resultPath);
}
- public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
- throws Exception {
- // normalize and split the line
- String[] tokens = value.toLowerCase().split("\\W+");
-
- // emit the pairs
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new Tuple2<String, Integer>(token, 1));
- }
- }
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e5e6da0b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
index 2940e6d..6ea864e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.StreamingProgramTestBase;
import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.testfunctions.Tokenizer;
public class TextOutputFormatITCase extends StreamingProgramTestBase {
@@ -39,7 +40,7 @@ public class TextOutputFormatITCase extends StreamingProgramTestBase {
DataStream<String> text = env.fromElements(WordCountData.TEXT);
DataStream<Tuple2<String, Integer>> counts = text
- .flatMap(new CsvOutputFormatITCase.Tokenizer())
+ .flatMap(new Tokenizer())
.keyBy(0).sum(1);
counts.writeAsText(resultPath);
http://git-wip-us.apache.org/repos/asf/flink/blob/e5e6da0b/flink-tests/src/test/java/org/apache/flink/test/testfunctions/Tokenizer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/testfunctions/Tokenizer.java b/flink-tests/src/test/java/org/apache/flink/test/testfunctions/Tokenizer.java
new file mode 100644
index 0000000..9b3764d
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/testfunctions/Tokenizer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.testfunctions;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+public final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+ String[] tokens = value.toLowerCase().split("\\W+");
+ for (String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<>(token, 1));
+ }
+ }
+ }
+}
[2/7] flink git commit: [FLINK-6005] [misc] Fix some ArrayList
initializations without initial size
Posted by se...@apache.org.
[FLINK-6005] [misc] Fix some ArrayList initializations without initial size
This closes #3499
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb1197d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb1197d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb1197d9
Branch: refs/heads/master
Commit: bb1197d9162a630c87d47defc5d0c42de6726feb
Parents: a87b527
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Mar 9 12:24:33 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 10 10:58:07 2017 +0100
----------------------------------------------------------------------
.../api/common/typeutils/base/ListSerializerTest.java | 5 +++--
.../apache/flink/graph/library/SummarizationITCase.java | 5 +++--
.../java/org/apache/flink/graph/generator/TestUtils.java | 10 ++++++----
.../io/network/serialization/LargeRecordsTest.java | 8 ++++----
4 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bb1197d9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
index 28cdc13..2de6bec 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
@@ -71,8 +71,9 @@ public class ListSerializerTest extends SerializerTestBase<List<Long>> {
list7.add(rnd.nextLong());
}
- final List<Long> list8 = new ArrayList<>();
- for (int i = 0; i < rnd.nextInt(200); i++) {
+ int list8Len = rnd.nextInt(200);
+ final List<Long> list8 = new ArrayList<>(list8Len);
+ for (int i = 0; i < list8Len; i++) {
list8.add(rnd.nextLong());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb1197d9/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
index 43514dc..fe4cd24 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
@@ -185,8 +185,9 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
}
private List<Long> getListFromIdRange(String idRange) {
- List<Long> result = new ArrayList<>();
- for (String id : ID_SEPARATOR.split(idRange)) {
+ String[] split = ID_SEPARATOR.split(idRange);
+ List<Long> result = new ArrayList<>(split.length);
+ for (String id : split) {
result.add(Long.parseLong(id));
}
return result;
http://git-wip-us.apache.org/repos/asf/flink/blob/bb1197d9/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
index 2cf9eac..23ad31c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
@@ -57,9 +57,10 @@ public final class TestUtils {
private static <K, VV, EV> void compareVertices(Graph<K, VV, EV> graph, String expectedVertices)
throws Exception {
if (expectedVertices != null) {
- List<String> resultVertices = new ArrayList<>();
+ List<Vertex<K, VV>> vertices = graph.getVertices().collect();
+ List<String> resultVertices = new ArrayList<>(vertices.size());
- for (Vertex<K, VV> vertex : graph.getVertices().collect()) {
+ for (Vertex<K, VV> vertex : vertices) {
resultVertices.add(vertex.f0.toString());
}
@@ -70,9 +71,10 @@ public final class TestUtils {
private static <K, VV, EV> void compareEdges(Graph<K, VV, EV> graph, String expectedEdges)
throws Exception {
if (expectedEdges != null) {
- List<String> resultEdges = new ArrayList<>();
+ List<Edge<K, EV>> edges = graph.getEdges().collect();
+ List<String> resultEdges = new ArrayList<>(edges.size());
- for (Edge<K, EV> edge : graph.getEdges().collect()) {
+ for (Edge<K, EV> edge : edges) {
resultEdges.add(edge.f0.toString() + "," + edge.f1.toString());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb1197d9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
index 1574fe9..25523db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
@@ -54,8 +54,8 @@ public class LargeRecordsTest {
final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
- List<SerializationTestType> originalRecords = new ArrayList<SerializationTestType>();
- List<SerializationTestType> deserializedRecords = new ArrayList<SerializationTestType>();
+ List<SerializationTestType> originalRecords = new ArrayList<SerializationTestType>((NUM_RECORDS + 1) / 2);
+ List<SerializationTestType> deserializedRecords = new ArrayList<SerializationTestType>((NUM_RECORDS + 1) / 2);
LargeObjectType genLarge = new LargeObjectType();
@@ -154,8 +154,8 @@ public class LargeRecordsTest {
final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
- List<SerializationTestType> originalRecords = new ArrayList<SerializationTestType>();
- List<SerializationTestType> deserializedRecords = new ArrayList<SerializationTestType>();
+ List<SerializationTestType> originalRecords = new ArrayList<>((NUM_RECORDS + 1) / 2);
+ List<SerializationTestType> deserializedRecords = new ArrayList<>((NUM_RECORDS + 1) / 2);
LargeObjectType genLarge = new LargeObjectType();
[7/7] flink git commit: [FLINK-4545] [network] Small adjustments to
LocalBufferPool with limited the number of used buffers
Posted by se...@apache.org.
[FLINK-4545] [network] Small adjustments to LocalBufferPool with limited the number of used buffers
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/206ea211
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/206ea211
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/206ea211
Branch: refs/heads/master
Commit: 206ea2119ce80c7d4920a471eb58c1d30abbd995
Parents: 11e2aa6
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 10 12:25:17 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 10 13:30:58 2017 +0100
----------------------------------------------------------------------
.../io/network/buffer/LocalBufferPool.java | 7 ++---
.../io/network/buffer/NetworkBufferPool.java | 27 +++++++++++++-------
.../runtime/taskmanager/TaskManagerTest.java | 2 +-
3 files changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/206ea211/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index a587997..b485fd1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -58,7 +58,7 @@ class LocalBufferPool implements BufferPool {
* The currently available memory segments. These are segments, which have been requested from
* the network buffer pool and are currently not handed out as Buffer instances.
*/
- private final Queue<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
+ private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
/**
* Buffer availability listeners, which need to be notified when a Buffer becomes available.
@@ -297,8 +297,9 @@ class LocalBufferPool implements BufferPool {
@Override
public void setNumBuffers(int numBuffers) throws IOException {
synchronized (availableMemorySegments) {
- checkArgument(numBuffers >= numberOfRequiredMemorySegments, "Buffer pool needs at least " +
- numberOfRequiredMemorySegments + " buffers, but tried to set to " + numBuffers + ".");
+ checkArgument(numBuffers >= numberOfRequiredMemorySegments,
+ "Buffer pool needs at least %s buffers, but tried to set to %s",
+ numberOfRequiredMemorySegments, numBuffers);
if (numBuffers > maxNumberOfMemorySegments) {
currentPoolSize = maxNumberOfMemorySegments;
http://git-wip-us.apache.org/repos/asf/flink/blob/206ea211/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 7668759..5f2da03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -22,13 +22,13 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
@@ -50,7 +50,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
private final int memorySegmentSize;
- private final Queue<MemorySegment> availableMemorySegments;
+ private final ArrayBlockingQueue<MemorySegment> availableMemorySegments;
private volatile boolean isDestroyed;
@@ -124,9 +124,10 @@ public class NetworkBufferPool implements BufferPoolFactory {
return availableMemorySegments.poll();
}
- // This is not safe with regard to destroy calls, but it does not hurt, because destroy happens
- // only once at clean up time (task manager shutdown).
public void recycle(MemorySegment segment) {
+ // Adds the segment back to the queue, which does not immediately free the memory
+ // however, since this happens when references to the global pool are also released,
+ // making the availableMemorySegments queue and its contained object reclaimable
availableMemorySegments.add(segment);
}
@@ -260,7 +261,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
assert Thread.holdsLock(factoryLock);
// All buffers, which are not among the required ones
- int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
+ final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
if (numAvailableMemorySegment == 0) {
// in this case, we need to redistribute buffers so that every pool gets its minimum
@@ -278,7 +279,8 @@ public class NetworkBufferPool implements BufferPoolFactory {
* a ratio that we use to distribute the buffers.
*/
- int totalCapacity = 0;
+ long totalCapacity = 0; // long to avoid int overflow
+
for (LocalBufferPool bufferPool : allBufferPools) {
int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
bufferPool.getNumberOfRequiredMemorySegments();
@@ -290,9 +292,13 @@ public class NetworkBufferPool implements BufferPoolFactory {
return; // necessary to avoid div by zero when nothing to re-distribute
}
- int memorySegmentsToDistribute = Math.min(numAvailableMemorySegment, totalCapacity);
+ // since one of the arguments of 'min(a,b)' is a positive int, this is actually
+ // guaranteed to be within the 'int' domain
+ // (we use a checked downCast to handle possible bugs more gracefully).
+ final int memorySegmentsToDistribute = MathUtils.checkedDownCast(
+ Math.min(numAvailableMemorySegment, totalCapacity));
- int totalPartsUsed = 0; // of totalCapacity
+ long totalPartsUsed = 0; // of totalCapacity
int numDistributedMemorySegment = 0;
for (LocalBufferPool bufferPool : allBufferPools) {
int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
@@ -307,7 +313,10 @@ public class NetworkBufferPool implements BufferPoolFactory {
// avoid remaining buffers by looking at the total capacity that should have been
// re-distributed up until here
- int mySize = memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment;
+ // the downcast will always succeed, because both arguments of the subtraction are in the 'int' domain
+ final int mySize = MathUtils.checkedDownCast(
+ memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);
+
numDistributedMemorySegment += mySize;
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/206ea211/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 0b39e20..730595c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -991,7 +991,7 @@ public class TaskManagerTest extends TestLogger {
config.setInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
TaskManagerServicesConfiguration tmConfig =
- TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getByName("localhost"), true);
+ TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLoopbackAddress(), true);
assertEquals(tmConfig.getNetworkConfig().partitionRequestInitialBackoff(), 100);
assertEquals(tmConfig.getNetworkConfig().partitionRequestMaxBackoff(), 200);
[5/7] flink git commit: [FLINK-4545] [network] Allow LocalBufferPool
to limited the number of used buffers
Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 4d462d0..7c51bc2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
@@ -32,6 +33,7 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -152,10 +154,17 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
StreamGraph streamGraph = env.getStreamGraph();
streamGraph.setJobName("test job");
JobGraph jobGraph = streamGraph.getJobGraph();
+ List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
assertEquals(2, jobGraph.getNumberOfVertices());
- assertEquals(1, jobGraph.getVerticesAsArray()[0].getParallelism());
- assertEquals(1, jobGraph.getVerticesAsArray()[1].getParallelism());
+ assertEquals(1, verticesSorted.get(0).getParallelism());
+ assertEquals(1, verticesSorted.get(1).getParallelism());
+
+ JobVertex sourceVertex = verticesSorted.get(0);
+ JobVertex mapSinkVertex = verticesSorted.get(1);
+
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType());
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED, mapSinkVertex.getInputs().get(0).getSource().getResultType());
}
/**
@@ -191,8 +200,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
.print();
JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
- JobVertex sourceVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
- JobVertex mapPrintVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
+ List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+ JobVertex sourceVertex = verticesSorted.get(0);
+ JobVertex mapPrintVertex = verticesSorted.get(1);
+
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType());
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED, mapPrintVertex.getInputs().get(0).getSource().getResultType());
StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
StreamConfig mapConfig = new StreamConfig(mapPrintVertex.getConfiguration());
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index ec55f19..4344a46 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -51,8 +51,8 @@ public class BarrierBufferMassiveRandomTest {
try {
ioMan = new IOManagerAsync();
- BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100);
- BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100);
+ BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, 100);
+ BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, 100);
RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(
new BufferPool[] { pool1, pool2 },
[6/7] flink git commit: [FLINK-4545] [network] Allow LocalBufferPool
to limited the number of used buffers
Posted by se...@apache.org.
[FLINK-4545] [network] Allow LocalBufferPool to limited the number of used buffers
Use "a * <number of channels> + b" buffers for bounded pipelined partitions:
Default: a = 2, b = 8
* 1 buffer for in-flight data in the subpartition/input channel
* 1 buffer for parallel serialization
* + some extra buffers (8 seems a good default given bandwidth-delay products of current networks)
This closes #3480
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11e2aa6d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11e2aa6d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11e2aa6d
Branch: refs/heads/master
Commit: 11e2aa6dcdbf42992cda57a5b50d5c29b4facf2d
Parents: e5e6da0
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Feb 10 14:53:09 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 10 13:30:58 2017 +0100
----------------------------------------------------------------------
.../flink/configuration/TaskManagerOptions.java | 14 ++
.../BackPressureStatsTrackerITCase.java | 2 +-
.../InputGateDeploymentDescriptor.java | 15 ++
.../runtime/executiongraph/ExecutionVertex.java | 7 +-
.../runtime/io/network/NetworkEnvironment.java | 23 ++-
.../runtime/io/network/buffer/BufferPool.java | 10 +-
.../io/network/buffer/BufferPoolFactory.java | 7 +-
.../io/network/buffer/LocalBufferPool.java | 57 ++++++-
.../io/network/buffer/NetworkBufferPool.java | 64 +++++--
.../netty/PartitionRequestServerHandler.java | 2 +-
.../io/network/partition/ResultPartition.java | 10 ++
.../network/partition/ResultPartitionType.java | 32 +++-
.../partition/consumer/SingleInputGate.java | 18 +-
.../taskexecutor/TaskManagerServices.java | 4 +-
.../TaskManagerServicesConfiguration.java | 17 +-
.../NetworkEnvironmentConfiguration.scala | 2 +
.../io/network/MockNetworkEnvironment.java | 2 +-
.../io/network/NetworkEnvironmentTest.java | 165 ++++++++++++++++++
.../io/network/api/writer/RecordWriterTest.java | 2 +-
.../network/buffer/BufferPoolFactoryTest.java | 170 ++++++++++++++++---
.../io/network/buffer/LocalBufferPoolTest.java | 58 +++++++
.../network/buffer/NetworkBufferPoolTest.java | 96 ++++++++---
.../partition/InputGateConcurrentTest.java | 6 +-
.../partition/InputGateFairnessTest.java | 3 +-
.../consumer/LocalInputChannelTest.java | 6 +-
.../partition/consumer/SingleInputGateTest.java | 24 ++-
.../partition/consumer/TestSingleInputGate.java | 2 +
.../partition/consumer/UnionInputGateTest.java | 15 +-
...askManagerComponentsStartupShutdownTest.java | 7 +-
.../runtime/taskmanager/TaskManagerTest.java | 29 +++-
.../api/graph/StreamingJobGraphGenerator.java | 6 +-
.../graph/StreamingJobGraphGeneratorTest.java | 21 ++-
.../io/BarrierBufferMassiveRandomTest.java | 4 +-
33 files changed, 790 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index b7ee20a..b891e35 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -53,6 +53,20 @@ public class TaskManagerOptions {
key("taskmanager.net.request-backoff.max")
.defaultValue(10000);
+ /**
+ * Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel).
+ *
+ * Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization
+ */
+ public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
+ key("taskmanager.net.memory.buffers-per-channel")
+ .defaultValue(2);
+
+ /** Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate). */
+ public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
+ key("taskmanager.net.memory.extra-buffers-per-gate")
+ .defaultValue(8);
+
// ------------------------------------------------------------------------
// Task Options
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 1943129..46f8be6 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -108,7 +108,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
//
// 1) Consume all buffers at first (no buffers for the test task)
//
- testBufferPool = networkBufferPool.createBufferPool(1);
+ testBufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
final List<Buffer> buffers = new ArrayList<>();
while (true) {
Buffer buffer = testBufferPool.requestBuffer();
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
index dde1ed7..9bf724a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.deployment;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -46,6 +47,9 @@ public class InputGateDeploymentDescriptor implements Serializable {
*/
private final IntermediateDataSetID consumedResultId;
+ /** The type of the partition the input gate is going to consume. */
+ private final ResultPartitionType consumedPartitionType;
+
/**
* The index of the consumed subpartition of each consumed partition. This index depends on the
* {@link DistributionPattern} and the subtask indices of the producing and consuming task.
@@ -57,10 +61,12 @@ public class InputGateDeploymentDescriptor implements Serializable {
public InputGateDeploymentDescriptor(
IntermediateDataSetID consumedResultId,
+ ResultPartitionType consumedPartitionType,
int consumedSubpartitionIndex,
InputChannelDeploymentDescriptor[] inputChannels) {
this.consumedResultId = checkNotNull(consumedResultId);
+ this.consumedPartitionType = checkNotNull(consumedPartitionType);
checkArgument(consumedSubpartitionIndex >= 0);
this.consumedSubpartitionIndex = consumedSubpartitionIndex;
@@ -72,6 +78,15 @@ public class InputGateDeploymentDescriptor implements Serializable {
return consumedResultId;
}
+ /**
+ * Returns the type of this input channel's consumed result partition.
+ *
+ * @return consumed result partition type
+ */
+ public ResultPartitionType getConsumedPartitionType() {
+ return consumedPartitionType;
+ }
+
public int getConsumedSubpartitionIndex() {
return consumedSubpartitionIndex;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index ca8e07c..21af73a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -687,9 +688,11 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
int queueToRequest = subTaskIndex % numConsumerEdges;
- IntermediateDataSetID resultId = edges[0].getSource().getIntermediateResult().getId();
+ IntermediateResult consumedIntermediateResult = edges[0].getSource().getIntermediateResult();
+ final IntermediateDataSetID resultId = consumedIntermediateResult.getId();
+ final ResultPartitionType partitionType = consumedIntermediateResult.getResultType();
- consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, queueToRequest, partitions));
+ consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, partitionType, queueToRequest, partitions));
}
SerializedValue<JobInformation> serializedJobInformation = getExecutionGraph().getSerializedJobInformation();
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 8e85ffe..4d4b305 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -73,6 +73,11 @@ public class NetworkEnvironment {
private final int partitionRequestMaxBackoff;
+ /** Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel). */
+ private final int networkBuffersPerChannel;
+ /** Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate). */
+ private final int extraNetworkBuffersPerGate;
+
private boolean isShutdown;
public NetworkEnvironment(
@@ -84,7 +89,9 @@ public class NetworkEnvironment {
KvStateServer kvStateServer,
IOMode defaultIOMode,
int partitionRequestInitialBackoff,
- int partitionRequestMaxBackoff) {
+ int partitionRequestMaxBackoff,
+ int networkBuffersPerChannel,
+ int extraNetworkBuffersPerGate) {
this.networkBufferPool = checkNotNull(networkBufferPool);
this.connectionManager = checkNotNull(connectionManager);
@@ -100,6 +107,8 @@ public class NetworkEnvironment {
this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
isShutdown = false;
+ this.networkBuffersPerChannel = networkBuffersPerChannel;
+ this.extraNetworkBuffersPerGate = extraNetworkBuffersPerGate;
}
// --------------------------------------------------------------------------------------------
@@ -171,7 +180,11 @@ public class NetworkEnvironment {
BufferPool bufferPool = null;
try {
- bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions());
+ int maxNumberOfMemorySegments = partition.getPartitionType().isBounded() ?
+ partition.getNumberOfSubpartitions() * networkBuffersPerChannel +
+ extraNetworkBuffersPerGate : Integer.MAX_VALUE;
+ bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(),
+ maxNumberOfMemorySegments);
partition.registerBufferPool(bufferPool);
resultPartitionManager.registerResultPartition(partition);
@@ -198,7 +211,11 @@ public class NetworkEnvironment {
BufferPool bufferPool = null;
try {
- bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels());
+ int maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
+ gate.getNumberOfInputChannels() * networkBuffersPerChannel +
+ extraNetworkBuffersPerGate : Integer.MAX_VALUE;
+ bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
+ maxNumberOfMemorySegments);
gate.setBufferPool(bufferPool);
} catch (Throwable t) {
if (bufferPool != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
index 8784b14..a4928ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
@@ -50,6 +50,13 @@ public interface BufferPool extends BufferProvider, BufferRecycler {
int getNumberOfRequiredMemorySegments();
/**
+ * Returns the maximum number of memory segments this buffer pool should use
+ *
+ * @return maximum number of memory segments to use or <tt>-1</tt> if unlimited
+ */
+ int getMaxNumberOfMemorySegments();
+
+ /**
* Returns the current size of this buffer pool.
*
* <p> The size of the buffer pool can change dynamically at runtime.
@@ -59,7 +66,7 @@ public interface BufferPool extends BufferProvider, BufferRecycler {
/**
* Sets the current size of this buffer pool.
*
- * <p> The size needs to be greater or equals to the guaranteed number of memory segments.
+ * <p> The size needs to be greater or equal to the guaranteed number of memory segments.
*/
void setNumBuffers(int numBuffers) throws IOException;
@@ -72,5 +79,4 @@ public interface BufferPool extends BufferProvider, BufferRecycler {
* Returns the number of used buffers of this buffer pool.
*/
int bestEffortGetNumOfUsedBuffers();
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
index e953158..ffed432 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
@@ -30,8 +30,13 @@ public interface BufferPoolFactory {
* buffers.
*
* <p> The buffer pool is of dynamic size with at least <tt>numRequiredBuffers</tt> buffers.
+ *
+ * @param numRequiredBuffers
+ * minimum number of network buffers in this pool
+ * @param maxUsedBuffers
+ * maximum number of network buffers this pool offers
*/
- BufferPool createBufferPool(int numRequiredBuffers) throws IOException;
+ BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) throws IOException;
/**
* Destroy callback for updating factory book keeping.
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index d6a4cf7..a587997 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.util.event.EventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayDeque;
@@ -44,6 +46,7 @@ import static org.apache.flink.util.Preconditions.checkState;
* match its new size.
*/
class LocalBufferPool implements BufferPool {
+ private static final Logger LOG = LoggerFactory.getLogger(LocalBufferPool.class);
/** Global network buffer pool to get buffers from. */
private final NetworkBufferPool networkBufferPool;
@@ -63,6 +66,9 @@ class LocalBufferPool implements BufferPool {
*/
private final Queue<EventListener<Buffer>> registeredListeners = new ArrayDeque<EventListener<Buffer>>();
+ /** Maximum number of network buffers to allocate. */
+ private final int maxNumberOfMemorySegments;
+
/** The current size of this pool */
private int currentPoolSize;
@@ -86,9 +92,37 @@ class LocalBufferPool implements BufferPool {
* minimum number of network buffers
*/
LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments) {
+ this(networkBufferPool, numberOfRequiredMemorySegments, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Local buffer pool based on the given <tt>networkBufferPool</tt> with a minimal and maximal
+ * number of network buffers being available.
+ *
+ * @param networkBufferPool
+ * global network buffer pool to get buffers from
+ * @param numberOfRequiredMemorySegments
+ * minimum number of network buffers
+ * @param maxNumberOfMemorySegments
+ * maximum number of network buffers to allocate
+ */
+ LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments,
+ int maxNumberOfMemorySegments) {
+ checkArgument(maxNumberOfMemorySegments >= numberOfRequiredMemorySegments,
+ "Maximum number of memory segments (%s) should not be smaller than minimum (%s).",
+ maxNumberOfMemorySegments, numberOfRequiredMemorySegments);
+
+ checkArgument(maxNumberOfMemorySegments > 0,
+ "Maximum number of memory segments (%s) should be larger than 0.",
+ maxNumberOfMemorySegments);
+
+ LOG.debug("Using a local buffer pool with {}-{} buffers",
+ numberOfRequiredMemorySegments, maxNumberOfMemorySegments);
+
this.networkBufferPool = networkBufferPool;
this.numberOfRequiredMemorySegments = numberOfRequiredMemorySegments;
this.currentPoolSize = numberOfRequiredMemorySegments;
+ this.maxNumberOfMemorySegments = maxNumberOfMemorySegments;
}
// ------------------------------------------------------------------------
@@ -113,6 +147,11 @@ class LocalBufferPool implements BufferPool {
}
@Override
+ public int getMaxNumberOfMemorySegments() {
+ return maxNumberOfMemorySegments;
+ }
+
+ @Override
public int getNumberOfAvailableMemorySegments() {
synchronized (availableMemorySegments) {
return availableMemorySegments.size();
@@ -160,6 +199,7 @@ class LocalBufferPool implements BufferPool {
boolean askToRecycle = owner != null;
+ // fill availableMemorySegments with at least one element, wait if required
while (availableMemorySegments.isEmpty()) {
if (isDestroyed) {
throw new IllegalStateException("Buffer pool is destroyed.");
@@ -257,9 +297,14 @@ class LocalBufferPool implements BufferPool {
@Override
public void setNumBuffers(int numBuffers) throws IOException {
synchronized (availableMemorySegments) {
- checkArgument(numBuffers >= numberOfRequiredMemorySegments, "Buffer pool needs at least " + numberOfRequiredMemorySegments + " buffers, but tried to set to " + numBuffers + ".");
+ checkArgument(numBuffers >= numberOfRequiredMemorySegments, "Buffer pool needs at least " +
+ numberOfRequiredMemorySegments + " buffers, but tried to set to " + numBuffers + ".");
- currentPoolSize = numBuffers;
+ if (numBuffers > maxNumberOfMemorySegments) {
+ currentPoolSize = maxNumberOfMemorySegments;
+ } else {
+ currentPoolSize = numBuffers;
+ }
returnExcessMemorySegments();
@@ -274,7 +319,10 @@ class LocalBufferPool implements BufferPool {
@Override
public String toString() {
synchronized (availableMemorySegments) {
- return String.format("[size: %d, required: %d, requested: %d, available: %d, listeners: %d, destroyed: %s]", currentPoolSize, numberOfRequiredMemorySegments, numberOfRequestedMemorySegments, availableMemorySegments.size(), registeredListeners.size(), isDestroyed);
+ return String.format(
+ "[size: %d, required: %d, requested: %d, available: %d, max: %d, listeners: %d, destroyed: %s]",
+ currentPoolSize, numberOfRequiredMemorySegments, numberOfRequestedMemorySegments,
+ availableMemorySegments.size(), maxNumberOfMemorySegments, registeredListeners.size(), isDestroyed);
}
}
@@ -296,8 +344,7 @@ class LocalBufferPool implements BufferPool {
return;
}
- networkBufferPool.recycle(segment);
- numberOfRequestedMemorySegments--;
+ returnMemorySegment(segment);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 5345fbb..7668759 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -180,7 +180,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
// ------------------------------------------------------------------------
@Override
- public BufferPool createBufferPool(int numRequiredBuffers) throws IOException {
+ public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) throws IOException {
// It is necessary to use a separate lock from the one used for buffer
// requests to ensure deadlock freedom for failure cases.
synchronized (factoryLock) {
@@ -205,7 +205,8 @@ public class NetworkBufferPool implements BufferPoolFactory {
// We are good to go, create a new buffer pool and redistribute
// non-fixed size buffers.
- LocalBufferPool localBufferPool = new LocalBufferPool(this, numRequiredBuffers);
+ LocalBufferPool localBufferPool =
+ new LocalBufferPool(this, numRequiredBuffers, maxUsedBuffers);
allBufferPools.add(localBufferPool);
@@ -236,7 +237,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
/**
* Destroys all buffer pools that allocate their buffers from this
- * buffer pool (created via {@link #createBufferPool(int)}).
+ * buffer pool (created via {@link #createBufferPool(int, int)}).
*/
public void destroyAllBufferPools() {
synchronized (factoryLock) {
@@ -258,27 +259,60 @@ public class NetworkBufferPool implements BufferPoolFactory {
private void redistributeBuffers() throws IOException {
assert Thread.holdsLock(factoryLock);
- int numManagedBufferPools = allBufferPools.size();
+ // All buffers, which are not among the required ones
+ int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
- if (numManagedBufferPools == 0) {
- return; // necessary to avoid div by zero when no managed pools
+ if (numAvailableMemorySegment == 0) {
+ // in this case, we need to redistribute buffers so that every pool gets its minimum
+ for (LocalBufferPool bufferPool : allBufferPools) {
+ bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
+ }
+ return;
}
- // All buffers, which are not among the required ones
- int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
+ /**
+ * With buffer pools being potentially limited, let's distribute the available memory
+ * segments based on the capacity of each buffer pool, i.e. the maximum number of segments
+ * an unlimited buffer pool can take is numAvailableMemorySegment, for limited buffer pools
+ * it may be less. Based on this and the sum of all these values (totalCapacity), we build
+ * a ratio that we use to distribute the buffers.
+ */
- // Available excess (not required) buffers per pool
- int numExcessBuffersPerPool = numAvailableMemorySegment / numManagedBufferPools;
+ int totalCapacity = 0;
+ for (LocalBufferPool bufferPool : allBufferPools) {
+ int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
+ bufferPool.getNumberOfRequiredMemorySegments();
+ totalCapacity += Math.min(numAvailableMemorySegment, excessMax);
+ }
- // Distribute leftover buffers in round robin fashion
- int numLeftoverBuffers = numAvailableMemorySegment % numManagedBufferPools;
+ // no capacity to receive additional buffers?
+ if (totalCapacity == 0) {
+ return; // necessary to avoid div by zero when nothing to re-distribute
+ }
- int bufferPoolIndex = 0;
+ int memorySegmentsToDistribute = Math.min(numAvailableMemorySegment, totalCapacity);
+ int totalPartsUsed = 0; // of totalCapacity
+ int numDistributedMemorySegment = 0;
for (LocalBufferPool bufferPool : allBufferPools) {
- int leftoverBuffers = bufferPoolIndex++ < numLeftoverBuffers ? 1 : 0;
+ int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
+ bufferPool.getNumberOfRequiredMemorySegments();
+
+ // shortcut
+ if (excessMax == 0) {
+ continue;
+ }
+
+ totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);
- bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + numExcessBuffersPerPool + leftoverBuffers);
+ // avoid remaining buffers by looking at the total capacity that should have been
+ // re-distributed up until here
+ int mySize = memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment;
+ numDistributedMemorySegment += mySize;
+ bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
}
+
+ assert (totalPartsUsed == totalCapacity);
+ assert (numDistributedMemorySegment == memorySegmentsToDistribute);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index 36c1234..6f56877 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -67,7 +67,7 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
- bufferPool = networkBufferPool.createBufferPool(1);
+ bufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 3d92584..eb1418b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -160,6 +160,7 @@ public class ResultPartition implements BufferPoolOwner {
break;
case PIPELINED:
+ case PIPELINED_BOUNDED:
for (int i = 0; i < subpartitions.length; i++) {
subpartitions[i] = new PipelinedSubpartition(i, this);
}
@@ -237,6 +238,15 @@ public class ResultPartition implements BufferPoolOwner {
return totalBuffers;
}
+ /**
+ * Returns the type of this result partition.
+ *
+ * @return result partition type
+ */
+ public ResultPartitionType getPartitionType() {
+ return partitionType;
+ }
+
// ------------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index 43d3a52..256387c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -20,9 +20,22 @@ package org.apache.flink.runtime.io.network.partition;
public enum ResultPartitionType {
- BLOCKING(false, false),
+ BLOCKING(false, false, false),
- PIPELINED(true, true);
+ PIPELINED(true, true, false),
+
+ /**
+ * Pipelined partitions with a bounded (local) buffer pool.
+ *
+ * For streaming jobs, a fixed limit on the buffer pool size should help avoid that too much
+ * data is being buffered and checkpoint barriers are delayed. In contrast to limiting the
+ * overall network buffer pool size, this, however, still allows to be flexible with regards
+ * to the total number of partitions by selecting an appropriately big network buffer pool size.
+ *
+ * For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
+ * no checkpoint barriers.
+ */
+ PIPELINED_BOUNDED(true, true, true);
/** Can the partition be consumed while being produced? */
private final boolean isPipelined;
@@ -30,12 +43,16 @@ public enum ResultPartitionType {
/** Does the partition produce back pressure when not consumed? */
private final boolean hasBackPressure;
+ /** Does this partition use a limited number of (network) buffers? */
+ private final boolean isBounded;
+
/**
* Specifies the behaviour of an intermediate result partition at runtime.
*/
- ResultPartitionType(boolean isPipelined, boolean hasBackPressure) {
+ ResultPartitionType(boolean isPipelined, boolean hasBackPressure, boolean isBounded) {
this.isPipelined = isPipelined;
this.hasBackPressure = hasBackPressure;
+ this.isBounded = isBounded;
}
public boolean hasBackPressure() {
@@ -49,4 +66,13 @@ public enum ResultPartitionType {
public boolean isPipelined() {
return isPipelined;
}
+
+ /**
+ * Whether this partition uses a limited number of (network) buffers or not.
+ *
+ * @return <tt>true</tt> if the number of buffers should be bound to some limit
+ */
+ public boolean isBounded() {
+ return isBounded;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index d546559..afe8722 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -115,6 +116,9 @@ public class SingleInputGate implements InputGate {
*/
private final IntermediateDataSetID consumedResultId;
+ /** The type of the partition the input gate is consuming. */
+ private final ResultPartitionType consumedPartitionType;
+
/**
* The index of the consumed subpartition of each consumed partition. This index depends on the
* {@link DistributionPattern} and the subtask indices of the producing and consuming task.
@@ -166,6 +170,7 @@ public class SingleInputGate implements InputGate {
String owningTaskName,
JobID jobId,
IntermediateDataSetID consumedResultId,
+ final ResultPartitionType consumedPartitionType,
int consumedSubpartitionIndex,
int numberOfInputChannels,
TaskActions taskActions,
@@ -175,6 +180,7 @@ public class SingleInputGate implements InputGate {
this.jobId = checkNotNull(jobId);
this.consumedResultId = checkNotNull(consumedResultId);
+ this.consumedPartitionType = checkNotNull(consumedPartitionType);
checkArgument(consumedSubpartitionIndex >= 0);
this.consumedSubpartitionIndex = consumedSubpartitionIndex;
@@ -201,6 +207,15 @@ public class SingleInputGate implements InputGate {
return consumedResultId;
}
+ /**
+ * Returns the type of this input channel's consumed result partition.
+ *
+ * @return consumed result partition type
+ */
+ public ResultPartitionType getConsumedPartitionType() {
+ return consumedPartitionType;
+ }
+
BufferProvider getBufferProvider() {
return bufferPool;
}
@@ -571,6 +586,7 @@ public class SingleInputGate implements InputGate {
TaskIOMetricGroup metrics) {
final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId());
+ final ResultPartitionType consumedPartitionType = checkNotNull(igdd.getConsumedPartitionType());
final int consumedSubpartitionIndex = igdd.getConsumedSubpartitionIndex();
checkArgument(consumedSubpartitionIndex >= 0);
@@ -578,7 +594,7 @@ public class SingleInputGate implements InputGate {
final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());
final SingleInputGate inputGate = new SingleInputGate(
- owningTaskName, jobId, consumedResultId, consumedSubpartitionIndex,
+ owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex,
icdd.length, taskActions, metrics);
// Create the input channels. There is one input channel for each consumed partition.
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 19c5c01..e3c8345 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -370,7 +370,9 @@ public class TaskManagerServices {
kvStateServer,
networkEnvironmentConfiguration.ioMode(),
networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
- networkEnvironmentConfiguration.partitionRequestMaxBackoff());
+ networkEnvironmentConfiguration.partitionRequestMaxBackoff(),
+ networkEnvironmentConfiguration.networkBuffersPerChannel(),
+ networkEnvironmentConfiguration.extraNetworkBuffersPerGate());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 2c76372..8ad318a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.HeapMemorySegment;
import org.apache.flink.core.memory.HybridMemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
@@ -311,13 +312,25 @@ public class TaskManagerServicesConfiguration {
ioMode = IOManager.IOMode.SYNC;
}
+ int initialRequestBackoff = configuration.getInteger(
+ TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+ int maxRequestBackoff = configuration.getInteger(
+ TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+ int buffersPerChannel = configuration.getInteger(
+ TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+ int extraBuffersPerGate = configuration.getInteger(
+ TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
return new NetworkEnvironmentConfiguration(
numNetworkBuffers,
pageSize,
memType,
ioMode,
- 500,
- 3000,
+ initialRequestBackoff,
+ maxRequestBackoff,
+ buffersPerChannel,
+ extraBuffersPerGate,
nettyConfig);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 6c7ca3e..4ecfe59 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -29,4 +29,6 @@ case class NetworkEnvironmentConfiguration(
ioMode: IOMode,
partitionRequestInitialBackoff : Int,
partitionRequestMaxBackoff : Int,
+ networkBuffersPerChannel: Int,
+ extraNetworkBuffersPerGate: Int,
nettyConfig: NettyConfig = null)
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
index dcdf44c..3bbe6d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
@@ -36,7 +36,7 @@ public class MockNetworkEnvironment {
static {
try {
- when(networkBufferPool.createBufferPool(anyInt())).thenReturn(mock(BufferPool.class));
+ when(networkBufferPool.createBufferPool(anyInt(), anyInt())).thenReturn(mock(BufferPool.class));
when(networkEnvironment.getNetworkBufferPool()).thenReturn(networkBufferPool);
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
new file mode 100644
index 0000000..b956691
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Various tests for the {@link NetworkEnvironment} class.
+ */
+public class NetworkEnvironmentTest {
+ private final static int numBuffers = 1024;
+
+ private final static int memorySegmentSize = 128;
+
+ /**
+ * Verifies that {@link NetworkEnvironment#registerTask(Task)} sets up (un)bounded buffer pool
+ * instances for various types of input and output channels.
+ */
+ @Test
+ public void testRegisterTaskUsesBoundedBuffers() throws Exception {
+
+ final NetworkEnvironment network = new NetworkEnvironment(
+ new NetworkBufferPool(numBuffers, memorySegmentSize, MemoryType.HEAP),
+ new LocalConnectionManager(),
+ new ResultPartitionManager(),
+ new TaskEventDispatcher(),
+ new KvStateRegistry(),
+ null,
+ IOManager.IOMode.SYNC,
+ 0,
+ 0,
+ 2,
+ 8);
+
+ // result partitions
+ ResultPartition rp1 = createResultPartition(ResultPartitionType.PIPELINED, 2);
+ ResultPartition rp2 = createResultPartition(ResultPartitionType.BLOCKING, 2);
+ ResultPartition rp3 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 2);
+ ResultPartition rp4 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 8);
+ final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
+ final ResultPartitionWriter[] resultPartitionWriters = new ResultPartitionWriter[] {
+ new ResultPartitionWriter(rp1), new ResultPartitionWriter(rp2),
+ new ResultPartitionWriter(rp3), new ResultPartitionWriter(rp4)};
+
+ // input gates
+ final SingleInputGate[] inputGates = new SingleInputGate[] {
+ createSingleInputGateMock(ResultPartitionType.PIPELINED, 2),
+ createSingleInputGateMock(ResultPartitionType.BLOCKING, 2),
+ createSingleInputGateMock(ResultPartitionType.PIPELINED_BOUNDED, 2),
+ createSingleInputGateMock(ResultPartitionType.PIPELINED_BOUNDED, 8)};
+
+ // overall task to register
+ Task task = mock(Task.class);
+ when(task.getProducedPartitions()).thenReturn(resultPartitions);
+ when(task.getAllWriters()).thenReturn(resultPartitionWriters);
+ when(task.getAllInputGates()).thenReturn(inputGates);
+
+ network.registerTask(task);
+
+ assertEquals(Integer.MAX_VALUE, rp1.getBufferPool().getMaxNumberOfMemorySegments());
+ assertEquals(Integer.MAX_VALUE, rp2.getBufferPool().getMaxNumberOfMemorySegments());
+ assertEquals(2 * 2 + 8, rp3.getBufferPool().getMaxNumberOfMemorySegments());
+ assertEquals(8 * 2 + 8, rp4.getBufferPool().getMaxNumberOfMemorySegments());
+
+ network.shutdown();
+ }
+
+ /**
+ * Helper to create simple {@link ResultPartition} instance for use by a {@link Task} inside
+ * {@link NetworkEnvironment#registerTask(Task)}.
+ *
+ * @param partitionType
+ * the produced partition type
+ * @param channels
+ * the nummer of output channels
+ *
+ * @return instance with minimal data set and some mocks so that it is useful for {@link
+ * NetworkEnvironment#registerTask(Task)}
+ */
+ private static ResultPartition createResultPartition(
+ final ResultPartitionType partitionType, final int channels) {
+ return new ResultPartition(
+ "TestTask-" + partitionType + ":" + channels,
+ mock(TaskActions.class),
+ new JobID(),
+ new ResultPartitionID(),
+ partitionType,
+ channels,
+ channels,
+ mock(ResultPartitionManager.class),
+ mock(ResultPartitionConsumableNotifier.class),
+ mock(IOManager.class),
+ false);
+ }
+
+ /**
+ * Helper to create a mock of a {@link SingleInputGate} for use by a {@link Task} inside
+ * {@link NetworkEnvironment#registerTask(Task)}.
+ *
+ * @param partitionType
+ * the consumed partition type
+ * @param channels
+ * the nummer of input channels
+ *
+ * @return mock with minimal functionality necessary by {@link NetworkEnvironment#registerTask(Task)}
+ */
+ private static SingleInputGate createSingleInputGateMock(
+ final ResultPartitionType partitionType, final int channels) {
+ SingleInputGate ig = mock(SingleInputGate.class);
+ when(ig.getConsumedPartitionType()).thenReturn(partitionType);
+ when(ig.getNumberOfInputChannels()).thenReturn(channels);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(final InvocationOnMock invocation) throws Throwable {
+ BufferPool bp = invocation.getArgumentAt(0, BufferPool.class);
+ if (partitionType == ResultPartitionType.PIPELINED_BOUNDED) {
+ assertEquals(channels * 2 + 8, bp.getMaxNumberOfMemorySegments());
+ } else {
+ assertEquals(Integer.MAX_VALUE, bp.getMaxNumberOfMemorySegments());
+ }
+ return null;
+ }
+ }).when(ig).setBufferPool(any(BufferPool.class));
+ return ig;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index ca1d0a5..7d83fb5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -177,7 +177,7 @@ public class RecordWriterTest {
try {
buffers = new NetworkBufferPool(1, 1024, MemoryType.HEAP);
- bufferPool = spy(buffers.createBufferPool(1));
+ bufferPool = spy(buffers.createBufferPool(1, Integer.MAX_VALUE));
ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class);
when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferPool));
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
index 49808c9..ce76a6d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Random;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
public class BufferPoolFactoryTest {
@@ -48,61 +49,190 @@ public class BufferPoolFactoryTest {
public void verifyAllBuffersReturned() {
String msg = "Did not return all buffers to network buffer pool after test.";
assertEquals(msg, numBuffers, networkBufferPool.getNumberOfAvailableMemorySegments());
+ // in case buffers have actually been requested, we must release them again
+ networkBufferPool.destroy();
}
@Test(expected = IOException.class)
public void testRequireMoreThanPossible() throws IOException {
- networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2);
+ networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2, Integer.MAX_VALUE);
+ }
+
+ @Test
+ public void testBoundedPools() throws IOException {
+ BufferPool lbp = networkBufferPool.createBufferPool(1, 1);
+ assertEquals(1, lbp.getNumBuffers());
+
+ lbp = networkBufferPool.createBufferPool(1, 2);
+ assertEquals(2, lbp.getNumBuffers());
}
@Test
public void testSingleManagedPoolGetsAll() throws IOException {
- BufferPool lbp = networkBufferPool.createBufferPool(1);
+ BufferPool lbp = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), lbp.getNumBuffers());
}
@Test
+ public void testSingleManagedPoolGetsAllExceptFixedOnes() throws IOException {
+ BufferPool fixed = networkBufferPool.createBufferPool(24, 24);
+
+ BufferPool lbp = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
+
+ assertEquals(24, fixed.getNumBuffers());
+ assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() - fixed.getNumBuffers(), lbp.getNumBuffers());
+ }
+
+ @Test
public void testUniformDistribution() throws IOException {
- BufferPool first = networkBufferPool.createBufferPool(0);
- BufferPool second = networkBufferPool.createBufferPool(0);
+ BufferPool first = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
+ assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), first.getNumBuffers());
+ BufferPool second = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, second.getNumBuffers());
}
+ /**
+ * Tests that buffers, once given to an initial buffer pool, get re-distributed to a second one
+ * in case both buffer pools request half of the available buffer count.
+ */
@Test
- public void testAllDistributed() {
- Random random = new Random();
+ public void testUniformDistributionAllBuffers() throws IOException {
+ BufferPool first = networkBufferPool
+ .createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() / 2, Integer.MAX_VALUE);
+ assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), first.getNumBuffers());
- try {
- List<BufferPool> pools = new ArrayList<BufferPool>();
+ BufferPool second = networkBufferPool
+ .createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() / 2, Integer.MAX_VALUE);
+ assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
+ assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2,
+ second.getNumBuffers());
+ }
- int numPools = numBuffers / 32;
- for (int i = 0; i < numPools; i++) {
- pools.add(networkBufferPool.createBufferPool(random.nextInt(7 + 1)));
+ @Test
+ public void testUniformDistributionBounded1() throws IOException {
+ BufferPool first = networkBufferPool.createBufferPool(0, networkBufferPool.getTotalNumberOfMemorySegments());
+ assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), first.getNumBuffers());
+
+ BufferPool second = networkBufferPool.createBufferPool(0, networkBufferPool.getTotalNumberOfMemorySegments());
+ assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
+ assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, second.getNumBuffers());
+ }
+
+ @Test
+ public void testUniformDistributionBounded2() throws IOException {
+ BufferPool first = networkBufferPool.createBufferPool(0, 10);
+ assertEquals(10, first.getNumBuffers());
+
+ BufferPool second = networkBufferPool.createBufferPool(0, 10);
+ assertEquals(10, first.getNumBuffers());
+ assertEquals(10, second.getNumBuffers());
+ }
+
+ @Test
+ public void testUniformDistributionBounded3() throws IOException {
+ NetworkBufferPool globalPool = new NetworkBufferPool(3, 128, MemoryType.HEAP);
+ BufferPool first = globalPool.createBufferPool(0, 10);
+ assertEquals(3, first.getNumBuffers());
+
+ BufferPool second = globalPool.createBufferPool(0, 10);
+ // the order of which buffer pool received 2 or 1 buffer is undefined
+ assertEquals(3, first.getNumBuffers() + second.getNumBuffers());
+ assertNotEquals(3, first.getNumBuffers());
+ assertNotEquals(3, second.getNumBuffers());
+
+ BufferPool third = globalPool.createBufferPool(0, 10);
+ assertEquals(1, first.getNumBuffers());
+ assertEquals(1, second.getNumBuffers());
+ assertEquals(1, third.getNumBuffers());
+
+ // similar to #verifyAllBuffersReturned()
+ String msg = "Did not return all buffers to network buffer pool after test.";
+ assertEquals(msg, 3, globalPool.getNumberOfAvailableMemorySegments());
+ // in case buffers have actually been requested, we must release them again
+ globalPool.destroy();
+ }
+
+ @Test
+ public void testBufferRedistributionMixed1() throws IOException {
+ // try multiple times for various orders during redistribution
+ for (int i = 0; i < 1_000; ++i) {
+ BufferPool first = networkBufferPool.createBufferPool(0, 10);
+ assertEquals(10, first.getNumBuffers());
+
+ BufferPool second = networkBufferPool.createBufferPool(0, 10);
+ assertEquals(10, first.getNumBuffers());
+ assertEquals(10, second.getNumBuffers());
+
+ BufferPool third = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
+ // note: exact buffer distribution depends on the order during the redistribution
+ for (BufferPool bp : new BufferPool[] {first, second, third}) {
+ int size = networkBufferPool.getTotalNumberOfMemorySegments() *
+ Math.min(networkBufferPool.getTotalNumberOfMemorySegments(),
+ bp.getMaxNumberOfMemorySegments()) /
+ (networkBufferPool.getTotalNumberOfMemorySegments() + 20);
+ if (bp.getNumBuffers() != size && bp.getNumBuffers() != (size + 1)) {
+ fail("wrong buffer pool size after redistribution: " + bp.getNumBuffers());
+ }
}
- int numDistributedBuffers = 0;
- for (BufferPool pool : pools) {
- numDistributedBuffers += pool.getNumBuffers();
+ BufferPool fourth = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
+ // note: exact buffer distribution depends on the order during the redistribution
+ for (BufferPool bp : new BufferPool[] {first, second, third, fourth}) {
+ int size = networkBufferPool.getTotalNumberOfMemorySegments() *
+ Math.min(networkBufferPool.getTotalNumberOfMemorySegments(),
+ bp.getMaxNumberOfMemorySegments()) /
+ (2 * networkBufferPool.getTotalNumberOfMemorySegments() + 20);
+ if (bp.getNumBuffers() != size && bp.getNumBuffers() != (size + 1)) {
+ fail("wrong buffer pool size after redistribution: " + bp.getNumBuffers());
+ }
}
- assertEquals(numBuffers, numDistributedBuffers);
+ verifyAllBuffersReturned();
+ setupNetworkBufferPool();
}
- catch (Throwable t) {
- t.printStackTrace();
- fail(t.getMessage());
+ }
+
+ @Test
+ public void testAllDistributed() throws IOException {
+ // try multiple times for various orders during redistribution
+ for (int i = 0; i < 1_000; ++i) {
+ Random random = new Random();
+
+ List<BufferPool> pools = new ArrayList<BufferPool>();
+
+ int numPools = numBuffers / 32;
+ long maxTotalUsed = 0;
+ for (int j = 0; j < numPools; j++) {
+ int numRequiredBuffers = random.nextInt(7 + 1);
+ // make unbounded buffers more likely:
+ int maxUsedBuffers = random.nextBoolean() ? Integer.MAX_VALUE :
+ Math.max(1, random.nextInt(10) + numRequiredBuffers);
+ pools.add(networkBufferPool.createBufferPool(numRequiredBuffers, maxUsedBuffers));
+ maxTotalUsed = Math.min(numBuffers, maxTotalUsed + maxUsedBuffers);
+
+ // after every iteration, all buffers (up to maxTotalUsed) must be distributed
+ int numDistributedBuffers = 0;
+ for (BufferPool pool : pools) {
+ numDistributedBuffers += pool.getNumBuffers();
+ }
+ assertEquals(maxTotalUsed, numDistributedBuffers);
+ }
+
+ verifyAllBuffersReturned();
+ setupNetworkBufferPool();
}
}
@Test
public void testCreateDestroy() throws IOException {
- BufferPool first = networkBufferPool.createBufferPool(0);
+ BufferPool first = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), first.getNumBuffers());
- BufferPool second = networkBufferPool.createBufferPool(0);
+ BufferPool second = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 93731e1..a186d56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -328,6 +328,64 @@ public class LocalBufferPoolTest {
}
}
+ @Test
+ public void testBoundedBuffer() throws Exception {
+ localBufferPool.lazyDestroy();
+
+ localBufferPool = new LocalBufferPool(networkBufferPool, 1, 2);
+ assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+ assertEquals(2, localBufferPool.getMaxNumberOfMemorySegments());
+
+ Buffer buffer1, buffer2;
+
+ // check min number of buffers:
+ localBufferPool.setNumBuffers(1);
+ assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+ assertNotNull(buffer1 = localBufferPool.requestBuffer());
+ assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+ assertNull(localBufferPool.requestBuffer());
+ assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+ buffer1.recycle();
+ assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+
+ // check max number of buffers:
+ localBufferPool.setNumBuffers(2);
+ assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+ assertNotNull(buffer1 = localBufferPool.requestBuffer());
+ assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+ assertNotNull(buffer2 = localBufferPool.requestBuffer());
+ assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+ assertNull(localBufferPool.requestBuffer());
+ assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+ buffer1.recycle();
+ assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+ buffer2.recycle();
+ assertEquals(2, localBufferPool.getNumberOfAvailableMemorySegments());
+
+ // try to set too large buffer size:
+ localBufferPool.setNumBuffers(3);
+ assertEquals(2, localBufferPool.getNumberOfAvailableMemorySegments());
+ assertNotNull(buffer1 = localBufferPool.requestBuffer());
+ assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+ assertNotNull(buffer2 = localBufferPool.requestBuffer());
+ assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+ assertNull(localBufferPool.requestBuffer());
+ assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+ buffer1.recycle();
+ assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+ buffer2.recycle();
+ assertEquals(2, localBufferPool.getNumberOfAvailableMemorySegments());
+
+ // decrease size again
+ localBufferPool.setNumBuffers(1);
+ assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+ assertNotNull(buffer1 = localBufferPool.requestBuffer());
+ assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+ assertNull(localBufferPool.requestBuffer());
+ buffer1.recycle();
+ assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+ }
+
// ------------------------------------------------------------------------
// Helpers
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index ab32685..7c6a543 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.core.memory.MemoryType;
import org.junit.Test;
+import java.util.ArrayList;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -47,7 +49,23 @@ public class NetworkBufferPoolTest {
assertTrue(globalPool.isDestroyed());
try {
- globalPool.createBufferPool(2);
+ globalPool.createBufferPool(2, 2);
+ fail("Should throw an IllegalStateException");
+ }
+ catch (IllegalStateException e) {
+ // yippie!
+ }
+
+ try {
+ globalPool.createBufferPool(2, 10);
+ fail("Should throw an IllegalStateException");
+ }
+ catch (IllegalStateException e) {
+ // yippie!
+ }
+
+ try {
+ globalPool.createBufferPool(2, Integer.MAX_VALUE);
fail("Should throw an IllegalStateException");
}
catch (IllegalStateException e) {
@@ -60,39 +78,51 @@ public class NetworkBufferPoolTest {
}
}
+
@Test
public void testDestroyAll() {
try {
- NetworkBufferPool globalPool = new NetworkBufferPool(8, 128, MemoryType.HEAP);
-
- BufferPool lbp = globalPool.createBufferPool(5);
-
- assertEquals(5, lbp.getNumberOfRequiredMemorySegments());
-
- Buffer[] buffers = {
- lbp.requestBuffer(),
- lbp.requestBuffer(),
- lbp.requestBuffer(),
- lbp.requestBuffer(),
- lbp.requestBuffer(),
- lbp.requestBuffer(),
- lbp.requestBuffer(),
- lbp.requestBuffer()
- };
-
- for (Buffer b : buffers) {
- assertNotNull(b);
- assertNotNull(b.getMemorySegment());
+ NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP);
+
+ BufferPool fixedPool = globalPool.createBufferPool(2, 2);
+ BufferPool boundedPool = globalPool.createBufferPool(0, 1);
+ BufferPool nonFixedPool = globalPool.createBufferPool(5, Integer.MAX_VALUE);
+
+ assertEquals(2, fixedPool.getNumberOfRequiredMemorySegments());
+ assertEquals(0, boundedPool.getNumberOfRequiredMemorySegments());
+ assertEquals(5, nonFixedPool.getNumberOfRequiredMemorySegments());
+
+ // actually, the buffer pool sizes may be different due to rounding and based on the internal order of
+ // the buffer pools - the total number of retrievable buffers should be equal to the number of buffers
+ // in the NetworkBufferPool though
+
+ ArrayList<Buffer> buffers = new ArrayList<>(globalPool.getTotalNumberOfMemorySegments());
+ collectBuffers:
+ for (int i = 0; i < 10; ++i) {
+ for (BufferPool bp : new BufferPool[] { fixedPool, boundedPool, nonFixedPool }) {
+ Buffer buffer = bp.requestBuffer();
+ if (buffer != null) {
+ assertNotNull(buffer.getMemorySegment());
+ buffers.add(buffer);
+ continue collectBuffers;
+ }
+ }
}
- assertNull(lbp.requestBuffer());
+ assertEquals(globalPool.getTotalNumberOfMemorySegments(), buffers.size());
+
+ assertNull(fixedPool.requestBuffer());
+ assertNull(boundedPool.requestBuffer());
+ assertNull(nonFixedPool.requestBuffer());
// destroy all allocated ones
globalPool.destroyAllBufferPools();
// check the destroyed status
assertFalse(globalPool.isDestroyed());
- assertTrue(lbp.isDestroyed());
+ assertTrue(fixedPool.isDestroyed());
+ assertTrue(boundedPool.isDestroyed());
+ assertTrue(nonFixedPool.isDestroyed());
assertEquals(0, globalPool.getNumberOfRegisteredBufferPools());
@@ -107,7 +137,23 @@ public class NetworkBufferPoolTest {
// can request no more buffers
try {
- lbp.requestBuffer();
+ fixedPool.requestBuffer();
+ fail("Should fail with an IllegalStateException");
+ }
+ catch (IllegalStateException e) {
+ // yippie!
+ }
+
+ try {
+ boundedPool.requestBuffer();
+ fail("Should fail with an IllegalStateException");
+ }
+ catch (IllegalStateException e) {
+ // that's the way we like it, aha, aha
+ }
+
+ try {
+ nonFixedPool.requestBuffer();
fail("Should fail with an IllegalStateException");
}
catch (IllegalStateException e) {
@@ -115,7 +161,7 @@ public class NetworkBufferPoolTest {
}
// can create a new pool now
- assertNotNull(globalPool.createBufferPool(8));
+ assertNotNull(globalPool.createBufferPool(10, Integer.MAX_VALUE));
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 27177c9..64f82f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -60,7 +60,7 @@ public class InputGateConcurrentTest {
final SingleInputGate gate = new SingleInputGate(
"Test Task Name",
new JobID(),
- new IntermediateDataSetID(),
+ new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0, numChannels,
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
@@ -95,7 +95,7 @@ public class InputGateConcurrentTest {
final SingleInputGate gate = new SingleInputGate(
"Test Task Name",
new JobID(),
- new IntermediateDataSetID(),
+ new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0,
numChannels,
mock(TaskActions.class),
@@ -144,7 +144,7 @@ public class InputGateConcurrentTest {
final SingleInputGate gate = new SingleInputGate(
"Test Task Name",
new JobID(),
- new IntermediateDataSetID(),
+ new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0,
numChannels,
mock(TaskActions.class),
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 7e1d792..f933840 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -347,7 +347,8 @@ public class InputGateFairnessTest {
TaskActions taskActions,
TaskIOMetricGroup metrics) {
- super(owningTaskName, jobId, consumedResultId, consumedSubpartitionIndex,
+ super(owningTaskName, jobId, consumedResultId, ResultPartitionType.PIPELINED,
+ consumedSubpartitionIndex,
numberOfInputChannels, taskActions, metrics);
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 15ff2da..18c3038 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -128,7 +128,7 @@ public class LocalInputChannelTest {
// Create a buffer pool for this partition
partition.registerBufferPool(
- networkBuffers.createBufferPool(producerBufferPoolSize));
+ networkBuffers.createBufferPool(producerBufferPoolSize, producerBufferPoolSize));
// Create the producer
partitionProducers[i] = new TestPartitionProducer(
@@ -162,7 +162,7 @@ public class LocalInputChannelTest {
i,
parallelism,
numberOfBuffersPerChannel,
- networkBuffers.createBufferPool(parallelism),
+ networkBuffers.createBufferPool(parallelism, parallelism),
partitionManager,
new TaskEventDispatcher(),
partitionIds)));
@@ -284,6 +284,7 @@ public class LocalInputChannelTest {
"test task name",
new JobID(),
new IntermediateDataSetID(),
+ ResultPartitionType.PIPELINED,
0,
1,
mock(TaskActions.class),
@@ -481,6 +482,7 @@ public class LocalInputChannelTest {
"Test Name",
new JobID(),
new IntermediateDataSetID(),
+ ResultPartitionType.PIPELINED,
subpartitionIndex,
numberOfInputChannels,
mock(TaskActions.class),
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index a25b8d5..2d1b4b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -73,7 +74,13 @@ public class SingleInputGateTest {
public void testBasicGetNextLogic() throws Exception {
// Setup
final SingleInputGate inputGate = new SingleInputGate(
- "Test Task Name", new JobID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ "Test Task Name", new JobID(),
+ new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
+ 0, 2,
+ mock(TaskActions.class),
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+
+ assertEquals(ResultPartitionType.PIPELINED, inputGate.getConsumedPartitionType());
final TestInputChannel[] inputChannels = new TestInputChannel[]{
new TestInputChannel(inputGate, 0),
@@ -127,7 +134,12 @@ public class SingleInputGateTest {
// Setup reader with one local and one unknown input channel
final IntermediateDataSetID resultId = new IntermediateDataSetID();
- final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), resultId, 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ final SingleInputGate inputGate = new SingleInputGate(
+ "Test Task Name", new JobID(),
+ resultId, ResultPartitionType.PIPELINED,
+ 0, 2,
+ mock(TaskActions.class),
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
final BufferPool bufferPool = mock(BufferPool.class);
when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
@@ -179,6 +191,7 @@ public class SingleInputGateTest {
"t1",
new JobID(),
new IntermediateDataSetID(),
+ ResultPartitionType.PIPELINED,
0,
1,
mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
@@ -218,6 +231,7 @@ public class SingleInputGateTest {
"InputGate",
new JobID(),
new IntermediateDataSetID(),
+ ResultPartitionType.PIPELINED,
0,
1,
mock(TaskActions.class),
@@ -303,7 +317,9 @@ public class SingleInputGateTest {
partitionIds[2],
ResultPartitionLocation.createUnknown())};
- InputGateDeploymentDescriptor gateDesc = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), 0, channelDescs);
+ InputGateDeploymentDescriptor gateDesc =
+ new InputGateDeploymentDescriptor(new IntermediateDataSetID(),
+ ResultPartitionType.PIPELINED, 0, channelDescs);
int initialBackoff = 137;
int maxBackoff = 1001;
@@ -324,6 +340,8 @@ public class SingleInputGateTest {
mock(TaskActions.class),
new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ assertEquals(gateDesc.getConsumedPartitionType(), gate.getConsumedPartitionType());
+
Map<IntermediateResultPartitionID, InputChannel> channelMap = gate.getInputChannels();
assertEquals(3, channelMap.size());
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index fe3b087..18ad490 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
@@ -55,6 +56,7 @@ public class TestSingleInputGate {
"Test Task Name",
new JobID(),
new IntermediateDataSetID(),
+ ResultPartitionType.PIPELINED,
0,
numberOfInputChannels,
mock(TaskActions.class),
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index c05df0a..bc1dd07 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.taskmanager.TaskActions;
@@ -42,8 +43,18 @@ public class UnionInputGateTest {
public void testBasicGetNextLogic() throws Exception {
// Setup
final String testTaskName = "Test Task";
- final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new IntermediateDataSetID(), 0, 3, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
- final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new IntermediateDataSetID(), 0, 5, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ final SingleInputGate ig1 = new SingleInputGate(
+ testTaskName, new JobID(),
+ new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
+ 0, 3,
+ mock(TaskActions.class),
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ final SingleInputGate ig2 = new SingleInputGate(
+ testTaskName, new JobID(),
+ new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
+ 0, 5,
+ mock(TaskActions.class),
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index e26e176..cecfe6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -113,7 +113,8 @@ public class TaskManagerComponentsStartupShutdownTest {
false); // exit-jvm-on-fatal-error
final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
- 32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, null);
+ 32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC,
+ 0, 0, 2, 8, null);
ResourceID taskManagerId = ResourceID.generate();
@@ -130,7 +131,9 @@ public class TaskManagerComponentsStartupShutdownTest {
null,
netConf.ioMode(),
netConf.partitionRequestInitialBackoff(),
- netConf.partitionRequestMaxBackoff());
+ netConf.partitionRequestMaxBackoff(),
+ netConf.networkBuffersPerChannel(),
+ netConf.extraNetworkBuffersPerGate());
network.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 356d693..0b39e20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -69,6 +69,7 @@ import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
import org.apache.flink.runtime.messages.TaskMessages.StopTask;
import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -91,6 +92,7 @@ import scala.concurrent.duration.FiniteDuration;
import scala.util.Failure;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
@@ -637,7 +639,7 @@ public class TaskManagerTest extends TestLogger {
InputGateDeploymentDescriptor ircdd =
new InputGateDeploymentDescriptor(
- new IntermediateDataSetID(),
+ new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0, new InputChannelDeploymentDescriptor[]{
new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
}
@@ -782,7 +784,7 @@ public class TaskManagerTest extends TestLogger {
InputGateDeploymentDescriptor ircdd =
new InputGateDeploymentDescriptor(
- new IntermediateDataSetID(),
+ new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0, new InputChannelDeploymentDescriptor[]{
new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
}
@@ -936,7 +938,7 @@ public class TaskManagerTest extends TestLogger {
new InputChannelDeploymentDescriptor(partitionId, loc)};
final InputGateDeploymentDescriptor igdd =
- new InputGateDeploymentDescriptor(resultId, 0, icdd);
+ new InputGateDeploymentDescriptor(resultId, ResultPartitionType.PIPELINED, 0, icdd);
final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
jid, "TestJob", vid, eid,
@@ -978,6 +980,25 @@ public class TaskManagerTest extends TestLogger {
}};
}
+ @Test
+ public void testTaskManagerServicesConfiguration() throws Exception {
+
+ // set some non-default values
+ final Configuration config = new Configuration();
+ config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+ config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+ config.setInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL, 10);
+ config.setInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
+
+ TaskManagerServicesConfiguration tmConfig =
+ TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getByName("localhost"), true);
+
+ assertEquals(tmConfig.getNetworkConfig().partitionRequestInitialBackoff(), 100);
+ assertEquals(tmConfig.getNetworkConfig().partitionRequestMaxBackoff(), 200);
+ assertEquals(tmConfig.getNetworkConfig().networkBuffersPerChannel(), 10);
+ assertEquals(tmConfig.getNetworkConfig().extraNetworkBuffersPerGate(), 100);
+ }
+
/**
* Tests that repeated local {@link PartitionNotFoundException}s ultimately fail the receiver.
*/
@@ -1031,7 +1052,7 @@ public class TaskManagerTest extends TestLogger {
new InputChannelDeploymentDescriptor(partitionId, loc)};
final InputGateDeploymentDescriptor igdd =
- new InputGateDeploymentDescriptor(resultId, 0, icdd);
+ new InputGateDeploymentDescriptor(resultId, ResultPartitionType.PIPELINED, 0, icdd);
final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
jid, "TestJob", vid, eid,
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 003eff9..c18b527 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -402,17 +402,17 @@ public class StreamingJobGraphGenerator {
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.POINTWISE,
- ResultPartitionType.PIPELINED);
+ ResultPartitionType.PIPELINED_BOUNDED);
} else if (partitioner instanceof RescalePartitioner){
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.POINTWISE,
- ResultPartitionType.PIPELINED);
+ ResultPartitionType.PIPELINED_BOUNDED);
} else {
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.ALL_TO_ALL,
- ResultPartitionType.PIPELINED);
+ ResultPartitionType.PIPELINED_BOUNDED);
}
// set strategy name so that web interface can show it.
jobEdge.setShipStrategyName(partitioner.toString());