You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 李佟 <li...@iie.ac.cn> on 2019/12/12 01:55:16 UTC
Flink1.9.1的SQL向前不兼容的问题
近期进行Flink升级,将原来的程序从老的集群(1.8.0运行正常)迁移到新的集群(1.9.1)中。在部署程序的时候发现在1.9.1的集群中,原来运行正常的Flink SQL的程序无法执行,异常如下:
org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
at org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90)
跟踪到反编译的scala文件并设置断点,发现下图中的红框部分没有执行,直接跳过。
功能很简单,就是在某个时间窗对数值求和。测试用例如下:
package org.flowmatrix.isp.traffic.accounting.test;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
import org.apache.flink.types.Row;
import org.junit.Test;
import javax.annotation.Nullable;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class TestSql {
@Test
public void testAccountingSql() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
try {
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
SimpleTableSource source = new SimpleTableSource();
Table t = tableEnv.fromTableSource(source);
String interval = "5"; //5 second
System.out.println("source schema is " + source.getTableSchema());
Table sqlResult = tableEnv.sqlQuery("SELECT " +
" TUMBLE_START(UserActionTime, INTERVAL '" + interval + "' SECOND) as rowTime, " +
" Username," +
" SUM(Data) as Data " +
" FROM " + t +
" GROUP BY TUMBLE(UserActionTime, INTERVAL '" + interval + "' SECOND),Username");
String[] fieldNames = {
"rowTime",
"Username", "Data"};
TypeInformation[] fieldTypes = {
TypeInformation.of(Timestamp.class),
TypeInformation.of(String.class),
TypeInformation.of(Long.class)};
TableSink sink1 = new CsvTableSink("/tmp/data.log", ",");
sink1 = sink1.configure(fieldNames, fieldTypes);
tableEnv.registerTableSink("EsSinkTable", sink1);
System.out.println("sql result schema is " + sqlResult.getSchema());
tableEnv.sqlUpdate("insert into EsSinkTable select " +
"rowTime,Username,Data from " + sqlResult + "");
env.execute("test");
} catch (Exception e) {
e.printStackTrace();
System.err.println("start program error. FlowMatrix --zookeeper <zookeeperAdress> --config <configpath>" +
" --name <jobName> --interval <intervalInMinute> --indexName <indexName>");
System.err.println(e.toString());
return;
}
}
public static class SimpleTableSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
return env.fromCollection(genertateData()).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Row>() {
private long lastWaterMarkMillSecond = -1;
private long waterMarkPeriodMillSecond = 1000;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Row lastElement, long extractedTimestamp) {
if(extractedTimestamp - lastWaterMarkMillSecond >= waterMarkPeriodMillSecond){
lastWaterMarkMillSecond = extractedTimestamp;
return new Watermark(extractedTimestamp);
}
return null;
}
@Override
public long extractTimestamp(Row element, long previousElementTimestamp) {
return ((Long)element.getField(0))*1000;
}
});
}
@Override
public TableSchema getTableSchema() {
TableSchema schema = TableSchema.builder()
.field("Username", Types.STRING())
.field("Data", Types.LONG())
.field("UserActionTime", Types.SQL_TIMESTAMP())
.build();
return schema;
}
@Override
public TypeInformation<Row> getReturnType() {
String[] names = new String[]{"Username", "Data", "UserActionTime"};
TypeInformation[] types =
new TypeInformation[]{Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()};
return Types.ROW(names, types);
}
@Override
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
"UserActionTime",
new ExistingField("UserActionTime"),
new AscendingTimestamps());
List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
return listRowtimeAttrDescr;
}
private static List<Row> genertateData() {
List<Row> rows = new ArrayList<>();
long startTime = System.currentTimeMillis() / 1000 - 10000;
for (int i = 0; i < 10000; i++) {
rows.add(buildRecord(startTime, i));
}
return rows;
}
private static Row buildRecord(long startTime, int i) {
Row row = new Row(3);
row.setField(0, "fox"); //Username
row.setField(1, Math.random()); //Data
row.setField(2, startTime + i); //UserActionTime
return row;
}
}
}
Re: Flink1.9.1的SQL向前不兼容的问题
Posted by Kurt Young <yk...@gmail.com>.
Hi,
建议你翻译成英文然后到jira里建个issue。
Best,
Kurt
On Thu, Dec 12, 2019 at 11:39 PM 李佟 <li...@iie.ac.cn> wrote:
> 近期进行Flink升级,将原来的程序从老的集群(1.8.0运行正常)迁移到新的集群(1.9.1)中。在部署程序的时候发现在1.9.1的集群中,原来运行正常的Flink
> SQL的程序无法执行,异常如下:
>
>
> org.apache.flink.table.api.ValidationException: *Window can only be
> defined over a time attribute column.*
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
>
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:90)
>
>
>
> 跟踪到反编译的scala文件并设置断点,发现下图中的红框部分没有执行,直接跳过。
>
>
> 功能很简单,就是在某个时间窗对数值求和。测试用例如下:
>
>
> package org.flowmatrix.isp.traffic.accounting.test;
>
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.scala.typeutils.Types;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
> import org.apache.flink.streaming.api.watermark.Watermark;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableSchema;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.sinks.CsvTableSink;
> import org.apache.flink.table.sinks.TableSink;
> import org.apache.flink.table.sources.DefinedRowtimeAttributes;
> import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
> import org.apache.flink.table.sources.StreamTableSource;
> import org.apache.flink.table.sources.tsextractors.ExistingField;
> import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
> import org.apache.flink.types.Row;
> import org.junit.Test;
>
> import javax.annotation.Nullable;
> import java.sql.Timestamp;
> import java.util.ArrayList;
> import java.util.Collections;
> import java.util.List;
>
> public class TestSql {
> @Test
> public void testAccountingSql() {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.setParallelism(1);
>
> try {
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
>
> SimpleTableSource source = new SimpleTableSource();
> Table t = tableEnv.fromTableSource(source);
>
> String interval = "5"; //5 second
> System.out.println("source schema is " +
> source.getTableSchema());
>
> Table sqlResult = tableEnv.sqlQuery("SELECT " +
> " TUMBLE_START(UserActionTime, INTERVAL '" + interval
> + "' SECOND) as rowTime, " +
> " Username," +
> " SUM(Data) as Data " +
> " FROM " + t +
> " GROUP BY TUMBLE(UserActionTime, INTERVAL '" +
> interval + "' SECOND),Username");
>
>
> String[] fieldNames = {
> "rowTime",
> "Username", "Data"};
> TypeInformation[] fieldTypes = {
> TypeInformation.of(Timestamp.class),
> TypeInformation.of(String.class),
> TypeInformation.of(Long.class)};
>
> TableSink sink1 = new CsvTableSink("/tmp/data.log", ",");
> sink1 = sink1.configure(fieldNames, fieldTypes);
> tableEnv.registerTableSink("EsSinkTable", sink1);
> System.out.println("sql result schema is " +
> sqlResult.getSchema());
>
> tableEnv.sqlUpdate("insert into EsSinkTable select " +
> "rowTime,Username,Data from " + sqlResult + "");
>
> env.execute("test");
> } catch (Exception e) {
> e.printStackTrace();
> System.err.println("start program error. FlowMatrix
> --zookeeper <zookeeperAdress> --config <configpath>" +
> " --name <jobName> --interval <intervalInMinute>
> --indexName <indexName>");
> System.err.println(e.toString());
> return;
> }
> }
>
> public static class SimpleTableSource implements
> StreamTableSource<Row>, DefinedRowtimeAttributes {
> @Override
> public DataStream<Row> getDataStream(StreamExecutionEnvironment
> env) {
> return
> env.fromCollection(genertateData()).assignTimestampsAndWatermarks(new
> AssignerWithPunctuatedWatermarks<Row>() {
> private long lastWaterMarkMillSecond = -1;
> private long waterMarkPeriodMillSecond = 1000;
> @Nullable
> @Override
> public Watermark checkAndGetNextWatermark(Row lastElement,
> long extractedTimestamp) {
> if(extractedTimestamp - lastWaterMarkMillSecond >=
> waterMarkPeriodMillSecond){
> lastWaterMarkMillSecond = extractedTimestamp;
> return new Watermark(extractedTimestamp);
> }
> return null;
> }
>
> @Override
> public long extractTimestamp(Row element, long
> previousElementTimestamp) {
> return ((Long)element.getField(0))*1000;
> }
> });
> }
>
> @Override
> public TableSchema getTableSchema() {
> TableSchema schema = TableSchema.builder()
> .field("Username", Types.STRING())
> .field("Data", Types.LONG())
> .field("UserActionTime", Types.SQL_TIMESTAMP())
> .build();
> return schema;
> }
>
> @Override
> public TypeInformation<Row> getReturnType() {
> String[] names = new String[]{"Username", "Data",
> "UserActionTime"};
> TypeInformation[] types =
> new TypeInformation[]{Types.STRING(), Types.LONG(),
> Types.SQL_TIMESTAMP()};
> return Types.ROW(names, types);
> }
>
>
> @Override
> public List<RowtimeAttributeDescriptor>
> getRowtimeAttributeDescriptors() {
> RowtimeAttributeDescriptor rowtimeAttrDescr = new
> RowtimeAttributeDescriptor(
> "UserActionTime",
> new ExistingField("UserActionTime"),
> new AscendingTimestamps());
> List<RowtimeAttributeDescriptor> listRowtimeAttrDescr =
> Collections.singletonList(rowtimeAttrDescr);
> return listRowtimeAttrDescr;
> }
>
>
> private static List<Row> genertateData() {
> List<Row> rows = new ArrayList<>();
> long startTime = System.currentTimeMillis() / 1000 - 10000;
> for (int i = 0; i < 10000; i++) {
> rows.add(buildRecord(startTime, i));
> }
> return rows;
> }
>
> private static Row buildRecord(long startTime, int i) {
> Row row = new Row(3);
> row.setField(0, "fox"); //Username
> row.setField(1, Math.random()); //Data
> row.setField(2, startTime + i); //UserActionTime
> return row;
> }
> }
> }
>
>
>