You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/05/13 01:15:53 UTC
[1/2] flink git commit: [tests] Consolidate miscellaneous tests into
one IntegrationTestCase to reuse minicluster and speed up tests
Repository: flink
Updated Branches:
refs/heads/master e79ff4ebf -> 73493335f
[tests] Consolidate miscellaneous tests into one IntegrationTestCase to reuse minicluster and speed up tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf4f22ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf4f22ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf4f22ea
Branch: refs/heads/master
Commit: cf4f22ea5626c1948a69f07e4b7b3bcb4001aed7
Parents: e79ff4e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue May 12 22:59:48 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 22:59:48 2015 +0200
----------------------------------------------------------------------
.../test/misc/DisjointDataflowsITCase.java | 37 ----
.../flink/test/misc/GenericTypeInfoTest.java | 8 +-
.../test/misc/MiscellaneousIssuesITCase.java | 177 +++++++++++++++++++
.../flink/test/misc/NullValuesITCase.java | 83 ---------
4 files changed, 183 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f22ea/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
deleted file mode 100644
index 6f4baa3..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/DisjointDataflowsITCase.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.misc;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-public class DisjointDataflowsITCase extends JavaProgramTestBase {
-
- @Override
- protected void testProgram() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // generate two different flows
- env.generateSequence(1, 10).output(new DiscardingOutputFormat<Long>());
- env.generateSequence(1, 10).output(new DiscardingOutputFormat<Long>());
-
- env.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f22ea/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java b/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
index 91c6baa..fa1fcb6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/GenericTypeInfoTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.test.misc;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+
import org.junit.Assert;
import org.junit.Test;
@@ -30,7 +30,11 @@ public class GenericTypeInfoTest {
@Test
public void testSerializerTree() {
- TypeInformation<CollectionDataSets.PojoWithCollectionGeneric> ti = (TypeInformation<CollectionDataSets.PojoWithCollectionGeneric>) TypeExtractor.createTypeInfo(CollectionDataSets.PojoWithCollectionGeneric.class);
+ @SuppressWarnings("unchecked")
+ TypeInformation<CollectionDataSets.PojoWithCollectionGeneric> ti =
+ (TypeInformation<CollectionDataSets.PojoWithCollectionGeneric>)
+ TypeExtractor.createTypeInfo(CollectionDataSets.PojoWithCollectionGeneric.class);
+
String serTree = Utils.getSerializerTree(ti);
// We can not test against the entire output because the fields of 'String' differ
// between java versions
http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f22ea/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
new file mode 100644
index 0000000..01e6f62
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
+import org.apache.flink.util.Collector;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for the system behavior in multiple corner cases
+ * - when null records are passed through the system.
+ * - when disjoint dataflows are executed
+ * - when accumulators are used chained after a non-udf operator.
+ *
+ * The tests are bundled into one class to reuse the same test cluster. This speeds
+ * up test execution, as the majority of the test time goes usually into starting/stopping the
+ * test cluster.
+ */
+@SuppressWarnings("serial")
+public class MiscellaneousIssuesITCase {
+
+ private static ForkableFlinkMiniCluster cluster;
+
+ @BeforeClass
+ public static void startCluster() {
+ try {
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
+ config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+ cluster = new ForkableFlinkMiniCluster(config, false);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Failed to start test cluster: " + e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void shutdownCluster() {
+ try {
+ cluster.shutdown();
+ cluster = null;
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail("Failed to stop test cluster: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testNullValues() {
+ try {
+ ExecutionEnvironment env =
+ ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
+
+ env.setParallelism(1);
+ env.getConfig().disableSysoutLogging();
+
+ DataSet<String> data = env.fromElements("hallo")
+ .map(new MapFunction<String, String>() {
+ @Override
+ public String map(String value) throws Exception {
+ return null;
+ }
+ });
+ data.writeAsText("/tmp/myTest", FileSystem.WriteMode.OVERWRITE);
+
+ try {
+ env.execute();
+ fail("this should fail due to null values.");
+ }
+ catch (ProgramInvocationException e) {
+ assertNotNull(e.getCause());
+ assertNotNull(e.getCause().getCause());
+ assertTrue(e.getCause().getCause() instanceof NullPointerException);
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDisjointDataflows() {
+ try {
+ ExecutionEnvironment env =
+ ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
+
+ env.setParallelism(5);
+ env.getConfig().disableSysoutLogging();
+
+ // generate two different flows
+ env.generateSequence(1, 10).output(new DiscardingOutputFormat<Long>());
+ env.generateSequence(1, 10).output(new DiscardingOutputFormat<Long>());
+
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testAccumulatorsAfterNoOp() {
+
+ final String ACC_NAME = "test_accumulator";
+
+ try {
+ ExecutionEnvironment env =
+ ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
+
+ env.setParallelism(6);
+ env.getConfig().disableSysoutLogging();
+
+ env.generateSequence(1, 1000000)
+ .rebalance()
+ .flatMap(new RichFlatMapFunction<Long, Long>() {
+
+ private LongCounter counter;
+
+ @Override
+ public void open(Configuration parameters) {
+ counter = getRuntimeContext().getLongCounter(ACC_NAME);
+ }
+
+ @Override
+ public void flatMap(Long value, Collector<Long> out) {
+ counter.add(1L);
+ }
+ })
+ .output(new DiscardingOutputFormat<Long>());
+
+ JobExecutionResult result = env.execute();
+
+ assertEquals(1000000L, result.getAllAccumulatorResults().get(ACC_NAME));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf4f22ea/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
deleted file mode 100644
index 6f7d002..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/NullValuesITCase.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.misc;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests how the system behaves when null records are passed through the system.
- */
-@SuppressWarnings("serial")
-public class NullValuesITCase {
-
- @Test
- public void testNullValues() {
- ForkableFlinkMiniCluster cluster = null;
- try {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 7);
- cluster = new ForkableFlinkMiniCluster(config, false);
-
- ExecutionEnvironment env =
- ExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getJobManagerRPCPort());
-
- env.setParallelism(1);
- env.getConfig().disableSysoutLogging();
-
- DataSet<String> data = env.fromElements("hallo")
- .map(new MapFunction<String, String>() {
- @Override
- public String map(String value) throws Exception {
- return null;
- }
- });
- data.writeAsText("/tmp/myTest", FileSystem.WriteMode.OVERWRITE);
-
- try {
- env.execute();
- fail("this should fail due to null values.");
- }
- catch (ProgramInvocationException e) {
- assertNotNull(e.getCause());
- assertNotNull(e.getCause().getCause());
- assertTrue(e.getCause().getCause() instanceof NullPointerException);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-}
[2/2] flink git commit: [FLINK-1959] [runtime] Support accumulators
in chained functions after a non-UDF operation
Posted by se...@apache.org.
[FLINK-1959] [runtime] Support accumulators in chained functions after a non-UDF operation
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73493335
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73493335
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73493335
Branch: refs/heads/master
Commit: 73493335f4dbecbb4f1f9f954b08534a5e35ca90
Parents: cf4f22e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue May 12 23:00:29 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue May 12 23:00:29 2015 +0200
----------------------------------------------------------------------
.../common/accumulators/AccumulatorHelper.java | 14 +++---
.../runtime/operators/RegularPactTask.java | 46 ++++++++++++--------
2 files changed, 37 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/73493335/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index 9b0e019..3e2e359 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -40,7 +40,8 @@ public class AccumulatorHelper {
if (ownAccumulator == null) {
// Take over counter from chained task
target.put(otherEntry.getKey(), otherEntry.getValue());
- } else {
+ }
+ else {
// Both should have the same type
AccumulatorHelper.compareAccumulatorTypes(otherEntry.getKey(),
ownAccumulator.getClass(), otherEntry.getValue().getClass());
@@ -122,12 +123,13 @@ public class AccumulatorHelper {
return builder.toString();
}
- public static void resetAndClearAccumulators(
- Map<String, Accumulator<?, ?>> accumulators) {
- for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet()) {
- entry.getValue().resetLocal();
+ public static void resetAndClearAccumulators(Map<String, Accumulator<?, ?>> accumulators) {
+ if (accumulators != null) {
+ for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet()) {
+ entry.getValue().resetLocal();
+ }
+ accumulators.clear();
}
- accumulators.clear();
}
public static Map<String, Accumulator<?, ?>> copy(final Map<String, Accumulator<?,
http://git-wip-us.apache.org/repos/asf/flink/blob/73493335/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index c844d8e..1c3328e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
@@ -70,6 +71,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -508,14 +510,13 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
// JobManager. close() has been called earlier for all involved UDFs
// (using this.stub.close() and closeChainedTasks()), so UDFs can no longer
// modify accumulators;
- if (this.stub != null) {
- // collect the counters from the stub
- if (FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext) != null) {
- Map<String, Accumulator<?, ?>> accumulators =
- FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators();
- RegularPactTask.reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks);
- }
- }
+
+ // collect the counters from the udf in the core driver
+ Map<String, Accumulator<?, ?>> accumulators =
+ FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators();
+
+ // collect accumulators from chained tasks and report them
+ reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks);
}
catch (Exception ex) {
// close the input, but do not report any exceptions, since we already have another root cause
@@ -572,16 +573,25 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
// We can merge here the accumulators from the stub and the chained
// tasks. Type conflicts can occur here if counters with same name but
// different type were used.
- for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
- if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) {
- Map<String, Accumulator<?, ?>> chainedAccumulators =
- FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators();
- AccumulatorHelper.mergeInto(accumulators, chainedAccumulators);
+
+ if (!chainedTasks.isEmpty()) {
+ if (accumulators == null) {
+ accumulators = new HashMap<String, Accumulator<?, ?>>();
+ }
+
+ for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
+ RuntimeContext rc = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null);
+ if (rc != null) {
+ Map<String, Accumulator<?, ?>> chainedAccumulators = rc.getAllAccumulators();
+ if (chainedAccumulators != null) {
+ AccumulatorHelper.mergeInto(accumulators, chainedAccumulators);
+ }
+ }
}
}
// Don't report if the UDF didn't collect any accumulators
- if (accumulators.size() == 0) {
+ if (accumulators == null || accumulators.size() == 0) {
return;
}
@@ -592,9 +602,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
// (e.g. in iterations) and we don't want to count twice. This may not be
// done before sending
AccumulatorHelper.resetAndClearAccumulators(accumulators);
+
for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
- if (FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null) != null) {
- AccumulatorHelper.resetAndClearAccumulators(FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null).getAllAccumulators());
+ RuntimeContext rc = FunctionUtils.getFunctionRuntimeContext(chainedTask.getStub(), null);
+ if (rc != null) {
+ AccumulatorHelper.resetAndClearAccumulators(rc.getAllAccumulators());
}
}
}
@@ -1140,7 +1152,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
} catch (InterruptedException iex) {
throw new RuntimeException("Interrupted while waiting for input " + index + " to become available.");
} catch (IOException ioex) {
- throw new RuntimeException("An I/O Exception occurred whily obaining input " + index + ".");
+ throw new RuntimeException("An I/O Exception occurred while obtaining input " + index + ".");
}
}
}