You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Stephan Ewen <se...@apache.org> on 2014/11/03 23:19:28 UTC

Static context environment

We implement the "context dependent switching" of the execution
environments (cluster / local / test) with static variables in the
ExecutionEnvironment.

That means that these environments are potentially shared between multiple
threads that run programs (also in case where they run one after the other).

This may lead to exceptions, as we sometimes see in the tests, when using
forked test execution: The later test in the same JVM may access the same
environment object as the prior ones. In particular, we see that half
finished programs may still be associated with the execution environment,
such that mixes between programs occur, producing hard to understand cast
exceptions (see trace below)

This is so far only relevant to tests with forked execution, but may become
relevant to users that build different programs at the same time.

I propose to change the static members from environments to environment
factories. That way, we can switch type of environment depending on the
context as before, and we guarantee that each call to
"ExecutionEnvironment.getEnvironment()" returns a dedicated and fresh
environment.


Running org.apache.flink.api.scala.operators.translation.DistinctTranslationTest
java.lang.ClassCastException:
org.apache.flink.api.common.operators.base.DeltaIterationBase cannot
be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
	at org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:39)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
	at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264)
	at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
	at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124)
	at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:200)
	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153)
	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.027
sec <<< FAILURE! - in
org.apache.flink.api.scala.operators.translation.DistinctTranslationTest
testCombinable(org.apache.flink.api.scala.operators.translation.DistinctTranslationTest)
 Time elapsed: 0.024 sec  <<< FAILURE!
java.lang.AssertionError:
org.apache.flink.api.common.operators.base.DeltaIterationBase cannot
be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
	at org.junit.Assert.fail(Assert.java:88)
	at org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:46)

Re: Static context environment

Posted by Ufuk Celebi <uc...@apache.org>.
+1 as well and nice catch with the exception.

On 03 Nov 2014, at 14:32, Aljoscha Krettek <al...@apache.org> wrote:

> +1
> 
> Yes, this seems like a very good idea and the Environment is very
> lightweight, so this would not worsen performance.
> 
> On Mon, Nov 3, 2014 at 11:19 PM, Stephan Ewen <se...@apache.org> wrote:
>> We implement the "context dependent switching" of the execution
>> environments (cluster / local / test) with static variables in the
>> ExecutionEnvironment.
>> 
>> That means that these environments are potentially shared between multiple
>> threads that run programs (also in case where they run one after the other).
>> 
>> This may lead to exceptions, as we sometimes see in the tests, when using
>> forked test execution: The later test in the same JVM may access the same
>> environment object as the prior ones. In particular, we see that half
>> finished programs may still be associated with the execution environment,
>> such that mixes between programs occur, producing hard to understand cast
>> exceptions (see trace below)
>> 
>> This is so far only relevant to tests with forked execution, but may become
>> relevant to users that build different programs at the same time.
>> 
>> I propose to change the static members from environments to environment
>> factories. That way, we can switch type of environment depending on the
>> context as before, and we guarantee that each call to
>> "ExecutionEnvironment.getEnvironment()" returns a dedicated and fresh
>> environment.
>> 
>> 
>> Running org.apache.flink.api.scala.operators.translation.DistinctTranslationTest
>> java.lang.ClassCastException:
>> org.apache.flink.api.common.operators.base.DeltaIterationBase cannot
>> be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
>>        at org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:39)
>>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>        at java.lang.reflect.Method.invoke(Method.java:483)
>>        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>>        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>>        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>>        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>>        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>>        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>>        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>>        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>>        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>>        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>>        at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>>        at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264)
>>        at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>>        at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124)
>>        at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:200)
>>        at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153)
>>        at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
>> Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.027
>> sec <<< FAILURE! - in
>> org.apache.flink.api.scala.operators.translation.DistinctTranslationTest
>> testCombinable(org.apache.flink.api.scala.operators.translation.DistinctTranslationTest)
>> Time elapsed: 0.024 sec  <<< FAILURE!
>> java.lang.AssertionError:
>> org.apache.flink.api.common.operators.base.DeltaIterationBase cannot
>> be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
>>        at org.junit.Assert.fail(Assert.java:88)
>>        at org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:46)


Re: Static context environment

Posted by Aljoscha Krettek <al...@apache.org>.
+1

Yes, this seems like a very good idea and the Environment is very
lightweight, so this would not worsen performance.

On Mon, Nov 3, 2014 at 11:19 PM, Stephan Ewen <se...@apache.org> wrote:
> We implement the "context dependent switching" of the execution
> environments (cluster / local / test) with static variables in the
> ExecutionEnvironment.
>
> That means that these environments are potentially shared between multiple
> threads that run programs (also in case where they run one after the other).
>
> This may lead to exceptions, as we sometimes see in the tests, when using
> forked test execution: The later test in the same JVM may access the same
> environment object as the prior ones. In particular, we see that half
> finished programs may still be associated with the execution environment,
> such that mixes between programs occur, producing hard to understand cast
> exceptions (see trace below)
>
> This is so far only relevant to tests with forked execution, but may become
> relevant to users that build different programs at the same time.
>
> I propose to change the static members from environments to environment
> factories. That way, we can switch type of environment depending on the
> context as before, and we guarantee that each call to
> "ExecutionEnvironment.getEnvironment()" returns a dedicated and fresh
> environment.
>
>
> Running org.apache.flink.api.scala.operators.translation.DistinctTranslationTest
> java.lang.ClassCastException:
> org.apache.flink.api.common.operators.base.DeltaIterationBase cannot
> be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
>         at org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:39)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:483)
>         at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
>         at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>         at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
>         at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>         at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>         at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>         at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>         at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>         at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264)
>         at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>         at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124)
>         at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:200)
>         at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153)
>         at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.027
> sec <<< FAILURE! - in
> org.apache.flink.api.scala.operators.translation.DistinctTranslationTest
> testCombinable(org.apache.flink.api.scala.operators.translation.DistinctTranslationTest)
>  Time elapsed: 0.024 sec  <<< FAILURE!
> java.lang.AssertionError:
> org.apache.flink.api.common.operators.base.DeltaIterationBase cannot
> be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
>         at org.junit.Assert.fail(Assert.java:88)
>         at org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:46)