You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/13 01:15:53 UTC

[1/2] flink git commit: [tests] Consolidate miscellaneous tests into one IntegrationTestCase to reuse minicluster and speed up tests

Repository: flink
Updated Branches:
  refs/heads/master e79ff4ebf -> 73493335f


[tests] Consolidate miscellaneous tests into one IntegrationTestCase to reuse minicluster and speed up tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf4f22ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf4f22ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf4f22ea

Branch: refs/heads/master
Commit: cf4f22ea5626c1948a69f07e4b7b3bcb4001aed7
Parents: e79ff4e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue May 12 22:59:48 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 22:59:48 2015 +0200

----------------------------------------------------------------------
 .../test/misc/DisjointDataflowsITCase.java      |  37 ----
 .../flink/test/misc/GenericTypeInfoTest.java    |   8 +-
 .../test/misc/MiscellaneousIssuesITCase.java    | 177 +++++++++++++++++++
 .../flink/test/misc/NullValuesITCase.java       |  83 ---------
 4 files changed, 183 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f22ea/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
deleted file mode 100644
index 6f4baa3..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.misc;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class DisjointDataflowsITCase extends JavaProgramTestBase {
-
-	@Override
-	protected void testProgram() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		// generate two different flows
-		env.generateSequence(1, 10).output(new DiscardingOutputFormat<Long>());
-		env.generateSequence(1, 10).output(new DiscardingOutputFormat<Long>());
-		
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f22ea/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java b/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
index 91c6baa..fa1fcb6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.test.misc;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -30,7 +30,11 @@ public class GenericTypeInfoTest {
 
 	@Test
 	public void testSerializerTree() {
-		TypeInformation<CollectionDataSets.PojoWithCollectionGeneric> ti = (TypeInformation<CollectionDataSets.PojoWithCollectionGeneric>) TypeExtractor.createTypeInfo(CollectionDataSets.PojoWithCollectionGeneric.class);
+		@SuppressWarnings("unchecked")
+		TypeInformation<CollectionDataSets.PojoWithCollectionGeneric> ti = 
+				(TypeInformation<CollectionDataSets.PojoWithCollectionGeneric>) 
+						TypeExtractor.createTypeInfo(CollectionDataSets.PojoWithCollectionGeneric.class);
+		
 		String serTree = Utils.getSerializerTree(ti);
 		// We can not test against the entire output because the fields of 'String' differ
 		// between java versions

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f22ea/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
new file mode 100644
index 0000000..01e6f62
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
+import org.apache.flink.util.Collector;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the system behavior in multiple corner cases
+ *   - when null records are passed through the system.
+ *   - when disjoint dataflows are executed
+ *   - when accumulators are used chained after a non-udf operator.
+ *   
+ * The tests are bundled into one class to reuse the same test cluster. This speeds
+ * up test execution, as the majority of the test time goes usually into starting/stopping the
+ * test cluster.
+ */
+@SuppressWarnings("serial")
+public class MiscellaneousIssuesITCase {
+
+	private static ForkableFlinkMiniCluster cluster;
+	
+	@BeforeClass
+	public static void startCluster() {
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
+			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+			cluster = new ForkableFlinkMiniCluster(config, false);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Failed to start test cluster: " + e.getMessage());
+		}
+	}
+	
+	@AfterClass
+	public static void shutdownCluster() {
+		try {
+			cluster.shutdown();
+			cluster = null;
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail("Failed to stop test cluster: " + e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testNullValues() {
+		try {
+			ExecutionEnvironment env =
+					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
+
+			env.setParallelism(1);
+			env.getConfig().disableSysoutLogging();
+
+			DataSet<String> data = env.fromElements("hallo")
+					.map(new MapFunction<String, String>() {
+						@Override
+						public String map(String value) throws Exception {
+							return null;
+						}
+					});
+			data.writeAsText("/tmp/myTest", FileSystem.WriteMode.OVERWRITE);
+
+			try {
+				env.execute();
+				fail("this should fail due to null values.");
+			}
+			catch (ProgramInvocationException e) {
+				assertNotNull(e.getCause());
+				assertNotNull(e.getCause().getCause());
+				assertTrue(e.getCause().getCause() instanceof NullPointerException);
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testDisjointDataflows() {
+		try {
+			ExecutionEnvironment env =
+					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
+
+			env.setParallelism(5);
+			env.getConfig().disableSysoutLogging();
+
+			// generate two different flows
+			env.generateSequence(1, 10).output(new DiscardingOutputFormat<Long>());
+			env.generateSequence(1, 10).output(new DiscardingOutputFormat<Long>());
+
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testAccumulatorsAfterNoOp() {
+		
+		final String ACC_NAME = "test_accumulator";
+		
+		try {
+			ExecutionEnvironment env =
+					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
+
+			env.setParallelism(6);
+			env.getConfig().disableSysoutLogging();
+			
+			env.generateSequence(1, 1000000)
+					.rebalance()
+					.flatMap(new RichFlatMapFunction<Long, Long>() {
+
+						private LongCounter counter;
+
+						@Override
+						public void open(Configuration parameters) {
+							counter = getRuntimeContext().getLongCounter(ACC_NAME);
+						}
+
+						@Override
+						public void flatMap(Long value, Collector<Long> out) {
+							counter.add(1L);
+						}
+					})
+					.output(new DiscardingOutputFormat<Long>());
+
+			JobExecutionResult result = env.execute();
+			
+			assertEquals(1000000L, result.getAllAccumulatorResults().get(ACC_NAME));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f22ea/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
deleted file mode 100644
index 6f7d002..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.misc;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests how the system behaves when null records are passed through the system.
- */
-@SuppressWarnings("serial")
-public class NullValuesITCase {
-
-	@Test
-	public void testNullValues() {
-		ForkableFlinkMiniCluster cluster = null;
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 7);
-			cluster = new ForkableFlinkMiniCluster(config, false);
-
-			ExecutionEnvironment env =
-					ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
-
-			env.setParallelism(1);
-			env.getConfig().disableSysoutLogging();
-
-			DataSet<String> data = env.fromElements("hallo")
-					.map(new MapFunction<String, String>() {
-						@Override
-						public String map(String value) throws Exception {
-							return null;
-						}
-					});
-			data.writeAsText("/tmp/myTest", FileSystem.WriteMode.OVERWRITE);
-
-			try {
-				env.execute();
-				fail("this should fail due to null values.");
-			}
-			catch (ProgramInvocationException e) {
-				assertNotNull(e.getCause());
-				assertNotNull(e.getCause().getCause());
-				assertTrue(e.getCause().getCause() instanceof NullPointerException);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			if (cluster != null) {
-				cluster.shutdown();
-			}
-		}
-	}
-}


[2/2] flink git commit: [FLINK-1959] [runtime] Support accumulators in chained functions after a non-UDF operation

Posted by se...@apache.org.
[FLINK-1959] [runtime] Support accumulators in chained functions after a non-UDF operation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73493335
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73493335
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73493335

Branch: refs/heads/master
Commit: 73493335f4dbecbb4f1f9f954b08534a5e35ca90
Parents: cf4f22e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue May 12 23:00:29 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 23:00:29 2015 +0200

----------------------------------------------------------------------
 .../common/accumulators/AccumulatorHelper.java  | 14 +++---
 .../runtime/operators/RegularPactTask.java      | 46 ++++++++++++--------
 2 files changed, 37 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/73493335/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index 9b0e019..3e2e359 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -40,7 +40,8 @@ public class AccumulatorHelper {
 			if (ownAccumulator == null) {
 				// Take over counter from chained task
 				target.put(otherEntry.getKey(), otherEntry.getValue());
-			} else {
+			}
+			else {
 				// Both should have the same type
 				AccumulatorHelper.compareAccumulatorTypes(otherEntry.getKey(),
 						ownAccumulator.getClass(), otherEntry.getValue().getClass());
@@ -122,12 +123,13 @@ public class AccumulatorHelper {
 		return builder.toString();
 	}
 
-	public static void resetAndClearAccumulators(
-			Map<String, Accumulator<?, ?>> accumulators) {
-		for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet()) {
-			entry.getValue().resetLocal();
+	public static void resetAndClearAccumulators(Map<String, Accumulator<?, ?>> accumulators) {
+		if (accumulators != null) {
+			for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet()) {
+				entry.getValue().resetLocal();
+			}
+			accumulators.clear();
 		}
-		accumulators.clear();
 	}
 
 	public static Map<String, Accumulator<?, ?>> copy(final Map<String, Accumulator<?,

http://git-wip-us.apache.org/repos/asf/flink/blob/73493335/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index c844d8e..1c3328e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
@@ -70,6 +71,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -508,14 +510,13 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 			// JobManager. close() has been called earlier for all involved UDFs
 			// (using this.stub.close() and closeChainedTasks()), so UDFs can no longer
 			// modify accumulators;
-			if (this.stub != null) {
-				// collect the counters from the stub
-				if (FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext) != null) {
-					Map<String, Accumulator<?, ?>> accumulators =
-							FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators();
-					RegularPactTask.reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks);
-				}
-			}
+
+			// collect the counters from the udf in the core driver
+			Map<String, Accumulator<?, ?>> accumulators =
+					FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators();
+			
+			// collect accumulators from chained tasks and report them
+			reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks);
 		}
 		catch (Exception ex) {
 			// close the input, but do not report any exceptions, since we already have another root cause
@@ -572,16 +573,25 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		// We can merge here the accumulators from the stub and the chained
 		// tasks. Type conflicts can occur here if counters with same name but
 		// different type were used.
-		for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
-			if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) {
-				Map<String, Accumulator<?, ?>> chainedAccumulators =
-						FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators();
-				AccumulatorHelper.mergeInto(accumulators, chainedAccumulators);
+		
+		if (!chainedTasks.isEmpty()) {
+			if (accumulators == null) {
+				accumulators = new HashMap<String, Accumulator<?, ?>>();
+			}
+			
+			for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
+				RuntimeContext rc = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null);
+				if (rc != null) {
+					Map<String, Accumulator<?, ?>> chainedAccumulators = rc.getAllAccumulators();
+					if (chainedAccumulators != null) {
+						AccumulatorHelper.mergeInto(accumulators, chainedAccumulators);
+					}
+				}
 			}
 		}
 
 		// Don't report if the UDF didn't collect any accumulators
-		if (accumulators.size() == 0) {
+		if (accumulators == null || accumulators.size() == 0) {
 			return;
 		}
 
@@ -592,9 +602,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		// (e.g. in iterations) and we don't want to count twice. This may not be
 		// done before sending
 		AccumulatorHelper.resetAndClearAccumulators(accumulators);
+		
 		for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
-			if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) {
-				AccumulatorHelper.resetAndClearAccumulators(FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators());
+			RuntimeContext rc = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null);
+			if (rc != null) {
+				AccumulatorHelper.resetAndClearAccumulators(rc.getAllAccumulators());
 			}
 		}
 	}
@@ -1140,7 +1152,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 			} catch (InterruptedException iex) {
 				throw new RuntimeException("Interrupted while waiting for input " + index + " to become available.");
 			} catch (IOException ioex) {
-				throw new RuntimeException("An I/O Exception occurred whily obaining input " + index + ".");
+				throw new RuntimeException("An I/O Exception occurred while obtaining input " + index + ".");
 			}
 		}
 	}