You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Marjan Jordanovski <mj...@gmail.com> on 2022/12/21 17:09:09 UTC

Question about match_recognize clause in Flink

Hello,

I am using custom made connector to create Source table in this way:

    create table Source (
        ts TIMESTAMP(3),
        instance STRING,
        sservice STRING,
        logdatetime STRING,
        threadid STRING,
        level STRING,
        log_line STRING
    ) with (
        'connector'='lokiquery',
        'host'='<lokiurl>',
        'lokiqueryparamsstring'='query={instance="test",
service="test"}&limit=5000&start=2022-12-15T16:40:09.560Z&end=2022-12-15T16:58:09.570Z'
    );

In this table I successfully store data from the specified time range from
loki. Data is coming as a batch. (not stream)

Then I want to create another table that will look for patterns in the
log_line column from the Source table. I am doing following:

SELECT *
FROM Source
    MATCH_RECOGNIZE (
        ORDER BY ts
        MEASURES
            START_ROW.ts AS start_ts,
            END_ROW.ts AS end_ts
        ONE ROW PER MATCH
        AFTER MATCH SKIP TO LAST END_ROW
        PATTERN (START_ROW{1} UNK_ROW+? MID_ROW{2} END_ROW{1})
        DEFINE
            START_ROW AS START_ROW.log_line SIMILAR TO
'%componentId:.{2}GridInstance_grtm_gridtemplate_headache_view_null%',
            MID_ROW AS MID_ROW.log_line SIMILAR TO '%DSResponse -
DSResponse: List with%',
            END_ROW AS END_ROW.log_line SIMILAR TO '%ContentRepository%'
    ) MR;

And when using python's pyflink, this works just fine!
But when I try the same thing in flink sql cli, I get strange error after
executing second table:

[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not
enough rules to produce a node with desired properties: convention=LOGICAL,
FlinkRelDistributionTraitDef=any, sort=[].
Missing conversion is LogicalMatch[convention: NONE -> LOGICAL]
There is 1 empty subset: rel#175:RelSubset#1.LOGICAL.any.[], the relevant
part of the original plan is as follows
167:LogicalMatch(partition=[[]], order=[[0 ASC-nulls-first]],
outputFields=[[start_ts, end_ts]], allRows=[false], after=[SKIP TO
LAST(_UTF-16LE'END_ROW')],
pattern=[(((PATTERN_QUANTIFIER(_UTF-16LE'START_ROW', 1, 1, false),
PATTERN_QUANTIFIER(_UTF-16LE'UNK_ROW', 1, -1, true)),
PATTERN_QUANTIFIER(_UTF-16LE'MID_ROW', 2, 2, false)),
PATTERN_QUANTIFIER(_UTF-16LE'END_ROW', 1, 1, false))],
isStrictStarts=[false], isStrictEnds=[false], subsets=[[]],
patternDefinitions=[[SIMILAR TO(PREV(START_ROW.$6, 0),
_UTF-16LE'%componentId:.{2}GridInstance_grtm_gridtemplate_headache_view_null%'),
SIMILAR TO(PREV(MID_ROW.$6, 0), _UTF-16LE'%DSResponse - DSResponse: List
with%'), SIMILAR TO(PREV(END_ROW.$6, 0), _UTF-16LE'%ContentRepository%')]],
inputFields=[[ts, instance, service, logdatetime, threadid, level,
log_line]])
  1:LogicalTableScan(subset=[rel#166:RelSubset#0.NONE.any.[]],
table=[[default_catalog, default_database, Source]])

In python, where this works, these are only configs that I use for table
environment (of course I also include jar for my custom connector) :
env_settings = EnvironmentSettings.in_batch_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().get_configuration().set_string("parallelism.default",
"1")

Therefore I set these values in flink sql table:
SET 'execution.runtime-mode' = 'batch';
SET 'parallelism.default' = '1';

But it didn't help. Does anyone have any idea what could be causing this
issue?

Thank you,
Marjan