You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2023/05/04 20:42:26 UTC

[accumulo] branch elasticity updated: Use empty location in getSplits call for eventual consistency scans (#3369)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 02a9b9be5b Use empty location in getSplits call for eventual consistency scans (#3369)
02a9b9be5b is described below

commit 02a9b9be5bd859a4c9378e40dbd7831f081d9e86
Author: Dave Marion <dl...@apache.org>
AuthorDate: Thu May 4 16:42:19 2023 -0400

    Use empty location in getSplits call for eventual consistency scans (#3369)
    
    
    Closes #3341
---
 .../hadoopImpl/mapred/AccumuloRecordReader.java    | 56 ++++++++++++++++++---
 .../hadoopImpl/mapreduce/AccumuloRecordReader.java | 57 +++++++++++++++++++---
 2 files changed, 97 insertions(+), 16 deletions(-)

diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
index 8a01a1bf7a..1ea193ffe7 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java
@@ -19,6 +19,9 @@
 package org.apache.accumulo.hadoopImpl.mapred;
 
 import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -31,6 +34,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -48,6 +52,8 @@ import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.clientImpl.ClientTabletCache;
+import org.apache.accumulo.core.clientImpl.ClientTabletCache.CachedTablet;
+import org.apache.accumulo.core.clientImpl.ClientTabletCache.LocationNeed;
 import org.apache.accumulo.core.clientImpl.OfflineScanner;
 import org.apache.accumulo.core.clientImpl.ScannerImpl;
 import org.apache.accumulo.core.data.Key;
@@ -57,6 +63,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.Retry;
 import org.apache.accumulo.hadoopImpl.mapreduce.InputTableConfig;
 import org.apache.accumulo.hadoopImpl.mapreduce.SplitUtils;
 import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
@@ -348,14 +355,47 @@ public abstract class AccumuloRecordReader<K,V> implements RecordReader<K,V> {
             // tablets... so clear it
             tl.invalidateCache();
 
-            while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) {
-              context.requireNotDeleted(tableId);
-              context.requireNotOffline(tableId, tableName);
-              binnedRanges.clear();
-              log.warn("Unable to locate bins for specified ranges. Retrying.");
-              // sleep randomly between 100 and 200 ms
-              sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
-              tl.invalidateCache();
+            if (InputConfigurator.getConsistencyLevel(callingClass, job)
+                == ConsistencyLevel.IMMEDIATE) {
+              while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) {
+                context.requireNotDeleted(tableId);
+                context.requireNotOffline(tableId, tableName);
+                binnedRanges.clear();
+                log.warn("Unable to locate bins for specified ranges. Retrying.");
+                // sleep randomly between 100 and 200 ms
+                sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
+                tl.invalidateCache();
+              }
+            } else {
+              Map<String,Map<KeyExtent,List<Range>>> unhostedRanges = new HashMap<>();
+              unhostedRanges.put("", new HashMap<>());
+              BiConsumer<CachedTablet,Range> consumer = (ct, r) -> {
+                unhostedRanges.get("").computeIfAbsent(ct.getExtent(), k -> new ArrayList<>())
+                    .add(r);
+              };
+              List<Range> failures =
+                  tl.findTablets(context, ranges, consumer, LocationNeed.NOT_REQUIRED);
+
+              Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
+                  .incrementBy(100, MILLISECONDS).maxWait(2, SECONDS).backOffFactor(1.5)
+                  .logInterval(3, MINUTES).createRetry();
+
+              while (!failures.isEmpty()) {
+
+                context.requireNotDeleted(tableId);
+
+                try {
+                  retry.waitForNextAttempt(log,
+                      String.format("locating tablets in table %s(%s) for %d ranges", tableName,
+                          tableId, ranges.size()));
+                } catch (InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
+                unhostedRanges.get("").clear();
+                tl.invalidateCache();
+                failures = tl.findTablets(context, ranges, consumer, LocationNeed.NOT_REQUIRED);
+              }
+              binnedRanges = unhostedRanges;
             }
           }
         } catch (InvalidTabletHostingRequestException | TableOfflineException
diff --git a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
index eff3906c5e..b4a90aa151 100644
--- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
+++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java
@@ -19,6 +19,9 @@
 package org.apache.accumulo.hadoopImpl.mapreduce;
 
 import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -31,6 +34,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
 
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -48,6 +52,8 @@ import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.clientImpl.ClientTabletCache;
+import org.apache.accumulo.core.clientImpl.ClientTabletCache.CachedTablet;
+import org.apache.accumulo.core.clientImpl.ClientTabletCache.LocationNeed;
 import org.apache.accumulo.core.clientImpl.OfflineScanner;
 import org.apache.accumulo.core.clientImpl.ScannerImpl;
 import org.apache.accumulo.core.data.Key;
@@ -57,6 +63,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.Retry;
 import org.apache.accumulo.hadoopImpl.mapreduce.lib.InputConfigurator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -381,14 +388,48 @@ public abstract class AccumuloRecordReader<K,V> extends RecordReader<K,V> {
             // tables tablets... so clear it
             tl.invalidateCache();
 
-            while (!tl.binRanges(clientContext, ranges, binnedRanges).isEmpty()) {
-              clientContext.requireNotDeleted(tableId);
-              clientContext.requireNotOffline(tableId, tableName);
-              binnedRanges.clear();
-              log.warn("Unable to locate bins for specified ranges. Retrying.");
-              // sleep randomly between 100 and 200 ms
-              sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
-              tl.invalidateCache();
+            if (InputConfigurator.getConsistencyLevel(callingClass, context.getConfiguration())
+                == ConsistencyLevel.IMMEDIATE) {
+              while (!tl.binRanges(clientContext, ranges, binnedRanges).isEmpty()) {
+                clientContext.requireNotDeleted(tableId);
+                clientContext.requireNotOffline(tableId, tableName);
+                binnedRanges.clear();
+                log.warn("Unable to locate bins for specified ranges. Retrying.");
+                // sleep randomly between 100 and 200 ms
+                sleepUninterruptibly(100 + random.nextInt(100), TimeUnit.MILLISECONDS);
+                tl.invalidateCache();
+              }
+            } else {
+              Map<String,Map<KeyExtent,List<Range>>> unhostedRanges = new HashMap<>();
+              unhostedRanges.put("", new HashMap<>());
+              BiConsumer<CachedTablet,Range> consumer = (ct, r) -> {
+                unhostedRanges.get("").computeIfAbsent(ct.getExtent(), k -> new ArrayList<>())
+                    .add(r);
+              };
+              List<Range> failures =
+                  tl.findTablets(clientContext, ranges, consumer, LocationNeed.NOT_REQUIRED);
+
+              Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
+                  .incrementBy(100, MILLISECONDS).maxWait(2, SECONDS).backOffFactor(1.5)
+                  .logInterval(3, MINUTES).createRetry();
+
+              while (!failures.isEmpty()) {
+
+                clientContext.requireNotDeleted(tableId);
+
+                try {
+                  retry.waitForNextAttempt(log,
+                      String.format("locating tablets in table %s(%s) for %d ranges", tableName,
+                          tableId, ranges.size()));
+                } catch (InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
+                unhostedRanges.get("").clear();
+                tl.invalidateCache();
+                failures =
+                    tl.findTablets(clientContext, ranges, consumer, LocationNeed.NOT_REQUIRED);
+              }
+              binnedRanges = unhostedRanges;
             }
           }
         } catch (InvalidTabletHostingRequestException | TableOfflineException