You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by zhangjun <82...@qq.com> on 2019/09/16 13:34:26 UTC

Re: Can I do a lookup in FlinkSQL?

You &nbsp;can &nbsp;try to use UDTF





------------------ Original ------------------
From: srikanth flink <flink.devv@gmail.com&gt;
Date: Mon,Sep 16,2019 9:23 PM
To: dev <dev@flink.apache.org&gt;, user <user@flink.apache.org&gt;, user-ml <user-ml@flink.apache.org&gt;
Subject: Re: Can I do a lookup in FlinkSQL?



Hi there,

I'm working with streaming in FlinkSQL. I've two tables created one with
dynamic stream and the other a periodic updates.
I would like to keep the periodic table a static(but updates with new data
every day or so by flushing the old), So at any point of time the static
table should contain new set of data.
With dynamic table being populated with stream data, could I do a lookup on
a column of static table to find if the value exists.

This is what I have done:
dynamic table: sourceKafka
static table: badips

Trying to build a list, kind of using ROW() function and done. From dynamic
table, trying to lookup into the list if the value exists.
Query: INSERT INTO sourceKafkaMalicious select s.* from sourceKafka as s
where s.`source.ip` OR s.`destination.ip` IN (select ROW(ip) from badips);
Resonse:
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Values passed to IN
operator must have compatible types

Is it possible to solve my use case? If so, where am I going wrong?

Thanks
Srikanth