You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2023/05/04 20:42:26 UTC
[accumulo] branch elasticity updated: Use empty location in getSplits call for eventual consistency scans (#3369)
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 02a9b9be5b Use empty location in getSplits call for eventual consistency scans (#3369)
02a9b9be5b is described below
commit 02a9b9be5bd859a4c9378e40dbd7831f081d9e86
Author: Dave Marion <dl...@apache.org>
AuthorDate: Thu May 4 16:42:19 2023 -0400
Use empty location in getSplits call for eventual consistency scans (#3369)
Closes #3341
---
.../hadoopImpl/mapred/AccumuloRecordReader.java | 56 ++++++++++++++++++---
.../hadoopImpl/mapreduce/AccumuloRecordReader.java | 57 +++++++++++++++++++---
2 files changed, 97 insertions(+), 16 deletions(-)
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
index 8a01a1bf7a..1ea193ffe7 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
@@ -19,6 +19,9 @@
package org.apache.accumulo.hadoopImpl.mapred;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.IOException;
import java.net.InetAddress;
@@ -31,6 +34,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
@@ -48,6 +52,8 @@ import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ClientTabletCache;
+import org.apache.accumulo.core.clientImpl.ClientTabletCache.CachedTablet;
+import org.apache.accumulo.core.clientImpl.ClientTabletCache.LocationNeed;
import org.apache.accumulo.core.clientImpl.OfflineScanner;
import org.apache.accumulo.core.clientImpl.ScannerImpl;
import org.apache.accumulo.core.data.Key;
@@ -57,6 +63,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.hadoopImpl.mapreduce.InputTableConfig;
import org.apache.accumulo.hadoopImpl.mapreduce.SplitUtils;
import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
@@ -348,14 +355,47 @@ public abstract class AccumuloRecordReader<K,V> implements RecordReader<K,V> {
// tablets... so clear it
tl.invalidateCache();
- while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) {
- context.requireNotDeleted(tableId);
- context.requireNotOffline(tableId, tableName);
- binnedRanges.clear();
- log.warn("Unable to locate bins for specified ranges. Retrying.");
- // sleep randomly between 100 and 200 ms
- sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
- tl.invalidateCache();
+ if (InputConfigurator.getConsistencyLevel(callingClass, job)
+ == ConsistencyLevel.IMMEDIATE) {
+ while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) {
+ context.requireNotDeleted(tableId);
+ context.requireNotOffline(tableId, tableName);
+ binnedRanges.clear();
+ log.warn("Unable to locate bins for specified ranges. Retrying.");
+ // sleep randomly between 100 and 200 ms
+ sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
+ tl.invalidateCache();
+ }
+ } else {
+ Map<String,Map<KeyExtent,List<Range>>> unhostedRanges = new HashMap<>();
+ unhostedRanges.put("", new HashMap<>());
+ BiConsumer<CachedTablet,Range> consumer = (ct, r) -> {
+ unhostedRanges.get("").computeIfAbsent(ct.getExtent(), k -> new ArrayList<>())
+ .add(r);
+ };
+ List<Range> failures =
+ tl.findTablets(context, ranges, consumer, LocationNeed.NOT_REQUIRED);
+
+ Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
+ .incrementBy(100, MILLISECONDS).maxWait(2, SECONDS).backOffFactor(1.5)
+ .logInterval(3, MINUTES).createRetry();
+
+ while (!failures.isEmpty()) {
+
+ context.requireNotDeleted(tableId);
+
+ try {
+ retry.waitForNextAttempt(log,
+ String.format("locating tablets in table %s(%s) for %d ranges", tableName,
+ tableId, ranges.size()));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ unhostedRanges.get("").clear();
+ tl.invalidateCache();
+ failures = tl.findTablets(context, ranges, consumer, LocationNeed.NOT_REQUIRED);
+ }
+ binnedRanges = unhostedRanges;
}
}
} catch (InvalidTabletHostingRequestException | TableOfflineException
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
index eff3906c5e..b4a90aa151 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
@@ -19,6 +19,9 @@
package org.apache.accumulo.hadoopImpl.mapreduce;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.IOException;
import java.net.InetAddress;
@@ -31,6 +34,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
@@ -48,6 +52,8 @@ import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ClientTabletCache;
+import org.apache.accumulo.core.clientImpl.ClientTabletCache.CachedTablet;
+import org.apache.accumulo.core.clientImpl.ClientTabletCache.LocationNeed;
import org.apache.accumulo.core.clientImpl.OfflineScanner;
import org.apache.accumulo.core.clientImpl.ScannerImpl;
import org.apache.accumulo.core.data.Key;
@@ -57,6 +63,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
@@ -381,14 +388,48 @@ public abstract class AccumuloRecordReader<K,V> extends RecordReader<K,V> {
// tables tablets... so clear it
tl.invalidateCache();
- while (!tl.binRanges(clientContext, ranges, binnedRanges).isEmpty()) {
- clientContext.requireNotDeleted(tableId);
- clientContext.requireNotOffline(tableId, tableName);
- binnedRanges.clear();
- log.warn("Unable to locate bins for specified ranges. Retrying.");
- // sleep randomly between 100 and 200 ms
- sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
- tl.invalidateCache();
+ if (InputConfigurator.getConsistencyLevel(callingClass, context.getConfiguration())
+ == ConsistencyLevel.IMMEDIATE) {
+ while (!tl.binRanges(clientContext, ranges, binnedRanges).isEmpty()) {
+ clientContext.requireNotDeleted(tableId);
+ clientContext.requireNotOffline(tableId, tableName);
+ binnedRanges.clear();
+ log.warn("Unable to locate bins for specified ranges. Retrying.");
+ // sleep randomly between 100 and 200 ms
+ sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
+ tl.invalidateCache();
+ }
+ } else {
+ Map<String,Map<KeyExtent,List<Range>>> unhostedRanges = new HashMap<>();
+ unhostedRanges.put("", new HashMap<>());
+ BiConsumer<CachedTablet,Range> consumer = (ct, r) -> {
+ unhostedRanges.get("").computeIfAbsent(ct.getExtent(), k -> new ArrayList<>())
+ .add(r);
+ };
+ List<Range> failures =
+ tl.findTablets(clientContext, ranges, consumer, LocationNeed.NOT_REQUIRED);
+
+ Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
+ .incrementBy(100, MILLISECONDS).maxWait(2, SECONDS).backOffFactor(1.5)
+ .logInterval(3, MINUTES).createRetry();
+
+ while (!failures.isEmpty()) {
+
+ clientContext.requireNotDeleted(tableId);
+
+ try {
+ retry.waitForNextAttempt(log,
+ String.format("locating tablets in table %s(%s) for %d ranges", tableName,
+ tableId, ranges.size()));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ unhostedRanges.get("").clear();
+ tl.invalidateCache();
+ failures =
+ tl.findTablets(clientContext, ranges, consumer, LocationNeed.NOT_REQUIRED);
+ }
+ binnedRanges = unhostedRanges;
}
}
} catch (InvalidTabletHostingRequestException | TableOfflineException