You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/24 21:12:55 UTC
[6/6] flink git commit: [hotfix] [tests] Fix mini cluster usage and
logging/printing in CustomDistributionITCase
[hotfix] [tests] Fix mini cluster usage and logging/printing in CustomDistributionITCase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/addad1af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/addad1af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/addad1af
Branch: refs/heads/master
Commit: addad1af453a088c559db234370db565a35fbc11
Parents: 635c869
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 24 21:02:09 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 24 21:19:04 2016 +0200
----------------------------------------------------------------------
.../CustomDistributionITCase.java | 110 +++++++++++--------
1 file changed, 64 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/addad1af/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
index c6bc08e..ca2c156 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
@@ -30,30 +30,60 @@ import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.Collector;
-import org.junit.Test;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.fail;
+@SuppressWarnings("serial")
+public class CustomDistributionITCase extends TestLogger {
-public class CustomDistributionITCase {
+ // ------------------------------------------------------------------------
+ // The mini cluster that is shared across tests
+ // ------------------------------------------------------------------------
- @Test
- public void testPartitionWithDistribution1() throws Exception{
- /*
- * Test the record partitioned rightly with one field according to the customized data distribution
- */
+ private static ForkableFlinkMiniCluster cluster;
- ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ @BeforeClass
+ public static void setup() throws Exception {
+ cluster = TestBaseUtils.startCluster(1, 8, false, false, true);
+ }
- DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+ @AfterClass
+ public static void teardown() throws Exception {
+ TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+ }
+
+ @Before
+ public void prepare() {
+ TestEnvironment clusterEnv = new TestEnvironment(cluster, 1);
+ clusterEnv.setAsContext();
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Test the record partitioned rightly with one field according to the customized data distribution
+ */
+ @Test
+ public void testPartitionWithDistribution1() throws Exception {
final TestDataDist1 dist = new TestDataDist1();
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(dist.getParallelism());
+ DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
DataSet<Boolean> result = DataSetUtils
.partitionByRange(input, dist, 0)
.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
@@ -96,13 +126,15 @@ public class CustomDistributionITCase {
env.execute();
}
+ /**
+ * Test the record partitioned rightly with two fields according to the customized data distribution
+ */
@Test
- public void testRangeWithDistribution2() throws Exception{
- /*
- * Test the record partitioned rightly with two fields according to the customized data distribution
- */
+ public void testRangeWithDistribution2() throws Exception {
+ final TestDataDist2 dist = new TestDataDist2();
- ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(dist.getParallelism());
DataSet<Tuple3<Integer, Integer, String>> input = env.fromElements(
new Tuple3<>(1, 5, "Hi"),
@@ -122,10 +154,6 @@ public class CustomDistributionITCase {
new Tuple3<>(5, 3, "Hi Java again")
);
- final TestDataDist2 dist = new TestDataDist2();
-
- env.setParallelism(dist.getParallelism());
-
DataSet<Boolean> result = DataSetUtils
.partitionByRange(input, dist, 0, 1)
.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() {
@@ -175,18 +203,18 @@ public class CustomDistributionITCase {
env.execute();
}
+ /*
+ * Test the number of partition keys less than the number of distribution fields
+ */
@Test
- public void testPartitionKeyLessDistribution() throws Exception{
- /*
- * Test the number of partition keys less than the number of distribution fields
- */
- ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-
- DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+ public void testPartitionKeyLessDistribution() throws Exception {
final TestDataDist2 dist = new TestDataDist2();
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(dist.getParallelism());
+ DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
DataSet<Boolean> result = DataSetUtils
.partitionByRange(input, dist, 0)
.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
@@ -229,19 +257,17 @@ public class CustomDistributionITCase {
env.execute();
}
+ /*
+ * Test the number of partition keys larger than the number of distribution fields
+ */
@Test(expected = IllegalArgumentException.class)
- public void testPartitionMoreThanDistribution() throws Exception{
- /*
- * Test the number of partition keys larger than the number of distribution fields
- */
+ public void testPartitionMoreThanDistribution() throws Exception {
+ final TestDataDist2 dist = new TestDataDist2();
- ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
- final TestDataDist2 dist = new TestDataDist2();
-
- DataSet<Tuple3<Integer, Long, String>> result = DataSetUtils
- .partitionByRange(input, dist, 0, 1, 2);
+ DataSetUtils.partitionByRange(input, dist, 0, 1, 2);
}
/**
@@ -278,14 +304,10 @@ public class CustomDistributionITCase {
}
@Override
- public void write(DataOutputView out) throws IOException {
-
- }
+ public void write(DataOutputView out) throws IOException {}
@Override
- public void read(DataInputView in) throws IOException {
-
- }
+ public void read(DataInputView in) throws IOException {}
}
/**
@@ -323,13 +345,9 @@ public class CustomDistributionITCase {
}
@Override
- public void write(DataOutputView out) throws IOException {
-
- }
+ public void write(DataOutputView out) throws IOException {}
@Override
- public void read(DataInputView in) throws IOException {
-
- }
+ public void read(DataInputView in) throws IOException {}
}
}