You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Frank Wilson <fr...@memberoo.net> on 2016/05/11 10:26:16 UTC

Threads and ThreadLocal variables in Dataflow

I am trying to read AppEngine search index documents using Google Cloud
Dataflow. I tried raising this before
on the google-cloud-sdk mailinglist but I was told that stackoverflow was a
better place for discussing code.
Unfortunately, as I hope will become obvious this is an awkward issue that
is unlikely to have a straight forward
answer and so would most likely be marked as 'unconstructive' by
stackoverflow moderators. My hope is this
list can give more information on what I guarantees I can expect with
regards to threading from the pipeline runner.

I am reading the AppEngine search indexes this the remote api, which is the
only way to access the service
via Java outside of AppEngine. One awkward aspect of the remote api is that
it forces you to install a proxy stub
in what seems to be some kind of thread local variable. I include a simple
Reader and Source for a single search
index at the end of this email.

The obvious issue is that 'installing' the remote api impacts the
reusability and composability of this source.
I've made a somewhat futile attempt to detect another remote api
'installation' via reflection to prevent an
exception being thrown and continue hoping that whatever remote api proxy
is present is the right one!

I've since begun extending this code to read from several indexes at once
and the problem just gets worse
since I now need to make remote api calls the get the list of indexes in
the createReader method of the
source.

It seems to me that it would be nice to have some kind of worker 'new
thread hook' where thread local variables
can be set up. Does this exist? This would let me decouple remote api proxy
'installation' from source and reader
itself.

Thanks,

Frank

***************
Sample code
***************

class SearchIndexReader extends BoundedSource.BoundedReader<Document> {

    private Document current;
    private RemoteApiInstaller installer;
    private SearchIndexSource source;
    private Iterator<Document> documentIterator;
    boolean cleanupApiInstallation;

    public SearchIndexReader(SearchIndexSource source) {
        this.source = source;
        cleanupApiInstallation = false;
    }

    @Override
    public boolean start() throws IOException {
        RemoteApiOptions options = new RemoteApiOptions()
                .server("myProjectId.appspot.com", 443)
                .useApplicationDefaultCredential()
                .remoteApiPath("/_ah/remote_api/");

        installer = new RemoteApiInstaller();

        try { // XXX is accessing this private method justified?
            Method installed =
installer.getClass().getDeclaredMethod("installed");

            installed.setAccessible(true);

            if (!(Boolean) installed.invoke(installer)) {
                installer.install(options);
                cleanupApiInstallation = true;
            }
        } catch
(NoSuchMethodException|IllegalAccessException|InvocationTargetException e) {
            throw new RuntimeException(e);
        }

        GetRequest request =
GetRequest.newBuilder().setReturningIdsOnly(true).build();
        Index index = getIndex();
        GetResponse<Document> rangeResponse = index.getRange(request);

        documentIterator = rangeResponse.iterator();

        return advance();
    }

    private Index getIndex() {
        IndexSpec remoteIndex =
IndexSpec.newBuilder().setName("remoteIndex").build();
        return
SearchServiceFactory.getSearchService().getIndex(remoteIndex);
    }

    @Override
    public boolean advance() throws IOException {
        if (documentIterator.hasNext()) {
            Document partialDoc = documentIterator.next();
            current = getIndex().get(partialDoc.getId());
            return true;
        }
        return false;
    }

    @Override
    public Document getCurrent() throws NoSuchElementException {
        return current;
    }

    @Override
    public void close() throws IOException {
        documentIterator = null;
        if (cleanupApiInstallation) {
            installer.uninstall();
        }
    }

    @Override
    public BoundedSource<Document> getCurrentSource() {
        return source;
    }
}

class SearchIndexSource extends BoundedSource<Document> {

    @Override
    public List<? extends BoundedSource<Document>> splitIntoBundles(long l,
PipelineOptions pipelineOptions) throws Exception {
        return Collections.singletonList(this);
    }

    @Override
    public long getEstimatedSizeBytes(PipelineOptions pipelineOptions)
throws Exception {
        throw new NotImplementedException();
    }

    @Override
    public boolean producesSortedKeys(PipelineOptions pipelineOptions)
throws Exception {
        return false;
    }

    @Override
    public BoundedReader<Document> createReader(PipelineOptions
pipelineOptions) throws IOException {
        return new SearchIndexReader(this);
    }

    @Override
    public void validate() {

    }

    @Override
    public Coder<Document> getDefaultOutputCoder() {
        return DocumentCoder.of();
    }
}

Re: Threads and ThreadLocal variables in Dataflow

Posted by Frank Wilson <fr...@memberoo.net>.
Hi Lukasz,

I started (later yesterday) using a ThreadLocal Facade object to
encapsulate the 'installation' of the remote api, but I like your idea of
using a single threaded executor, so I will give it a go.

Thanks,

Frank

On 11 May 2016 at 17:19, Lukasz Cwik <lc...@google.com> wrote:

> RemoteApiInstaller seems to be unfriendly.
>
> Have you considered using a single threaded executor
> <https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newSingleThreadExecutor()> where
> you channel all the work you want to do via Callable/Future pairs?
> So in start() you would create this single threaded executor, and submit
> the first callable to invoke the RemoteApiInstaller.install(...).
> In getCurrent()/advance()/... you would channel all your work through the
> single threaded executor by giving it callables and blocking on the future
> to get the result.
> In close() you would submit a task to uninstall the RemoteApi and then you
> would shutdown the executor.
>
> This way you don't have to worry about other peoples installations, can
> have multiple variants, and just need to invoke a callable to do work.
> Just remember to shutdown the executor to not leak threads.
>
>
>
>

Re: Threads and ThreadLocal variables in Dataflow

Posted by Frances Perry <fj...@google.com>.
Side note: For general programming model questions like this, this list is
great. But if you are using Cloud Dataflow for Java 1.x with the Cloud
Dataflow runner (aka. the recommended configuration for running at Google
currently), Stack Overflow (tag google-cloud-dataflow
<http://stackoverflow.com/questions/tagged/google-cloud-dataflow>) is
currently the best place for detailed support as we're still in the process
of bootstrapping the Apache Beam support on Google Cloud.

On Wed, May 11, 2016 at 9:19 AM, Lukasz Cwik <lc...@google.com> wrote:

> RemoteApiInstaller seems to be unfriendly.
>
> Have you considered using a single threaded executor
> <https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newSingleThreadExecutor()> where
> you channel all the work you want to do via Callable/Future pairs?
> So in start() you would create this single threaded executor, and submit
> the first callable to invoke the RemoteApiInstaller.install(...).
> In getCurrent()/advance()/... you would channel all your work through the
> single threaded executor by giving it callables and blocking on the future
> to get the result.
> In close() you would submit a task to uninstall the RemoteApi and then you
> would shutdown the executor.
>
> This way you don't have to worry about other peoples installations, can
> have multiple variants, and just need to invoke a callable to do work.
> Just remember to shutdown the executor to not leak threads.
>
>
>
>
> On Wed, May 11, 2016 at 6:55 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
> wrote:
>
>> Hi Frank,
>>
>> Which runner do you use ?
>>
>> We are working on the InProcessPipelineRunner right now as it heavily
>> creates threads.
>>
>> About your question, where do you put the AppEngine API call ? In your
>> own ParDo ?
>>
>> Regards
>> JB
>>
>>
>> On 05/11/2016 12:26 PM, Frank Wilson wrote:
>>
>>> I am trying to read AppEngine search index documents using Google Cloud
>>> Dataflow. I tried raising this before
>>> on the google-cloud-sdk mailinglist but I was told that stackoverflow
>>> was a better place for discussing code.
>>> Unfortunately, as I hope will become obvious this is an awkward issue
>>> that is unlikely to have a straight forward
>>> answer and so would most likely be marked as 'unconstructive' by
>>> stackoverflow moderators. My hope is this
>>> list can give more information on what I guarantees I can expect with
>>> regards to threading from the pipeline runner.
>>>
>>> I am reading the AppEngine search indexes this the remote api, which is
>>> the only way to access the service
>>> via Java outside of AppEngine. One awkward aspect of the remote api is
>>> that it forces you to install a proxy stub
>>> in what seems to be some kind of thread local variable. I include a
>>> simple Reader and Source for a single search
>>> index at the end of this email.
>>>
>>> The obvious issue is that 'installing' the remote api impacts the
>>> reusability and composability of this source.
>>> I've made a somewhat futile attempt to detect another remote api
>>> 'installation' via reflection to prevent an
>>> exception being thrown and continue hoping that whatever remote api
>>> proxy is present is the right one!
>>>
>>> I've since begun extending this code to read from several indexes at
>>> once and the problem just gets worse
>>> since I now need to make remote api calls the get the list of indexes in
>>> the createReader method of the
>>> source.
>>>
>>> It seems to me that it would be nice to have some kind of worker 'new
>>> thread hook' where thread local variables
>>> can be set up. Does this exist? This would let me decouple remote api
>>> proxy 'installation' from source and reader
>>> itself.
>>>
>>> Thanks,
>>>
>>> Frank
>>>
>>> ***************
>>> Sample code
>>> ***************
>>>
>>> class SearchIndexReader extends BoundedSource.BoundedReader<Document> {
>>>
>>>      private Document current;
>>>      private RemoteApiInstaller installer;
>>>      private SearchIndexSource source;
>>>      private Iterator<Document> documentIterator;
>>>      boolean cleanupApiInstallation;
>>>
>>>      public SearchIndexReader(SearchIndexSource source) {
>>>          this.source = source;
>>>          cleanupApiInstallation = false;
>>>      }
>>>
>>>      @Override
>>>      public boolean start() throws IOException {
>>>          RemoteApiOptions options = new RemoteApiOptions()
>>>                  .server("myProjectId.appspot.com
>>> <http://myProjectId.appspot.com>", 443)
>>>
>>>                  .useApplicationDefaultCredential()
>>>                  .remoteApiPath("/_ah/remote_api/");
>>>
>>>          installer = new RemoteApiInstaller();
>>>
>>>          try { // XXX is accessing this private method justified?
>>>              Method installed =
>>> installer.getClass().getDeclaredMethod("installed");
>>>
>>>              installed.setAccessible(true);
>>>
>>>              if (!(Boolean) installed.invoke(installer)) {
>>>                  installer.install(options);
>>>                  cleanupApiInstallation = true;
>>>              }
>>>          } catch
>>> (NoSuchMethodException|IllegalAccessException|InvocationTargetException
>>> e) {
>>>              throw new RuntimeException(e);
>>>          }
>>>
>>>          GetRequest request =
>>> GetRequest.newBuilder().setReturningIdsOnly(true).build();
>>>          Index index = getIndex();
>>>          GetResponse<Document> rangeResponse = index.getRange(request);
>>>
>>>          documentIterator = rangeResponse.iterator();
>>>
>>>          return advance();
>>>      }
>>>
>>>      private Index getIndex() {
>>>          IndexSpec remoteIndex =
>>> IndexSpec.newBuilder().setName("remoteIndex").build();
>>>          return
>>> SearchServiceFactory.getSearchService().getIndex(remoteIndex);
>>>      }
>>>
>>>      @Override
>>>      public boolean advance() throws IOException {
>>>          if (documentIterator.hasNext()) {
>>>              Document partialDoc = documentIterator.next();
>>>              current = getIndex().get(partialDoc.getId());
>>>              return true;
>>>          }
>>>          return false;
>>>      }
>>>
>>>      @Override
>>>      public Document getCurrent() throws NoSuchElementException {
>>>          return current;
>>>      }
>>>
>>>      @Override
>>>      public void close() throws IOException {
>>>          documentIterator = null;
>>>          if (cleanupApiInstallation) {
>>>              installer.uninstall();
>>>          }
>>>      }
>>>
>>>      @Override
>>>      public BoundedSource<Document> getCurrentSource() {
>>>          return source;
>>>      }
>>> }
>>>
>>> class SearchIndexSource extends BoundedSource<Document> {
>>>
>>>      @Override
>>>      public List<? extends BoundedSource<Document>>
>>> splitIntoBundles(long l, PipelineOptions pipelineOptions) throws
>>> Exception {
>>>          return Collections.singletonList(this);
>>>      }
>>>
>>>      @Override
>>>      public long getEstimatedSizeBytes(PipelineOptions pipelineOptions)
>>> throws Exception {
>>>          throw new NotImplementedException();
>>>      }
>>>
>>>      @Override
>>>      public boolean producesSortedKeys(PipelineOptions pipelineOptions)
>>> throws Exception {
>>>          return false;
>>>      }
>>>
>>>      @Override
>>>      public BoundedReader<Document> createReader(PipelineOptions
>>> pipelineOptions) throws IOException {
>>>          return new SearchIndexReader(this);
>>>      }
>>>
>>>      @Override
>>>      public void validate() {
>>>
>>>      }
>>>
>>>      @Override
>>>      public Coder<Document> getDefaultOutputCoder() {
>>>          return DocumentCoder.of();
>>>      }
>>> }
>>>
>>>
>>>
>> --
>> Jean-Baptiste Onofré
>> jbonofre@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>
>

Re: Threads and ThreadLocal variables in Dataflow

Posted by Lukasz Cwik <lc...@google.com>.
RemoteApiInstaller seems to be unfriendly.

Have you considered using a single threaded executor
<https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newSingleThreadExecutor()>
where
you channel all the work you want to do via Callable/Future pairs?
So in start() you would create this single threaded executor, and submit
the first callable to invoke the RemoteApiInstaller.install(...).
In getCurrent()/advance()/... you would channel all your work through the
single threaded executor by giving it callables and blocking on the future
to get the result.
In close() you would submit a task to uninstall the RemoteApi and then you
would shutdown the executor.

This way you don't have to worry about other peoples installations, can
have multiple variants, and just need to invoke a callable to do work.
Just remember to shutdown the executor to not leak threads.




On Wed, May 11, 2016 at 6:55 AM, Jean-Baptiste Onofré <jb...@nanthrax.net>
wrote:

> Hi Frank,
>
> Which runner do you use ?
>
> We are working on the InProcessPipelineRunner right now as it heavily
> creates threads.
>
> About your question, where do you put the AppEngine API call ? In your own
> ParDo ?
>
> Regards
> JB
>
>
> On 05/11/2016 12:26 PM, Frank Wilson wrote:
>
>> I am trying to read AppEngine search index documents using Google Cloud
>> Dataflow. I tried raising this before
>> on the google-cloud-sdk mailinglist but I was told that stackoverflow
>> was a better place for discussing code.
>> Unfortunately, as I hope will become obvious this is an awkward issue
>> that is unlikely to have a straight forward
>> answer and so would most likely be marked as 'unconstructive' by
>> stackoverflow moderators. My hope is this
>> list can give more information on what I guarantees I can expect with
>> regards to threading from the pipeline runner.
>>
>> I am reading the AppEngine search indexes this the remote api, which is
>> the only way to access the service
>> via Java outside of AppEngine. One awkward aspect of the remote api is
>> that it forces you to install a proxy stub
>> in what seems to be some kind of thread local variable. I include a
>> simple Reader and Source for a single search
>> index at the end of this email.
>>
>> The obvious issue is that 'installing' the remote api impacts the
>> reusability and composability of this source.
>> I've made a somewhat futile attempt to detect another remote api
>> 'installation' via reflection to prevent an
>> exception being thrown and continue hoping that whatever remote api
>> proxy is present is the right one!
>>
>> I've since begun extending this code to read from several indexes at
>> once and the problem just gets worse
>> since I now need to make remote api calls the get the list of indexes in
>> the createReader method of the
>> source.
>>
>> It seems to me that it would be nice to have some kind of worker 'new
>> thread hook' where thread local variables
>> can be set up. Does this exist? This would let me decouple remote api
>> proxy 'installation' from source and reader
>> itself.
>>
>> Thanks,
>>
>> Frank
>>
>> ***************
>> Sample code
>> ***************
>>
>> class SearchIndexReader extends BoundedSource.BoundedReader<Document> {
>>
>>      private Document current;
>>      private RemoteApiInstaller installer;
>>      private SearchIndexSource source;
>>      private Iterator<Document> documentIterator;
>>      boolean cleanupApiInstallation;
>>
>>      public SearchIndexReader(SearchIndexSource source) {
>>          this.source = source;
>>          cleanupApiInstallation = false;
>>      }
>>
>>      @Override
>>      public boolean start() throws IOException {
>>          RemoteApiOptions options = new RemoteApiOptions()
>>                  .server("myProjectId.appspot.com
>> <http://myProjectId.appspot.com>", 443)
>>
>>                  .useApplicationDefaultCredential()
>>                  .remoteApiPath("/_ah/remote_api/");
>>
>>          installer = new RemoteApiInstaller();
>>
>>          try { // XXX is accessing this private method justified?
>>              Method installed =
>> installer.getClass().getDeclaredMethod("installed");
>>
>>              installed.setAccessible(true);
>>
>>              if (!(Boolean) installed.invoke(installer)) {
>>                  installer.install(options);
>>                  cleanupApiInstallation = true;
>>              }
>>          } catch
>> (NoSuchMethodException|IllegalAccessException|InvocationTargetException
>> e) {
>>              throw new RuntimeException(e);
>>          }
>>
>>          GetRequest request =
>> GetRequest.newBuilder().setReturningIdsOnly(true).build();
>>          Index index = getIndex();
>>          GetResponse<Document> rangeResponse = index.getRange(request);
>>
>>          documentIterator = rangeResponse.iterator();
>>
>>          return advance();
>>      }
>>
>>      private Index getIndex() {
>>          IndexSpec remoteIndex =
>> IndexSpec.newBuilder().setName("remoteIndex").build();
>>          return
>> SearchServiceFactory.getSearchService().getIndex(remoteIndex);
>>      }
>>
>>      @Override
>>      public boolean advance() throws IOException {
>>          if (documentIterator.hasNext()) {
>>              Document partialDoc = documentIterator.next();
>>              current = getIndex().get(partialDoc.getId());
>>              return true;
>>          }
>>          return false;
>>      }
>>
>>      @Override
>>      public Document getCurrent() throws NoSuchElementException {
>>          return current;
>>      }
>>
>>      @Override
>>      public void close() throws IOException {
>>          documentIterator = null;
>>          if (cleanupApiInstallation) {
>>              installer.uninstall();
>>          }
>>      }
>>
>>      @Override
>>      public BoundedSource<Document> getCurrentSource() {
>>          return source;
>>      }
>> }
>>
>> class SearchIndexSource extends BoundedSource<Document> {
>>
>>      @Override
>>      public List<? extends BoundedSource<Document>>
>> splitIntoBundles(long l, PipelineOptions pipelineOptions) throws
>> Exception {
>>          return Collections.singletonList(this);
>>      }
>>
>>      @Override
>>      public long getEstimatedSizeBytes(PipelineOptions pipelineOptions)
>> throws Exception {
>>          throw new NotImplementedException();
>>      }
>>
>>      @Override
>>      public boolean producesSortedKeys(PipelineOptions pipelineOptions)
>> throws Exception {
>>          return false;
>>      }
>>
>>      @Override
>>      public BoundedReader<Document> createReader(PipelineOptions
>> pipelineOptions) throws IOException {
>>          return new SearchIndexReader(this);
>>      }
>>
>>      @Override
>>      public void validate() {
>>
>>      }
>>
>>      @Override
>>      public Coder<Document> getDefaultOutputCoder() {
>>          return DocumentCoder.of();
>>      }
>> }
>>
>>
>>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Re: Threads and ThreadLocal variables in Dataflow

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Frank,

Which runner do you use ?

We are working on the InProcessPipelineRunner right now as it heavily 
creates threads.

About your question, where do you put the AppEngine API call ? In your 
own ParDo ?

Regards
JB

On 05/11/2016 12:26 PM, Frank Wilson wrote:
> I am trying to read AppEngine search index documents using Google Cloud
> Dataflow. I tried raising this before
> on the google-cloud-sdk mailinglist but I was told that stackoverflow
> was a better place for discussing code.
> Unfortunately, as I hope will become obvious this is an awkward issue
> that is unlikely to have a straight forward
> answer and so would most likely be marked as 'unconstructive' by
> stackoverflow moderators. My hope is this
> list can give more information on what I guarantees I can expect with
> regards to threading from the pipeline runner.
>
> I am reading the AppEngine search indexes this the remote api, which is
> the only way to access the service
> via Java outside of AppEngine. One awkward aspect of the remote api is
> that it forces you to install a proxy stub
> in what seems to be some kind of thread local variable. I include a
> simple Reader and Source for a single search
> index at the end of this email.
>
> The obvious issue is that 'installing' the remote api impacts the
> reusability and composability of this source.
> I've made a somewhat futile attempt to detect another remote api
> 'installation' via reflection to prevent an
> exception being thrown and continue hoping that whatever remote api
> proxy is present is the right one!
>
> I've since begun extending this code to read from several indexes at
> once and the problem just gets worse
> since I now need to make remote api calls the get the list of indexes in
> the createReader method of the
> source.
>
> It seems to me that it would be nice to have some kind of worker 'new
> thread hook' where thread local variables
> can be set up. Does this exist? This would let me decouple remote api
> proxy 'installation' from source and reader
> itself.
>
> Thanks,
>
> Frank
>
> ***************
> Sample code
> ***************
>
> class SearchIndexReader extends BoundedSource.BoundedReader<Document> {
>
>      private Document current;
>      private RemoteApiInstaller installer;
>      private SearchIndexSource source;
>      private Iterator<Document> documentIterator;
>      boolean cleanupApiInstallation;
>
>      public SearchIndexReader(SearchIndexSource source) {
>          this.source = source;
>          cleanupApiInstallation = false;
>      }
>
>      @Override
>      public boolean start() throws IOException {
>          RemoteApiOptions options = new RemoteApiOptions()
>                  .server("myProjectId.appspot.com
> <http://myProjectId.appspot.com>", 443)
>                  .useApplicationDefaultCredential()
>                  .remoteApiPath("/_ah/remote_api/");
>
>          installer = new RemoteApiInstaller();
>
>          try { // XXX is accessing this private method justified?
>              Method installed =
> installer.getClass().getDeclaredMethod("installed");
>
>              installed.setAccessible(true);
>
>              if (!(Boolean) installed.invoke(installer)) {
>                  installer.install(options);
>                  cleanupApiInstallation = true;
>              }
>          } catch
> (NoSuchMethodException|IllegalAccessException|InvocationTargetException e) {
>              throw new RuntimeException(e);
>          }
>
>          GetRequest request =
> GetRequest.newBuilder().setReturningIdsOnly(true).build();
>          Index index = getIndex();
>          GetResponse<Document> rangeResponse = index.getRange(request);
>
>          documentIterator = rangeResponse.iterator();
>
>          return advance();
>      }
>
>      private Index getIndex() {
>          IndexSpec remoteIndex =
> IndexSpec.newBuilder().setName("remoteIndex").build();
>          return
> SearchServiceFactory.getSearchService().getIndex(remoteIndex);
>      }
>
>      @Override
>      public boolean advance() throws IOException {
>          if (documentIterator.hasNext()) {
>              Document partialDoc = documentIterator.next();
>              current = getIndex().get(partialDoc.getId());
>              return true;
>          }
>          return false;
>      }
>
>      @Override
>      public Document getCurrent() throws NoSuchElementException {
>          return current;
>      }
>
>      @Override
>      public void close() throws IOException {
>          documentIterator = null;
>          if (cleanupApiInstallation) {
>              installer.uninstall();
>          }
>      }
>
>      @Override
>      public BoundedSource<Document> getCurrentSource() {
>          return source;
>      }
> }
>
> class SearchIndexSource extends BoundedSource<Document> {
>
>      @Override
>      public List<? extends BoundedSource<Document>>
> splitIntoBundles(long l, PipelineOptions pipelineOptions) throws Exception {
>          return Collections.singletonList(this);
>      }
>
>      @Override
>      public long getEstimatedSizeBytes(PipelineOptions pipelineOptions)
> throws Exception {
>          throw new NotImplementedException();
>      }
>
>      @Override
>      public boolean producesSortedKeys(PipelineOptions pipelineOptions)
> throws Exception {
>          return false;
>      }
>
>      @Override
>      public BoundedReader<Document> createReader(PipelineOptions
> pipelineOptions) throws IOException {
>          return new SearchIndexReader(this);
>      }
>
>      @Override
>      public void validate() {
>
>      }
>
>      @Override
>      public Coder<Document> getDefaultOutputCoder() {
>          return DocumentCoder.of();
>      }
> }
>
>

-- 
Jean-Baptiste Onofr�
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com