You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by "Zephyr Guo (JIRA)" <ji...@apache.org> on 2015/07/01 08:04:05 UTC
[jira] [Work started] (HBASE-13997) ScannerCallableWithReplicas
cause Infinitely blocking
[ https://issues.apache.org/jira/browse/HBASE-13997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Work on HBASE-13997 started by Zephyr Guo.
------------------------------------------
> ScannerCallableWithReplicas cause Infinitely blocking
> -----------------------------------------------------
>
> Key: HBASE-13997
> URL: https://issues.apache.org/jira/browse/HBASE-13997
> Project: HBase
> Issue Type: Bug
> Components: Client
> Affects Versions: 1.0.1.1
> Reporter: Zephyr Guo
> Assignee: Zephyr Guo
> Priority: Minor
>
> Bug in ScannerCallableWithReplicas.addCallsForOtherReplicas method
> {code:title=code in ScannerCallableWithReplicas.addCallsForOtherReplicas |borderStyle=solid}
> private int addCallsForOtherReplicas(
> BoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl, int min,
> int max) {
> if (scan.getConsistency() == Consistency.STRONG) {
> return 0; // not scheduling on other replicas for strong consistency
> }
> for (int id = min; id <= max; id++) {
> if (currentScannerCallable.getHRegionInfo().getReplicaId() == id) {
> continue; //this was already scheduled earlier
> }
> ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id);
> if (this.lastResult != null) {
> s.getScan().setStartRow(this.lastResult.getRow());
> }
> outstandingCallables.add(s);
> RetryingRPC retryingOnReplica = new RetryingRPC(s);
> cs.submit(retryingOnReplica);
> }
> return max - min + 1; //bug? should be "max - min",because "continue"
> //always happen once
> }
> {code}
> It can cause completed < submitted always so that the following code will be infinitely blocked.
> {code:title=code in ScannerCallableWithReplicas.call|borderStyle=solid}
> // submitted larger than the actual one
> submitted += addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
> try {
> //here will be affected
> while (completed < submitted) {
> try {
> Future<Pair<Result[], ScannerCallable>> f = cs.take();
> Pair<Result[], ScannerCallable> r = f.get();
> if (r != null && r.getSecond() != null) {
> updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
> }
> return r == null ? null : r.getFirst(); // great we got an answer
> } catch (ExecutionException e) {
> // if not cancel or interrupt, wait until all RPC's are done
> // one of the tasks failed. Save the exception for later.
> if (exceptions == null) exceptions = new ArrayList<ExecutionException>(rl.size());
> exceptions.add(e);
> completed++;
> }
> }
> } catch (CancellationException e) {
> throw new InterruptedIOException(e.getMessage());
> } catch (InterruptedException e) {
> throw new InterruptedIOException(e.getMessage());
> } finally {
> // We get there because we were interrupted or because one or more of the
> // calls succeeded or failed. In all case, we stop all our tasks.
> cs.cancelAll(true);
> }
> {code}
> If all replica-RS occur ExecutionException ,it will be infinitely blocked in cs.take()
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)