You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Jigar Gajjar <ji...@gmail.com> on 2021/10/09 15:40:48 UTC

Custom Sink Object attribute issue

Hello Devs,


Here is my custom sink code.

`````````````````````````

public class FlinkNeptuneSink<IN> extends RichSinkFunction<IN> {
    static HttpClient client = HttpClient.newHttpClient();
    private static final long serialVersionUID = 1L;
    NeptuneClientFactory neptuneClientFactory;
    JsonLDWriteContext jsonLDWriteContext;
    String baseURI;
    Map contextJsonMap;
    String namespaceURI;

    public FlinkNeptuneSink(String protocol, String host, String port,
String baseURI, Map contextJsonMap, String namespaceURI) {
        neptuneClientFactory = new NeptuneClientFactory(protocol, host,
port);

        this.baseURI = baseURI;
        this.contextJsonMap = contextJsonMap;
        this.namespaceURI = namespaceURI;
    }

    @Override
    public void invoke(IN value, Context context) throws IOException {
    //neptuneClientFactory.getNeptuneClient()   (repository  attribute in
neptuneClientFactory   is null)
    try (RepositoryConnection conn =
neptuneClientFactory.getNeptuneClient().getConnection())      {
    }

    }
}

public class NeptuneClientFactory implements Serializable {
        private transient Repository repository;

        public NeptuneClientFactory(String protocol, String host, String
port) {
            this.repository = createNeptuneClient(protocol, host, port);
        }

    public static Repository createNeptuneClient(String protocol, String
host, String port) {
        String sparqlEndpoint = String.format("%s://%s:%s/sparql",
protocol, host, port);
        Repository repo = new SPARQLRepository(sparqlEndpoint);
        repo.init();
        return repo;
    }

        public Repository getNeptuneClient() {
            return repository;
        }
}


filtredNonEmptyP5.addSink(new FlinkNeptuneSink<>("https", "neptunehost",
"8182", "https://localhost/entity", contextJsonMap, "
https://localhost/namespaces/default"));

`````````

when  it invokes method then only neptuneClientFactory has a repository
value as null. not sure why, it has other attributes values properly set.

Is flink initializing sink attributes from somewhere else?
When I debug  then while creating sink  it
initializes neptuneClientFactory  properly but when it comes to invoke
method then the repository is blank.

Please help.

-- 
Thanks
Jigar Gajjar

Re: Custom Sink Object attribute issue

Posted by Arvid Heise <ar...@apache.org>.
Hi Jigar,

I'm moving your user question to the user ML.

The best place to initialize transient fields is in

 private void readObject(java.io.ObjectInputStream in)
     throws IOException, ClassNotFoundException;

as described in [1]:

Remember that transient fields will be initialized to their default values.
> You can provide a readObject method that restores transient fields to
> acceptable values, or alternatively, lazily initialize those fields first
> time they are used.

[1] Effective Java, Item 87 : Consider using a custom serialized form;
https://ahdak.github.io/blog/effective-java-part-11/


On Mon, Oct 11, 2021 at 10:56 AM Till Rohrmann <tr...@apache.org> wrote:

> Hi Jigar,
>
> in order to run the Sink function on the Flink cluster, it will be
> serialized. Since you marked the repository as transient, it won't be
> shipped to the cluster. So if Repository is Serializable, you can ship it
> to the cluster. If not, then you need to reconstruct the Repository on the
> cluster (e.g. on the first invoke call or the open call on the
> RichSinkFunction).
>
> Cheers,
> Till
>
> On Mon, Oct 11, 2021 at 10:12 AM Jigar Gajjar <ji...@gmail.com>
> wrote:
>
> > Hello Devs,
> >
> >
> > Here is my custom sink code.
> >
> > `````````````````````````
> >
> > public class FlinkNeptuneSink<IN> extends RichSinkFunction<IN> {
> >     static HttpClient client = HttpClient.newHttpClient();
> >     private static final long serialVersionUID = 1L;
> >     NeptuneClientFactory neptuneClientFactory;
> >     JsonLDWriteContext jsonLDWriteContext;
> >     String baseURI;
> >     Map contextJsonMap;
> >     String namespaceURI;
> >
> >     public FlinkNeptuneSink(String protocol, String host, String port,
> > String baseURI, Map contextJsonMap, String namespaceURI) {
> >         neptuneClientFactory = new NeptuneClientFactory(protocol, host,
> > port);
> >
> >         this.baseURI = baseURI;
> >         this.contextJsonMap = contextJsonMap;
> >         this.namespaceURI = namespaceURI;
> >     }
> >
> >     @Override
> >     public void invoke(IN value, Context context) throws IOException {
> >     //neptuneClientFactory.getNeptuneClient()   (repository  attribute in
> > neptuneClientFactory   is null)
> >     try (RepositoryConnection conn =
> > neptuneClientFactory.getNeptuneClient().getConnection())      {
> >     }
> >
> >     }
> > }
> >
> > public class NeptuneClientFactory implements Serializable {
> >         private transient Repository repository;
> >
> >         public NeptuneClientFactory(String protocol, String host, String
> > port) {
> >             this.repository = createNeptuneClient(protocol, host, port);
> >         }
> >
> >     public static Repository createNeptuneClient(String protocol, String
> > host, String port) {
> >         String sparqlEndpoint = String.format("%s://%s:%s/sparql",
> > protocol, host, port);
> >         Repository repo = new SPARQLRepository(sparqlEndpoint);
> >         repo.init();
> >         return repo;
> >     }
> >
> >         public Repository getNeptuneClient() {
> >             return repository;
> >         }
> > }
> >
> >
> > filtredNonEmptyP5.addSink(new FlinkNeptuneSink<>("https", "neptunehost",
> > "8182", "https://localhost/entity", contextJsonMap, "
> > https://localhost/namespaces/default"));
> >
> > `````````
> >
> > when  it invokes method then only neptuneClientFactory has a repository
> > value as null. not sure why, it has other attributes values properly set.
> >
> > Is flink initializing sink attributes from somewhere else?
> > When I debug  then while creating sink  it
> > initializes neptuneClientFactory  properly but when it comes to invoke
> > method then the repository is blank.
> >
> > Please help.
> >
> > --
> > Thanks
> > Jigar Gajjar
> >
>

Re: Custom Sink Object attribute issue

Posted by Till Rohrmann <tr...@apache.org>.
Hi Jigar,

in order to run the Sink function on the Flink cluster, it will be
serialized. Since you marked the repository as transient, it won't be
shipped to the cluster. So if Repository is Serializable, you can ship it
to the cluster. If not, then you need to reconstruct the Repository on the
cluster (e.g. on the first invoke call or the open call on the
RichSinkFunction).

Cheers,
Till

On Mon, Oct 11, 2021 at 10:12 AM Jigar Gajjar <ji...@gmail.com>
wrote:

> Hello Devs,
>
>
> Here is my custom sink code.
>
> `````````````````````````
>
> public class FlinkNeptuneSink<IN> extends RichSinkFunction<IN> {
>     static HttpClient client = HttpClient.newHttpClient();
>     private static final long serialVersionUID = 1L;
>     NeptuneClientFactory neptuneClientFactory;
>     JsonLDWriteContext jsonLDWriteContext;
>     String baseURI;
>     Map contextJsonMap;
>     String namespaceURI;
>
>     public FlinkNeptuneSink(String protocol, String host, String port,
> String baseURI, Map contextJsonMap, String namespaceURI) {
>         neptuneClientFactory = new NeptuneClientFactory(protocol, host,
> port);
>
>         this.baseURI = baseURI;
>         this.contextJsonMap = contextJsonMap;
>         this.namespaceURI = namespaceURI;
>     }
>
>     @Override
>     public void invoke(IN value, Context context) throws IOException {
>     //neptuneClientFactory.getNeptuneClient()   (repository  attribute in
> neptuneClientFactory   is null)
>     try (RepositoryConnection conn =
> neptuneClientFactory.getNeptuneClient().getConnection())      {
>     }
>
>     }
> }
>
> public class NeptuneClientFactory implements Serializable {
>         private transient Repository repository;
>
>         public NeptuneClientFactory(String protocol, String host, String
> port) {
>             this.repository = createNeptuneClient(protocol, host, port);
>         }
>
>     public static Repository createNeptuneClient(String protocol, String
> host, String port) {
>         String sparqlEndpoint = String.format("%s://%s:%s/sparql",
> protocol, host, port);
>         Repository repo = new SPARQLRepository(sparqlEndpoint);
>         repo.init();
>         return repo;
>     }
>
>         public Repository getNeptuneClient() {
>             return repository;
>         }
> }
>
>
> filtredNonEmptyP5.addSink(new FlinkNeptuneSink<>("https", "neptunehost",
> "8182", "https://localhost/entity", contextJsonMap, "
> https://localhost/namespaces/default"));
>
> `````````
>
> when  it invokes method then only neptuneClientFactory has a repository
> value as null. not sure why, it has other attributes values properly set.
>
> Is flink initializing sink attributes from somewhere else?
> When I debug  then while creating sink  it
> initializes neptuneClientFactory  properly but when it comes to invoke
> method then the repository is blank.
>
> Please help.
>
> --
> Thanks
> Jigar Gajjar
>