You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Porritt, James" <Ja...@uk.mlp.com> on 2018/07/23 12:33:33 UTC

Trying to implement UpsertStreamTableSink in Java

I put this class together when trying to create my own upsertable table sink in Java:

public class MyTableSink implements UpsertStreamTableSink<Row> {
    @Override
    public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
        return null;
    }

    @Override
    public void setKeyFields(String[] keys) {
        System.out.println("setKeyFields" + keys);
    }

    @Override
    public void setIsAppendOnly(Boolean isAppendOnly) {

    }

    @Override
    public String[] getFieldNames() {
        return new String[0];
    }

    @Override
    public TypeInformation<?>[] getFieldTypes() {
        return new TypeInformation[0];
    }

    @Override
    public TypeInformation<Row> getRecordType() {
        return null;
    }

    @Override
    public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
        return new TupleTypeInfo<Tuple2<Boolean, Row>>();
    }

    @Override
    public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
        dataStream.print();
    }
}

I try and link it to my StreamTable with:

mystreamtable.writeToSink(new MyTableSink());

For some reason though I'm getting the error:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
Caused by: org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.
        at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:284)
        at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
        at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
        at alphagen_stats.KafkaAlphaGen.main(KafkaAlphaGen.java:265)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
        ... 12 more

What am I doing wrong?
######################################################################

The information contained in this communication is confidential and

intended only for the individual(s) named above. If you are not a named

addressee, please notify the sender immediately and delete this email

from your system and do not disclose the email or any part of it to any

person. The views expressed in this email are the views of the author

and do not necessarily represent the views of Millennium Capital Partners

LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic

communications of MCP LLP and its affiliates, including telephone

communications, may be electronically archived and subject to review

and/or disclosure to someone other than the recipient. MCP LLP is

authorized and regulated by the Financial Conduct Authority. Millennium

Capital Partners LLP is a limited liability partnership registered in

England & Wales with number OC312897 and with its registered office at

50 Berkeley Street, London, W1J 8HD.

######################################################################

Re: Trying to implement UpsertStreamTableSink in Java

Posted by Timo Walther <tw...@apache.org>.
Hi James,

the method `Table.writeToSink()` calls `configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes)` internally. Since you return null, you 
are trying to register null instead of a table sink.

I hope this helps.

Regards,
Timo


Am 23.07.18 um 14:33 schrieb Porritt, James:
>
> I put this class together when trying to create my own upsertable 
> table sink in Java:
>
> public class MyTableSink implements UpsertStreamTableSink<Row> {
>
> @Override
>
> public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, 
> TypeInformation<?>[] fieldTypes) {
>
> return null;
>
> }
>
> @Override
>
> public void setKeyFields(String[] keys) {
>
> System.out.println("setKeyFields" + keys);
>
> }
>
> @Override
>
> public void setIsAppendOnly(Boolean isAppendOnly) {
>
> }
>
> @Override
>
> public String[] getFieldNames() {
>
> return new String[0];
>
> }
>
> @Override
>
> public TypeInformation<?>[] getFieldTypes() {
>
> return new TypeInformation[0];
>
> }
>
> @Override
>
> public TypeInformation<Row> getRecordType() {
>
> return null;
>
> }
>
> @Override
>
> public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
>
> return new TupleTypeInfo<Tuple2<Boolean, Row>>();
>
> }
>
> @Override
>
> public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
>
> dataStream.print();
>
> }
>
> }
>
> I try and link it to my StreamTable with:
>
> mystreamtable.writeToSink(new MyTableSink());
>
> For some reason though I’m getting the error:
>
> org.apache.flink.client.program.ProgramInvocationException: The main 
> method caused an error.
>
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
>
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
>
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
>
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:781)
>
> at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:275)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
>
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at javax.security.auth.Subject.doAs(Subject.java:422)
>
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
>
> Caused by: org.apache.flink.table.api.TableException: Stream Tables 
> can only be emitted by AppendStreamTableSink, RetractStreamTableSink, 
> or UpsertStreamTableSink.
>
> at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:284)
>
> at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
>
> at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
>
> at alphagen_stats.KafkaAlphaGen.main(KafkaAlphaGen.java:265)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
>    at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>
> ... 12 more
>
> What am I doing wrong?
>
> ######################################################################
> The information contained in this communication is confidential and
> intended only for the individual(s) named above. If you are not a named
> addressee, please notify the sender immediately and delete this email
> from your system and do not disclose the email or any part of it to any
> person. The views expressed in this email are the views of the author
> and do not necessarily represent the views of Millennium Capital Partners
> LLP (MCP LLP) or any of its affiliates. Outgoing and incoming electronic
> communications of MCP LLP and its affiliates, including telephone
> communications, may be electronically archived and subject to review
> and/or disclosure to someone other than the recipient. MCP LLP is
> authorized and regulated by the Financial Conduct Authority. Millennium
> Capital Partners LLP is a limited liability partnership registered in
> England & Wales with number OC312897 and with its registered office at
> 50 Berkeley Street, London, W1J 8HD.
> ######################################################################
>