You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/08/24 21:12:55 UTC

[6/6] flink git commit: [hotfix] [tests] Fix mini cluster usage and logging/printing in CustomDistributionITCase

[hotfix] [tests] Fix mini cluster usage and logging/printing in CustomDistributionITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/addad1af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/addad1af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/addad1af

Branch: refs/heads/master
Commit: addad1af453a088c559db234370db565a35fbc11
Parents: 635c869
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 24 21:02:09 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Aug 24 21:19:04 2016 +0200

----------------------------------------------------------------------
 .../CustomDistributionITCase.java               | 110 +++++++++++--------
 1 file changed, 64 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/addad1af/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
index c6bc08e..ca2c156 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
@@ -30,30 +30,60 @@ import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
-import org.junit.Test;
+import org.apache.flink.util.TestLogger;
 
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 import java.io.IOException;
 
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("serial")
+public class CustomDistributionITCase extends TestLogger {
 
-public class CustomDistributionITCase {
+	// ------------------------------------------------------------------------
+	//  The mini cluster that is shared across tests
+	// ------------------------------------------------------------------------
 
-	@Test
-	public void testPartitionWithDistribution1() throws Exception{
-		/*
-		 * Test the record partitioned rightly with one field according to the customized data distribution
-		 */
+	private static ForkableFlinkMiniCluster cluster;
 
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+	@BeforeClass
+	public static void setup() throws Exception {
+		cluster = TestBaseUtils.startCluster(1, 8, false, false, true);
+	}
 
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+	@AfterClass
+	public static void teardown() throws Exception {
+		TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
+	}
+
+	@Before
+	public void prepare() {
+		TestEnvironment clusterEnv = new TestEnvironment(cluster, 1);
+		clusterEnv.setAsContext();
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Test the record partitioned rightly with one field according to the customized data distribution
+	 */
+	@Test
+	public void testPartitionWithDistribution1() throws Exception {
 		final TestDataDist1 dist = new TestDataDist1();
 
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(dist.getParallelism());
 
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
 		DataSet<Boolean> result = DataSetUtils
 			.partitionByRange(input, dist, 0)
 			.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
@@ -96,13 +126,15 @@ public class CustomDistributionITCase {
 		env.execute();
 	}
 
+	/**
+	 * Test the record partitioned rightly with two fields according to the customized data distribution
+	 */
 	@Test
-	public void testRangeWithDistribution2() throws Exception{
-		/*
-		 * Test the record partitioned rightly with two fields according to the customized data distribution
-		 */
+	public void testRangeWithDistribution2() throws Exception {
+		final TestDataDist2 dist = new TestDataDist2();
 
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(dist.getParallelism());
 
 		DataSet<Tuple3<Integer, Integer, String>> input = env.fromElements(
 						new Tuple3<>(1, 5, "Hi"),
@@ -122,10 +154,6 @@ public class CustomDistributionITCase {
 						new Tuple3<>(5, 3, "Hi Java again")
 			);
 
-		final TestDataDist2 dist = new TestDataDist2();
-
-		env.setParallelism(dist.getParallelism());
-
 		DataSet<Boolean> result = DataSetUtils
 			.partitionByRange(input, dist, 0, 1)
 			.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() {
@@ -175,18 +203,18 @@ public class CustomDistributionITCase {
 		env.execute();
 	}
 
+	/*
+	 * Test the number of partition keys less than the number of distribution fields
+	 */
 	@Test
-	public void testPartitionKeyLessDistribution() throws Exception{
-		/*
-		 * Test the number of partition keys less than the number of distribution fields
-		 */
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+	public void testPartitionKeyLessDistribution() throws Exception {
 		final TestDataDist2 dist = new TestDataDist2();
 
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(dist.getParallelism());
 
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
 		DataSet<Boolean> result = DataSetUtils
 			.partitionByRange(input, dist, 0)
 			.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
@@ -229,19 +257,17 @@ public class CustomDistributionITCase {
 		env.execute();
 	}
 
+	/*
+	 * Test the number of partition keys larger than the number of distribution fields
+	 */
 	@Test(expected = IllegalArgumentException.class)
-	public void testPartitionMoreThanDistribution() throws Exception{
-		/*
-		 * Test the number of partition keys larger than the number of distribution fields
-		 */
+	public void testPartitionMoreThanDistribution() throws Exception {
+		final TestDataDist2 dist = new TestDataDist2();
 
-		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		final TestDataDist2 dist = new TestDataDist2();
-
-		DataSet<Tuple3<Integer, Long, String>> result = DataSetUtils
-				.partitionByRange(input, dist, 0, 1, 2);
+		DataSetUtils.partitionByRange(input, dist, 0, 1, 2);
 	}
 	
 	/**
@@ -278,14 +304,10 @@ public class CustomDistributionITCase {
 		}
 
 		@Override
-		public void write(DataOutputView out) throws IOException {
-			
-		}
+		public void write(DataOutputView out) throws IOException {}
 
 		@Override
-		public void read(DataInputView in) throws IOException {
-			
-		}
+		public void read(DataInputView in) throws IOException {}
 	}
 
 	/**
@@ -323,13 +345,9 @@ public class CustomDistributionITCase {
 		}
 
 		@Override
-		public void write(DataOutputView out) throws IOException {
-			
-		}
+		public void write(DataOutputView out) throws IOException {}
 
 		@Override
-		public void read(DataInputView in) throws IOException {
-			
-		}
+		public void read(DataInputView in) throws IOException {}
 	}
 }