You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stephan Ewen (JIRA)" <ji...@apache.org> on 2014/11/03 23:24:34 UTC
[jira] [Created] (FLINK-1207) Switch ContextEnvironment to
Environment factory
Stephan Ewen created FLINK-1207:
-----------------------------------
Summary: Switch ContextEnvironment to Environment factory
Key: FLINK-1207
URL: https://issues.apache.org/jira/browse/FLINK-1207
Project: Flink
Issue Type: Bug
Components: Java API
Affects Versions: 0.8-incubating
Reporter: Stephan Ewen
Fix For: 0.8-incubating
We implement the "context dependent switching" of the execution environments (cluster / local / test) with static variables in the ExecutionEnvironment.
That means that these environments are potentially shared between multiple threads that run programs (also in case where they run one after the other).
This may lead to exceptions, as we sometimes see in the tests, when using forked test execution: The later test in the same JVM may access the same environment object as the prior ones. In particular, we see that half finished programs may still be associated with the execution environment, such that mixes between programs occur, producing hard to understand cast exceptions (see trace below)
This is so far only relevant to tests with forked execution, but may become relevant to users that build different programs at the same time.
I propose to change the static members from environments to environment factories. That way, we can switch type of environment depending on the context as before, and we guarantee that each call to "ExecutionEnvironment.getEnvironment()" returns a dedicated and fresh environment.
Running org.apache.flink.api.scala.operators.translation.DistinctTranslationTest
java.lang.ClassCastException: org.apache.flink.api.common.operators.base.DeltaIterationBase cannot be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
at org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:39)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264)
at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124)
at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:200)
at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.027 sec <<< FAILURE! - in org.apache.flink.api.scala.operators.translation.DistinctTranslationTest
testCombinable(org.apache.flink.api.scala.operators.translation.DistinctTranslationTest) Time elapsed: 0.024 sec <<< FAILURE!
java.lang.AssertionError: org.apache.flink.api.common.operators.base.DeltaIterationBase cannot be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
at org.junit.Assert.fail(Assert.java:88)
at org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:46)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)