You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2022/12/07 11:44:56 UTC

[GitHub] [accumulo] keith-turner commented on a diff in pull request #3091: Added code and tests to confirm ScanServers can scan offline tables

keith-turner commented on code in PR #3091:
URL: https://github.com/apache/accumulo/pull/3091#discussion_r1042082740


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTableExtentCache.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
+
+public class OfflineTableExtentCache {
+
+  private static final Logger LOG = LoggerFactory.getLogger(OfflineTableExtentCache.class);
+
+  private final Cache<TableId,TreeMap<Range,List<KeyExtent>>> cache;
+  private final ClientContext ctx;
+
+  OfflineTableExtentCache(ClientContext ctx) {
+    this.ctx = ctx;
+    cache = Caffeine.newBuilder().scheduler(Scheduler.systemScheduler())
+        .expireAfterAccess(Duration.ofMinutes(10)).build();
+  }
+
+  private List<KeyExtent> loadExtentsForRange(TableId tid, Range r) {
+    LOG.trace("Loading extents for tid: {} and range: {}", tid, r);
+    TabletsMetadata tm =
+        ctx.getAmple().readTablets().forTable(tid)
+            .overlapping(r.getStartKey() == null ? null : r.getStartKey().getRow(),
+                r.isStartKeyInclusive(), r.getEndKey() == null ? null : r.getEndKey().getRow())
+            .build();
+    List<KeyExtent> result = new ArrayList<>();
+    for (TabletMetadata t : tm) {
+      LOG.trace("Evaluating: {}", t.getExtent());
+      if (t.getExtent().endRow() == null || r.getEndKey() == null || t.getExtent().endRow() != null
+          && r.getEndKey().compareTo(new Key(t.getExtent().endRow()), PartialKey.ROW) >= 0) {
+        LOG.trace("Adding: {}", t.getExtent());
+        result.add(t.getExtent());
+      }
+    }
+    LOG.trace("Loaded extents: {}", result);
+    return result;
+  }
+
+  public Map<Range,List<KeyExtent>> lookup(TableId tid, List<Range> ranges) {

Review Comment:
   Not completely sure, but I think this method could result in returning overlapping key extents for a range and/or missing extents.
   
   For overlapping tablets, could the following happen?
    * Range R1 is added to cache with extent E1.
    * Range R2 is added to the cache with extents E1b. Range R2 does not overlap with R1.  E1b is the result of E1 splitting.
    * A lookup is done for range R3 that overlaps R1 and R2.  Will it return extents E1 and E1b for R3?  
   
   For missing a tablet could the following happen?
    * Assume a table has four contiguous tablets : E2,E3,E4,E5
    * A lookup for range R3 that only overlaps E2 adds R3->[E2] to the cache.
    * A lookup for range R4 that only overlaps E4 adds R4->[E4] to the cache.
    * A lookup for range R5 that only overlaps E5 adds R5->[E5] to the tablet.
    * A lookup for range R6 arrives.  R6 overlaps R3,R4,and R5 and it also overlaps E2,E3,E4,and E5.  Would the cache lookup only return E2,E4,E5 for R6?



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTableExtentCache.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
+
+public class OfflineTableExtentCache {
+
+  private static final Logger LOG = LoggerFactory.getLogger(OfflineTableExtentCache.class);
+
+  private final Cache<TableId,TreeMap<Range,List<KeyExtent>>> cache;
+  private final ClientContext ctx;
+
+  OfflineTableExtentCache(ClientContext ctx) {
+    this.ctx = ctx;
+    cache = Caffeine.newBuilder().scheduler(Scheduler.systemScheduler())
+        .expireAfterAccess(Duration.ofMinutes(10)).build();
+  }
+
+  private List<KeyExtent> loadExtentsForRange(TableId tid, Range r) {
+    LOG.trace("Loading extents for tid: {} and range: {}", tid, r);
+    TabletsMetadata tm =
+        ctx.getAmple().readTablets().forTable(tid)
+            .overlapping(r.getStartKey() == null ? null : r.getStartKey().getRow(),
+                r.isStartKeyInclusive(), r.getEndKey() == null ? null : r.getEndKey().getRow())
+            .build();
+    List<KeyExtent> result = new ArrayList<>();
+    for (TabletMetadata t : tm) {
+      LOG.trace("Evaluating: {}", t.getExtent());
+      if (t.getExtent().endRow() == null || r.getEndKey() == null || t.getExtent().endRow() != null
+          && r.getEndKey().compareTo(new Key(t.getExtent().endRow()), PartialKey.ROW) >= 0) {
+        LOG.trace("Adding: {}", t.getExtent());
+        result.add(t.getExtent());
+      }
+    }
+    LOG.trace("Loaded extents: {}", result);
+    return result;
+  }
+
+  public Map<Range,List<KeyExtent>> lookup(TableId tid, List<Range> ranges) {
+    LOG.trace("Performing lookup for table: {}, range: {}", tid, ranges);
+    Map<Range,List<KeyExtent>> results = new TreeMap<>();
+    TreeMap<Range,List<KeyExtent>> extentMap = cache.get(tid, (t) -> {
+      return new TreeMap<Range,List<KeyExtent>>();
+    });
+
+    Collections.sort(ranges);
+    ranges.forEach(r -> {
+      // Find an existing range in the extentMap that overlaps the scan range
+      boolean foundRange = false;
+      Range rangeBefore = extentMap.floorKey(r);
+      if (rangeBefore != null) {
+        Map<Range,List<KeyExtent>> tm = extentMap.tailMap(rangeBefore);
+        for (Entry<Range,List<KeyExtent>> e : tm.entrySet()) {
+          if (e.getKey().overlaps(r)) {
+            // Ignore extents that don't contain the scan range
+            List<KeyExtent> extents = new ArrayList<>();
+            if (r.getStartKey() == null) {
+              extents.addAll(e.getValue());
+            } else {
+              e.getValue().forEach(extent -> {
+                Text row = r.isStartKeyInclusive() ? r.getStartKey().getRow()
+                    : r.getStartKey().followingKey(PartialKey.ROW).getRow();
+                if (extent.contains(row)) {
+                  extents.add(extent);
+                }
+              });
+            }
+            results.put(r, extents);
+            foundRange = true;
+            break;
+          }
+        }
+      }
+      if (!foundRange) {
+        List<KeyExtent> extents = loadExtentsForRange(tid, r);
+        if (extents != null) {
+          extentMap.put(r, extents);
+          results.put(r, extents);
+        }
+      }
+    });
+    // remove any overlapping from the cache
+    removeOverlapping(extentMap);
+    LOG.trace("Returning extents: {}", extentMap);
+    return results;
+  }
+
+  private void removeOverlapping(TreeMap<Range,List<KeyExtent>> map) {
+
+    Range idx = map.firstKey();
+    Set<Range> removals = new HashSet<>();
+    for (Range range : map.keySet()) {

Review Comment:
   Seems like this is an O(N) operation that will occur on each cache lookup.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTableExtentCache.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
+
+public class OfflineTableExtentCache {
+
+  private static final Logger LOG = LoggerFactory.getLogger(OfflineTableExtentCache.class);
+
+  private final Cache<TableId,TreeMap<Range,List<KeyExtent>>> cache;

Review Comment:
   There could be concurrency issues if multiple threads are accessing the treemap in the cache.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTableExtentCache.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
+
+public class OfflineTableExtentCache {
+
+  private static final Logger LOG = LoggerFactory.getLogger(OfflineTableExtentCache.class);
+
+  private final Cache<TableId,TreeMap<Range,List<KeyExtent>>> cache;
+  private final ClientContext ctx;
+
+  OfflineTableExtentCache(ClientContext ctx) {
+    this.ctx = ctx;
+    cache = Caffeine.newBuilder().scheduler(Scheduler.systemScheduler())
+        .expireAfterAccess(Duration.ofMinutes(10)).build();
+  }
+
+  private List<KeyExtent> loadExtentsForRange(TableId tid, Range r) {
+    LOG.trace("Loading extents for tid: {} and range: {}", tid, r);
+    TabletsMetadata tm =
+        ctx.getAmple().readTablets().forTable(tid)
+            .overlapping(r.getStartKey() == null ? null : r.getStartKey().getRow(),
+                r.isStartKeyInclusive(), r.getEndKey() == null ? null : r.getEndKey().getRow())
+            .build();
+    List<KeyExtent> result = new ArrayList<>();
+    for (TabletMetadata t : tm) {
+      LOG.trace("Evaluating: {}", t.getExtent());
+      if (t.getExtent().endRow() == null || r.getEndKey() == null || t.getExtent().endRow() != null
+          && r.getEndKey().compareTo(new Key(t.getExtent().endRow()), PartialKey.ROW) >= 0) {
+        LOG.trace("Adding: {}", t.getExtent());
+        result.add(t.getExtent());
+      }
+    }
+    LOG.trace("Loaded extents: {}", result);
+    return result;
+  }
+
+  public Map<Range,List<KeyExtent>> lookup(TableId tid, List<Range> ranges) {
+    LOG.trace("Performing lookup for table: {}, range: {}", tid, ranges);
+    Map<Range,List<KeyExtent>> results = new TreeMap<>();
+    TreeMap<Range,List<KeyExtent>> extentMap = cache.get(tid, (t) -> {
+      return new TreeMap<Range,List<KeyExtent>>();
+    });
+
+    Collections.sort(ranges);
+    ranges.forEach(r -> {
+      // Find an existing range in the extentMap that overlaps the scan range
+      boolean foundRange = false;
+      Range rangeBefore = extentMap.floorKey(r);
+      if (rangeBefore != null) {
+        Map<Range,List<KeyExtent>> tm = extentMap.tailMap(rangeBefore);
+        for (Entry<Range,List<KeyExtent>> e : tm.entrySet()) {
+          if (e.getKey().overlaps(r)) {
+            // Ignore extents that don't contain the scan range
+            List<KeyExtent> extents = new ArrayList<>();
+            if (r.getStartKey() == null) {
+              extents.addAll(e.getValue());
+            } else {
+              e.getValue().forEach(extent -> {
+                Text row = r.isStartKeyInclusive() ? r.getStartKey().getRow()
+                    : r.getStartKey().followingKey(PartialKey.ROW).getRow();
+                if (extent.contains(row)) {

Review Comment:
   Will this select extents that fall completely within the query range?  Like a query range that is [b,r] and and extent that is (j,m]?  The extent (j,m] will not contain the start row b.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTableExtentCache.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
+
+public class OfflineTableExtentCache {
+
+  private static final Logger LOG = LoggerFactory.getLogger(OfflineTableExtentCache.class);
+
+  private final Cache<TableId,TreeMap<Range,List<KeyExtent>>> cache;
+  private final ClientContext ctx;
+
+  OfflineTableExtentCache(ClientContext ctx) {
+    this.ctx = ctx;
+    cache = Caffeine.newBuilder().scheduler(Scheduler.systemScheduler())
+        .expireAfterAccess(Duration.ofMinutes(10)).build();
+  }
+
+  private List<KeyExtent> loadExtentsForRange(TableId tid, Range r) {
+    LOG.trace("Loading extents for tid: {} and range: {}", tid, r);
+    TabletsMetadata tm =
+        ctx.getAmple().readTablets().forTable(tid)
+            .overlapping(r.getStartKey() == null ? null : r.getStartKey().getRow(),
+                r.isStartKeyInclusive(), r.getEndKey() == null ? null : r.getEndKey().getRow())
+            .build();
+    List<KeyExtent> result = new ArrayList<>();
+    for (TabletMetadata t : tm) {
+      LOG.trace("Evaluating: {}", t.getExtent());
+      if (t.getExtent().endRow() == null || r.getEndKey() == null || t.getExtent().endRow() != null
+          && r.getEndKey().compareTo(new Key(t.getExtent().endRow()), PartialKey.ROW) >= 0) {
+        LOG.trace("Adding: {}", t.getExtent());
+        result.add(t.getExtent());
+      }
+    }
+    LOG.trace("Loaded extents: {}", result);
+    return result;
+  }
+
+  public Map<Range,List<KeyExtent>> lookup(TableId tid, List<Range> ranges) {
+    LOG.trace("Performing lookup for table: {}, range: {}", tid, ranges);
+    Map<Range,List<KeyExtent>> results = new TreeMap<>();
+    TreeMap<Range,List<KeyExtent>> extentMap = cache.get(tid, (t) -> {
+      return new TreeMap<Range,List<KeyExtent>>();
+    });
+
+    Collections.sort(ranges);
+    ranges.forEach(r -> {
+      // Find an existing range in the extentMap that overlaps the scan range
+      boolean foundRange = false;
+      Range rangeBefore = extentMap.floorKey(r);
+      if (rangeBefore != null) {
+        Map<Range,List<KeyExtent>> tm = extentMap.tailMap(rangeBefore);
+        for (Entry<Range,List<KeyExtent>> e : tm.entrySet()) {
+          if (e.getKey().overlaps(r)) {
+            // Ignore extents that don't contain the scan range
+            List<KeyExtent> extents = new ArrayList<>();
+            if (r.getStartKey() == null) {
+              extents.addAll(e.getValue());
+            } else {
+              e.getValue().forEach(extent -> {
+                Text row = r.isStartKeyInclusive() ? r.getStartKey().getRow()
+                    : r.getStartKey().followingKey(PartialKey.ROW).getRow();
+                if (extent.contains(row)) {
+                  extents.add(extent);
+                }
+              });
+            }
+            results.put(r, extents);
+            foundRange = true;
+            break;

Review Comment:
   Getting a bit lost w/ the curly braces, but it seems like this takes only the first range in the cache that overlaps the query range.  If so, what if there are other ranges in the cache that overlap the query range?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org