You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by hzp <hz...@qq.com> on 2019/10/12 15:42:01 UTC
[flink sql] table in subquery join temporal table raise java.lang.NullPointerException
Hi all,
I'm using flink sql to join a temporal table in a subquery, but it raises java.lang.NullPointerException when execute.
Orders is a table source, and Rates is a temporal table
Here are my sqls:
// works
SELECT o_amount * r_amount AS amount
FROM Orders, LATERAL TABLE (Rates(o_proctime))
WHERE r_currency = o_currency
// sql raise exception
SELECT o_amount * r_amount AS amount
FROM (SELECT * FROM Orders) as Orders, LATERAL TABLE (Rates(o_proctime))
WHERE r_currency = o_currency
The error stack:
Exception in thread "main" java.lang.NullPointerException
at org.apache.flink.table.planner.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:167)
at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:98)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212)
at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:77)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:281)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:276)
at cn.easyops.flink_sql.Test.main(Test.java:159)
Here is the complete test code, hope anyone can help, thanks!
package test.flinksql;
import java.util.Random;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
public class TemporalTableFunctionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
sEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, bsSettings);
tEnv.registerTableSource("RatesHistory", new FooSource(new String[] {"r_currency", "r_amount", "r_proctime"}));
Table ratesHistory = tEnv.sqlQuery("SELECT * FROM RatesHistory");
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency");
tEnv.registerFunction("Rates", rates);
tEnv.registerTableSource("Orders", new FooSource(new String[] {"o_currency", "o_amount", "o_proctime"}));
tEnv.registerTableSink("OutSink", new SysoSink());
// works
Table prices = tEnv.sqlQuery(
" SELECT \r\n" +
" o_amount * r_amount AS amount \r\n" +
" FROM Orders \r\n" +
" , LATERAL TABLE (Rates(o_proctime)) \r\n" +
" WHERE r_currency = o_currency ");
// Raise NullPointerException
//Table prices = tEnv.sqlQuery(
// " SELECT \r\n" +
// " o_amount * r_amount AS amount \r\n" +
// " FROM (SELECT * FROM Orders) as O \r\n" +
// " , LATERAL TABLE (Rates(o_proctime)) \r\n" +
// " WHERE r_currency = o_currency ");
prices.insertInto("OutSink");
sEnv.execute();
}
public static class SysoSink implements RetractStreamTableSink<Row> {
@Override
public String[] getFieldNames() {
return new String[] {"out"};
}
@Override
public TypeInformation<?>[] getFieldTypes() {
return new TypeInformation[] {Types.LONG()};
}
@Override
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return this;
}
@Override
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
consumeDataStream(dataStream);
}
@Override
public TypeInformation<Row> getRecordType() {
return Types.ROW(getFieldNames(), getFieldTypes());
}
@Override
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
return dataStream.addSink(new SysoSinkFunction<Tuple2<Boolean, Row>>());
}
}
@SuppressWarnings("serial")
public static class SysoSinkFunction<T> implements SinkFunction<T> {
@Override
public void invoke(T value) throws Exception {
System.out.println(value);
}
}
public static class FooSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
String[] fieldNames;
public FooSource(String[] fieldNames) {
this.fieldNames = fieldNames;
}
@Override
public TableSchema getTableSchema() {
return new TableSchema(fieldNames, new TypeInformation[] {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
}
@Override
public TypeInformation<Row> getReturnType() {
return Types.ROW(fieldNames, new TypeInformation[] {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
return execEnv.addSource(new SourceFunction<Row>() {
@Override
public void run(SourceContext<Row> ctx) throws Exception {
Random random = new Random();
while (true) {
Row row = new Row(3);
row.setField(0, "Euro" + random.nextLong() % 3);
row.setField(1, random.nextLong() % 200 );
row.setField(2, new java.sql.Timestamp(System.currentTimeMillis()));
ctx.collect(row);
Thread.sleep(100);
}
}
@Override
public void cancel() {
System.out.println("cancelling ----------------------------------------------");
}
}, getReturnType());
}
@Override
public String getProctimeAttribute() {
return fieldNames[2];
}
}
}
Re: [flink sql] table in subquery join temporal table raise java.lang.NullPointerException
Posted by Till Rohrmann <tr...@apache.org>.
Thanks for reporting this issue. I've pulled in Jark and Kurt who might
help you with this problem.
Cheers,
Till
On Sat, Oct 12, 2019 at 5:42 PM hzp <hz...@qq.com> wrote:
> Hi all,
>
> I'm using flink sql to join a temporal table in a subquery, but it raises
> java.lang.NullPointerException when execute.
>
> Orders is a table source, and Rates is a temporal table
>
> Here are my sqls:
> // works
> SELECT o_amount * r_amount AS amount
> FROM Orders, LATERAL TABLE (Rates(o_proctime))
> WHERE r_currency = o_currency
>
> // sql raise exception
> SELECT o_amount * r_amount AS amount
> FROM (SELECT * FROM Orders) as Orders, LATERAL TABLE (Rates(o_proctime))
> WHERE r_currency = o_currency
>
> The error stack:
> Exception in thread "main" java.lang.NullPointerException
> at
> org.apache.flink.table.planner.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:167)
> at
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:98)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
> at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.immutable.Range.foreach(Range.scala:160)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:77)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:281)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:276)
> at cn.easyops.flink_sql.Test.main(Test.java:159)
>
>
> Here is the complete test code, hope anyone can help, thanks!
>
> package test.flinksql;
>
> import java.util.Random;
>
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSink;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableSchema;
> import org.apache.flink.table.api.Types;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.TemporalTableFunction;
> import org.apache.flink.table.sinks.RetractStreamTableSink;
> import org.apache.flink.table.sinks.TableSink;
> import org.apache.flink.table.sources.DefinedProctimeAttribute;
> import org.apache.flink.table.sources.StreamTableSource;
> import org.apache.flink.types.Row;
>
> public class TemporalTableFunctionTest {
>
> public static void main(String[] args) throws Exception {
>
> StreamExecutionEnvironment sEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> sEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv,
> bsSettings);
>
> tEnv.registerTableSource("RatesHistory", new FooSource(new
> String[] {"r_currency", "r_amount", "r_proctime"}));
> Table ratesHistory = tEnv.sqlQuery("SELECT * FROM RatesHistory");
> TemporalTableFunction rates =
> ratesHistory.createTemporalTableFunction("r_proctime", "r_currency");
> tEnv.registerFunction("Rates", rates);
>
> tEnv.registerTableSource("Orders", new FooSource(new String[]
> {"o_currency", "o_amount", "o_proctime"}));
>
> tEnv.registerTableSink("OutSink", new SysoSink());
>
> // works
> Table prices = tEnv.sqlQuery(
> " SELECT \r\n" +
> " o_amount * r_amount AS amount \r\n" +
> " FROM Orders \r\n" +
> " , LATERAL TABLE (Rates(o_proctime)) \r\n" +
> " WHERE r_currency = o_currency ");
>
> // Raise NullPointerException
> //Table prices = tEnv.sqlQuery(
> // " SELECT \r\n" +
> // " o_amount * r_amount AS amount \r\n" +
> // " FROM (SELECT * FROM Orders) as O
> \r\n" +
> // " , LATERAL TABLE (Rates(o_proctime)) \r\n" +
> // " WHERE r_currency = o_currency ");
>
> prices.insertInto("OutSink");
>
> sEnv.execute();
> }
>
>
> public static class SysoSink implements RetractStreamTableSink<Row> {
> @Override
> public String[] getFieldNames() {
> return new String[] {"out"};
> }
> @Override
> public TypeInformation<?>[] getFieldTypes() {
> return new TypeInformation[] {Types.LONG()};
> }
> @Override
> public TableSink<Tuple2<Boolean, Row>> configure(String[]
> fieldNames, TypeInformation<?>[] fieldTypes) {
> return this;
> }
> @Override
> public void emitDataStream(DataStream<Tuple2<Boolean, Row>>
> dataStream) {
> consumeDataStream(dataStream);
> }
> @Override
> public TypeInformation<Row> getRecordType() {
> return Types.ROW(getFieldNames(), getFieldTypes());
> }
> @Override
> public DataStreamSink<Tuple2<Boolean, Row>>
> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
> return dataStream.addSink(new SysoSinkFunction<Tuple2<Boolean,
> Row>>());
> }
> }
>
> @SuppressWarnings("serial")
> public static class SysoSinkFunction<T> implements SinkFunction<T> {
> @Override
> public void invoke(T value) throws Exception {
> System.out.println(value);
> }
> }
>
> public static class FooSource implements StreamTableSource<Row>,
> DefinedProctimeAttribute {
>
> String[] fieldNames;
>
>
> public FooSource(String[] fieldNames) {
> this.fieldNames = fieldNames;
> }
>
> @Override
> public TableSchema getTableSchema() {
> return new TableSchema(fieldNames, new TypeInformation[]
> {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
> }
>
> @Override
> public TypeInformation<Row> getReturnType() {
> return Types.ROW(fieldNames, new TypeInformation[]
> {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
> }
>
> @Override
> public DataStream<Row> getDataStream(StreamExecutionEnvironment
> execEnv) {
> return execEnv.addSource(new SourceFunction<Row>() {
>
> @Override
> public void run(SourceContext<Row> ctx) throws Exception {
> Random random = new Random();
>
> while (true) {
>
> Row row = new Row(3);
> row.setField(0, "Euro" + random.nextLong() % 3);
> row.setField(1, random.nextLong() % 200 );
> row.setField(2, new
> java.sql.Timestamp(System.currentTimeMillis()));
> ctx.collect(row);
> Thread.sleep(100);
> }
>
> }
>
> @Override
> public void cancel() {
> System.out.println("cancelling
> ----------------------------------------------");
>
> }
> }, getReturnType());
> }
>
> @Override
> public String getProctimeAttribute() {
> return fieldNames[2];
> }
> }
>
> }
>
>