You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by hzp <hz...@qq.com> on 2019/10/12 15:42:01 UTC

[flink sql] table in subquery join temporal table raise java.lang.NullPointerException

Hi all,


I'm using flink sql to join a temporal table in a subquery, but it raises java.lang.NullPointerException when execute.


Orders is a table source, and Rates is a temporal table




Here are my sqls:

// works
SELECT o_amount * r_amount AS amount
FROM Orders, LATERAL TABLE (Rates(o_proctime))
WHERE r_currency = o_currency



// sql raise exception
SELECT o_amount * r_amount AS amount
FROM (SELECT * FROM Orders) as Orders, LATERAL TABLE (Rates(o_proctime))
WHERE r_currency = o_currency




The error stack: 
Exception in thread "main" java.lang.NullPointerException
	at org.apache.flink.table.planner.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:167)
	at org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:98)
	at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
	at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
	at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
	at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
	at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
	at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
	at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.immutable.Range.foreach(Range.scala:160)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166)
	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88)
	at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
	at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212)
	at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:77)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:281)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:276)
	at cn.easyops.flink_sql.Test.main(Test.java:159)





Here is the complete test code, hope anyone can help, thanks! 


package test.flinksql;


import java.util.Random;


import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.sinks.RetractStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;


public class TemporalTableFunctionTest {


    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        sEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);


        EnvironmentSettings bsSettings = 
            EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
            .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, bsSettings);
        
        tEnv.registerTableSource("RatesHistory", new FooSource(new String[] {"r_currency", "r_amount", "r_proctime"}));
        Table ratesHistory = tEnv.sqlQuery("SELECT * FROM RatesHistory");
        TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency");
        tEnv.registerFunction("Rates", rates);
        
        tEnv.registerTableSource("Orders", new FooSource(new String[] {"o_currency", "o_amount", "o_proctime"}));
        
        tEnv.registerTableSink("OutSink", new SysoSink());


        // works
        Table prices = tEnv.sqlQuery(
                "  SELECT                                     \r\n" +
                "    o_amount * r_amount AS amount            \r\n" +
                "  FROM Orders                                \r\n" +
                "     , LATERAL TABLE (Rates(o_proctime))     \r\n" +
                "  WHERE r_currency = o_currency              ");


        // Raise NullPointerException
        //Table prices = tEnv.sqlQuery(
        //        "  SELECT                                     \r\n" +
        //        "    o_amount * r_amount AS amount            \r\n" +
        //        "  FROM (SELECT * FROM Orders) as O                                \r\n" +
        //        "     , LATERAL TABLE (Rates(o_proctime))     \r\n" +
        //        "  WHERE r_currency = o_currency              ");
        
        prices.insertInto("OutSink");
        
        sEnv.execute();
    }
    


    public static class SysoSink implements RetractStreamTableSink<Row> {
        @Override
        public String[] getFieldNames() {
            return new String[] {"out"};
        }
        @Override
        public TypeInformation<?>[] getFieldTypes() {
            return new TypeInformation[] {Types.LONG()};
        }
        @Override
        public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
            return this;
        }
        @Override
        public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
            consumeDataStream(dataStream);
        }
        @Override
        public TypeInformation<Row> getRecordType() {
            return Types.ROW(getFieldNames(), getFieldTypes());
        }
        @Override
        public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
            return dataStream.addSink(new SysoSinkFunction<Tuple2<Boolean, Row>>());
        }
    }
    
    @SuppressWarnings("serial")
    public static class SysoSinkFunction<T> implements SinkFunction<T> {
        @Override
        public void invoke(T value) throws Exception {
            System.out.println(value);
        }
    }
    
    public static class FooSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
        
        String[] fieldNames;
        
        
        public FooSource(String[] fieldNames) {
            this.fieldNames = fieldNames;
        }


        @Override
        public TableSchema getTableSchema() {
            return new TableSchema(fieldNames, new TypeInformation[] {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
        }
        
        @Override
        public TypeInformation<Row> getReturnType() {
            return Types.ROW(fieldNames, new TypeInformation[] {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
        }


        @Override
        public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
            return execEnv.addSource(new SourceFunction<Row>() {


                @Override
                public void run(SourceContext<Row> ctx) throws Exception {
                    Random random = new Random();
                    
                    while (true) {


                        Row row = new Row(3);
                        row.setField(0, "Euro" + random.nextLong() % 3);
                        row.setField(1, random.nextLong() % 200 );
                        row.setField(2, new java.sql.Timestamp(System.currentTimeMillis()));
                        ctx.collect(row);
                        Thread.sleep(100);
                    }
                    
                }


                @Override
                public void cancel() {
                    System.out.println("cancelling ----------------------------------------------");
                    
                }
            }, getReturnType());
        }


        @Override
        public String getProctimeAttribute() {
            return fieldNames[2];
        }
    }


}

Re: [flink sql] table in subquery join temporal table raise java.lang.NullPointerException

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for reporting this issue. I've pulled in Jark and Kurt who might
help you with this problem.

Cheers,
Till

On Sat, Oct 12, 2019 at 5:42 PM hzp <hz...@qq.com> wrote:

> Hi all,
>
> I'm using flink sql to join a temporal table in a subquery, but it raises
> java.lang.NullPointerException when execute.
>
> Orders is a table source, and Rates is a temporal table
>
> Here are my sqls:
> // works
> SELECT o_amount * r_amount AS amount
> FROM Orders, LATERAL TABLE (Rates(o_proctime))
> WHERE r_currency = o_currency
>
> // sql raise exception
> SELECT o_amount * r_amount AS amount
> FROM (SELECT * FROM Orders) as Orders, LATERAL TABLE (Rates(o_proctime))
> WHERE r_currency = o_currency
>
> The error stack:
> Exception in thread "main" java.lang.NullPointerException
> at
> org.apache.flink.table.planner.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:167)
> at
> org.apache.flink.table.planner.plan.rules.logical.LogicalCorrelateToJoinFromTemporalTableFunctionRule.onMatch(LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala:98)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
> at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.immutable.Range.foreach(Range.scala:160)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:166)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:88)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:212)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:77)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:281)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnvironmentImpl.java:276)
> at cn.easyops.flink_sql.Test.main(Test.java:159)
>
>
> Here is the complete test code, hope anyone can help, thanks!
>
> package test.flinksql;
>
> import java.util.Random;
>
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSink;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.streaming.api.functions.source.SourceFunction;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableSchema;
> import org.apache.flink.table.api.Types;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.TemporalTableFunction;
> import org.apache.flink.table.sinks.RetractStreamTableSink;
> import org.apache.flink.table.sinks.TableSink;
> import org.apache.flink.table.sources.DefinedProctimeAttribute;
> import org.apache.flink.table.sources.StreamTableSource;
> import org.apache.flink.types.Row;
>
> public class TemporalTableFunctionTest {
>
>     public static void main(String[] args) throws Exception {
>
>         StreamExecutionEnvironment sEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         sEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>
>         EnvironmentSettings bsSettings =
>             EnvironmentSettings.newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>             .build();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv,
> bsSettings);
>
>         tEnv.registerTableSource("RatesHistory", new FooSource(new
> String[] {"r_currency", "r_amount", "r_proctime"}));
>         Table ratesHistory = tEnv.sqlQuery("SELECT * FROM RatesHistory");
>         TemporalTableFunction rates =
> ratesHistory.createTemporalTableFunction("r_proctime", "r_currency");
>         tEnv.registerFunction("Rates", rates);
>
>         tEnv.registerTableSource("Orders", new FooSource(new String[]
> {"o_currency", "o_amount", "o_proctime"}));
>
>         tEnv.registerTableSink("OutSink", new SysoSink());
>
>         // works
>         Table prices = tEnv.sqlQuery(
>                 "  SELECT                                     \r\n" +
>                 "    o_amount * r_amount AS amount            \r\n" +
>                 "  FROM Orders                                \r\n" +
>                 "     , LATERAL TABLE (Rates(o_proctime))     \r\n" +
>                 "  WHERE r_currency = o_currency              ");
>
>         // Raise NullPointerException
>         //Table prices = tEnv.sqlQuery(
>         //        "  SELECT                                     \r\n" +
>         //        "    o_amount * r_amount AS amount            \r\n" +
>         //        "  FROM (SELECT * FROM Orders) as O
>           \r\n" +
>         //        "     , LATERAL TABLE (Rates(o_proctime))     \r\n" +
>         //        "  WHERE r_currency = o_currency              ");
>
>         prices.insertInto("OutSink");
>
>         sEnv.execute();
>     }
>
>
>     public static class SysoSink implements RetractStreamTableSink<Row> {
>         @Override
>         public String[] getFieldNames() {
>             return new String[] {"out"};
>         }
>         @Override
>         public TypeInformation<?>[] getFieldTypes() {
>             return new TypeInformation[] {Types.LONG()};
>         }
>         @Override
>         public TableSink<Tuple2<Boolean, Row>> configure(String[]
> fieldNames, TypeInformation<?>[] fieldTypes) {
>             return this;
>         }
>         @Override
>         public void emitDataStream(DataStream<Tuple2<Boolean, Row>>
> dataStream) {
>             consumeDataStream(dataStream);
>         }
>         @Override
>         public TypeInformation<Row> getRecordType() {
>             return Types.ROW(getFieldNames(), getFieldTypes());
>         }
>         @Override
>         public DataStreamSink<Tuple2<Boolean, Row>>
> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
>             return dataStream.addSink(new SysoSinkFunction<Tuple2<Boolean,
> Row>>());
>         }
>     }
>
>     @SuppressWarnings("serial")
>     public static class SysoSinkFunction<T> implements SinkFunction<T> {
>         @Override
>         public void invoke(T value) throws Exception {
>             System.out.println(value);
>         }
>     }
>
>     public static class FooSource implements StreamTableSource<Row>,
> DefinedProctimeAttribute {
>
>         String[] fieldNames;
>
>
>         public FooSource(String[] fieldNames) {
>             this.fieldNames = fieldNames;
>         }
>
>         @Override
>         public TableSchema getTableSchema() {
>             return new TableSchema(fieldNames, new TypeInformation[]
> {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
>         }
>
>         @Override
>         public TypeInformation<Row> getReturnType() {
>             return Types.ROW(fieldNames, new TypeInformation[]
> {Types.STRING(), Types.LONG(), Types.SQL_TIMESTAMP()});
>         }
>
>         @Override
>         public DataStream<Row> getDataStream(StreamExecutionEnvironment
> execEnv) {
>             return execEnv.addSource(new SourceFunction<Row>() {
>
>                 @Override
>                 public void run(SourceContext<Row> ctx) throws Exception {
>                     Random random = new Random();
>
>                     while (true) {
>
>                         Row row = new Row(3);
>                         row.setField(0, "Euro" + random.nextLong() % 3);
>                         row.setField(1, random.nextLong() % 200 );
>                         row.setField(2, new
> java.sql.Timestamp(System.currentTimeMillis()));
>                         ctx.collect(row);
>                         Thread.sleep(100);
>                     }
>
>                 }
>
>                 @Override
>                 public void cancel() {
>                     System.out.println("cancelling
> ----------------------------------------------");
>
>                 }
>             }, getReturnType());
>         }
>
>         @Override
>         public String getProctimeAttribute() {
>             return fieldNames[2];
>         }
>     }
>
> }
>
>