You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rion Williams <ri...@gmail.com> on 2021/03/04 04:13:57 UTC

Defining GlobalJobParameters in Flink Unit Testing Harnesses

Hi all,

Early today I had asked a few questions regarding the use of the many
testing constructs available within Flink and believe that I have things in
a good direction at present. I did run into a specific case that either may
not be supported, or just isn't documented well enough for me to determine
what is going wrong.

Basically, I have a KeyedProcessFunction that reads some global-level
configuration via GlobalJobParameters within its open function:

override fun open(configuration: Configuration) {
    // Omitted for brevity

    val parameters = runtimeContext.executionConfig.globalJobParameters as?
ParameterTool
    if (parameters != null) {
        processingTimeBuffer = parameters.getLong("processingTimeBuffer",
0L)
    }
}

This works just as expected within the actual pipeline itself when set
similarly:

streamEnvironment.config.globalJobParameters = parameters

However, I don't see an effective way to set this against a TestHarness as
I've made several attempts but I never can seem to populate the
globalJobParameters property within the KeyedProcessFunction itself using a
test harness despite multiple attempts

// Attempt 1
magicWindowHarness.operator.runtimeContext.executionConfig.globalJobParameters
= ParameterTool.fromMap(...)

// Attempt 2
magicWindowHarness.executionConfig.globalJobParameters =
ParameterTool.fromMap(...)

// Attempt 3
magicWindowHarness.environment.executionConfig.globalJobParameters =
ParameterTool.fromMap(...)

// Attempt 4
val env = StreamExecutionEnvironment.
env.config.globalJobParameters = ParameterTool.fromMap(...)

Is this supported or am I simply going about it the wrong way? Or even just
perhaps missing a piece of the puzzle?

Thanks much,

Rion

Re: Defining GlobalJobParameters in Flink Unit Testing Harnesses

Posted by Chesnay Schepler <ch...@apache.org>.
hmm...at a glance I don't anything else that would prevent things from 
working. I couldn't find a place where we copy the ExecutionConfig, so 
you should be modifying the very config that is passed to your function.

It seems we inevitably have to dive deep into the source / debugging 
which objects are being accessed / passed to the function.

On 3/4/2021 3:34 PM, Rion Williams wrote:
> Thanks Chesnay!
>
> I tried giving that a shot but I still wasn't able to access the 
> globalJobParameters from within the open function in my 
> KeyedProcessFunction. You can see the implementation below which I 
> believe should be correct:
>
> object CustomProcessFunctionTestHarness {
>     fun <K, IN, OUT> forKeyedProcessFunction(
>         function: KeyedProcessFunction<K, IN, OUT>,
>         keySelector: KeySelector<IN, K>,
>         keyType: TypeInformation<K>,
>         parameters: ParameterTool
>     ): KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> {
>         val testHarness: KeyedOneInputStreamOperatorTestHarness<K, IN, 
> OUT> =
>             KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>(
>                 KeyedProcessOperator<K, IN, 
> OUT>(Preconditions.checkNotNull(function)),
>                 keySelector,
>                 keyType,
>                 1,
>                 1,
>                 0
>             )
>
>         // Adjust execution configuration via parameters
>         testHarness.executionConfig.globalJobParameters = parameters
>         testHarness.open()
>
>         return testHarness
>     }
> }
>
> Usage-wise, the declaration looks as you might expect:
>
> bufferedMagicWindowHarness = 
> CustomProcessFunctionTestHarness.forKeyedProcessFunction(
>             MagicWindowFunction(),
>             { log -> log.getKey() },
>             TypeInformation.of(String::class.java),
> ParameterTool.fromArgs(arrayOf("--processingTimeBuffer", "60000"))
>         )
>
> And then I think as I described earlier, these are attempted to be 
> read via the following in the open() function:
>
> val parameters = runtimeContext.executionConfig.globalJobParameters 
> as? ParameterTool
> if (parameters != null) {
>      processingTimeBuffer = parameters.getLong("processingTimeBuffer")
> }
>
> Does anything look out of place here? I haven't gone spelunking into 
> the source code for this yet, but I'm assuming that I'm setting the 
> correct values on the execution configuration.
>
> Thanks again,
>
> Rion
>
>
> On Thu, Mar 4, 2021 at 7:57 AM Chesnay Schepler <chesnay@apache.org 
> <ma...@apache.org>> wrote:
>
>     The reason why your attempts have failed is that
>     ProcessFunctionTestHarnesses.forKeyedProcessFunction automatically
>     calls open(), thus any mutations on the harness happen too late.
>
>     I'd suggest to take a look at the implementation of that method
>     and essentially copy the code.
>     You can then call the harness constructor manually and mutate the
>     execution config before calling open().
>
>     On 3/4/2021 2:49 PM, Rion Williams wrote:
>>     Absolutely,
>>
>>     I think it's gone through quite a few iterations, but this is the
>>     current state of it (defined in a @Before function as part of
>>     scaffolding out the tests):
>>
>>     private lateinit var magicWindowHarness:
>>     KeyedOneInputStreamOperatorTestHarness<String, Log, FileOutput>
>>
>>     @Before
>>     fun init() {
>>         magicWindowHarness =
>>     ProcessFunctionTestHarnesses.forKeyedProcessFunction(
>>             MagicWindowFunction(),
>>             { log -> log.getKey() },
>>             TypeInformation.of(String::class.java)
>>         )
>>     }
>>
>>     I've also tried a few variants of that with a separate
>>     declaration for the function itself, etc.
>>
>>
>>
>>     On Thu, Mar 4, 2021 at 6:47 AM Chesnay Schepler
>>     <chesnay@apache.org <ma...@apache.org>> wrote:
>>
>>         Could you show us how you create test harness?
>>
>>         On 3/4/2021 5:13 AM, Rion Williams wrote:
>>>         Hi all,
>>>
>>>         Early today I had asked a few questions regarding the use of
>>>         the many testing constructs available within Flink and
>>>         believe that I have things in a good direction at present. I
>>>         did run into a specific case that either may not be
>>>         supported, or just isn't documented well enough for me to
>>>         determine what is going wrong.
>>>
>>>         Basically, I have a KeyedProcessFunction that reads some
>>>         global-level configuration via GlobalJobParameters within
>>>         its open function:
>>>
>>>         override fun open(configuration: Configuration) {
>>>             // Omitted for brevity
>>>
>>>             val parameters =
>>>         runtimeContext.executionConfig.globalJobParameters as?
>>>         ParameterTool
>>>             if (parameters != null) {
>>>                 processingTimeBuffer =
>>>         parameters.getLong("processingTimeBuffer", 0L)
>>>             }
>>>         }
>>>
>>>         This works just as expected within the actual pipeline
>>>         itself when set similarly:
>>>
>>>         streamEnvironment.config.globalJobParameters = parameters
>>>
>>>         However, I don't see an effective way to set this against a
>>>         TestHarness as I've made several attempts but I never can
>>>         seem to populate the globalJobParameters property within the
>>>         KeyedProcessFunction itself using a test harness despite
>>>         multiple attempts
>>>
>>>         // Attempt 1
>>>         magicWindowHarness.operator.runtimeContext.executionConfig.globalJobParameters
>>>         = ParameterTool.fromMap(...)
>>>
>>>         // Attempt 2
>>>         magicWindowHarness.executionConfig.globalJobParameters =
>>>         ParameterTool.fromMap(...)
>>>
>>>         // Attempt 3
>>>         magicWindowHarness.environment.executionConfig.globalJobParameters
>>>         = ParameterTool.fromMap(...)
>>>
>>>         // Attempt 4
>>>         val env = StreamExecutionEnvironment.
>>>         env.config.globalJobParameters = ParameterTool.fromMap(...)
>>>
>>>         Is this supported or am I simply going about it the wrong
>>>         way? Or even just perhaps missing a piece of the puzzle?
>>>
>>>         Thanks much,
>>>
>>>         Rion
>>>
>>
>


Re: Defining GlobalJobParameters in Flink Unit Testing Harnesses

Posted by Rion Williams <ri...@gmail.com>.
Thanks Chesnay!

I tried giving that a shot but I still wasn't able to access the
globalJobParameters from within the open function in my KeyedProcessFunction.
You can see the implementation below which I believe should be correct:

object CustomProcessFunctionTestHarness {
    fun <K, IN, OUT> forKeyedProcessFunction(
        function: KeyedProcessFunction<K, IN, OUT>,
        keySelector: KeySelector<IN, K>,
        keyType: TypeInformation<K>,
        parameters: ParameterTool
    ): KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> {
        val testHarness: KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
=
            KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>(
                KeyedProcessOperator<K, IN,
OUT>(Preconditions.checkNotNull(function)),
                keySelector,
                keyType,
                1,
                1,
                0
            )

        // Adjust execution configuration via parameters
        testHarness.executionConfig.globalJobParameters = parameters
        testHarness.open()

        return testHarness
    }
}

Usage-wise, the declaration looks as you might expect:

bufferedMagicWindowHarness =
CustomProcessFunctionTestHarness.forKeyedProcessFunction(
            MagicWindowFunction(),
            { log -> log.getKey() },
            TypeInformation.of(String::class.java),
            ParameterTool.fromArgs(arrayOf("--processingTimeBuffer",
"60000"))
        )

And then I think as I described earlier, these are attempted to be read via
the following in the open() function:

val parameters = runtimeContext.executionConfig.globalJobParameters as?
ParameterTool
if (parameters != null) {
     processingTimeBuffer = parameters.getLong("processingTimeBuffer")
}

Does anything look out of place here? I haven't gone spelunking into the
source code for this yet, but I'm assuming that I'm setting the correct
values on the execution configuration.

Thanks again,

Rion


On Thu, Mar 4, 2021 at 7:57 AM Chesnay Schepler <ch...@apache.org> wrote:

> The reason why your attempts have failed is that
> ProcessFunctionTestHarnesses.forKeyedProcessFunction automatically calls
> open(), thus any mutations on the harness happen too late.
>
> I'd suggest to take a look at the implementation of that method and
> essentially copy the code.
> You can then call the harness constructor manually and mutate the
> execution config before calling open().
>
> On 3/4/2021 2:49 PM, Rion Williams wrote:
>
> Absolutely,
>
> I think it's gone through quite a few iterations, but this is the current
> state of it (defined in a @Before function as part of scaffolding out the
> tests):
>
> private lateinit var magicWindowHarness:
> KeyedOneInputStreamOperatorTestHarness<String, Log, FileOutput>
>
> @Before
> fun init() {
>     magicWindowHarness =
> ProcessFunctionTestHarnesses.forKeyedProcessFunction(
>         MagicWindowFunction(),
>         { log -> log.getKey() },
>         TypeInformation.of(String::class.java)
>     )
> }
>
> I've also tried a few variants of that with a separate declaration for the
> function itself, etc.
>
>
>
> On Thu, Mar 4, 2021 at 6:47 AM Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> Could you show us how you create test harness?
>>
>> On 3/4/2021 5:13 AM, Rion Williams wrote:
>>
>> Hi all,
>>
>> Early today I had asked a few questions regarding the use of the many
>> testing constructs available within Flink and believe that I have things in
>> a good direction at present. I did run into a specific case that either may
>> not be supported, or just isn't documented well enough for me to determine
>> what is going wrong.
>>
>> Basically, I have a KeyedProcessFunction that reads some global-level
>> configuration via GlobalJobParameters within its open function:
>>
>> override fun open(configuration: Configuration) {
>>     // Omitted for brevity
>>
>>     val parameters = runtimeContext.executionConfig.globalJobParameters
>> as? ParameterTool
>>     if (parameters != null) {
>>         processingTimeBuffer = parameters.getLong("processingTimeBuffer",
>> 0L)
>>     }
>> }
>>
>> This works just as expected within the actual pipeline itself when set
>> similarly:
>>
>> streamEnvironment.config.globalJobParameters = parameters
>>
>> However, I don't see an effective way to set this against a TestHarness
>> as I've made several attempts but I never can seem to populate the
>> globalJobParameters property within the KeyedProcessFunction itself using a
>> test harness despite multiple attempts
>>
>> // Attempt 1
>> magicWindowHarness.operator.runtimeContext.executionConfig.globalJobParameters
>> = ParameterTool.fromMap(...)
>>
>> // Attempt 2
>> magicWindowHarness.executionConfig.globalJobParameters =
>> ParameterTool.fromMap(...)
>>
>> // Attempt 3
>> magicWindowHarness.environment.executionConfig.globalJobParameters =
>> ParameterTool.fromMap(...)
>>
>> // Attempt 4
>> val env = StreamExecutionEnvironment.
>> env.config.globalJobParameters = ParameterTool.fromMap(...)
>>
>> Is this supported or am I simply going about it the wrong way? Or even
>> just perhaps missing a piece of the puzzle?
>>
>> Thanks much,
>>
>> Rion
>>
>>
>>
>

Re: Defining GlobalJobParameters in Flink Unit Testing Harnesses

Posted by Chesnay Schepler <ch...@apache.org>.
The reason why your attempts have failed is that 
ProcessFunctionTestHarnesses.forKeyedProcessFunction automatically calls 
open(), thus any mutations on the harness happen too late.

I'd suggest to take a look at the implementation of that method and 
essentially copy the code.
You can then call the harness constructor manually and mutate the 
execution config before calling open().

On 3/4/2021 2:49 PM, Rion Williams wrote:
> Absolutely,
>
> I think it's gone through quite a few iterations, but this is the 
> current state of it (defined in a @Before function as part of 
> scaffolding out the tests):
>
> private lateinit var magicWindowHarness: 
> KeyedOneInputStreamOperatorTestHarness<String, Log, FileOutput>
>
> @Before
> fun init() {
>     magicWindowHarness = 
> ProcessFunctionTestHarnesses.forKeyedProcessFunction(
>         MagicWindowFunction(),
>         { log -> log.getKey() },
>         TypeInformation.of(String::class.java)
>     )
> }
>
> I've also tried a few variants of that with a separate declaration for 
> the function itself, etc.
>
>
>
> On Thu, Mar 4, 2021 at 6:47 AM Chesnay Schepler <chesnay@apache.org 
> <ma...@apache.org>> wrote:
>
>     Could you show us how you create test harness?
>
>     On 3/4/2021 5:13 AM, Rion Williams wrote:
>>     Hi all,
>>
>>     Early today I had asked a few questions regarding the use of the
>>     many testing constructs available within Flink and believe that I
>>     have things in a good direction at present. I did run into a
>>     specific case that either may not be supported, or just isn't
>>     documented well enough for me to determine what is going wrong.
>>
>>     Basically, I have a KeyedProcessFunction that reads some
>>     global-level configuration via GlobalJobParameters within its
>>     open function:
>>
>>     override fun open(configuration: Configuration) {
>>         // Omitted for brevity
>>
>>         val parameters =
>>     runtimeContext.executionConfig.globalJobParameters as? ParameterTool
>>         if (parameters != null) {
>>             processingTimeBuffer =
>>     parameters.getLong("processingTimeBuffer", 0L)
>>         }
>>     }
>>
>>     This works just as expected within the actual pipeline itself
>>     when set similarly:
>>
>>     streamEnvironment.config.globalJobParameters = parameters
>>
>>     However, I don't see an effective way to set this against a
>>     TestHarness as I've made several attempts but I never can seem to
>>     populate the globalJobParameters property within the
>>     KeyedProcessFunction itself using a test harness despite multiple
>>     attempts
>>
>>     // Attempt 1
>>     magicWindowHarness.operator.runtimeContext.executionConfig.globalJobParameters
>>     = ParameterTool.fromMap(...)
>>
>>     // Attempt 2
>>     magicWindowHarness.executionConfig.globalJobParameters =
>>     ParameterTool.fromMap(...)
>>
>>     // Attempt 3
>>     magicWindowHarness.environment.executionConfig.globalJobParameters
>>     = ParameterTool.fromMap(...)
>>
>>     // Attempt 4
>>     val env = StreamExecutionEnvironment.
>>     env.config.globalJobParameters = ParameterTool.fromMap(...)
>>
>>     Is this supported or am I simply going about it the wrong way? Or
>>     even just perhaps missing a piece of the puzzle?
>>
>>     Thanks much,
>>
>>     Rion
>>
>


Re: Defining GlobalJobParameters in Flink Unit Testing Harnesses

Posted by Rion Williams <ri...@gmail.com>.
Absolutely,

I think it's gone through quite a few iterations, but this is the current
state of it (defined in a @Before function as part of scaffolding out the
tests):

private lateinit var magicWindowHarness:
KeyedOneInputStreamOperatorTestHarness<String, Log, FileOutput>

@Before
fun init() {
    magicWindowHarness =
ProcessFunctionTestHarnesses.forKeyedProcessFunction(
        MagicWindowFunction(),
        { log -> log.getKey() },
        TypeInformation.of(String::class.java)
    )
}

I've also tried a few variants of that with a separate declaration for the
function itself, etc.



On Thu, Mar 4, 2021 at 6:47 AM Chesnay Schepler <ch...@apache.org> wrote:

> Could you show us how you create test harness?
>
> On 3/4/2021 5:13 AM, Rion Williams wrote:
>
> Hi all,
>
> Early today I had asked a few questions regarding the use of the many
> testing constructs available within Flink and believe that I have things in
> a good direction at present. I did run into a specific case that either may
> not be supported, or just isn't documented well enough for me to determine
> what is going wrong.
>
> Basically, I have a KeyedProcessFunction that reads some global-level
> configuration via GlobalJobParameters within its open function:
>
> override fun open(configuration: Configuration) {
>     // Omitted for brevity
>
>     val parameters = runtimeContext.executionConfig.globalJobParameters
> as? ParameterTool
>     if (parameters != null) {
>         processingTimeBuffer = parameters.getLong("processingTimeBuffer",
> 0L)
>     }
> }
>
> This works just as expected within the actual pipeline itself when set
> similarly:
>
> streamEnvironment.config.globalJobParameters = parameters
>
> However, I don't see an effective way to set this against a TestHarness as
> I've made several attempts but I never can seem to populate the
> globalJobParameters property within the KeyedProcessFunction itself using a
> test harness despite multiple attempts
>
> // Attempt 1
> magicWindowHarness.operator.runtimeContext.executionConfig.globalJobParameters
> = ParameterTool.fromMap(...)
>
> // Attempt 2
> magicWindowHarness.executionConfig.globalJobParameters =
> ParameterTool.fromMap(...)
>
> // Attempt 3
> magicWindowHarness.environment.executionConfig.globalJobParameters =
> ParameterTool.fromMap(...)
>
> // Attempt 4
> val env = StreamExecutionEnvironment.
> env.config.globalJobParameters = ParameterTool.fromMap(...)
>
> Is this supported or am I simply going about it the wrong way? Or even
> just perhaps missing a piece of the puzzle?
>
> Thanks much,
>
> Rion
>
>
>

Re: Defining GlobalJobParameters in Flink Unit Testing Harnesses

Posted by Chesnay Schepler <ch...@apache.org>.
Could you show us how you create test harness?

On 3/4/2021 5:13 AM, Rion Williams wrote:
> Hi all,
>
> Early today I had asked a few questions regarding the use of the many 
> testing constructs available within Flink and believe that I have 
> things in a good direction at present. I did run into a specific case 
> that either may not be supported, or just isn't documented well enough 
> for me to determine what is going wrong.
>
> Basically, I have a KeyedProcessFunction that reads some global-level 
> configuration via GlobalJobParameters within its open function:
>
> override fun open(configuration: Configuration) {
>     // Omitted for brevity
>
>     val parameters = 
> runtimeContext.executionConfig.globalJobParameters as? ParameterTool
>     if (parameters != null) {
>         processingTimeBuffer = 
> parameters.getLong("processingTimeBuffer", 0L)
>     }
> }
>
> This works just as expected within the actual pipeline itself when set 
> similarly:
>
> streamEnvironment.config.globalJobParameters = parameters
>
> However, I don't see an effective way to set this against a 
> TestHarness as I've made several attempts but I never can seem to 
> populate the globalJobParameters property within the 
> KeyedProcessFunction itself using a test harness despite multiple attempts
>
> // Attempt 1
> magicWindowHarness.operator.runtimeContext.executionConfig.globalJobParameters 
> = ParameterTool.fromMap(...)
>
> // Attempt 2
> magicWindowHarness.executionConfig.globalJobParameters = 
> ParameterTool.fromMap(...)
>
> // Attempt 3
> magicWindowHarness.environment.executionConfig.globalJobParameters = 
> ParameterTool.fromMap(...)
>
> // Attempt 4
> val env = StreamExecutionEnvironment.
> env.config.globalJobParameters = ParameterTool.fromMap(...)
>
> Is this supported or am I simply going about it the wrong way? Or even 
> just perhaps missing a piece of the puzzle?
>
> Thanks much,
>
> Rion
>