You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rong Rong <wa...@gmail.com> on 2019/10/01 04:50:41 UTC

Re: Flink Join Time Window

Hi Nishant,

On a brief look. I think this is a problem with your 2nd query:

>
> *Table2*...
> Table latestBadIps = tableEnv.sqlQuery("SELECT MAX(b_proctime) AS
> mb_proctime, bad_ip FROM BadIP ***GROUP BY bad_ip***HAVING
> MIN(b_proctime) > CURRENT_TIMESTAMP - INTERVAL '2' DAY ");
> tableEnv.registerTable("LatestBadIP", latestBadIps);


This SQL statement states that the table is a  ending and thus your final
table generates a nonWindowJoin.

If I understood you correctly, you were trying to emit some sort of bad IP
address within a specific time window until it is last seen 2 days ago?
What I am assuming you were trying to do is something similar to the
OverWindowAggregate[1].
Similar to: "SELECT MAX(b_proctime) OVER ( PARTITION BY bad_ip RANGE
BETWEEN INTERVAL '2' DAY PRECEDING AND CURRENT ROW ) FROM BadIP"

Thanks,
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#aggregations


On Mon, Sep 30, 2019 at 2:17 AM Nishant Gupta <ni...@gmail.com>
wrote:

> Hi Team,
>
> I am trying to Join [kafka stream] and [badip stream grouped with badip]
>
> Can someone please help me out with verifying what is wrong in
> highlighted query. Am I writing the time window join query wrong with this
> use case.? Or it is a bug and i should report this
> what is the work around, if it is a bug.
>
> *Table1*
> Table  kafkaSource = tableEnv.fromDataStream(inKafkaStreamM,
> "sourceip,field1,field2, k_proctime.proctime")
> tableEnv.registerTable("KafkaSource", kafkaSource);
>
> *Table2*
> Table  badipTable = tableEnv.fromDataStream(badipStreamM, "bad_ip,
> b_proctime.proctime");
> tableEnv.registerTable("BadIP", badipTable);
>
> Table latestBadIps = tableEnv.sqlQuery("SELECT MAX(b_proctime) AS
> mb_proctime, bad_ip FROM BadIP GROUP BY bad_ip HAVING MIN(b_proctime) >
> CURRENT_TIMESTAMP - INTERVAL '2' DAY ");
> tableEnv.registerTable("LatestBadIP", latestBadIps);
>
> *Table3 - Join*
> *Success for below query*
> Table resultKafkaMalicious = tableEnv.sqlQuery("SELECT K.* FROM
> KafkaSource AS K, LatestBadIP AS LB WHERE K.sourceip = LB.bad_ip");
>
> *Failure for below query*
> Table resultKafkaMalicious = tableEnv.sqlQuery("SELECT K.* FROM
> KafkaSource AS K, LatestBadIP AS LB WHERE K.sourceip = LB.bad_ip AND
> LB.mb_proctime BETWEEN K.k_proctime - INTERVAL '4' HOUR AND K.k_proctime +
> INTERVAL '10' MINUTE");
>
> *Error:*
> 14:25:25,230 INFO  org.apache.flink.runtime.taskmanager.Task
>       - InnerJoin(where: (AND(=(sourceip, bad_ip), >=(mb_proctime,
> -(PROCTIME(k_proctime), 14400000:INTERVAL HOUR)), <=(mb_proctime,
> +(PROCTIME(k_proctime), 600000:INTERVAL MINUTE)))), join: (tlsversion,
> tlscipher, tlscurve, tlsserver_name, tlsresumed, tlslast_alert,
> tlsnext_protocol, tlsestablished, tlsclient_cert_chain_fuids, tlssubject,
> tlsissuer, tlsclient_subject, tlsclient_issuer, tlsja3, ecsversion,
> sourceip, sourceport, sourcegeolower, sourcegeoupper,
> sourcegeocountry_iso_code, sourcegeocountry_name, sourcegeoregion_name,
> sourcegeocity_name, sourcegeolocationlat, sourcegeolocationlon,
> sourcegeozipcode, sourcegeotimezone, sourcegeoisp, sourcegeodomain,
> sourcegeonetspeed, sourcegeoiddcode, sourcegeoareacode,
> sourcegeoweatherstation_code, sourcegeoweatherstation_name, sourcegeomcc,
> sourcegeomnc, sourcegeomobilebrand, sourcegeoelevation, sourcegeousagetype,
> destinationip, destinationport, destinationgeolower, destinationgeoupper,
> destinationgeocountry_iso_code, destinationgeocountry_name,
> destinationgeoregion_name, destinationgeocity_name,
> destinationgeolocationlat, destinationgeolocationlon,
> destinationgeozipcode, destinationgeotimezone, destinationgeoisp,
> destinationgeodomain, destinationgeonetspeed, destinationgeoiddcode,
> destinationgeoareacode, destinationgeoweatherstation_code,
> destinationgeoweatherstation_name, destinationgeomcc, destinationgeomnc,
> destinationgeomobilebrand, destinationgeoelevation,
> destinationgeousagetype, eventid, eventprovider, eventdataset, eventtype,
> eventaction, organizationid, timestamp_received, clientmac, transactionid,
> timestamp, message, dhcpassigned_ip, dhcplease_time, dnsrtt,
> dnsquestionclass, dnsquestionname, dnsquestiontype, dnsquery,
> dnsqtype_name, dnsresponse_code, dnsrcode_name, dnsheader_flags,
> dnsanswersdata, dnsanswersttl, dnsrejected, networkprotocol, k_proctime,
> mb_proctime, bad_ip)) -> select: (tlsversion, tlscipher, tlscurve,
> tlsserver_name, tlsresumed, tlslast_alert, tlsnext_protocol,
> tlsestablished, tlsclient_cert_chain_fuids, tlssubject, tlsissuer,
> tlsclient_subject, tlsclient_issuer, tlsja3, ecsversion, sourceip,
> sourceport, sourcegeolower, sourcegeoupper, sourcegeocountry_iso_code,
> sourcegeocountry_name, sourcegeoregion_name, sourcegeocity_name,
> sourcegeolocationlat, sourcegeolocationlon, sourcegeozipcode,
> sourcegeotimezone, sourcegeoisp, sourcegeodomain, sourcegeonetspeed,
> sourcegeoiddcode, sourcegeoareacode, sourcegeoweatherstation_code,
> sourcegeoweatherstation_name, sourcegeomcc, sourcegeomnc,
> sourcegeomobilebrand, sourcegeoelevation, sourcegeousagetype,
> destinationip, destinationport, destinationgeolower, destinationgeoupper,
> destinationgeocountry_iso_code, destinationgeocountry_name,
> destinationgeoregion_name, destinationgeocity_name,
> destinationgeolocationlat, destinationgeolocationlon,
> destinationgeozipcode, destinationgeotimezone, destinationgeoisp,
> destinationgeodomain, destinationgeonetspeed, destinationgeoiddcode,
> destinationgeoareacode, destinationgeoweatherstation_code,
> destinationgeoweatherstation_name, destinationgeomcc, destinationgeomnc,
> destinationgeomobilebrand, destinationgeoelevation,
> destinationgeousagetype, eventid, eventprovider, eventdataset, eventtype,
> eventaction, organizationid, timestamp_received, clientmac, transactionid,
> timestamp, message, dhcpassigned_ip, dhcplease_time, dnsrtt,
> dnsquestionclass, dnsquestionname, dnsquestiontype, dnsquery,
> dnsqtype_name, dnsresponse_code, dnsrcode_name, dnsheader_flags,
> dnsanswersdata, dnsanswersttl, dnsrejected, networkprotocol,
> PROCTIME(k_proctime) AS k_proctime) -> to: Tuple2 -> Map -> Sink: Unnamed
> (4/8) (94e70bf5bb5b89ac5ae933c73c4b0353) switched from RUNNING to FAILED.
> org.apache.flink.api.common.InvalidProgramException: *Table program
> cannot be compiled. This is a bug. Please file an issue.*
> at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
> at
> org.apache.flink.table.runtime.join.NonWindowJoin.compile(NonWindowJoin.scala:46)
> at
> org.apache.flink.table.runtime.join.NonWindowJoin.open(NonWindowJoin.scala:75)
> at
> org.apache.flink.table.runtime.join.NonWindowInnerJoin.open(NonWindowInnerJoin.scala:53)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.co.LegacyKeyedCoProcessOperator.open(LegacyKeyedCoProcessOperator.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.base/java.lang.Thread.run(Thread.java:844)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 957,
> Column 26: Unknown variable or type "ctx"
> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6773)
> at org.codehaus.janino.UnitCompiler.access$13500(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$21.visitPackage(UnitCompiler.java:6385)
> at org.codehaus.janino.UnitCompiler$21.visitPackage(UnitCompiler.java:6382)
> at org.codehaus.janino.Java$Package.accept(Java.java:4237)
> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6768)
> at org.codehaus.janino.UnitCompiler.access$14100(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6410)
> at
> org.codehaus.janino.UnitCompiler$21$2$1.visitAmbiguousName(UnitCompiler.java:6407)
> at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4213)
> at
> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6407)
> at
> org.codehaus.janino.UnitCompiler$21$2.visitLvalue(UnitCompiler.java:6403)
> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4137)
> at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
> at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7019)
> at org.codehaus.janino.UnitCompiler.access$15700(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$21$2.visitMethodInvocation(UnitCompiler.java:6430)
> at
> org.codehaus.janino.UnitCompiler$21$2.visitMethodInvocation(UnitCompiler.java:6403)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403)
> at org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382)
> at org.codehaus.janino.Java$Rvalue.accept(Java.java:4105)
> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8939)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2580)
> at org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1503)
> at
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1487)
> at
> org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3511)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
> at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
> at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> at
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> at
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
> ... 11 more
>