You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Chennet Steven <St...@live.com> on 2019/06/21 03:34:25 UTC
为何会报"Window can only be defined over a time attribute column."??
Flink 1.4.2
执行Sql SELECT Tag, SUM(PowerX) FROM TimeTable GROUP BY TUMBLE(UptMs, INTERVAL '1' HOUR),Tag
会报异常,但是感觉UptMs列类型是TimeStamp啊?
Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:69)
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:82)
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:87)
at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:317)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:506)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:385)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:251)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:197)
at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:257)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:665)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
at com.teld.bdp.tablewindow.TubleApp$.main(TubleApp.scala:56)
at com.teld.bdp.tablewindow.TubleApp.main(TubleApp.scala)
Disconnected from the target VM, address: '127.0.0.1:60155', transport: 'socket'
代码如下
package com.teld.bdp.tablewindow
import java.io.Serializable
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.Types
import org.apache.flink.api.java.tuple.Tuple3
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row
import org.apache.flink.table.api.TableEnvironment
// Flink 1.4.2
object TubleApp {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.createLocalEnvironment(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tableEnv = TableEnvironment.getTableEnvironment(senv)
tableEnv.registerFunction("TOTIMESTAMP", new ConvertLongToTimeStamp())
// 2019-06-20 10:27:57 1560997677000
// 2019-06-20 11:27:57 1561001277000
// 2019-06-20 12:27:57 1561004877000
val source = senv.fromElements(
new Tuple3("a", 1, 1560997677000L),
new Tuple3("b", 1, 1560997677000L),
new Tuple3("a", 2, 1561001277000L),
new Tuple3("b", 2, 1561001277000L),
new Tuple3("a", 3, 1561004877000L),
new Tuple3("b", 3, 1561004877000L)
)
import org.apache.flink.table.api.scala._
tableEnv.registerDataStream("Temp", source, 'Tag, 'PowerX, 'UptTime)
//-----------------------------源Table-----------------------
val sourceTable = tableEnv.sqlQuery("select * from Temp")
// root
// |-- Tag: String
// |-- PowerX: Integer
// |-- UptTime: Long
sourceTable.printSchema()
tableEnv.registerTable("SourceTable", sourceTable)
//-----------------------------转换成TimeStamp-----------------------
val timeTable = tableEnv.sqlQuery("SELECT Tag, PowerX, UptTime, TOTIMESTAMP(UptTime) AS UptMs FROM SourceTable")
// root
// |-- Tag: String
// |-- PowerX: Integer
// |-- UptTime: Long
// |-- UptMs: Timestamp
timeTable.printSchema()
tableEnv.registerTable("TimeTable", timeTable)
tableEnv.toAppendStream[Row](timeTable).print()
//------------------------------agg-------------------------------------
val aggTable = tableEnv.sqlQuery("SELECT Tag, SUM(PowerX) FROM TimeTable GROUP BY TUMBLE(UptMs, INTERVAL '1' HOUR),Tag")
// root
// |-- Tag: String
// |-- EXPR$1: Integer
aggTable.printSchema()
// 为啥这下面这个聚合的table会抛异常啊?
// Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
tableEnv.toRetractStream[Row](aggTable).print()
senv.execute("abc")
}
}
class ConvertLongToTimeStamp extends ScalarFunction with Serializable {
def eval(uptTime: Long): Long = uptTime
override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.SQL_TIMESTAMP
}
From stevenchen
webchat 38798579
Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
Posted by Dian Fu <di...@gmail.com>.
1)在Table API & SQL中,RuntimeContext是不暴露给用户用的,所以是private
2)窗口之间聚合值的差值,可以看看cep能否满足需求,可以参考文档: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/match_recognize.html <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/match_recognize.html>
> 在 2019年11月13日,下午3:35,Chennet Steven <St...@live.com> 写道:
>
> 场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
> 请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥?
>
> From stevenchen
> webchat 38798579
>
> 发件人: Dian Fu<ma...@gmail.com>
> 发送时间: Thursday, November 7, 2019 19:41
> 收件人: user-zh@flink.apache.org<ma...@flink.apache.org>
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>
> 可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java>
>> 在 2019年11月7日,下午7:06,Chennet Steven <St...@live.com> 写道:
>>
>> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
>> 能否给个example或者是test代码的链接啊?
>>
>> From stevenchen
>> webchat 38798579
>>
>> ________________________________
>> 发件人: wenlong.lwl <we...@gmail.com>
>> 发送时间: Thursday, November 7, 2019 2:13:43 PM
>> 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
>> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>>
>> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>>
>> On Thu, 7 Nov 2019 at 09:22, Chennet Steven <st...@live.com> wrote:
>>
>>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
>>> 如何在聚合函数中使用State?
>>>
>>>
>>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>>> TypeInformation}
>>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
>>> import org.apache.flink.api.java.typeutils.TupleTypeInfo
>>> import org.apache.flink.table.functions.{AggregateFunction,
>>> FunctionContext}
>>> import java.lang.{Iterable => JIterable}
>>>
>>>
>>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>>>
>>> class IntDiffSumFunction extends AggregateFunction[Int,
>>> IntDiffSumAccumulator] {
>>>
>>> override def open(context: FunctionContext): Unit = {
>>> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>>> //getRuntimeContext.getState(desc)
>>> val a = this.hashCode()
>>> print(s"hashCode:$a")
>>> super.open(context)
>>> }
>>>
>>> override def createAccumulator(): IntDiffSumAccumulator = {
>>> val acc = new IntDiffSumAccumulator()
>>> acc.f0 = 0
>>> acc.f1 = false
>>> acc
>>> }
>>>
>>> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>>> accumulator.f0 += value
>>> accumulator.f1 = true
>>> }
>>>
>>> override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>>> if (accumulator.f1) {
>>>
>>> accumulator.f0
>>> } else {
>>> Int.MinValue
>>> }
>>> }
>>>
>>> def merge(acc: IntDiffSumAccumulator, its:
>>> JIterable[IntDiffSumAccumulator]) = {
>>> val iter = its.iterator()
>>> while (true) {
>>> val a = iter.next()
>>> if (a.f1) {
>>> acc.f0 += a.f0
>>> acc.f1 = true
>>> }
>>> }
>>> }
>>>
>>> def resetAccumulator(acc: IntDiffSumAccumulator) = {
>>> acc.f0 = 0
>>> acc.f1 = false
>>> }
>>>
>>> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>>> new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>>> }
>>>
>>>
>>> From stevenchen
>>> webchat 38798579
>>>
>>>
>>>
>
回复: Flink1.9.1,TableApi如何读取Kafka08Json的数据
Posted by Chennet Steven <st...@live.com>.
Kafka的一个value如下
{
"BillID": "230c95c6-346c-4070-9b49-b3bbbf6691db",
"BillCode": "201912230300118165",
"UptTimeMs": 1577091480000,
"SOC": 0.86,
"HighestVoltage": 4.019999980926514
}
其中 "UptTimeMs": 1577091480000 是到1970的毫秒值
代码修改如下
private static void FunA() throws Exception {
Configuration localConfig = new Configuration();
localConfig.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConfig);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment ste = StreamTableEnvironment.create(env);
Kafka kafka08 = new Kafka()
.version("0.8")
.topic("BDP-ChargingMinuteMetric")
.startFromEarliest()
.property("zookeeper.connect", "def:2182/ ")
.property("bootstrap.servers", "abc:9095")
.property("group.id", "abc");
Schema schema = new Schema()
.field("rowtime", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime().timestampsFromField("UptTimeMs").watermarksPeriodicBounded(1000))
.field("BillID", Types.STRING)
.field("SOC", Types.DOUBLE)
.field("HighestVoltage", Types.DOUBLE);
TypeInformation<?>[] types = new TypeInformation<?>[]{Types.SQL_TIMESTAMP, Types.STRING, Types.DOUBLE, Types.DOUBLE};
String[] fieldNames = new String[]{"UptTimeMs", "BillId", "SOC", "HighestVoltage"};
TypeInformation<Row> typeInformation = new RowTypeInfo(types, fieldNames);
FormatDescriptor formatDescriptor = new Json().failOnMissingField(true).schema(typeInformation).deriveSchema();
ste.connect(kafka08).withFormat(formatDescriptor).withSchema(schema).inAppendMode().registerTableSource("SourceTable");
Table table = ste.sqlQuery("select * from SourceTable");
DataStream<Row> rowDataStream = ste.toAppendStream(table, Row.class);
rowDataStream.print();
ste.execute("ABC");
}
提示错误
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.table.executor.StreamExecutor.execute(StreamExecutor.java:50)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:410)
at com.teld.demo.Kafka08App.FunA(Kafka08App.java:75)
at com.teld.demo.Kafka08App.main(Kafka08App.java:23)
Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize JSON object.
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:217)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to deserialize JSON object.
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:374)
Caused by: java.time.format.DateTimeParseException: Text '1577085840000' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
查看源代码发现:
https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
发现rowtime的必须是Sql_TimeStamp类型,而Sql_TimeStamp使用RFC3339_TIMESTAMP_FORMAT来解析,这需要一个String类型的字符串啊,如果是到1970的毫秒值,这个地方该如何做啊?
From stevenchen
webchat 38798579
发件人: Chennet Steven<ma...@live.com>
发送时间: Tuesday, December 24, 2019 15:53
收件人: user-zh@flink.apache.org<ma...@flink.apache.org>
主题: Flink1.9.1,TableApi如何读取Kafka08Json的数据
刚从1.7升级到1.9,感觉kafka的读取方式有了变化,没找到example关于1.9读取kafka的example,谁能给个demo的地址啊?
下面这个代码在1.9下没有跑通过,提示
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'UptTimeMs' could not be resolved by the field mapping.
at org.apache.flink.table.sources.TableSourceValidation.resolveField(TableSourceValidation.java:245)
at org.apache.flink.table.sources.TableSourceValidation.lambda$validateTimestampExtractorArguments$6(TableSourceValidation.java:202)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
at org.apache.flink.table.sources.TableSourceValidation.validateTimestampExtractorArguments(TableSourceValidation.java:204)
at org.apache.flink.table.sources.TableSourceValidation.validateTableSource(TableSourceValidation.java:70)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.validateTableSource(TableEnvironmentImpl.java:435)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.validateTableSource(StreamTableEnvironmentImpl.java:329)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSourceInternal(TableEnvironmentImpl.java:516)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSource(TableEnvironmentImpl.java:200)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:70)
at com.teld.demo.Kafka08App.FunA(Kafka08App.java:69)
at com.teld.demo.Kafka08App.main(Kafka08App.java:23)
代码如下
private static void FunA() throws Exception {
Configuration localConfig = new Configuration();
localConfig.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConfig);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment ste = StreamTableEnvironment.create(env);
Kafka kafka08 = new Kafka()
.version("0.8")
.topic("BDP-ChargingMinuteMetric")
.startFromEarliest()
.property("zookeeper.connect", "abc:2182/kafka08")
.property("bootstrap.servers", "def:9095")
.property("group.id", "abc");
Schema schema = new Schema()
.field("UptTimeMs", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime().timestampsFromField("UptTimeMs").watermarksPeriodicBounded(1000))
.field("BillId", Types.STRING)
.field("SOC", Types.DOUBLE)
.field("HighestVoltage", Types.DOUBLE);
TypeInformation<?>[] types = new TypeInformation<?>[]{Types.SQL_TIMESTAMP, Types.STRING, Types.DOUBLE, Types.DOUBLE};
String[] fieldNames = new String[]{"UptTimeMs", "BillId", "SOC", "HighestVoltage"};
TypeInformation<Row> typeInformation = new RowTypeInfo(types, fieldNames);
FormatDescriptor formatDescriptor = new Json().failOnMissingField(false).schema(typeInformation).deriveSchema();
ste.connect(kafka08).withFormat(formatDescriptor).withSchema(schema).inAppendMode().registerTableSource("SourceTable");
Table table = ste.sqlQuery("select * from SourceTable");
DataStream<Row> rowDataStream = ste.toAppendStream(table, Row.class);
rowDataStream.print();
ste.execute("ABC");
}
From stevenchen
webchat 38798579
Flink1.9.1,TableApi如何读取Kafka08Json的数据
Posted by Chennet Steven <st...@live.com>.
刚从1.7升级到1.9,感觉kafka的读取方式有了变化,没找到example关于1.9读取kafka的example,谁能给个demo的地址啊?
下面这个代码在1.9下没有跑通过,提示
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'UptTimeMs' could not be resolved by the field mapping.
at org.apache.flink.table.sources.TableSourceValidation.resolveField(TableSourceValidation.java:245)
at org.apache.flink.table.sources.TableSourceValidation.lambda$validateTimestampExtractorArguments$6(TableSourceValidation.java:202)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
at org.apache.flink.table.sources.TableSourceValidation.validateTimestampExtractorArguments(TableSourceValidation.java:204)
at org.apache.flink.table.sources.TableSourceValidation.validateTableSource(TableSourceValidation.java:70)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.validateTableSource(TableEnvironmentImpl.java:435)
at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.validateTableSource(StreamTableEnvironmentImpl.java:329)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSourceInternal(TableEnvironmentImpl.java:516)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerTableSource(TableEnvironmentImpl.java:200)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:70)
at com.teld.demo.Kafka08App.FunA(Kafka08App.java:69)
at com.teld.demo.Kafka08App.main(Kafka08App.java:23)
代码如下
private static void FunA() throws Exception {
Configuration localConfig = new Configuration();
localConfig.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConfig);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment ste = StreamTableEnvironment.create(env);
Kafka kafka08 = new Kafka()
.version("0.8")
.topic("BDP-ChargingMinuteMetric")
.startFromEarliest()
.property("zookeeper.connect", "hdpjntest.chinacloudapp.cn:2182/kafka08")
.property("bootstrap.servers", "telddruidteal.chinacloudapp.cn:9095")
.property("group.id", "abc");
Schema schema = new Schema()
.field("UptTimeMs", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime().timestampsFromField("UptTimeMs").watermarksPeriodicBounded(1000))
.field("BillId", Types.STRING)
.field("SOC", Types.DOUBLE)
.field("HighestVoltage", Types.DOUBLE);
TypeInformation<?>[] types = new TypeInformation<?>[]{Types.SQL_TIMESTAMP, Types.STRING, Types.DOUBLE, Types.DOUBLE};
String[] fieldNames = new String[]{"UptTimeMs", "BillId", "SOC", "HighestVoltage"};
TypeInformation<Row> typeInformation = new RowTypeInfo(types, fieldNames);
FormatDescriptor formatDescriptor = new Json().failOnMissingField(false).schema(typeInformation).deriveSchema();
ste.connect(kafka08).withFormat(formatDescriptor).withSchema(schema).inAppendMode().registerTableSource("SourceTable");
Table table = ste.sqlQuery("select * from SourceTable");
DataStream<Row> rowDataStream = ste.toAppendStream(table, Row.class);
rowDataStream.print();
ste.execute("ABC");
}
From stevenchen
webchat 38798579
回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
Posted by "Yuan,Youjun" <yu...@baidu.com>.
SQL没有表达这种“最早一分钟”的逻辑。
如果在你的消息的开头,插入一个temperature=0的消息,那么你得到的第一个输出diff_temperature=0,不知道这种方式是否可以接受。
发件人: Chennet Steven <st...@live.com>
发送时间: Thursday, November 14, 2019 5:32 PM
收件人: user-zh@flink.apache.org; Yuan,Youjun <yu...@baidu.com>
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
Yuan, 非常感谢大佬的回复和方案,我代码尝试了,这个方案的确可行,但在计算最早一分钟 diff_temperature时候,由于没有更早的分钟数据,这个diff_temperature会被计算成第一分钟的t,是否能有方法将他设置为null?
运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":59999} ---- 这分钟的1.3 是否能有方法设置为null?
{"deviceid":"dev1","diff_temperature":0.3,"ts":119999}
{"deviceid":"dev1","diff_temperature":0.1,"ts":179999}
From stevenchen
webchat 38798579
________________________________
发件人: Yuan,Youjun <yu...@baidu.com>>
发送时间: Wednesday, November 13, 2019 11:34:53 PM
收件人: user-zh@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
这个场景应可以通过标准的SQL完成计算。大致思路如下:
1,内层查询统计每个设备一分钟的最大温度,max(temp) as max_temperature + tumble窗口
2,外层通过row over窗口,拿到当前分钟的max_temperature,和前后2分钟最大温度的和,即SUM(max_temperature) AS sum_temperature
3,最外层,就直接select 2 * max_temperature - sum_temperature就是你需要的前后2个分钟最大温度的差了。
假设输入消息有三个字段:
Ts: 时间戳
Deviceid:设备编号
Temp: 设备温度
完整的SQL如下:
INSERT INTO mysink
SELECT ts, deviceid, 2 * max_temperature - sum_temperature AS diff_temperature
FROM (
SELECT deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature
FROM (
SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid
)
)
我用如下测试数据:
"20000,dev1,1.2",
"50000,dev1,1.3",
"60000,dev1,1.4",
"100000,dev1,1.5",
"110000,dev1,1.6",
"120000,dev1,1.7"
运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":59999}
{"deviceid":"dev1","diff_temperature":0.3,"ts":119999}
{"deviceid":"dev1","diff_temperature":0.1,"ts":179999}
如果你向完整的验证我的方法,你可以:
1,登陆 http://creek.baidubce.com/
2,在作业订阅输入框,输入邮件末尾的作业定义(json)
3,点击生成可执行文件,在弹出的对话框中,选择你的电脑的OS和CPU ARCH,并且点击确定
耐心等待几秒中,系统会生成完整的可执行文件,你直接执行它,便可从控制台看到计算结果。如果你需要验证更多的数据,请修改source的Type=STDIN,这样你可以从命令行下输入你的数据了。
作业定义(json):
{
"注释":{
"说明": "内层query计算每个设备每分钟的最大温度,max+tumble窗口;外层query计算同一设备前后2分钟最大温度差, Row over窗口;计算温度差的计算方法为:当前窗口最大温度 x 2 - 前后2个窗口最大温度的和。本例采用预先配置的输入数据,即source type=COLLECTION,如果需要尝试更多的输入,可以将type改为STDIN,即从标准输入读入数据。",
"输入示例": "1000,dev1,2.3",
"输出示例": {"deviceid":"dev1","diff_temperature":1.3,"ts":59999}
},
"sources": [{
"schema": {
"format": "CSV",
"fields": [{
"name": "ts",
"type": "SQL_TIMESTAMP"
},
{
"name": "deviceid",
"type": "STRING"
},
{
"name": "temp",
"type": "DOUBLE"
}]
},
"watermark": 0,
"name": "mysrc",
"eventTime": "ts",
"type": "COLLECTION",
"attr": {
"input": [
"10000,dev1,1.1",
"20000,dev1,1.2",
"50000,dev1,1.3",
"60000,dev1,1.4",
"100000,dev1,1.5",
"110000,dev1,1.6",
"120000,dev1,1.7"
]
}
}],
"sink": {
"schema": {
"format": "JSON"
},
"name": "mysink",
"type": "STDOUT"
},
"name": "demojob",
"timeType": "EVENTTIME",
"sql": "INSERT INTO mysink SELECT ts, deviceid, 2 * max_temperature - sum_temperature AS diff_temperature FROM ( SELECT deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM (SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid)) "
}
-----邮件原件-----
发件人: Chennet Steven <st...@live.com>>
发送时间: Wednesday, November 13, 2019 3:36 PM
收件人: user-zh@flink.apache.org<ma...@flink.apache.org>
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥?
From stevenchen
webchat 38798579
发件人: Dian Fu<ma...@gmail.com>
发送时间: Thursday, November 7, 2019 19:41
收件人: user-zh@flink.apache.org<ma...@flink.apache.org>>
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java>
> 在 2019年11月7日,下午7:06,Chennet Steven <St...@live.com>> 写道:
>
> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如
> 何在自定义函数中使用,
> 能否给个example或者是test代码的链接啊?
>
> From stevenchen
> webchat 38798579
>
> ________________________________
> 发件人: wenlong.lwl <we...@gmail.com>>
> 发送时间: Thursday, November 7, 2019 2:13:43 PM
> 收件人: user-zh@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>
> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>
> On Thu, 7 Nov 2019 at 09:22, Chennet Steven <st...@live.com>> wrote:
>
>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContex
>> t
>> 如何在聚合函数中使用State?
>>
>>
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> TypeInformation}
>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import
>> org.apache.flink.api.java.typeutils.TupleTypeInfo
>> import org.apache.flink.table.functions.{AggregateFunction,
>> FunctionContext}
>> import java.lang.{Iterable => JIterable}
>>
>>
>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>>
>> class IntDiffSumFunction extends AggregateFunction[Int,
>> IntDiffSumAccumulator] {
>>
>> override def open(context: FunctionContext): Unit = {
>> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>> //getRuntimeContext.getState(desc)
>> val a = this.hashCode()
>> print(s"hashCode:$a")
>> super.open(context)
>> }
>>
>> override def createAccumulator(): IntDiffSumAccumulator = {
>> val acc = new IntDiffSumAccumulator()
>> acc.f0 = 0
>> acc.f1 = false
>> acc
>> }
>>
>> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>> accumulator.f0 += value
>> accumulator.f1 = true
>> }
>>
>> override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>> if (accumulator.f1) {
>>
>> accumulator.f0
>> } else {
>> Int.MinValue
>> }
>> }
>>
>> def merge(acc: IntDiffSumAccumulator, its:
>> JIterable[IntDiffSumAccumulator]) = {
>> val iter = its.iterator()
>> while (true) {
>> val a = iter.next()
>> if (a.f1) {
>> acc.f0 += a.f0
>> acc.f1 = true
>> }
>> }
>> }
>>
>> def resetAccumulator(acc: IntDiffSumAccumulator) = {
>> acc.f0 = 0
>> acc.f1 = false
>> }
>>
>> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>> new
>> TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>> }
>>
>>
>> From stevenchen
>> webchat 38798579
>>
>>
>>
回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
Posted by Chennet Steven <st...@live.com>.
Yuan, 非常感谢大佬的回复和方案,我代码尝试了,这个方案的确可行,但在计算最早一分钟 diff_temperature时候,由于没有更早的分钟数据,这个diff_temperature会被计算成第一分钟的t,是否能有方法将他设置为null?
运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":59999} ---- 这分钟的1.3 是否能有方法设置为null?
{"deviceid":"dev1","diff_temperature":0.3,"ts":119999}
{"deviceid":"dev1","diff_temperature":0.1,"ts":179999}
From stevenchen
webchat 38798579
________________________________
发件人: Yuan,Youjun <yu...@baidu.com>
发送时间: Wednesday, November 13, 2019 11:34:53 PM
收件人: user-zh@flink.apache.org <us...@flink.apache.org>
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
这个场景应可以通过标准的SQL完成计算。大致思路如下:
1,内层查询统计每个设备一分钟的最大温度,max(temp) as max_temperature + tumble窗口
2,外层通过row over窗口,拿到当前分钟的max_temperature,和前后2分钟最大温度的和,即SUM(max_temperature) AS sum_temperature
3,最外层,就直接select 2 * max_temperature - sum_temperature就是你需要的前后2个分钟最大温度的差了。
假设输入消息有三个字段:
Ts: 时间戳
Deviceid:设备编号
Temp: 设备温度
完整的SQL如下:
INSERT INTO mysink
SELECT ts, deviceid, 2 * max_temperature - sum_temperature AS diff_temperature
FROM (
SELECT deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature
FROM (
SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid
)
)
我用如下测试数据:
"20000,dev1,1.2",
"50000,dev1,1.3",
"60000,dev1,1.4",
"100000,dev1,1.5",
"110000,dev1,1.6",
"120000,dev1,1.7"
运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":59999}
{"deviceid":"dev1","diff_temperature":0.3,"ts":119999}
{"deviceid":"dev1","diff_temperature":0.1,"ts":179999}
如果你向完整的验证我的方法,你可以:
1,登陆 http://creek.baidubce.com/
2,在作业订阅输入框,输入邮件末尾的作业定义(json)
3,点击生成可执行文件,在弹出的对话框中,选择你的电脑的OS和CPU ARCH,并且点击确定
耐心等待几秒中,系统会生成完整的可执行文件,你直接执行它,便可从控制台看到计算结果。如果你需要验证更多的数据,请修改source的Type=STDIN,这样你可以从命令行下输入你的数据了。
作业定义(json):
{
"注释":{
"说明": "内层query计算每个设备每分钟的最大温度,max+tumble窗口;外层query计算同一设备前后2分钟最大温度差, Row over窗口;计算温度差的计算方法为:当前窗口最大温度 x 2 - 前后2个窗口最大温度的和。本例采用预先配置的输入数据,即source type=COLLECTION,如果需要尝试更多的输入,可以将type改为STDIN,即从标准输入读入数据。",
"输入示例": "1000,dev1,2.3",
"输出示例": {"deviceid":"dev1","diff_temperature":1.3,"ts":59999}
},
"sources": [{
"schema": {
"format": "CSV",
"fields": [{
"name": "ts",
"type": "SQL_TIMESTAMP"
},
{
"name": "deviceid",
"type": "STRING"
},
{
"name": "temp",
"type": "DOUBLE"
}]
},
"watermark": 0,
"name": "mysrc",
"eventTime": "ts",
"type": "COLLECTION",
"attr": {
"input": [
"10000,dev1,1.1",
"20000,dev1,1.2",
"50000,dev1,1.3",
"60000,dev1,1.4",
"100000,dev1,1.5",
"110000,dev1,1.6",
"120000,dev1,1.7"
]
}
}],
"sink": {
"schema": {
"format": "JSON"
},
"name": "mysink",
"type": "STDOUT"
},
"name": "demojob",
"timeType": "EVENTTIME",
"sql": "INSERT INTO mysink SELECT ts, deviceid, 2 * max_temperature - sum_temperature AS diff_temperature FROM ( SELECT deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM (SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid)) "
}
-----邮件原件-----
发件人: Chennet Steven <st...@live.com>
发送时间: Wednesday, November 13, 2019 3:36 PM
收件人: user-zh@flink.apache.org
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥?
From stevenchen
webchat 38798579
发件人: Dian Fu<ma...@gmail.com>
发送时间: Thursday, November 7, 2019 19:41
收件人: user-zh@flink.apache.org<ma...@flink.apache.org>
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java>
> 在 2019年11月7日,下午7:06,Chennet Steven <St...@live.com> 写道:
>
> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如
> 何在自定义函数中使用,
> 能否给个example或者是test代码的链接啊?
>
> From stevenchen
> webchat 38798579
>
> ________________________________
> 发件人: wenlong.lwl <we...@gmail.com>
> 发送时间: Thursday, November 7, 2019 2:13:43 PM
> 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>
> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>
> On Thu, 7 Nov 2019 at 09:22, Chennet Steven <st...@live.com> wrote:
>
>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContex
>> t
>> 如何在聚合函数中使用State?
>>
>>
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> TypeInformation}
>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import
>> org.apache.flink.api.java.typeutils.TupleTypeInfo
>> import org.apache.flink.table.functions.{AggregateFunction,
>> FunctionContext}
>> import java.lang.{Iterable => JIterable}
>>
>>
>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>>
>> class IntDiffSumFunction extends AggregateFunction[Int,
>> IntDiffSumAccumulator] {
>>
>> override def open(context: FunctionContext): Unit = {
>> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>> //getRuntimeContext.getState(desc)
>> val a = this.hashCode()
>> print(s"hashCode:$a")
>> super.open(context)
>> }
>>
>> override def createAccumulator(): IntDiffSumAccumulator = {
>> val acc = new IntDiffSumAccumulator()
>> acc.f0 = 0
>> acc.f1 = false
>> acc
>> }
>>
>> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>> accumulator.f0 += value
>> accumulator.f1 = true
>> }
>>
>> override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>> if (accumulator.f1) {
>>
>> accumulator.f0
>> } else {
>> Int.MinValue
>> }
>> }
>>
>> def merge(acc: IntDiffSumAccumulator, its:
>> JIterable[IntDiffSumAccumulator]) = {
>> val iter = its.iterator()
>> while (true) {
>> val a = iter.next()
>> if (a.f1) {
>> acc.f0 += a.f0
>> acc.f1 = true
>> }
>> }
>> }
>>
>> def resetAccumulator(acc: IntDiffSumAccumulator) = {
>> acc.f0 = 0
>> acc.f1 = false
>> }
>>
>> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>> new
>> TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>> }
>>
>>
>> From stevenchen
>> webchat 38798579
>>
>>
>>
回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
Posted by "Yuan,Youjun" <yu...@baidu.com>.
这个场景应可以通过标准的SQL完成计算。大致思路如下:
1,内层查询统计每个设备一分钟的最大温度,max(temp) as max_temperature + tumble窗口
2,外层通过row over窗口,拿到当前分钟的max_temperature,和前后2分钟最大温度的和,即SUM(max_temperature) AS sum_temperature
3,最外层,就直接select 2 * max_temperature - sum_temperature就是你需要的前后2个分钟最大温度的差了。
假设输入消息有三个字段:
Ts: 时间戳
Deviceid:设备编号
Temp: 设备温度
完整的SQL如下:
INSERT INTO mysink
SELECT ts, deviceid, 2 * max_temperature - sum_temperature AS diff_temperature
FROM (
SELECT deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature
FROM (
SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid
)
)
我用如下测试数据:
"20000,dev1,1.2",
"50000,dev1,1.3",
"60000,dev1,1.4",
"100000,dev1,1.5",
"110000,dev1,1.6",
"120000,dev1,1.7"
运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":59999}
{"deviceid":"dev1","diff_temperature":0.3,"ts":119999}
{"deviceid":"dev1","diff_temperature":0.1,"ts":179999}
如果你向完整的验证我的方法,你可以:
1,登陆 http://creek.baidubce.com/
2,在作业订阅输入框,输入邮件末尾的作业定义(json)
3,点击生成可执行文件,在弹出的对话框中,选择你的电脑的OS和CPU ARCH,并且点击确定
耐心等待几秒中,系统会生成完整的可执行文件,你直接执行它,便可从控制台看到计算结果。如果你需要验证更多的数据,请修改source的Type=STDIN,这样你可以从命令行下输入你的数据了。
作业定义(json):
{
"注释":{
"说明": "内层query计算每个设备每分钟的最大温度,max+tumble窗口;外层query计算同一设备前后2分钟最大温度差, Row over窗口;计算温度差的计算方法为:当前窗口最大温度 x 2 - 前后2个窗口最大温度的和。本例采用预先配置的输入数据,即source type=COLLECTION,如果需要尝试更多的输入,可以将type改为STDIN,即从标准输入读入数据。",
"输入示例": "1000,dev1,2.3",
"输出示例": {"deviceid":"dev1","diff_temperature":1.3,"ts":59999}
},
"sources": [{
"schema": {
"format": "CSV",
"fields": [{
"name": "ts",
"type": "SQL_TIMESTAMP"
},
{
"name": "deviceid",
"type": "STRING"
},
{
"name": "temp",
"type": "DOUBLE"
}]
},
"watermark": 0,
"name": "mysrc",
"eventTime": "ts",
"type": "COLLECTION",
"attr": {
"input": [
"10000,dev1,1.1",
"20000,dev1,1.2",
"50000,dev1,1.3",
"60000,dev1,1.4",
"100000,dev1,1.5",
"110000,dev1,1.6",
"120000,dev1,1.7"
]
}
}],
"sink": {
"schema": {
"format": "JSON"
},
"name": "mysink",
"type": "STDOUT"
},
"name": "demojob",
"timeType": "EVENTTIME",
"sql": "INSERT INTO mysink SELECT ts, deviceid, 2 * max_temperature - sum_temperature AS diff_temperature FROM ( SELECT deviceid, ts, max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM (SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS max_temperature from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), deviceid)) "
}
-----邮件原件-----
发件人: Chennet Steven <st...@live.com>
发送时间: Wednesday, November 13, 2019 3:36 PM
收件人: user-zh@flink.apache.org
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥?
From stevenchen
webchat 38798579
发件人: Dian Fu<ma...@gmail.com>
发送时间: Thursday, November 7, 2019 19:41
收件人: user-zh@flink.apache.org<ma...@flink.apache.org>
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java>
> 在 2019年11月7日,下午7:06,Chennet Steven <St...@live.com> 写道:
>
> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如
> 何在自定义函数中使用,
> 能否给个example或者是test代码的链接啊?
>
> From stevenchen
> webchat 38798579
>
> ________________________________
> 发件人: wenlong.lwl <we...@gmail.com>
> 发送时间: Thursday, November 7, 2019 2:13:43 PM
> 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>
> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>
> On Thu, 7 Nov 2019 at 09:22, Chennet Steven <st...@live.com> wrote:
>
>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContex
>> t
>> 如何在聚合函数中使用State?
>>
>>
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> TypeInformation}
>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import
>> org.apache.flink.api.java.typeutils.TupleTypeInfo
>> import org.apache.flink.table.functions.{AggregateFunction,
>> FunctionContext}
>> import java.lang.{Iterable => JIterable}
>>
>>
>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>>
>> class IntDiffSumFunction extends AggregateFunction[Int,
>> IntDiffSumAccumulator] {
>>
>> override def open(context: FunctionContext): Unit = {
>> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>> //getRuntimeContext.getState(desc)
>> val a = this.hashCode()
>> print(s"hashCode:$a")
>> super.open(context)
>> }
>>
>> override def createAccumulator(): IntDiffSumAccumulator = {
>> val acc = new IntDiffSumAccumulator()
>> acc.f0 = 0
>> acc.f1 = false
>> acc
>> }
>>
>> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>> accumulator.f0 += value
>> accumulator.f1 = true
>> }
>>
>> override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>> if (accumulator.f1) {
>>
>> accumulator.f0
>> } else {
>> Int.MinValue
>> }
>> }
>>
>> def merge(acc: IntDiffSumAccumulator, its:
>> JIterable[IntDiffSumAccumulator]) = {
>> val iter = its.iterator()
>> while (true) {
>> val a = iter.next()
>> if (a.f1) {
>> acc.f0 += a.f0
>> acc.f1 = true
>> }
>> }
>> }
>>
>> def resetAccumulator(acc: IntDiffSumAccumulator) = {
>> acc.f0 = 0
>> acc.f1 = false
>> }
>>
>> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>> new
>> TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>> }
>>
>>
>> From stevenchen
>> webchat 38798579
>>
>>
>>
回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
Posted by Chennet Steven <st...@live.com>.
场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥?
From stevenchen
webchat 38798579
发件人: Dian Fu<ma...@gmail.com>
发送时间: Thursday, November 7, 2019 19:41
收件人: user-zh@flink.apache.org<ma...@flink.apache.org>
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java>
> 在 2019年11月7日,下午7:06,Chennet Steven <St...@live.com> 写道:
>
> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
> 能否给个example或者是test代码的链接啊?
>
> From stevenchen
> webchat 38798579
>
> ________________________________
> 发件人: wenlong.lwl <we...@gmail.com>
> 发送时间: Thursday, November 7, 2019 2:13:43 PM
> 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>
> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>
> On Thu, 7 Nov 2019 at 09:22, Chennet Steven <st...@live.com> wrote:
>
>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
>> 如何在聚合函数中使用State?
>>
>>
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> TypeInformation}
>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
>> import org.apache.flink.api.java.typeutils.TupleTypeInfo
>> import org.apache.flink.table.functions.{AggregateFunction,
>> FunctionContext}
>> import java.lang.{Iterable => JIterable}
>>
>>
>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>>
>> class IntDiffSumFunction extends AggregateFunction[Int,
>> IntDiffSumAccumulator] {
>>
>> override def open(context: FunctionContext): Unit = {
>> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>> //getRuntimeContext.getState(desc)
>> val a = this.hashCode()
>> print(s"hashCode:$a")
>> super.open(context)
>> }
>>
>> override def createAccumulator(): IntDiffSumAccumulator = {
>> val acc = new IntDiffSumAccumulator()
>> acc.f0 = 0
>> acc.f1 = false
>> acc
>> }
>>
>> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>> accumulator.f0 += value
>> accumulator.f1 = true
>> }
>>
>> override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>> if (accumulator.f1) {
>>
>> accumulator.f0
>> } else {
>> Int.MinValue
>> }
>> }
>>
>> def merge(acc: IntDiffSumAccumulator, its:
>> JIterable[IntDiffSumAccumulator]) = {
>> val iter = its.iterator()
>> while (true) {
>> val a = iter.next()
>> if (a.f1) {
>> acc.f0 += a.f0
>> acc.f1 = true
>> }
>> }
>> }
>>
>> def resetAccumulator(acc: IntDiffSumAccumulator) = {
>> acc.f0 = 0
>> acc.f1 = false
>> }
>>
>> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>> new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>> }
>>
>>
>> From stevenchen
>> webchat 38798579
>>
>>
>>
Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
Posted by Dian Fu <di...@gmail.com>.
可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java <https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java>
> 在 2019年11月7日,下午7:06,Chennet Steven <St...@live.com> 写道:
>
> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
> 能否给个example或者是test代码的链接啊?
>
> From stevenchen
> webchat 38798579
>
> ________________________________
> 发件人: wenlong.lwl <we...@gmail.com>
> 发送时间: Thursday, November 7, 2019 2:13:43 PM
> 收件人: user-zh@flink.apache.org <us...@flink.apache.org>
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>
> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>
> On Thu, 7 Nov 2019 at 09:22, Chennet Steven <st...@live.com> wrote:
>
>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
>> 如何在聚合函数中使用State?
>>
>>
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> TypeInformation}
>> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
>> import org.apache.flink.api.java.typeutils.TupleTypeInfo
>> import org.apache.flink.table.functions.{AggregateFunction,
>> FunctionContext}
>> import java.lang.{Iterable => JIterable}
>>
>>
>> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>>
>> class IntDiffSumFunction extends AggregateFunction[Int,
>> IntDiffSumAccumulator] {
>>
>> override def open(context: FunctionContext): Unit = {
>> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
>> //getRuntimeContext.getState(desc)
>> val a = this.hashCode()
>> print(s"hashCode:$a")
>> super.open(context)
>> }
>>
>> override def createAccumulator(): IntDiffSumAccumulator = {
>> val acc = new IntDiffSumAccumulator()
>> acc.f0 = 0
>> acc.f1 = false
>> acc
>> }
>>
>> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
>> accumulator.f0 += value
>> accumulator.f1 = true
>> }
>>
>> override def getValue(accumulator: IntDiffSumAccumulator): Int = {
>> if (accumulator.f1) {
>>
>> accumulator.f0
>> } else {
>> Int.MinValue
>> }
>> }
>>
>> def merge(acc: IntDiffSumAccumulator, its:
>> JIterable[IntDiffSumAccumulator]) = {
>> val iter = its.iterator()
>> while (true) {
>> val a = iter.next()
>> if (a.f1) {
>> acc.f0 += a.f0
>> acc.f1 = true
>> }
>> }
>> }
>>
>> def resetAccumulator(acc: IntDiffSumAccumulator) = {
>> acc.f0 = 0
>> acc.f1 = false
>> }
>>
>> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
>> new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
>> BasicTypeInfo.BOOLEAN_TYPE_INFO)
>> }
>>
>>
>> From stevenchen
>> webchat 38798579
>>
>>
>>
回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
Posted by Chennet Steven <st...@live.com>.
在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如何在自定义函数中使用,
能否给个example或者是test代码的链接啊?
From stevenchen
webchat 38798579
________________________________
发件人: wenlong.lwl <we...@gmail.com>
发送时间: Thursday, November 7, 2019 2:13:43 PM
收件人: user-zh@flink.apache.org <us...@flink.apache.org>
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
On Thu, 7 Nov 2019 at 09:22, Chennet Steven <st...@live.com> wrote:
> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
> 如何在聚合函数中使用State?
>
>
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
> TypeInformation}
> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
> import org.apache.flink.api.java.typeutils.TupleTypeInfo
> import org.apache.flink.table.functions.{AggregateFunction,
> FunctionContext}
> import java.lang.{Iterable => JIterable}
>
>
> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>
> class IntDiffSumFunction extends AggregateFunction[Int,
> IntDiffSumAccumulator] {
>
> override def open(context: FunctionContext): Unit = {
> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
> //getRuntimeContext.getState(desc)
> val a = this.hashCode()
> print(s"hashCode:$a")
> super.open(context)
> }
>
> override def createAccumulator(): IntDiffSumAccumulator = {
> val acc = new IntDiffSumAccumulator()
> acc.f0 = 0
> acc.f1 = false
> acc
> }
>
> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
> accumulator.f0 += value
> accumulator.f1 = true
> }
>
> override def getValue(accumulator: IntDiffSumAccumulator): Int = {
> if (accumulator.f1) {
>
> accumulator.f0
> } else {
> Int.MinValue
> }
> }
>
> def merge(acc: IntDiffSumAccumulator, its:
> JIterable[IntDiffSumAccumulator]) = {
> val iter = its.iterator()
> while (true) {
> val a = iter.next()
> if (a.f1) {
> acc.f0 += a.f0
> acc.f1 = true
> }
> }
> }
>
> def resetAccumulator(acc: IntDiffSumAccumulator) = {
> acc.f0 = 0
> acc.f1 = false
> }
>
> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
> new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.BOOLEAN_TYPE_INFO)
> }
>
>
> From stevenchen
> webchat 38798579
>
>
>
Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
Posted by "wenlong.lwl" <we...@gmail.com>.
可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
On Thu, 7 Nov 2019 at 09:22, Chennet Steven <st...@live.com> wrote:
> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
> 如何在聚合函数中使用State?
>
>
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
> TypeInformation}
> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
> import org.apache.flink.api.java.typeutils.TupleTypeInfo
> import org.apache.flink.table.functions.{AggregateFunction,
> FunctionContext}
> import java.lang.{Iterable => JIterable}
>
>
> class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
>
> class IntDiffSumFunction extends AggregateFunction[Int,
> IntDiffSumAccumulator] {
>
> override def open(context: FunctionContext): Unit = {
> // Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
> //getRuntimeContext.getState(desc)
> val a = this.hashCode()
> print(s"hashCode:$a")
> super.open(context)
> }
>
> override def createAccumulator(): IntDiffSumAccumulator = {
> val acc = new IntDiffSumAccumulator()
> acc.f0 = 0
> acc.f1 = false
> acc
> }
>
> def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
> accumulator.f0 += value
> accumulator.f1 = true
> }
>
> override def getValue(accumulator: IntDiffSumAccumulator): Int = {
> if (accumulator.f1) {
>
> accumulator.f0
> } else {
> Int.MinValue
> }
> }
>
> def merge(acc: IntDiffSumAccumulator, its:
> JIterable[IntDiffSumAccumulator]) = {
> val iter = its.iterator()
> while (true) {
> val a = iter.next()
> if (a.f1) {
> acc.f0 += a.f0
> acc.f1 = true
> }
> }
> }
>
> def resetAccumulator(acc: IntDiffSumAccumulator) = {
> acc.f0 = 0
> acc.f1 = false
> }
>
> override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
> new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.BOOLEAN_TYPE_INFO)
> }
>
>
> From stevenchen
> webchat 38798579
>
>
>
Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
Posted by Chennet Steven <st...@live.com>.
尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext
如何在聚合函数中使用State?
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.table.functions.{AggregateFunction, FunctionContext}
import java.lang.{Iterable => JIterable}
class IntDiffSumAccumulator extends JTuple2[Int, Boolean]
class IntDiffSumFunction extends AggregateFunction[Int, IntDiffSumAccumulator] {
override def open(context: FunctionContext): Unit = {
// Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State
//getRuntimeContext.getState(desc)
val a = this.hashCode()
print(s"hashCode:$a")
super.open(context)
}
override def createAccumulator(): IntDiffSumAccumulator = {
val acc = new IntDiffSumAccumulator()
acc.f0 = 0
acc.f1 = false
acc
}
def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {
accumulator.f0 += value
accumulator.f1 = true
}
override def getValue(accumulator: IntDiffSumAccumulator): Int = {
if (accumulator.f1) {
accumulator.f0
} else {
Int.MinValue
}
}
def merge(acc: IntDiffSumAccumulator, its: JIterable[IntDiffSumAccumulator]) = {
val iter = its.iterator()
while (true) {
val a = iter.next()
if (a.f1) {
acc.f0 += a.f0
acc.f1 = true
}
}
}
def resetAccumulator(acc: IntDiffSumAccumulator) = {
acc.f0 = 0
acc.f1 = false
}
override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =
new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO)
}
From stevenchen
webchat 38798579
答复: 为何会报"Window can only be defined over a time attribute column."??
Posted by Chennet Steven <St...@live.com>.
貌似问题变成这样子
程序流程如下:WM=watermark
source(没有WM)-->转换A-->TableA(没有WM)--->转换B--->TableB(没有WM)---->转换C(sql TUMBLE)-->TableC-->Sink
为了让转换C的Sql能够成功执行,如何在TableB上Assign一个Watermark??
From stevenchen
webchat 38798579
________________________________
发件人: Chennet Steven
发送时间: Friday, June 21, 2019 11:34:25 AM
收件人: user-zh@flink.apache.org
主题: 为何会报"Window can only be defined over a time attribute column."??
Flink 1.4.2
执行Sql SELECT Tag, SUM(PowerX) FROM TimeTable GROUP BY TUMBLE(UptMs, INTERVAL '1' HOUR),Tag
会报异常,但是感觉UptMs列类型是TimeStamp啊?
Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:69)
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:82)
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:87)
at org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:317)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:506)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:385)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:251)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:210)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:197)
at org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:257)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:665)
at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:730)
at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
at org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
at com.teld.bdp.tablewindow.TubleApp$.main(TubleApp.scala:56)
at com.teld.bdp.tablewindow.TubleApp.main(TubleApp.scala)
Disconnected from the target VM, address: '127.0.0.1:60155', transport: 'socket'
代码如下
package com.teld.bdp.tablewindow
import java.io.Serializable
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.Types
import org.apache.flink.api.java.tuple.Tuple3
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row
import org.apache.flink.table.api.TableEnvironment
// Flink 1.4.2
object TubleApp {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.createLocalEnvironment(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tableEnv = TableEnvironment.getTableEnvironment(senv)
tableEnv.registerFunction("TOTIMESTAMP", new ConvertLongToTimeStamp())
// 2019-06-20 10:27:57 1560997677000
// 2019-06-20 11:27:57 1561001277000
// 2019-06-20 12:27:57 1561004877000
val source = senv.fromElements(
new Tuple3("a", 1, 1560997677000L),
new Tuple3("b", 1, 1560997677000L),
new Tuple3("a", 2, 1561001277000L),
new Tuple3("b", 2, 1561001277000L),
new Tuple3("a", 3, 1561004877000L),
new Tuple3("b", 3, 1561004877000L)
)
import org.apache.flink.table.api.scala._
tableEnv.registerDataStream("Temp", source, 'Tag, 'PowerX, 'UptTime)
//-----------------------------源Table-----------------------
val sourceTable = tableEnv.sqlQuery("select * from Temp")
// root
// |-- Tag: String
// |-- PowerX: Integer
// |-- UptTime: Long
sourceTable.printSchema()
tableEnv.registerTable("SourceTable", sourceTable)
//-----------------------------转换成TimeStamp-----------------------
val timeTable = tableEnv.sqlQuery("SELECT Tag, PowerX, UptTime, TOTIMESTAMP(UptTime) AS UptMs FROM SourceTable")
// root
// |-- Tag: String
// |-- PowerX: Integer
// |-- UptTime: Long
// |-- UptMs: Timestamp
timeTable.printSchema()
tableEnv.registerTable("TimeTable", timeTable)
tableEnv.toAppendStream[Row](timeTable).print()
//------------------------------agg-------------------------------------
val aggTable = tableEnv.sqlQuery("SELECT Tag, SUM(PowerX) FROM TimeTable GROUP BY TUMBLE(UptMs, INTERVAL '1' HOUR),Tag")
// root
// |-- Tag: String
// |-- EXPR$1: Integer
aggTable.printSchema()
// 为啥这下面这个聚合的table会抛异常啊?
// Exception in thread "main" org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
tableEnv.toRetractStream[Row](aggTable).print()
senv.execute("abc")
}
}
class ConvertLongToTimeStamp extends ScalarFunction with Serializable {
def eval(uptTime: Long): Long = uptTime
override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = Types.SQL_TIMESTAMP
}
From stevenchen
webchat 38798579