You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/08/03 13:08:20 UTC

[jira] [Commented] (FLINK-4311) TableInputFormat fails when reused on next split

    [ https://issues.apache.org/jira/browse/FLINK-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15405893#comment-15405893 ] 

ASF GitHub Bot commented on FLINK-4311:
---------------------------------------

GitHub user nielsbasjes opened a pull request:

    https://github.com/apache/flink/pull/2330

    FLINK-4311 Fixed several problems in TableInputFormat

    Question: Do you guys want a unit test for this?
    In HBase itself I have done this in the past yet this required a large chunk of additional software to start and stop an HBase minicluster during the unit tests.
    I.e. pull in this thing: 
    https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java
    and then do something like this:
    https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestScanRowPrefix.java


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/nielsbasjes/flink FLINK-4311

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2330.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2330
    
----
commit 5c3d53c810f8df6d5544685ef3f1004c46541daf
Author: Niels Basjes <nb...@bol.com>
Date:   2016-08-03T12:54:34Z

    [FLINK-4311] TableInputFormat can handle reuse for next input split

commit 8696f5e257c7434d62e662c4c97f4ede2da5411b
Author: Niels Basjes <nb...@bol.com>
Date:   2016-08-03T12:56:01Z

    [FLINK-4311] Cannot override a static member function.

----


> TableInputFormat fails when reused on next split
> ------------------------------------------------
>
>                 Key: FLINK-4311
>                 URL: https://issues.apache.org/jira/browse/FLINK-4311
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.0.3
>            Reporter: Niels Basjes
>            Assignee: Niels Basjes
>            Priority: Critical
>
> We have written a batch job that uses data from HBase by means of using the TableInputFormat.
> We have found that this class sometimes fails with this exception:
> {quote}
> java.lang.RuntimeException: java.util.concurrent.RejectedExecutionException: Task org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
> 	at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208)
> 	at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
> 	at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
> 	at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
> 	at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:155)
> 	at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
> 	at org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:152)
> 	at org.apache.flink.addons.hbase.TableInputFormat.open(TableInputFormat.java:47)
> 	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:147)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.RejectedExecutionException: Task org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture@4f4efe4b rejected from java.util.concurrent.ThreadPoolExecutor@7872d5c1[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1165]
> 	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
> 	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
> 	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
> 	at org.apache.hadoop.hbase.client.ResultBoundedCompletionService.submit(ResultBoundedCompletionService.java:142)
> 	at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.addCallsForCurrentReplica(ScannerCallableWithReplicas.java:269)
> 	at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:165)
> 	at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
> 	at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
> 	... 10 more
> {quote}
> As you can see the ThreadPoolExecutor was terminated at this point.
> We tracked it down to the fact that 
> # the configure method opens the table
> # the open method obtains the result scanner
> # the closes method closes the table.
> If a second split arrives on the same instance then the open method will fail because the table has already been closed.
> We also found that this error varies with the versions of HBase that are used. I have also seen this exception:
> {quote}
> Caused by: java.io.IOException: hconnection-0x19d37183 closed
> 	at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1146)
> 	at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300)
> 	... 37 more
> {quote}
> I found that in the [documentation of the InputFormat interface|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/io/InputFormat.html] is clearly states
> {quote}IMPORTANT NOTE: Input formats must be written such that an instance can be opened again after it was closed. That is due to the fact that the input format is used for potentially multiple splits. After a split is done, the format's close function is invoked and, if another split is available, the open function is invoked afterwards for the next split.{quote}
> It appears that this specific InputFormat has not been checked against this constraint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)