You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Michael Kobit <mk...@gmail.com> on 2017/09/20 21:25:26 UTC

LocalStreamEnvironment - configuration doesn't seem to be used in RichFunction operators

I'm new to Flink and in the process of trying to write a few operators and
tests for them. One of the issues I've ran into is "how do I properly set
up the dependencies for an operator". I've discovered the serialization
constraints and learned about the execution some model as I've started to
progress through it, but I'm still struggling to find an analog for
dependency injection in Flink.

I was experimenting with different ways to supply configuration for the
*Rich* functions to basically set themselves up and tear themselves down
with their dependencies on open/close. I wanted to basically "inject" a
dependency say like an HTTP client that caches, and then mock that
dependency for a local test instead of actually making HTTP calls. It
seemed like it could be done by getting and getting the correct
implementation types from the config using some custom injector type
(analogous to Spring or Guice dependency injection). I know I have to deal
serialization of the operators, which is why I was thinking I could do this
in open/close and have the magical injector be serializable (and possibly
be part of the config). This may or may not be a bad idea already, but bear
with me (and any feedback is very appreciated).

I was doing some local testing using StreamExecutionEnvironment, but wasn't
able to actually pass in configuration options to the local stream
execution.

I tried it these ways:

   1. Create with a config
   - StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
   2. Configure the created LocalStreamEnvironment
   by env.getConfig().setGlobalJobParameters(configuration)
   3. Configure thte DataStremSource<Integer>
   by source.getExecutionConfig().setGlobalJobParameters(configuration)
   4. Configure the SingleOutputStreamOperator
   by mapped.getExecutionConfig().setGlobalJobParameters(configuration)

All 4 of those failed, so I felt like I am doing something wrong here, and
wanted to reach out.

Here is the example code where all of those tests failing:

import static org.assertj.core.api.Assertions.assertThat;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.Test;

import java.util.Iterator;

public class FlinkInspection {

    @Test
    public void
issueWithLocalStreamEnvironmentCreateWithConfiguration() throws
Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);
        LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
        DataStreamSource<Integer> source = env.fromElements(1, 2);

        SingleOutputStreamOperator<Integer> mapped = source.map(new
ConfigurationRetrievingOperator());

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    @Test
    public void
issueWithLocalStreamEnvironmentConfiguredWithWithConfiguration()
throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);
        LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
        env.getConfig().setGlobalJobParameters(configuration);
        DataStreamSource<Integer> source = env.fromElements(1, 2);

        SingleOutputStreamOperator<Integer> mapped = source.map(new
ConfigurationRetrievingOperator());

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    @Test
    public void
issueWithLocalStreamEnvironmentConfiguringDataStreamSource() throws
Exception {
        LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
        DataStreamSource<Integer> source = env.fromElements(1, 2);
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);
        source.getExecutionConfig().setGlobalJobParameters(configuration);

        SingleOutputStreamOperator<Integer> mapped = source.map(new
ConfigurationRetrievingOperator());

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    @Test
    public void
issueWithLocalStreamEnvironmentConfiguringDataStreamWithOperator()
throws Exception {
        LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
        DataStreamSource<Integer> source = env.fromElements(1, 2);
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);

        SingleOutputStreamOperator<Integer> mapped = source.map(new
ConfigurationRetrievingOperator());
        mapped.getExecutionConfig().setGlobalJobParameters(configuration);

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    static class ConfigurationRetrievingOperator extends
RichMapFunction<Integer, Integer> {

        private int factor = -1;

        @Override
        public Integer map(final Integer value) throws Exception {
            return value * factor;
        }

        @Override
        public void open(final Configuration parameters) throws Exception {
            factor = parameters.getInteger("key", 0);
        }
    }
}


   1. Any suggestions on how I should think about the dependency injection?
   2. Are there ways to customize the Configuration that is passed into
   Rich functions?
   3. Is this an issue with LocalStreamEnvironment, or am I doing something
   completely wrong?

Re: LocalStreamEnvironment - configuration doesn't seem to be used in RichFunction operators

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Yes, I agree that the behavior can be quite surprising, and if not stated somewhere in the docs already we should update it.

Pass in a Serializable "injector"/proxy object in the constructor
In the "open" (or body of the function) get the things/initialize stuff I want that may or may not be Serializable, e.g. an HTTP client or database connection from that object
Don't use the Configuration instance since it doesn't do anything anyways
Yes, I think you’re on the right track with this :)

Cheers,
Gordon


On 22 September 2017 at 11:08:21 PM, Michael Kobit (mkobit@gmail.com) wrote:

Thanks for the response.

That is a bit surprising that it is always a new instance given the various API signatures that take in a Configuration instance. The best practices docs (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#using-the-parameters-in-your-flink-program ) also sort of mention it, but I just noticed most of those seem like they are for the DataSet API rather than the DataStream API (I don't know if there is a big difference between the programming APIs there). I'm still new to these things, so I could be making invalid assumptions, too.

I think I have a simple idea for how to get dependency style injection working anyways by.
Pass in a Serializable "injector"/proxy object in the constructor
In the "open" (or body of the function) get the things/initialize stuff I want that may or may not be Serializable, e.g. an HTTP client or database connection from that object
Don't use the Configuration instance since it doesn't do anything anyways
I haven't thought through any possible security holes or considerations with this approach yet.

Thanks for the response, that clears up my confusion - now just to explore and find some better ways to test this stuff!

On Fri, Sep 22, 2017 at 11:51 AM Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Hi,

The passing in of a Configuration instance in the open method is actually a leftover artifact of the DataStream API that remains only due to API backwards compatibility reasons.
There’s actually no way to modify what configuration is retrieved there (and it is actually always a new empty Configuration).

Normally, to inject dependencies into your operators, you would simply do that be supplying it through the constructor of the operator, and store it as class fields for future use in the operator work methods.
Make sure that they are serializable, as the operator will need to be serialized when deploying the job. I’m assuming that this should be possible for you anyway, since you were trying to write that information into the Configuration.

Hope this helps!

Cheers,
Gordon


On 20 September 2017 at 11:25:41 PM, Michael Kobit (mkobit@gmail.com) wrote:

I'm new to Flink and in the process of trying to write a few operators and tests for them. One of the issues I've ran into is "how do I properly set up the dependencies for an operator". I've discovered the serialization constraints and learned about the execution some model as I've started to progress through it, but I'm still struggling to find an analog for dependency injection in Flink.

I was experimenting with different ways to supply configuration for the *Rich* functions to basically set themselves up and tear themselves down with their dependencies on open/close. I wanted to basically "inject" a dependency say like an HTTP client that caches, and then mock that dependency for a local test instead of actually making HTTP calls. It seemed like it could be done by getting and getting the correct implementation types from the config using some custom injector type (analogous to Spring or Guice dependency injection). I know I have to deal serialization of the operators, which is why I was thinking I could do this in open/close and have the magical injector be serializable (and possibly be part of the config). This may or may not be a bad idea already, but bear with me (and any feedback is very appreciated).

I was doing some local testing using StreamExecutionEnvironment, but wasn't able to actually pass in configuration options to the local stream execution.

I tried it these ways:
Create with a config - StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
Configure the created LocalStreamEnvironment by env.getConfig().setGlobalJobParameters(configuration)
Configure thte DataStremSource<Integer> by source.getExecutionConfig().setGlobalJobParameters(configuration)
Configure the SingleOutputStreamOperator by mapped.getExecutionConfig().setGlobalJobParameters(configuration)
All 4 of those failed, so I felt like I am doing something wrong here, and wanted to reach out.

Here is the example code where all of those tests failing:

import static org.assertj.core.api.Assertions.assertThat;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.Test;

import java.util.Iterator;

public class FlinkInspection {

    @Test
    public void issueWithLocalStreamEnvironmentCreateWithConfiguration() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
        DataStreamSource<Integer> source = env.fromElements(1, 2);

        SingleOutputStreamOperator<Integer> mapped = source.map(new ConfigurationRetrievingOperator());

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    @Test
    public void issueWithLocalStreamEnvironmentConfiguredWithWithConfiguration() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
        env.getConfig().setGlobalJobParameters(configuration);
        DataStreamSource<Integer> source = env.fromElements(1, 2);

        SingleOutputStreamOperator<Integer> mapped = source.map(new ConfigurationRetrievingOperator());

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    @Test
    public void issueWithLocalStreamEnvironmentConfiguringDataStreamSource() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
        DataStreamSource<Integer> source = env.fromElements(1, 2);
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);
        source.getExecutionConfig().setGlobalJobParameters(configuration);

        SingleOutputStreamOperator<Integer> mapped = source.map(new ConfigurationRetrievingOperator());

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    @Test
    public void issueWithLocalStreamEnvironmentConfiguringDataStreamWithOperator() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
        DataStreamSource<Integer> source = env.fromElements(1, 2);
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);

        SingleOutputStreamOperator<Integer> mapped = source.map(new ConfigurationRetrievingOperator());
        mapped.getExecutionConfig().setGlobalJobParameters(configuration);

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    static class ConfigurationRetrievingOperator extends RichMapFunction<Integer, Integer> {

        private int factor = -1;

        @Override
        public Integer map(final Integer value) throws Exception {
            return value * factor;
        }

        @Override
        public void open(final Configuration parameters) throws Exception {
            factor = parameters.getInteger("key", 0);
        }
    }
}
Any suggestions on how I should think about the dependency injection?
Are there ways to customize the Configuration that is passed into Rich functions?
Is this an issue with LocalStreamEnvironment, or am I doing something completely wrong?

Re: LocalStreamEnvironment - configuration doesn't seem to be used in RichFunction operators

Posted by Michael Kobit <mk...@gmail.com>.
Thanks for the response.

That is a bit surprising that it is always a new instance given the various
API signatures that take in a Configuration instance. The best practices
docs (
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#using-the-parameters-in-your-flink-program
)
also sort of mention it, but I just noticed most of those seem like they
are for the DataSet API rather than the DataStream API (I don't know if
there is a big difference between the programming APIs there). I'm still
new to these things, so I could be making invalid assumptions, too.

I think I have a simple idea for how to get dependency style injection
working anyways by.

   - Pass in a Serializable "injector"/proxy object in the constructor
   - In the "open" (or body of the function) get the things/initialize
   stuff I want that may or may not be Serializable, e.g. an HTTP client or
   database connection from that object
   - Don't use the Configuration instance since it doesn't do anything
   anyways

I haven't thought through any possible security holes or considerations
with this approach yet.

Thanks for the response, that clears up my confusion - now just to explore
and find some better ways to test this stuff!

On Fri, Sep 22, 2017 at 11:51 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi,
>
> The passing in of a Configuration instance in the open method is actually
> a leftover artifact of the DataStream API that remains only due to API
> backwards compatibility reasons.
> There’s actually no way to modify what configuration is retrieved there
> (and it is actually always a new empty Configuration).
>
> Normally, to inject dependencies into your operators, you would simply do
> that be supplying it through the constructor of the operator, and store it
> as class fields for future use in the operator work methods.
> Make sure that they are serializable, as the operator will need to be
> serialized when deploying the job. I’m assuming that this should be
> possible for you anyway, since you were trying to write that information
> into the Configuration.
>
> Hope this helps!
>
> Cheers,
> Gordon
>
>
> On 20 September 2017 at 11:25:41 PM, Michael Kobit (mkobit@gmail.com)
> wrote:
>
> I'm new to Flink and in the process of trying to write a few operators and
> tests for them. One of the issues I've ran into is "how do I properly set
> up the dependencies for an operator". I've discovered the serialization
> constraints and learned about the execution some model as I've started to
> progress through it, but I'm still struggling to find an analog for
> dependency injection in Flink.
>
> I was experimenting with different ways to supply configuration for the
> *Rich* functions to basically set themselves up and tear themselves down
> with their dependencies on open/close. I wanted to basically "inject" a
> dependency say like an HTTP client that caches, and then mock that
> dependency for a local test instead of actually making HTTP calls. It
> seemed like it could be done by getting and getting the correct
> implementation types from the config using some custom injector type
> (analogous to Spring or Guice dependency injection). I know I have to deal
> serialization of the operators, which is why I was thinking I could do this
> in open/close and have the magical injector be serializable (and possibly
> be part of the config). This may or may not be a bad idea already, but bear
> with me (and any feedback is very appreciated).
>
> I was doing some local testing using StreamExecutionEnvironment, but
> wasn't able to actually pass in configuration options to the local stream
> execution.
>
> I tried it these ways:
>
>    1. Create with a config
>    - StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
>    2. Configure the created LocalStreamEnvironment
>    by env.getConfig().setGlobalJobParameters(configuration)
>    3. Configure thte DataStremSource<Integer>
>    by source.getExecutionConfig().setGlobalJobParameters(configuration)
>    4. Configure the SingleOutputStreamOperator
>    by mapped.getExecutionConfig().setGlobalJobParameters(configuration)
>
> All 4 of those failed, so I felt like I am doing something wrong here, and
> wanted to reach out.
>
> Here is the example code where all of those tests failing:
>
> import static org.assertj.core.api.Assertions.assertThat;
>
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.contrib.streaming.DataStreamUtils;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.junit.Test;
>
> import java.util.Iterator;
>
> public class FlinkInspection {
>
>     @Test
>     public void issueWithLocalStreamEnvironmentCreateWithConfiguration() throws Exception {
>         Configuration configuration = new Configuration();
>         configuration.setInteger("key", 10);
>         LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
>         DataStreamSource<Integer> source = env.fromElements(1, 2);
>
>         SingleOutputStreamOperator<Integer> mapped = source.map(new ConfigurationRetrievingOperator());
>
>         Iterator<Integer> collection = DataStreamUtils.collect(mapped);
>         env.execute();
>
>         assertThat(collection).containsExactlyInAnyOrder(10, 20);
>     }
>
>     @Test
>     public void issueWithLocalStreamEnvironmentConfiguredWithWithConfiguration() throws Exception {
>         Configuration configuration = new Configuration();
>         configuration.setInteger("key", 10);
>         LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
>         env.getConfig().setGlobalJobParameters(configuration);
>         DataStreamSource<Integer> source = env.fromElements(1, 2);
>
>         SingleOutputStreamOperator<Integer> mapped = source.map(new ConfigurationRetrievingOperator());
>
>         Iterator<Integer> collection = DataStreamUtils.collect(mapped);
>         env.execute();
>
>         assertThat(collection).containsExactlyInAnyOrder(10, 20);
>     }
>
>     @Test
>     public void issueWithLocalStreamEnvironmentConfiguringDataStreamSource() throws Exception {
>         LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
>         DataStreamSource<Integer> source = env.fromElements(1, 2);
>         Configuration configuration = new Configuration();
>         configuration.setInteger("key", 10);
>         source.getExecutionConfig().setGlobalJobParameters(configuration);
>
>         SingleOutputStreamOperator<Integer> mapped = source.map(new ConfigurationRetrievingOperator());
>
>         Iterator<Integer> collection = DataStreamUtils.collect(mapped);
>         env.execute();
>
>         assertThat(collection).containsExactlyInAnyOrder(10, 20);
>     }
>
>     @Test
>     public void issueWithLocalStreamEnvironmentConfiguringDataStreamWithOperator() throws Exception {
>         LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
>         DataStreamSource<Integer> source = env.fromElements(1, 2);
>         Configuration configuration = new Configuration();
>         configuration.setInteger("key", 10);
>
>         SingleOutputStreamOperator<Integer> mapped = source.map(new ConfigurationRetrievingOperator());
>         mapped.getExecutionConfig().setGlobalJobParameters(configuration);
>
>         Iterator<Integer> collection = DataStreamUtils.collect(mapped);
>         env.execute();
>
>         assertThat(collection).containsExactlyInAnyOrder(10, 20);
>     }
>
>     static class ConfigurationRetrievingOperator extends RichMapFunction<Integer, Integer> {
>
>         private int factor = -1;
>
>         @Override
>         public Integer map(final Integer value) throws Exception {
>             return value * factor;
>         }
>
>         @Override
>         public void open(final Configuration parameters) throws Exception {
>             factor = parameters.getInteger("key", 0);
>         }
>     }
> }
>
>
>    1. Any suggestions on how I should think about the dependency
>    injection?
>    2. Are there ways to customize the Configuration that is passed into
>    Rich functions?
>    3. Is this an issue with LocalStreamEnvironment, or am I doing
>    something completely wrong?
>
>

Re: LocalStreamEnvironment - configuration doesn't seem to be used in RichFunction operators

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

The passing in of a Configuration instance in the open method is actually a leftover artifact of the DataStream API that remains only due to API backwards compatibility reasons.
There’s actually no way to modify what configuration is retrieved there (and it is actually always a new empty Configuration).

Normally, to inject dependencies into your operators, you would simply do that be supplying it through the constructor of the operator, and store it as class fields for future use in the operator work methods.
Make sure that they are serializable, as the operator will need to be serialized when deploying the job. I’m assuming that this should be possible for you anyway, since you were trying to write that information into the Configuration.

Hope this helps!

Cheers,
Gordon

On 20 September 2017 at 11:25:41 PM, Michael Kobit (mkobit@gmail.com) wrote:

I'm new to Flink and in the process of trying to write a few operators and tests for them. One of the issues I've ran into is "how do I properly set up the dependencies for an operator". I've discovered the serialization constraints and learned about the execution some model as I've started to progress through it, but I'm still struggling to find an analog for dependency injection in Flink.

I was experimenting with different ways to supply configuration for the *Rich* functions to basically set themselves up and tear themselves down with their dependencies on open/close. I wanted to basically "inject" a dependency say like an HTTP client that caches, and then mock that dependency for a local test instead of actually making HTTP calls. It seemed like it could be done by getting and getting the correct implementation types from the config using some custom injector type (analogous to Spring or Guice dependency injection). I know I have to deal serialization of the operators, which is why I was thinking I could do this in open/close and have the magical injector be serializable (and possibly be part of the config). This may or may not be a bad idea already, but bear with me (and any feedback is very appreciated).

I was doing some local testing using StreamExecutionEnvironment, but wasn't able to actually pass in configuration options to the local stream execution.

I tried it these ways:
Create with a config - StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
Configure the created LocalStreamEnvironment by env.getConfig().setGlobalJobParameters(configuration)
Configure thte DataStremSource<Integer> by source.getExecutionConfig().setGlobalJobParameters(configuration)
Configure the SingleOutputStreamOperator by mapped.getExecutionConfig().setGlobalJobParameters(configuration)
All 4 of those failed, so I felt like I am doing something wrong here, and wanted to reach out.

Here is the example code where all of those tests failing:

import static org.assertj.core.api.Assertions.assertThat;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.Test;

import java.util.Iterator;

public class FlinkInspection {

    @Test
    public void issueWithLocalStreamEnvironmentCreateWithConfiguration() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
        DataStreamSource<Integer> source = env.fromElements(1, 2);

        SingleOutputStreamOperator<Integer> mapped = source.map(new ConfigurationRetrievingOperator());

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    @Test
    public void issueWithLocalStreamEnvironmentConfiguredWithWithConfiguration() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
        env.getConfig().setGlobalJobParameters(configuration);
        DataStreamSource<Integer> source = env.fromElements(1, 2);

        SingleOutputStreamOperator<Integer> mapped = source.map(new ConfigurationRetrievingOperator());

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    @Test
    public void issueWithLocalStreamEnvironmentConfiguringDataStreamSource() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
        DataStreamSource<Integer> source = env.fromElements(1, 2);
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);
        source.getExecutionConfig().setGlobalJobParameters(configuration);

        SingleOutputStreamOperator<Integer> mapped = source.map(new ConfigurationRetrievingOperator());

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    @Test
    public void issueWithLocalStreamEnvironmentConfiguringDataStreamWithOperator() throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
        DataStreamSource<Integer> source = env.fromElements(1, 2);
        Configuration configuration = new Configuration();
        configuration.setInteger("key", 10);

        SingleOutputStreamOperator<Integer> mapped = source.map(new ConfigurationRetrievingOperator());
        mapped.getExecutionConfig().setGlobalJobParameters(configuration);

        Iterator<Integer> collection = DataStreamUtils.collect(mapped);
        env.execute();

        assertThat(collection).containsExactlyInAnyOrder(10, 20);
    }

    static class ConfigurationRetrievingOperator extends RichMapFunction<Integer, Integer> {

        private int factor = -1;

        @Override
        public Integer map(final Integer value) throws Exception {
            return value * factor;
        }

        @Override
        public void open(final Configuration parameters) throws Exception {
            factor = parameters.getInteger("key", 0);
        }
    }
}
Any suggestions on how I should think about the dependency injection?
Are there ways to customize the Configuration that is passed into Rich functions?
Is this an issue with LocalStreamEnvironment, or am I doing something completely wrong?