You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Jigar Gajjar <ji...@gmail.com> on 2021/10/09 15:40:48 UTC
Custom Sink Object attribute issue
Hello Devs,
Here is my custom sink code.
`````````````````````````
public class FlinkNeptuneSink<IN> extends RichSinkFunction<IN> {
static HttpClient client = HttpClient.newHttpClient();
private static final long serialVersionUID = 1L;
NeptuneClientFactory neptuneClientFactory;
JsonLDWriteContext jsonLDWriteContext;
String baseURI;
Map contextJsonMap;
String namespaceURI;
public FlinkNeptuneSink(String protocol, String host, String port,
String baseURI, Map contextJsonMap, String namespaceURI) {
neptuneClientFactory = new NeptuneClientFactory(protocol, host,
port);
this.baseURI = baseURI;
this.contextJsonMap = contextJsonMap;
this.namespaceURI = namespaceURI;
}
@Override
public void invoke(IN value, Context context) throws IOException {
//neptuneClientFactory.getNeptuneClient() (repository attribute in
neptuneClientFactory is null)
try (RepositoryConnection conn =
neptuneClientFactory.getNeptuneClient().getConnection()) {
}
}
}
public class NeptuneClientFactory implements Serializable {
private transient Repository repository;
public NeptuneClientFactory(String protocol, String host, String
port) {
this.repository = createNeptuneClient(protocol, host, port);
}
public static Repository createNeptuneClient(String protocol, String
host, String port) {
String sparqlEndpoint = String.format("%s://%s:%s/sparql",
protocol, host, port);
Repository repo = new SPARQLRepository(sparqlEndpoint);
repo.init();
return repo;
}
public Repository getNeptuneClient() {
return repository;
}
}
filtredNonEmptyP5.addSink(new FlinkNeptuneSink<>("https", "neptunehost",
"8182", "https://localhost/entity", contextJsonMap, "
https://localhost/namespaces/default"));
`````````
when it invokes method then only neptuneClientFactory has a repository
value as null. not sure why, it has other attributes values properly set.
Is flink initializing sink attributes from somewhere else?
When I debug then while creating sink it
initializes neptuneClientFactory properly but when it comes to invoke
method then the repository is blank.
Please help.
--
Thanks
Jigar Gajjar
Re: Custom Sink Object attribute issue
Posted by Arvid Heise <ar...@apache.org>.
Hi Jigar,
I'm moving your user question to the user ML.
The best place to initialize transient fields is in
private void readObject(java.io.ObjectInputStream in)
throws IOException, ClassNotFoundException;
as described in [1]:
Remember that transient fields will be initialized to their default values.
> You can provide a readObject method that restores transient fields to
> acceptable values, or alternatively, lazily initialize those fields first
> time they are used.
[1] Effective Java, Item 87 : Consider using a custom serialized form;
https://ahdak.github.io/blog/effective-java-part-11/
On Mon, Oct 11, 2021 at 10:56 AM Till Rohrmann <tr...@apache.org> wrote:
> Hi Jigar,
>
> in order to run the Sink function on the Flink cluster, it will be
> serialized. Since you marked the repository as transient, it won't be
> shipped to the cluster. So if Repository is Serializable, you can ship it
> to the cluster. If not, then you need to reconstruct the Repository on the
> cluster (e.g. on the first invoke call or the open call on the
> RichSinkFunction).
>
> Cheers,
> Till
>
> On Mon, Oct 11, 2021 at 10:12 AM Jigar Gajjar <ji...@gmail.com>
> wrote:
>
> > Hello Devs,
> >
> >
> > Here is my custom sink code.
> >
> > `````````````````````````
> >
> > public class FlinkNeptuneSink<IN> extends RichSinkFunction<IN> {
> > static HttpClient client = HttpClient.newHttpClient();
> > private static final long serialVersionUID = 1L;
> > NeptuneClientFactory neptuneClientFactory;
> > JsonLDWriteContext jsonLDWriteContext;
> > String baseURI;
> > Map contextJsonMap;
> > String namespaceURI;
> >
> > public FlinkNeptuneSink(String protocol, String host, String port,
> > String baseURI, Map contextJsonMap, String namespaceURI) {
> > neptuneClientFactory = new NeptuneClientFactory(protocol, host,
> > port);
> >
> > this.baseURI = baseURI;
> > this.contextJsonMap = contextJsonMap;
> > this.namespaceURI = namespaceURI;
> > }
> >
> > @Override
> > public void invoke(IN value, Context context) throws IOException {
> > //neptuneClientFactory.getNeptuneClient() (repository attribute in
> > neptuneClientFactory is null)
> > try (RepositoryConnection conn =
> > neptuneClientFactory.getNeptuneClient().getConnection()) {
> > }
> >
> > }
> > }
> >
> > public class NeptuneClientFactory implements Serializable {
> > private transient Repository repository;
> >
> > public NeptuneClientFactory(String protocol, String host, String
> > port) {
> > this.repository = createNeptuneClient(protocol, host, port);
> > }
> >
> > public static Repository createNeptuneClient(String protocol, String
> > host, String port) {
> > String sparqlEndpoint = String.format("%s://%s:%s/sparql",
> > protocol, host, port);
> > Repository repo = new SPARQLRepository(sparqlEndpoint);
> > repo.init();
> > return repo;
> > }
> >
> > public Repository getNeptuneClient() {
> > return repository;
> > }
> > }
> >
> >
> > filtredNonEmptyP5.addSink(new FlinkNeptuneSink<>("https", "neptunehost",
> > "8182", "https://localhost/entity", contextJsonMap, "
> > https://localhost/namespaces/default"));
> >
> > `````````
> >
> > when it invokes method then only neptuneClientFactory has a repository
> > value as null. not sure why, it has other attributes values properly set.
> >
> > Is flink initializing sink attributes from somewhere else?
> > When I debug then while creating sink it
> > initializes neptuneClientFactory properly but when it comes to invoke
> > method then the repository is blank.
> >
> > Please help.
> >
> > --
> > Thanks
> > Jigar Gajjar
> >
>
Re: Custom Sink Object attribute issue
Posted by Till Rohrmann <tr...@apache.org>.
Hi Jigar,
in order to run the Sink function on the Flink cluster, it will be
serialized. Since you marked the repository as transient, it won't be
shipped to the cluster. So if Repository is Serializable, you can ship it
to the cluster. If not, then you need to reconstruct the Repository on the
cluster (e.g. on the first invoke call or the open call on the
RichSinkFunction).
Cheers,
Till
On Mon, Oct 11, 2021 at 10:12 AM Jigar Gajjar <ji...@gmail.com>
wrote:
> Hello Devs,
>
>
> Here is my custom sink code.
>
> `````````````````````````
>
> public class FlinkNeptuneSink<IN> extends RichSinkFunction<IN> {
> static HttpClient client = HttpClient.newHttpClient();
> private static final long serialVersionUID = 1L;
> NeptuneClientFactory neptuneClientFactory;
> JsonLDWriteContext jsonLDWriteContext;
> String baseURI;
> Map contextJsonMap;
> String namespaceURI;
>
> public FlinkNeptuneSink(String protocol, String host, String port,
> String baseURI, Map contextJsonMap, String namespaceURI) {
> neptuneClientFactory = new NeptuneClientFactory(protocol, host,
> port);
>
> this.baseURI = baseURI;
> this.contextJsonMap = contextJsonMap;
> this.namespaceURI = namespaceURI;
> }
>
> @Override
> public void invoke(IN value, Context context) throws IOException {
> //neptuneClientFactory.getNeptuneClient() (repository attribute in
> neptuneClientFactory is null)
> try (RepositoryConnection conn =
> neptuneClientFactory.getNeptuneClient().getConnection()) {
> }
>
> }
> }
>
> public class NeptuneClientFactory implements Serializable {
> private transient Repository repository;
>
> public NeptuneClientFactory(String protocol, String host, String
> port) {
> this.repository = createNeptuneClient(protocol, host, port);
> }
>
> public static Repository createNeptuneClient(String protocol, String
> host, String port) {
> String sparqlEndpoint = String.format("%s://%s:%s/sparql",
> protocol, host, port);
> Repository repo = new SPARQLRepository(sparqlEndpoint);
> repo.init();
> return repo;
> }
>
> public Repository getNeptuneClient() {
> return repository;
> }
> }
>
>
> filtredNonEmptyP5.addSink(new FlinkNeptuneSink<>("https", "neptunehost",
> "8182", "https://localhost/entity", contextJsonMap, "
> https://localhost/namespaces/default"));
>
> `````````
>
> when it invokes method then only neptuneClientFactory has a repository
> value as null. not sure why, it has other attributes values properly set.
>
> Is flink initializing sink attributes from somewhere else?
> When I debug then while creating sink it
> initializes neptuneClientFactory properly but when it comes to invoke
> method then the repository is blank.
>
> Please help.
>
> --
> Thanks
> Jigar Gajjar
>