You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by gi...@apache.org on 2018/02/07 15:14:25 UTC
[31/42] hbase-site git commit: Published site at .
http://git-wip-us.apache.org/repos/asf/hbase-site/blob/6bdadd97/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.html
----------------------------------------------------------------------
diff --git a/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.html b/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.html
index b67ae11..cd0ff28 100644
--- a/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.html
+++ b/devapidocs/src-html/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.html
@@ -259,8 +259,8 @@
<span class="sourceLineNo">251</span><a name="line.251"></a>
<span class="sourceLineNo">252</span> @SuppressWarnings("unchecked")<a name="line.252"></a>
<span class="sourceLineNo">253</span> private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName,<a name="line.253"></a>
-<span class="sourceLineNo">254</span> RegionResult regionResult, List<Action> failedActions) {<a name="line.254"></a>
-<span class="sourceLineNo">255</span> Object result = regionResult.result.get(action.getOriginalIndex());<a name="line.255"></a>
+<span class="sourceLineNo">254</span> RegionResult regionResult, List<Action> failedActions, Throwable regionException) {<a name="line.254"></a>
+<span class="sourceLineNo">255</span> Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);<a name="line.255"></a>
<span class="sourceLineNo">256</span> if (result == null) {<a name="line.256"></a>
<span class="sourceLineNo">257</span> LOG.error("Server " + serverName + " sent us neither result nor exception for row '"<a name="line.257"></a>
<span class="sourceLineNo">258</span> + Bytes.toStringBinary(action.getAction().getRow()) + "' of "<a name="line.258"></a>
@@ -287,164 +287,165 @@
<span class="sourceLineNo">279</span> List<Action> failedActions = new ArrayList<>();<a name="line.279"></a>
<span class="sourceLineNo">280</span> actionsByRegion.forEach((rn, regionReq) -> {<a name="line.280"></a>
<span class="sourceLineNo">281</span> RegionResult regionResult = resp.getResults().get(rn);<a name="line.281"></a>
-<span class="sourceLineNo">282</span> if (regionResult != null) {<a name="line.282"></a>
-<span class="sourceLineNo">283</span> regionReq.actions.forEach(<a name="line.283"></a>
-<span class="sourceLineNo">284</span> action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions));<a name="line.284"></a>
-<span class="sourceLineNo">285</span> } else {<a name="line.285"></a>
-<span class="sourceLineNo">286</span> Throwable t = resp.getException(rn);<a name="line.286"></a>
-<span class="sourceLineNo">287</span> Throwable error;<a name="line.287"></a>
-<span class="sourceLineNo">288</span> if (t == null) {<a name="line.288"></a>
-<span class="sourceLineNo">289</span> LOG.error(<a name="line.289"></a>
-<span class="sourceLineNo">290</span> "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));<a name="line.290"></a>
-<span class="sourceLineNo">291</span> error = new RuntimeException("Invalid response");<a name="line.291"></a>
-<span class="sourceLineNo">292</span> } else {<a name="line.292"></a>
-<span class="sourceLineNo">293</span> error = translateException(t);<a name="line.293"></a>
-<span class="sourceLineNo">294</span> logException(tries, () -> Stream.of(regionReq), error, serverName);<a name="line.294"></a>
-<span class="sourceLineNo">295</span> conn.getLocator().updateCachedLocation(regionReq.loc, error);<a name="line.295"></a>
-<span class="sourceLineNo">296</span> if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {<a name="line.296"></a>
-<span class="sourceLineNo">297</span> failAll(regionReq.actions.stream(), tries, error, serverName);<a name="line.297"></a>
-<span class="sourceLineNo">298</span> return;<a name="line.298"></a>
-<span class="sourceLineNo">299</span> }<a name="line.299"></a>
-<span class="sourceLineNo">300</span> addError(regionReq.actions, error, serverName);<a name="line.300"></a>
-<span class="sourceLineNo">301</span> failedActions.addAll(regionReq.actions);<a name="line.301"></a>
-<span class="sourceLineNo">302</span> }<a name="line.302"></a>
-<span class="sourceLineNo">303</span> }<a name="line.303"></a>
-<span class="sourceLineNo">304</span> });<a name="line.304"></a>
-<span class="sourceLineNo">305</span> if (!failedActions.isEmpty()) {<a name="line.305"></a>
-<span class="sourceLineNo">306</span> tryResubmit(failedActions.stream(), tries);<a name="line.306"></a>
-<span class="sourceLineNo">307</span> }<a name="line.307"></a>
-<span class="sourceLineNo">308</span> }<a name="line.308"></a>
-<span class="sourceLineNo">309</span><a name="line.309"></a>
-<span class="sourceLineNo">310</span> private void send(Map<ServerName, ServerRequest> actionsByServer, int tries) {<a name="line.310"></a>
-<span class="sourceLineNo">311</span> long remainingNs;<a name="line.311"></a>
-<span class="sourceLineNo">312</span> if (operationTimeoutNs > 0) {<a name="line.312"></a>
-<span class="sourceLineNo">313</span> remainingNs = remainingTimeNs();<a name="line.313"></a>
-<span class="sourceLineNo">314</span> if (remainingNs <= 0) {<a name="line.314"></a>
-<span class="sourceLineNo">315</span> failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream())<a name="line.315"></a>
-<span class="sourceLineNo">316</span> .flatMap(r -> r.actions.stream()),<a name="line.316"></a>
-<span class="sourceLineNo">317</span> tries);<a name="line.317"></a>
-<span class="sourceLineNo">318</span> return;<a name="line.318"></a>
-<span class="sourceLineNo">319</span> }<a name="line.319"></a>
-<span class="sourceLineNo">320</span> } else {<a name="line.320"></a>
-<span class="sourceLineNo">321</span> remainingNs = Long.MAX_VALUE;<a name="line.321"></a>
-<span class="sourceLineNo">322</span> }<a name="line.322"></a>
-<span class="sourceLineNo">323</span> actionsByServer.forEach((sn, serverReq) -> {<a name="line.323"></a>
-<span class="sourceLineNo">324</span> ClientService.Interface stub;<a name="line.324"></a>
-<span class="sourceLineNo">325</span> try {<a name="line.325"></a>
-<span class="sourceLineNo">326</span> stub = conn.getRegionServerStub(sn);<a name="line.326"></a>
-<span class="sourceLineNo">327</span> } catch (IOException e) {<a name="line.327"></a>
-<span class="sourceLineNo">328</span> onError(serverReq.actionsByRegion, tries, e, sn);<a name="line.328"></a>
-<span class="sourceLineNo">329</span> return;<a name="line.329"></a>
-<span class="sourceLineNo">330</span> }<a name="line.330"></a>
-<span class="sourceLineNo">331</span> ClientProtos.MultiRequest req;<a name="line.331"></a>
-<span class="sourceLineNo">332</span> List<CellScannable> cells = new ArrayList<>();<a name="line.332"></a>
-<span class="sourceLineNo">333</span> // Map from a created RegionAction to the original index for a RowMutations within<a name="line.333"></a>
-<span class="sourceLineNo">334</span> // the original list of actions. This will be used to process the results when there<a name="line.334"></a>
-<span class="sourceLineNo">335</span> // is RowMutations in the action list.<a name="line.335"></a>
-<span class="sourceLineNo">336</span> Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();<a name="line.336"></a>
-<span class="sourceLineNo">337</span> try {<a name="line.337"></a>
-<span class="sourceLineNo">338</span> req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);<a name="line.338"></a>
-<span class="sourceLineNo">339</span> } catch (IOException e) {<a name="line.339"></a>
-<span class="sourceLineNo">340</span> onError(serverReq.actionsByRegion, tries, e, sn);<a name="line.340"></a>
-<span class="sourceLineNo">341</span> return;<a name="line.341"></a>
-<span class="sourceLineNo">342</span> }<a name="line.342"></a>
-<span class="sourceLineNo">343</span> HBaseRpcController controller = conn.rpcControllerFactory.newController();<a name="line.343"></a>
-<span class="sourceLineNo">344</span> resetController(controller, Math.min(rpcTimeoutNs, remainingNs));<a name="line.344"></a>
-<span class="sourceLineNo">345</span> if (!cells.isEmpty()) {<a name="line.345"></a>
-<span class="sourceLineNo">346</span> controller.setCellScanner(createCellScanner(cells));<a name="line.346"></a>
-<span class="sourceLineNo">347</span> }<a name="line.347"></a>
-<span class="sourceLineNo">348</span> stub.multi(controller, req, resp -> {<a name="line.348"></a>
-<span class="sourceLineNo">349</span> if (controller.failed()) {<a name="line.349"></a>
-<span class="sourceLineNo">350</span> onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn);<a name="line.350"></a>
-<span class="sourceLineNo">351</span> } else {<a name="line.351"></a>
-<span class="sourceLineNo">352</span> try {<a name="line.352"></a>
-<span class="sourceLineNo">353</span> onComplete(serverReq.actionsByRegion, tries, sn, ResponseConverter.getResults(req,<a name="line.353"></a>
-<span class="sourceLineNo">354</span> rowMutationsIndexMap, resp, controller.cellScanner()));<a name="line.354"></a>
-<span class="sourceLineNo">355</span> } catch (Exception e) {<a name="line.355"></a>
-<span class="sourceLineNo">356</span> onError(serverReq.actionsByRegion, tries, e, sn);<a name="line.356"></a>
-<span class="sourceLineNo">357</span> return;<a name="line.357"></a>
-<span class="sourceLineNo">358</span> }<a name="line.358"></a>
-<span class="sourceLineNo">359</span> }<a name="line.359"></a>
-<span class="sourceLineNo">360</span> });<a name="line.360"></a>
-<span class="sourceLineNo">361</span> });<a name="line.361"></a>
-<span class="sourceLineNo">362</span> }<a name="line.362"></a>
-<span class="sourceLineNo">363</span><a name="line.363"></a>
-<span class="sourceLineNo">364</span> private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Throwable t,<a name="line.364"></a>
-<span class="sourceLineNo">365</span> ServerName serverName) {<a name="line.365"></a>
-<span class="sourceLineNo">366</span> Throwable error = translateException(t);<a name="line.366"></a>
-<span class="sourceLineNo">367</span> logException(tries, () -> actionsByRegion.values().stream(), error, serverName);<a name="line.367"></a>
-<span class="sourceLineNo">368</span> actionsByRegion<a name="line.368"></a>
-<span class="sourceLineNo">369</span> .forEach((rn, regionReq) -> conn.getLocator().updateCachedLocation(regionReq.loc, error));<a name="line.369"></a>
-<span class="sourceLineNo">370</span> if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {<a name="line.370"></a>
-<span class="sourceLineNo">371</span> failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error,<a name="line.371"></a>
-<span class="sourceLineNo">372</span> serverName);<a name="line.372"></a>
-<span class="sourceLineNo">373</span> return;<a name="line.373"></a>
-<span class="sourceLineNo">374</span> }<a name="line.374"></a>
-<span class="sourceLineNo">375</span> List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())<a name="line.375"></a>
-<span class="sourceLineNo">376</span> .collect(Collectors.toList());<a name="line.376"></a>
-<span class="sourceLineNo">377</span> addError(copiedActions, error, serverName);<a name="line.377"></a>
-<span class="sourceLineNo">378</span> tryResubmit(copiedActions.stream(), tries);<a name="line.378"></a>
-<span class="sourceLineNo">379</span> }<a name="line.379"></a>
-<span class="sourceLineNo">380</span><a name="line.380"></a>
-<span class="sourceLineNo">381</span> private void tryResubmit(Stream<Action> actions, int tries) {<a name="line.381"></a>
-<span class="sourceLineNo">382</span> long delayNs;<a name="line.382"></a>
-<span class="sourceLineNo">383</span> if (operationTimeoutNs > 0) {<a name="line.383"></a>
-<span class="sourceLineNo">384</span> long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;<a name="line.384"></a>
-<span class="sourceLineNo">385</span> if (maxDelayNs <= 0) {<a name="line.385"></a>
-<span class="sourceLineNo">386</span> failAll(actions, tries);<a name="line.386"></a>
-<span class="sourceLineNo">387</span> return;<a name="line.387"></a>
-<span class="sourceLineNo">388</span> }<a name="line.388"></a>
-<span class="sourceLineNo">389</span> delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));<a name="line.389"></a>
-<span class="sourceLineNo">390</span> } else {<a name="line.390"></a>
-<span class="sourceLineNo">391</span> delayNs = getPauseTime(pauseNs, tries - 1);<a name="line.391"></a>
-<span class="sourceLineNo">392</span> }<a name="line.392"></a>
-<span class="sourceLineNo">393</span> retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);<a name="line.393"></a>
-<span class="sourceLineNo">394</span> }<a name="line.394"></a>
-<span class="sourceLineNo">395</span><a name="line.395"></a>
-<span class="sourceLineNo">396</span> private void groupAndSend(Stream<Action> actions, int tries) {<a name="line.396"></a>
-<span class="sourceLineNo">397</span> long locateTimeoutNs;<a name="line.397"></a>
-<span class="sourceLineNo">398</span> if (operationTimeoutNs > 0) {<a name="line.398"></a>
-<span class="sourceLineNo">399</span> locateTimeoutNs = remainingTimeNs();<a name="line.399"></a>
-<span class="sourceLineNo">400</span> if (locateTimeoutNs <= 0) {<a name="line.400"></a>
-<span class="sourceLineNo">401</span> failAll(actions, tries);<a name="line.401"></a>
-<span class="sourceLineNo">402</span> return;<a name="line.402"></a>
-<span class="sourceLineNo">403</span> }<a name="line.403"></a>
-<span class="sourceLineNo">404</span> } else {<a name="line.404"></a>
-<span class="sourceLineNo">405</span> locateTimeoutNs = -1L;<a name="line.405"></a>
-<span class="sourceLineNo">406</span> }<a name="line.406"></a>
-<span class="sourceLineNo">407</span> ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();<a name="line.407"></a>
-<span class="sourceLineNo">408</span> ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();<a name="line.408"></a>
-<span class="sourceLineNo">409</span> CompletableFuture.allOf(actions<a name="line.409"></a>
-<span class="sourceLineNo">410</span> .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),<a name="line.410"></a>
-<span class="sourceLineNo">411</span> RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {<a name="line.411"></a>
-<span class="sourceLineNo">412</span> if (error != null) {<a name="line.412"></a>
-<span class="sourceLineNo">413</span> error = translateException(error);<a name="line.413"></a>
-<span class="sourceLineNo">414</span> if (error instanceof DoNotRetryIOException) {<a name="line.414"></a>
-<span class="sourceLineNo">415</span> failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");<a name="line.415"></a>
-<span class="sourceLineNo">416</span> return;<a name="line.416"></a>
-<span class="sourceLineNo">417</span> }<a name="line.417"></a>
-<span class="sourceLineNo">418</span> addError(action, error, null);<a name="line.418"></a>
-<span class="sourceLineNo">419</span> locateFailed.add(action);<a name="line.419"></a>
-<span class="sourceLineNo">420</span> } else {<a name="line.420"></a>
-<span class="sourceLineNo">421</span> computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new)<a name="line.421"></a>
-<span class="sourceLineNo">422</span> .addAction(loc, action);<a name="line.422"></a>
-<span class="sourceLineNo">423</span> }<a name="line.423"></a>
-<span class="sourceLineNo">424</span> }))<a name="line.424"></a>
-<span class="sourceLineNo">425</span> .toArray(CompletableFuture[]::new)).whenComplete((v, r) -> {<a name="line.425"></a>
-<span class="sourceLineNo">426</span> if (!actionsByServer.isEmpty()) {<a name="line.426"></a>
-<span class="sourceLineNo">427</span> send(actionsByServer, tries);<a name="line.427"></a>
-<span class="sourceLineNo">428</span> }<a name="line.428"></a>
-<span class="sourceLineNo">429</span> if (!locateFailed.isEmpty()) {<a name="line.429"></a>
-<span class="sourceLineNo">430</span> tryResubmit(locateFailed.stream(), tries);<a name="line.430"></a>
-<span class="sourceLineNo">431</span> }<a name="line.431"></a>
-<span class="sourceLineNo">432</span> });<a name="line.432"></a>
-<span class="sourceLineNo">433</span> }<a name="line.433"></a>
-<span class="sourceLineNo">434</span><a name="line.434"></a>
-<span class="sourceLineNo">435</span> public List<CompletableFuture<T>> call() {<a name="line.435"></a>
-<span class="sourceLineNo">436</span> groupAndSend(actions.stream(), 1);<a name="line.436"></a>
-<span class="sourceLineNo">437</span> return futures;<a name="line.437"></a>
-<span class="sourceLineNo">438</span> }<a name="line.438"></a>
-<span class="sourceLineNo">439</span>}<a name="line.439"></a>
+<span class="sourceLineNo">282</span> Throwable regionException = resp.getException(rn);<a name="line.282"></a>
+<span class="sourceLineNo">283</span> if (regionResult != null) {<a name="line.283"></a>
+<span class="sourceLineNo">284</span> regionReq.actions.forEach(<a name="line.284"></a>
+<span class="sourceLineNo">285</span> action -> onComplete(action, regionReq, tries, serverName, regionResult, failedActions,<a name="line.285"></a>
+<span class="sourceLineNo">286</span> regionException));<a name="line.286"></a>
+<span class="sourceLineNo">287</span> } else {<a name="line.287"></a>
+<span class="sourceLineNo">288</span> Throwable error;<a name="line.288"></a>
+<span class="sourceLineNo">289</span> if (regionException == null) {<a name="line.289"></a>
+<span class="sourceLineNo">290</span> LOG.error(<a name="line.290"></a>
+<span class="sourceLineNo">291</span> "Server sent us neither results nor exceptions for " + Bytes.toStringBinary(rn));<a name="line.291"></a>
+<span class="sourceLineNo">292</span> error = new RuntimeException("Invalid response");<a name="line.292"></a>
+<span class="sourceLineNo">293</span> } else {<a name="line.293"></a>
+<span class="sourceLineNo">294</span> error = translateException(regionException);<a name="line.294"></a>
+<span class="sourceLineNo">295</span> }<a name="line.295"></a>
+<span class="sourceLineNo">296</span> logException(tries, () -> Stream.of(regionReq), error, serverName);<a name="line.296"></a>
+<span class="sourceLineNo">297</span> conn.getLocator().updateCachedLocation(regionReq.loc, error);<a name="line.297"></a>
+<span class="sourceLineNo">298</span> if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {<a name="line.298"></a>
+<span class="sourceLineNo">299</span> failAll(regionReq.actions.stream(), tries, error, serverName);<a name="line.299"></a>
+<span class="sourceLineNo">300</span> return;<a name="line.300"></a>
+<span class="sourceLineNo">301</span> }<a name="line.301"></a>
+<span class="sourceLineNo">302</span> addError(regionReq.actions, error, serverName);<a name="line.302"></a>
+<span class="sourceLineNo">303</span> failedActions.addAll(regionReq.actions);<a name="line.303"></a>
+<span class="sourceLineNo">304</span> }<a name="line.304"></a>
+<span class="sourceLineNo">305</span> });<a name="line.305"></a>
+<span class="sourceLineNo">306</span> if (!failedActions.isEmpty()) {<a name="line.306"></a>
+<span class="sourceLineNo">307</span> tryResubmit(failedActions.stream(), tries);<a name="line.307"></a>
+<span class="sourceLineNo">308</span> }<a name="line.308"></a>
+<span class="sourceLineNo">309</span> }<a name="line.309"></a>
+<span class="sourceLineNo">310</span><a name="line.310"></a>
+<span class="sourceLineNo">311</span> private void send(Map<ServerName, ServerRequest> actionsByServer, int tries) {<a name="line.311"></a>
+<span class="sourceLineNo">312</span> long remainingNs;<a name="line.312"></a>
+<span class="sourceLineNo">313</span> if (operationTimeoutNs > 0) {<a name="line.313"></a>
+<span class="sourceLineNo">314</span> remainingNs = remainingTimeNs();<a name="line.314"></a>
+<span class="sourceLineNo">315</span> if (remainingNs <= 0) {<a name="line.315"></a>
+<span class="sourceLineNo">316</span> failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream())<a name="line.316"></a>
+<span class="sourceLineNo">317</span> .flatMap(r -> r.actions.stream()),<a name="line.317"></a>
+<span class="sourceLineNo">318</span> tries);<a name="line.318"></a>
+<span class="sourceLineNo">319</span> return;<a name="line.319"></a>
+<span class="sourceLineNo">320</span> }<a name="line.320"></a>
+<span class="sourceLineNo">321</span> } else {<a name="line.321"></a>
+<span class="sourceLineNo">322</span> remainingNs = Long.MAX_VALUE;<a name="line.322"></a>
+<span class="sourceLineNo">323</span> }<a name="line.323"></a>
+<span class="sourceLineNo">324</span> actionsByServer.forEach((sn, serverReq) -> {<a name="line.324"></a>
+<span class="sourceLineNo">325</span> ClientService.Interface stub;<a name="line.325"></a>
+<span class="sourceLineNo">326</span> try {<a name="line.326"></a>
+<span class="sourceLineNo">327</span> stub = conn.getRegionServerStub(sn);<a name="line.327"></a>
+<span class="sourceLineNo">328</span> } catch (IOException e) {<a name="line.328"></a>
+<span class="sourceLineNo">329</span> onError(serverReq.actionsByRegion, tries, e, sn);<a name="line.329"></a>
+<span class="sourceLineNo">330</span> return;<a name="line.330"></a>
+<span class="sourceLineNo">331</span> }<a name="line.331"></a>
+<span class="sourceLineNo">332</span> ClientProtos.MultiRequest req;<a name="line.332"></a>
+<span class="sourceLineNo">333</span> List<CellScannable> cells = new ArrayList<>();<a name="line.333"></a>
+<span class="sourceLineNo">334</span> // Map from a created RegionAction to the original index for a RowMutations within<a name="line.334"></a>
+<span class="sourceLineNo">335</span> // the original list of actions. This will be used to process the results when there<a name="line.335"></a>
+<span class="sourceLineNo">336</span> // is RowMutations in the action list.<a name="line.336"></a>
+<span class="sourceLineNo">337</span> Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();<a name="line.337"></a>
+<span class="sourceLineNo">338</span> try {<a name="line.338"></a>
+<span class="sourceLineNo">339</span> req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);<a name="line.339"></a>
+<span class="sourceLineNo">340</span> } catch (IOException e) {<a name="line.340"></a>
+<span class="sourceLineNo">341</span> onError(serverReq.actionsByRegion, tries, e, sn);<a name="line.341"></a>
+<span class="sourceLineNo">342</span> return;<a name="line.342"></a>
+<span class="sourceLineNo">343</span> }<a name="line.343"></a>
+<span class="sourceLineNo">344</span> HBaseRpcController controller = conn.rpcControllerFactory.newController();<a name="line.344"></a>
+<span class="sourceLineNo">345</span> resetController(controller, Math.min(rpcTimeoutNs, remainingNs));<a name="line.345"></a>
+<span class="sourceLineNo">346</span> if (!cells.isEmpty()) {<a name="line.346"></a>
+<span class="sourceLineNo">347</span> controller.setCellScanner(createCellScanner(cells));<a name="line.347"></a>
+<span class="sourceLineNo">348</span> }<a name="line.348"></a>
+<span class="sourceLineNo">349</span> stub.multi(controller, req, resp -> {<a name="line.349"></a>
+<span class="sourceLineNo">350</span> if (controller.failed()) {<a name="line.350"></a>
+<span class="sourceLineNo">351</span> onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn);<a name="line.351"></a>
+<span class="sourceLineNo">352</span> } else {<a name="line.352"></a>
+<span class="sourceLineNo">353</span> try {<a name="line.353"></a>
+<span class="sourceLineNo">354</span> onComplete(serverReq.actionsByRegion, tries, sn, ResponseConverter.getResults(req,<a name="line.354"></a>
+<span class="sourceLineNo">355</span> rowMutationsIndexMap, resp, controller.cellScanner()));<a name="line.355"></a>
+<span class="sourceLineNo">356</span> } catch (Exception e) {<a name="line.356"></a>
+<span class="sourceLineNo">357</span> onError(serverReq.actionsByRegion, tries, e, sn);<a name="line.357"></a>
+<span class="sourceLineNo">358</span> return;<a name="line.358"></a>
+<span class="sourceLineNo">359</span> }<a name="line.359"></a>
+<span class="sourceLineNo">360</span> }<a name="line.360"></a>
+<span class="sourceLineNo">361</span> });<a name="line.361"></a>
+<span class="sourceLineNo">362</span> });<a name="line.362"></a>
+<span class="sourceLineNo">363</span> }<a name="line.363"></a>
+<span class="sourceLineNo">364</span><a name="line.364"></a>
+<span class="sourceLineNo">365</span> private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Throwable t,<a name="line.365"></a>
+<span class="sourceLineNo">366</span> ServerName serverName) {<a name="line.366"></a>
+<span class="sourceLineNo">367</span> Throwable error = translateException(t);<a name="line.367"></a>
+<span class="sourceLineNo">368</span> logException(tries, () -> actionsByRegion.values().stream(), error, serverName);<a name="line.368"></a>
+<span class="sourceLineNo">369</span> actionsByRegion<a name="line.369"></a>
+<span class="sourceLineNo">370</span> .forEach((rn, regionReq) -> conn.getLocator().updateCachedLocation(regionReq.loc, error));<a name="line.370"></a>
+<span class="sourceLineNo">371</span> if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {<a name="line.371"></a>
+<span class="sourceLineNo">372</span> failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error,<a name="line.372"></a>
+<span class="sourceLineNo">373</span> serverName);<a name="line.373"></a>
+<span class="sourceLineNo">374</span> return;<a name="line.374"></a>
+<span class="sourceLineNo">375</span> }<a name="line.375"></a>
+<span class="sourceLineNo">376</span> List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())<a name="line.376"></a>
+<span class="sourceLineNo">377</span> .collect(Collectors.toList());<a name="line.377"></a>
+<span class="sourceLineNo">378</span> addError(copiedActions, error, serverName);<a name="line.378"></a>
+<span class="sourceLineNo">379</span> tryResubmit(copiedActions.stream(), tries);<a name="line.379"></a>
+<span class="sourceLineNo">380</span> }<a name="line.380"></a>
+<span class="sourceLineNo">381</span><a name="line.381"></a>
+<span class="sourceLineNo">382</span> private void tryResubmit(Stream<Action> actions, int tries) {<a name="line.382"></a>
+<span class="sourceLineNo">383</span> long delayNs;<a name="line.383"></a>
+<span class="sourceLineNo">384</span> if (operationTimeoutNs > 0) {<a name="line.384"></a>
+<span class="sourceLineNo">385</span> long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;<a name="line.385"></a>
+<span class="sourceLineNo">386</span> if (maxDelayNs <= 0) {<a name="line.386"></a>
+<span class="sourceLineNo">387</span> failAll(actions, tries);<a name="line.387"></a>
+<span class="sourceLineNo">388</span> return;<a name="line.388"></a>
+<span class="sourceLineNo">389</span> }<a name="line.389"></a>
+<span class="sourceLineNo">390</span> delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));<a name="line.390"></a>
+<span class="sourceLineNo">391</span> } else {<a name="line.391"></a>
+<span class="sourceLineNo">392</span> delayNs = getPauseTime(pauseNs, tries - 1);<a name="line.392"></a>
+<span class="sourceLineNo">393</span> }<a name="line.393"></a>
+<span class="sourceLineNo">394</span> retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);<a name="line.394"></a>
+<span class="sourceLineNo">395</span> }<a name="line.395"></a>
+<span class="sourceLineNo">396</span><a name="line.396"></a>
+<span class="sourceLineNo">397</span> private void groupAndSend(Stream<Action> actions, int tries) {<a name="line.397"></a>
+<span class="sourceLineNo">398</span> long locateTimeoutNs;<a name="line.398"></a>
+<span class="sourceLineNo">399</span> if (operationTimeoutNs > 0) {<a name="line.399"></a>
+<span class="sourceLineNo">400</span> locateTimeoutNs = remainingTimeNs();<a name="line.400"></a>
+<span class="sourceLineNo">401</span> if (locateTimeoutNs <= 0) {<a name="line.401"></a>
+<span class="sourceLineNo">402</span> failAll(actions, tries);<a name="line.402"></a>
+<span class="sourceLineNo">403</span> return;<a name="line.403"></a>
+<span class="sourceLineNo">404</span> }<a name="line.404"></a>
+<span class="sourceLineNo">405</span> } else {<a name="line.405"></a>
+<span class="sourceLineNo">406</span> locateTimeoutNs = -1L;<a name="line.406"></a>
+<span class="sourceLineNo">407</span> }<a name="line.407"></a>
+<span class="sourceLineNo">408</span> ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();<a name="line.408"></a>
+<span class="sourceLineNo">409</span> ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();<a name="line.409"></a>
+<span class="sourceLineNo">410</span> CompletableFuture.allOf(actions<a name="line.410"></a>
+<span class="sourceLineNo">411</span> .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),<a name="line.411"></a>
+<span class="sourceLineNo">412</span> RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {<a name="line.412"></a>
+<span class="sourceLineNo">413</span> if (error != null) {<a name="line.413"></a>
+<span class="sourceLineNo">414</span> error = translateException(error);<a name="line.414"></a>
+<span class="sourceLineNo">415</span> if (error instanceof DoNotRetryIOException) {<a name="line.415"></a>
+<span class="sourceLineNo">416</span> failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");<a name="line.416"></a>
+<span class="sourceLineNo">417</span> return;<a name="line.417"></a>
+<span class="sourceLineNo">418</span> }<a name="line.418"></a>
+<span class="sourceLineNo">419</span> addError(action, error, null);<a name="line.419"></a>
+<span class="sourceLineNo">420</span> locateFailed.add(action);<a name="line.420"></a>
+<span class="sourceLineNo">421</span> } else {<a name="line.421"></a>
+<span class="sourceLineNo">422</span> computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new)<a name="line.422"></a>
+<span class="sourceLineNo">423</span> .addAction(loc, action);<a name="line.423"></a>
+<span class="sourceLineNo">424</span> }<a name="line.424"></a>
+<span class="sourceLineNo">425</span> }))<a name="line.425"></a>
+<span class="sourceLineNo">426</span> .toArray(CompletableFuture[]::new)).whenComplete((v, r) -> {<a name="line.426"></a>
+<span class="sourceLineNo">427</span> if (!actionsByServer.isEmpty()) {<a name="line.427"></a>
+<span class="sourceLineNo">428</span> send(actionsByServer, tries);<a name="line.428"></a>
+<span class="sourceLineNo">429</span> }<a name="line.429"></a>
+<span class="sourceLineNo">430</span> if (!locateFailed.isEmpty()) {<a name="line.430"></a>
+<span class="sourceLineNo">431</span> tryResubmit(locateFailed.stream(), tries);<a name="line.431"></a>
+<span class="sourceLineNo">432</span> }<a name="line.432"></a>
+<span class="sourceLineNo">433</span> });<a name="line.433"></a>
+<span class="sourceLineNo">434</span> }<a name="line.434"></a>
+<span class="sourceLineNo">435</span><a name="line.435"></a>
+<span class="sourceLineNo">436</span> public List<CompletableFuture<T>> call() {<a name="line.436"></a>
+<span class="sourceLineNo">437</span> groupAndSend(actions.stream(), 1);<a name="line.437"></a>
+<span class="sourceLineNo">438</span> return futures;<a name="line.438"></a>
+<span class="sourceLineNo">439</span> }<a name="line.439"></a>
+<span class="sourceLineNo">440</span>}<a name="line.440"></a>