You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Caizhi Weng <ts...@gmail.com> on 2021/12/01 03:24:05 UTC

Re: REST API for detached minicluster based integration test

Hi!

I see. So to test your watermark strategy you would like to fetch the
watermarks downstream.

I would suggest taking a look at
org.apache.flink.streaming.api.operators.AbstractStreamOperator. This class
has a processWatermark method, which is called when a watermark flows
through this operator. You can make your own testing operator by extending
this class and stuff the testing operator in a
org.apache.flink.streaming.api.transformations.OneInputTransformation. In
this case you do not need to fetch watermarks from the metrics. If
processWatermark is never called then it means no watermark ever comes and
you might want to check your watermark strategy implementation.

Jin Yi <ji...@promoted.ai> 于2021年12月1日周三 上午4:14写道:

> thanks for the reply caizhi!
>
> we're on flink 1.12.3.  in the test, i'm using a custom watermark strategy
> that is derived from BoundedOutOfOrdernessWatermarks that emits watermarks
> using processing time after a period of no events to keep the timer-reliant
> operators happy.  basically, it's using event time for everything, but the
> inputs have watermarks periodically output if there's no events coming in
> through them.
>
> we started w/ test data w/ their own event times in the tests and simply
> used the SEE.fromCollection with a timestamp assigner that extracts the
> timestamp from the test event data.  however, doing things this way, the
> minicluster during the test terminates (and completes the test) once all
> the input is processed, even though there are timers in the operators that
> are meant to supply additional output still outstanding.  so, that's why i
> cobbled together an attempt to control when the input sources are complete
> by using the posted WaitingSourceFunction to send the signal to
> close/cancel the input stream based on some form of state checking on the
> job (which is where this thread starts).
>
> what's the best way to achieve what i need?  i would love to set the
> inputs up so that watermarks get emitted appropriately throughout the
> processing of the test data as well as for a defined period after all the
> "input" is complete such that the timer-reliant operators get their timers
> triggered.  thanks!
>
> On Mon, Nov 29, 2021 at 6:37 PM Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> I believe metrics are enabled by default even for a mini cluster. Which
>> Flink version are you using and how do you set your watermark strategy?
>> Could you share your user code about how to create the datastream / SQL and
>> get the job graph?
>>
>> I'm also curious about why do you need to extract the output watermarks
>> just for stopping the source. You can control the records and the watermark
>> strategy from the source. From my point of view, constructing some test
>> data with some specific row time would be enough.
>>
>> Jin Yi <ji...@promoted.ai> 于2021年11月30日周二 上午3:34写道:
>>
>>> bump.  a more general question is what do people do for more end to end,
>>> full integration tests to test event time based jobs with timers?
>>>
>>> On Tue, Nov 23, 2021 at 11:26 AM Jin Yi <ji...@promoted.ai> wrote:
>>>
>>>> i am writing an integration test where i execute a streaming flink job
>>>> using faked, "unbounded" input where i want to control when the source
>>>> function(s) complete by triggering them once the job's operator's maximum
>>>> output watermarks are beyond some job completion watermark that's relative
>>>> to the maximum input timestamp because the flink job uses event time timers
>>>> to produce some output.
>>>>
>>>> here is the faked, "unbounded" source function class:
>>>>
>>>>   private static class WaitingSourceFunction<OUT> extends
>>>> FromElementsFunction<OUT> {
>>>>
>>>>     private boolean isWaiting;
>>>>
>>>>     private TypeInformation<OUT> typeInfo;
>>>>
>>>>
>>>>     private WaitingSourceFunction(
>>>>
>>>>         StreamExecutionEnvironment env, Collection<OUT> data,
>>>> TypeInformation<OUT> typeInfo)
>>>>
>>>>         throws IOException {
>>>>
>>>>       super(typeInfo.createSerializer(env.getConfig()), data);
>>>>
>>>>       this.isWaiting = true;
>>>>
>>>>       this.typeInfo = typeInfo;
>>>>
>>>>     }
>>>>
>>>>
>>>>     @Override
>>>>
>>>>     public void cancel() {
>>>>
>>>>       super.cancel();
>>>>
>>>>       isWaiting = false;
>>>>
>>>>     }
>>>>
>>>>
>>>>     @Override
>>>>
>>>>     public void run(SourceContext<OUT> ctx) throws Exception {
>>>>
>>>>       super.run(ctx);
>>>>
>>>>       while (isWaiting) {
>>>>
>>>>         TimeUnit.SECONDS.sleep(10);
>>>>
>>>>       }
>>>>
>>>>     }
>>>>
>>>>
>>>>     public long getEndWatermark() {
>>>>
>>>>       // *TODO*
>>>>
>>>>       return 1000000;
>>>>
>>>>     }
>>>>
>>>>   }
>>>>
>>>> and here is function where i want to busy wait (currently hacked up to
>>>> print info to show my problem):
>>>>
>>>>   private void waitForDone(String jobName, WaitingSourceFunction<?>...
>>>> functions)
>>>>
>>>>       throws ConfigurationException, Exception, ExecutionException,
>>>> IOException, InterruptedException {
>>>>
>>>>     JobExecutionResult jobResult = env.execute(jobName);
>>>>
>>>>     RestClient restClient = new RestClient(
>>>>
>>>>         RestClientConfiguration.fromConfiguration(getClientConfiguration()),
>>>> scheduledExecutorService);
>>>>
>>>>     URI restUri = MiniClusterExtension.flinkCluster.getRestAddres();
>>>>
>>>>
>>>>     System.out.printf("** JOB: %s %s\n", jobName,
>>>> jobResult.getJobID());
>>>>
>>>>
>>>>     long currentWatermark = 0;
>>>>
>>>>     long lastInputWatermark = Arrays.stream(functions)
>>>>
>>>>       .map(f -> f.getEndWatermark())
>>>>
>>>>       .mapToLong(l -> l)
>>>>
>>>>       .max().getAsLong();
>>>>
>>>>     for (int i = 0; i < 3 ; i++) {
>>>>
>>>>     //while (currentWatermark < (lastInputWatermark + 1000)) {
>>>>
>>>>       JobDetailsHeaders getVertexHeaders =
>>>> JobDetailsHeaders.getInstance();
>>>>
>>>>       JobMessageParameters getVertexParams =
>>>> getVertexHeaders.getUnresolvedMessageParameters();
>>>>
>>>>       getVertexParams.jobPathParameter.resolve(jobResult.getJobID());
>>>>
>>>>       List<JobVertexID> vertexIds =
>>>>
>>>>         restClient.sendRequest(restUri.getHost(), restUri.getPort(),
>>>> getVertexHeaders, getVertexParams, EmptyRequestBody.getInstance())
>>>>
>>>>           .get().getJobVertexInfos().stream()
>>>>
>>>>           .map(v -> v.getJobVertexID())
>>>>
>>>>           .collect(Collectors.toUnmodifiableList());
>>>>
>>>>
>>>>       for (JobVertexID vertexId : vertexIds) {
>>>>
>>>>         JobVertexMetricsHeaders getWatermarkHeader =
>>>> JobVertexMetricsHeaders.getInstance();
>>>>
>>>>         JobVertexMetricsMessageParameters getWatermarkParams =
>>>> getWatermarkHeader.getUnresolvedMessageParameters();
>>>>
>>>>
>>>> getWatermarkParams.jobPathParameter.resolve(jobResult.getJobID());
>>>>
>>>>         getWatermarkParams.jobVertexIdPathParameter.resolve(vertexId);
>>>>
>>>>         System.out.printf("** LOG VERTEX: %s\n", vertexId);
>>>>
>>>>         try {
>>>>
>>>>           long maxWatermark = restClient.sendRequest(
>>>>
>>>>               restUri.getHost(), restUri.getPort(),
>>>> getWatermarkHeader, getWatermarkParams, EmptyRequestBody.getInstance())
>>>>
>>>>             .get().getMetrics().stream()
>>>>
>>>>             .filter(m -> m.getId().endsWith("Watermark"))
>>>>
>>>>             .map(m -> {
>>>>
>>>>               System.out.printf("** LOG METRIC: %s\n", m);
>>>>
>>>>               return Long.valueOf(StringUtil.isBlank(m.getValue()) ?
>>>> "0" : m.getValue());
>>>>
>>>>             })
>>>>
>>>>             .mapToLong(v -> v)
>>>>
>>>>             .max().orElse(0);
>>>>
>>>>           currentWatermark = Math.max(currentWatermark, maxWatermark);
>>>>
>>>>         } catch (Exception e) {
>>>>
>>>>           System.out.printf("** LOG ERROR: %s\n", e);
>>>>
>>>>         }
>>>>
>>>>       }
>>>>
>>>>       System.out.printf("** SLEEP: %s %s %s\n", i, currentWatermark,
>>>> lastInputWatermark);
>>>>
>>>>       TimeUnit.SECONDS.sleep(1);
>>>>
>>>>     }
>>>>
>>>>
>>>>     System.out.println("** CANCEL SOURCES");
>>>>
>>>>     for (WaitingSourceFunction<?> function : functions) *{*
>>>>
>>>>       function.cancel();
>>>>
>>>>     }
>>>>
>>>>   }
>>>>
>>>> THE PROBLEM: the output of the logging in the test clearly shows that
>>>> the watermark metrics are all null throughout the wait loop:
>>>> https://paste-bin.xyz/16195
>>>>
>>>> I also tried using JobVertexWatermarksHeaders instead of
>>>> JobVertexMetricsHeaders, for the REST get on the job vertices to get
>>>> watermark information, but the response bodies were empty.
>>>>
>>>> QUESTIONS:
>>>> 1.  shouldn't the watermark metrics be populated in the job running in
>>>> the minicluster?
>>>> 2.  do i have to enable metrics somehow?
>>>> 3.  is there a better way to extract the output watermarks of a running
>>>> flink job?
>>>>
>>>

Re: REST API for detached minicluster based integration test

Posted by Jin Yi <ji...@promoted.ai>.
so i went ahead and put some logging in the WatermarkGeneartor.onEvent and
.onPeriodicEmit functions in the test source watermark generator, and i do
see the watermarks come by with values through those functions.  they're
just not being returned as expected via the rest api.

On Tue, Nov 30, 2021 at 7:24 PM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> I see. So to test your watermark strategy you would like to fetch the
> watermarks downstream.
>
> I would suggest taking a look at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator. This
> class has a processWatermark method, which is called when a watermark
> flows through this operator. You can make your own testing operator by
> extending this class and stuff the testing operator in a
> org.apache.flink.streaming.api.transformations.OneInputTransformation. In
> this case you do not need to fetch watermarks from the metrics. If
> processWatermark is never called then it means no watermark ever comes
> and you might want to check your watermark strategy implementation.
>
> Jin Yi <ji...@promoted.ai> 于2021年12月1日周三 上午4:14写道:
>
>> thanks for the reply caizhi!
>>
>> we're on flink 1.12.3.  in the test, i'm using a custom watermark
>> strategy that is derived from BoundedOutOfOrdernessWatermarks that emits
>> watermarks using processing time after a period of no events to keep the
>> timer-reliant operators happy.  basically, it's using event time for
>> everything, but the inputs have watermarks periodically output if there's
>> no events coming in through them.
>>
>> we started w/ test data w/ their own event times in the tests and simply
>> used the SEE.fromCollection with a timestamp assigner that extracts the
>> timestamp from the test event data.  however, doing things this way, the
>> minicluster during the test terminates (and completes the test) once all
>> the input is processed, even though there are timers in the operators that
>> are meant to supply additional output still outstanding.  so, that's why i
>> cobbled together an attempt to control when the input sources are complete
>> by using the posted WaitingSourceFunction to send the signal to
>> close/cancel the input stream based on some form of state checking on the
>> job (which is where this thread starts).
>>
>> what's the best way to achieve what i need?  i would love to set the
>> inputs up so that watermarks get emitted appropriately throughout the
>> processing of the test data as well as for a defined period after all the
>> "input" is complete such that the timer-reliant operators get their timers
>> triggered.  thanks!
>>
>> On Mon, Nov 29, 2021 at 6:37 PM Caizhi Weng <ts...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> I believe metrics are enabled by default even for a mini cluster. Which
>>> Flink version are you using and how do you set your watermark strategy?
>>> Could you share your user code about how to create the datastream / SQL and
>>> get the job graph?
>>>
>>> I'm also curious about why do you need to extract the output watermarks
>>> just for stopping the source. You can control the records and the watermark
>>> strategy from the source. From my point of view, constructing some test
>>> data with some specific row time would be enough.
>>>
>>> Jin Yi <ji...@promoted.ai> 于2021年11月30日周二 上午3:34写道:
>>>
>>>> bump.  a more general question is what do people do for more end to
>>>> end, full integration tests to test event time based jobs with timers?
>>>>
>>>> On Tue, Nov 23, 2021 at 11:26 AM Jin Yi <ji...@promoted.ai> wrote:
>>>>
>>>>> i am writing an integration test where i execute a streaming flink job
>>>>> using faked, "unbounded" input where i want to control when the source
>>>>> function(s) complete by triggering them once the job's operator's maximum
>>>>> output watermarks are beyond some job completion watermark that's relative
>>>>> to the maximum input timestamp because the flink job uses event time timers
>>>>> to produce some output.
>>>>>
>>>>> here is the faked, "unbounded" source function class:
>>>>>
>>>>>   private static class WaitingSourceFunction<OUT> extends
>>>>> FromElementsFunction<OUT> {
>>>>>
>>>>>     private boolean isWaiting;
>>>>>
>>>>>     private TypeInformation<OUT> typeInfo;
>>>>>
>>>>>
>>>>>     private WaitingSourceFunction(
>>>>>
>>>>>         StreamExecutionEnvironment env, Collection<OUT> data,
>>>>> TypeInformation<OUT> typeInfo)
>>>>>
>>>>>         throws IOException {
>>>>>
>>>>>       super(typeInfo.createSerializer(env.getConfig()), data);
>>>>>
>>>>>       this.isWaiting = true;
>>>>>
>>>>>       this.typeInfo = typeInfo;
>>>>>
>>>>>     }
>>>>>
>>>>>
>>>>>     @Override
>>>>>
>>>>>     public void cancel() {
>>>>>
>>>>>       super.cancel();
>>>>>
>>>>>       isWaiting = false;
>>>>>
>>>>>     }
>>>>>
>>>>>
>>>>>     @Override
>>>>>
>>>>>     public void run(SourceContext<OUT> ctx) throws Exception {
>>>>>
>>>>>       super.run(ctx);
>>>>>
>>>>>       while (isWaiting) {
>>>>>
>>>>>         TimeUnit.SECONDS.sleep(10);
>>>>>
>>>>>       }
>>>>>
>>>>>     }
>>>>>
>>>>>
>>>>>     public long getEndWatermark() {
>>>>>
>>>>>       // *TODO*
>>>>>
>>>>>       return 1000000;
>>>>>
>>>>>     }
>>>>>
>>>>>   }
>>>>>
>>>>> and here is function where i want to busy wait (currently hacked up to
>>>>> print info to show my problem):
>>>>>
>>>>>   private void waitForDone(String jobName, WaitingSourceFunction<?>...
>>>>> functions)
>>>>>
>>>>>       throws ConfigurationException, Exception, ExecutionException,
>>>>> IOException, InterruptedException {
>>>>>
>>>>>     JobExecutionResult jobResult = env.execute(jobName);
>>>>>
>>>>>     RestClient restClient = new RestClient(
>>>>>
>>>>>         RestClientConfiguration.fromConfiguration(getClientConfiguration()),
>>>>> scheduledExecutorService);
>>>>>
>>>>>     URI restUri = MiniClusterExtension.flinkCluster.getRestAddres();
>>>>>
>>>>>
>>>>>     System.out.printf("** JOB: %s %s\n", jobName,
>>>>> jobResult.getJobID());
>>>>>
>>>>>
>>>>>     long currentWatermark = 0;
>>>>>
>>>>>     long lastInputWatermark = Arrays.stream(functions)
>>>>>
>>>>>       .map(f -> f.getEndWatermark())
>>>>>
>>>>>       .mapToLong(l -> l)
>>>>>
>>>>>       .max().getAsLong();
>>>>>
>>>>>     for (int i = 0; i < 3 ; i++) {
>>>>>
>>>>>     //while (currentWatermark < (lastInputWatermark + 1000)) {
>>>>>
>>>>>       JobDetailsHeaders getVertexHeaders =
>>>>> JobDetailsHeaders.getInstance();
>>>>>
>>>>>       JobMessageParameters getVertexParams =
>>>>> getVertexHeaders.getUnresolvedMessageParameters();
>>>>>
>>>>>       getVertexParams.jobPathParameter.resolve(jobResult.getJobID());
>>>>>
>>>>>       List<JobVertexID> vertexIds =
>>>>>
>>>>>         restClient.sendRequest(restUri.getHost(), restUri.getPort(),
>>>>> getVertexHeaders, getVertexParams, EmptyRequestBody.getInstance())
>>>>>
>>>>>           .get().getJobVertexInfos().stream()
>>>>>
>>>>>           .map(v -> v.getJobVertexID())
>>>>>
>>>>>           .collect(Collectors.toUnmodifiableList());
>>>>>
>>>>>
>>>>>       for (JobVertexID vertexId : vertexIds) {
>>>>>
>>>>>         JobVertexMetricsHeaders getWatermarkHeader =
>>>>> JobVertexMetricsHeaders.getInstance();
>>>>>
>>>>>         JobVertexMetricsMessageParameters getWatermarkParams =
>>>>> getWatermarkHeader.getUnresolvedMessageParameters();
>>>>>
>>>>>
>>>>> getWatermarkParams.jobPathParameter.resolve(jobResult.getJobID());
>>>>>
>>>>>         getWatermarkParams.jobVertexIdPathParameter.resolve(vertexId);
>>>>>
>>>>>         System.out.printf("** LOG VERTEX: %s\n", vertexId);
>>>>>
>>>>>         try {
>>>>>
>>>>>           long maxWatermark = restClient.sendRequest(
>>>>>
>>>>>               restUri.getHost(), restUri.getPort(),
>>>>> getWatermarkHeader, getWatermarkParams, EmptyRequestBody.getInstance())
>>>>>
>>>>>             .get().getMetrics().stream()
>>>>>
>>>>>             .filter(m -> m.getId().endsWith("Watermark"))
>>>>>
>>>>>             .map(m -> {
>>>>>
>>>>>               System.out.printf("** LOG METRIC: %s\n", m);
>>>>>
>>>>>               return Long.valueOf(StringUtil.isBlank(m.getValue()) ?
>>>>> "0" : m.getValue());
>>>>>
>>>>>             })
>>>>>
>>>>>             .mapToLong(v -> v)
>>>>>
>>>>>             .max().orElse(0);
>>>>>
>>>>>           currentWatermark = Math.max(currentWatermark, maxWatermark);
>>>>>
>>>>>         } catch (Exception e) {
>>>>>
>>>>>           System.out.printf("** LOG ERROR: %s\n", e);
>>>>>
>>>>>         }
>>>>>
>>>>>       }
>>>>>
>>>>>       System.out.printf("** SLEEP: %s %s %s\n", i, currentWatermark,
>>>>> lastInputWatermark);
>>>>>
>>>>>       TimeUnit.SECONDS.sleep(1);
>>>>>
>>>>>     }
>>>>>
>>>>>
>>>>>     System.out.println("** CANCEL SOURCES");
>>>>>
>>>>>     for (WaitingSourceFunction<?> function : functions) *{*
>>>>>
>>>>>       function.cancel();
>>>>>
>>>>>     }
>>>>>
>>>>>   }
>>>>>
>>>>> THE PROBLEM: the output of the logging in the test clearly shows that
>>>>> the watermark metrics are all null throughout the wait loop:
>>>>> https://paste-bin.xyz/16195
>>>>>
>>>>> I also tried using JobVertexWatermarksHeaders instead of
>>>>> JobVertexMetricsHeaders, for the REST get on the job vertices to get
>>>>> watermark information, but the response bodies were empty.
>>>>>
>>>>> QUESTIONS:
>>>>> 1.  shouldn't the watermark metrics be populated in the job running in
>>>>> the minicluster?
>>>>> 2.  do i have to enable metrics somehow?
>>>>> 3.  is there a better way to extract the output watermarks of a
>>>>> running flink job?
>>>>>
>>>>