You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2024/02/08 23:43:24 UTC

(accumulo) branch elasticity updated: Fixes GC WAL bug (#4196)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 8b8a49f573 Fixes GC WAL bug (#4196)
8b8a49f573 is described below

commit 8b8a49f5730e2960593f6650216a1419e8157c51
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Feb 8 18:43:18 2024 -0500

    Fixes GC WAL bug (#4196)
    
    This fixes a GC bug that only exists in the elastictiy branch. When
    garbage collecting write ahead logs the metadata table should be
    inspected to look for tablets with WALs and tablets assigned to dead
    tablet servers.  The GC code was only looking for tablets with WALs.
    This commits makes it also look for tablets assigned to dead
    tservers.
    
    This bug was noticed because CleanWalIT was sometimes failing.
    Sometimes the GC was collecting a WAL after the metadata table recovered
    but before user tablets could be assigned the WAL.  This caused the user
    tablets to silently loose data.  The test caught this data loss.
    
    In the course of fixing this bug a change was made to how RowFilter was
    used and a pre-existing bug with RowFilter was found and fixed.  This
    bug fix will be back ported separately.
---
 .../accumulo/core/iterators/user/GcWalsFilter.java | 101 +++++++++++++++++++++
 .../core/iterators/user/HasWalsFilter.java         |  45 ---------
 .../core/iterators/user/TabletMetadataFilter.java  |   5 +
 .../core/metadata/schema/TabletsMetadata.java      |   2 +
 .../accumulo/gc/GarbageCollectWriteAheadLogs.java  |  14 +--
 .../gc/GarbageCollectWriteAheadLogsTest.java       |   9 +-
 .../test/functional/AmpleConditionalWriterIT.java  |  44 ++++++++-
 7 files changed, 161 insertions(+), 59 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/GcWalsFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/GcWalsFilter.java
new file mode 100644
index 0000000000..a30302ce59
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/GcWalsFilter.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.iterators.user;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.TabletState;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+/**
+ * A filter used by the Accumulo GC to find tablets that either have walogs or are assigned to a
+ * dead tablet server.
+ */
+
+// ELASTICITY_TODO Move TabletMetadataFilter and its subclasses out of public API. It use internal
+// types that are not user facing.
+public class GcWalsFilter extends TabletMetadataFilter {
+
+  private Map<String,String> options = null;
+
+  private Predicate<TabletMetadata> filter = null;
+
+  private static final String LIVE_TSERVER_OPT = "liveTservers";
+
+  public GcWalsFilter() {}
+
+  public GcWalsFilter(Set<TServerInstance> liveTservers) {
+    String lts = liveTservers.stream().map(TServerInstance::toString).peek(tsi -> {
+      if (tsi.contains(",")) {
+        throw new IllegalArgumentException(tsi);
+      }
+    }).collect(Collectors.joining(","));
+    this.options = Map.of(LIVE_TSERVER_OPT, lts);
+  }
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
+      IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+    var encodedLiveTservers = options.get(LIVE_TSERVER_OPT);
+    Set<TServerInstance> liveTservers;
+    if (encodedLiveTservers.isBlank()) {
+      liveTservers = Set.of();
+    } else {
+      liveTservers = Arrays.stream(options.get(LIVE_TSERVER_OPT).split(","))
+          .map(TServerInstance::new).collect(Collectors.toUnmodifiableSet());
+    }
+
+    filter = tm -> !tm.getLogs().isEmpty()
+        || TabletState.compute(tm, liveTservers) == TabletState.ASSIGNED_TO_DEAD_SERVER;
+  }
+
+  private static final Set<ColumnType> COLUMNS =
+      Sets.immutableEnumSet(ColumnType.LOGS, ColumnType.LOCATION, ColumnType.SUSPEND);
+
+  @Override
+  public Set<ColumnType> getColumns() {
+    return COLUMNS;
+  }
+
+  @Override
+  protected Predicate<TabletMetadata> acceptTablet() {
+    return filter;
+  }
+
+  @Override
+  public Map<String,String> getServerSideOptions() {
+    Preconditions.checkState(options != null);
+    return options;
+  }
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/HasWalsFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/HasWalsFilter.java
deleted file mode 100644
index 0563a7c3dc..0000000000
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/HasWalsFilter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.iterators.user;
-
-import java.util.Set;
-import java.util.function.Predicate;
-
-import org.apache.accumulo.core.metadata.schema.TabletMetadata;
-
-import com.google.common.collect.Sets;
-
-public class HasWalsFilter extends TabletMetadataFilter {
-
-  private static final Set<TabletMetadata.ColumnType> COLUMNS =
-      Sets.immutableEnumSet(TabletMetadata.ColumnType.LOGS);
-
-  private final static Predicate<TabletMetadata> HAS_WALS =
-      tabletMetadata -> !tabletMetadata.getLogs().isEmpty();
-
-  @Override
-  public Set<TabletMetadata.ColumnType> getColumns() {
-    return COLUMNS;
-  }
-
-  @Override
-  protected Predicate<TabletMetadata> acceptTablet() {
-    return HAS_WALS;
-  }
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/iterators/user/TabletMetadataFilter.java b/core/src/main/java/org/apache/accumulo/core/iterators/user/TabletMetadataFilter.java
index 13163aebea..2381b7e475 100644
--- a/core/src/main/java/org/apache/accumulo/core/iterators/user/TabletMetadataFilter.java
+++ b/core/src/main/java/org/apache/accumulo/core/iterators/user/TabletMetadataFilter.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.core.iterators.user;
 
 import java.util.EnumSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.function.Predicate;
 
@@ -40,4 +41,8 @@ public abstract class TabletMetadataFilter extends RowFilter {
   public abstract Set<TabletMetadata.ColumnType> getColumns();
 
   protected abstract Predicate<TabletMetadata> acceptTablet();
+
+  public Map<String,String> getServerSideOptions() {
+    return Map.of();
+  }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
index 481661450d..c47e9973dd 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java
@@ -189,6 +189,7 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable
 
             for (TabletMetadataFilter tmf : tabletMetadataFilters) {
               IteratorSetting iterSetting = new IteratorSetting(iteratorPriority, tmf.getClass());
+              iterSetting.addOptions(tmf.getServerSideOptions());
               scanner.addScanIterator(iterSetting);
               iteratorPriority++;
             }
@@ -270,6 +271,7 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable
           for (TabletMetadataFilter tmf : tabletMetadataFilters) {
             iteratorPriority++;
             IteratorSetting iterSetting = new IteratorSetting(iteratorPriority, tmf.getClass());
+            iterSetting.addOptions(tmf.getServerSideOptions());
             scanner.addScanIterator(iterSetting);
           }
         }
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 161e6c5754..d5fb04606a 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -39,7 +39,7 @@ import java.util.stream.Stream;
 
 import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.accumulo.core.gc.thrift.GcCycleStats;
-import org.apache.accumulo.core.iterators.user.HasWalsFilter;
+import org.apache.accumulo.core.iterators.user.GcWalsFilter;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.TabletState;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
@@ -96,13 +96,15 @@ public class GarbageCollectWriteAheadLogs {
   }
 
   @VisibleForTesting
-  Stream<TabletMetadata> createStore() {
+  Stream<TabletMetadata> createStore(Set<TServerInstance> liveTServers) {
+    GcWalsFilter walsFilter = new GcWalsFilter(liveTServers);
+
     TabletsMetadata root = context.getAmple().readTablets().forLevel(DataLevel.ROOT)
-        .filter(new HasWalsFilter()).fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build();
+        .filter(walsFilter).fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build();
     TabletsMetadata metadata = context.getAmple().readTablets().forLevel(DataLevel.METADATA)
-        .filter(new HasWalsFilter()).fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build();
+        .filter(walsFilter).fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build();
     TabletsMetadata user = context.getAmple().readTablets().forLevel(DataLevel.USER)
-        .filter(new HasWalsFilter()).fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build();
+        .filter(walsFilter).fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).build();
     return Streams.concat(root.stream(), metadata.stream(), user.stream()).onClose(() -> {
       root.close();
       metadata.close();
@@ -281,7 +283,7 @@ public class GarbageCollectWriteAheadLogs {
     }
 
     // remove any entries if there's a log reference (recovery hasn't finished)
-    try (Stream<TabletMetadata> store = createStore()) {
+    try (Stream<TabletMetadata> store = createStore(liveServers)) {
       store.forEach(tabletMetadata -> {
         // Tablet is still assigned to a dead server. Manager has moved markers and reassigned it
         // Easiest to just ignore all the WALs for the dead server.
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index abfd09430d..7f4ec5173c 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Stream;
 
@@ -111,7 +112,7 @@ public class GarbageCollectWriteAheadLogsTest {
       }
 
       @Override
-      Stream<TabletMetadata> createStore() {
+      Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) {
         return tabletOnServer1List;
       }
     };
@@ -150,7 +151,7 @@ public class GarbageCollectWriteAheadLogsTest {
       }
 
       @Override
-      Stream<TabletMetadata> createStore() {
+      Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) {
         return tabletOnServer1List;
       }
     };
@@ -194,7 +195,7 @@ public class GarbageCollectWriteAheadLogsTest {
       }
 
       @Override
-      Stream<TabletMetadata> createStore() {
+      Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) {
         return tabletOnServer1List;
       }
     };
@@ -232,7 +233,7 @@ public class GarbageCollectWriteAheadLogsTest {
       }
 
       @Override
-      Stream<TabletMetadata> createStore() {
+      Stream<TabletMetadata> createStore(Set<TServerInstance> liveTservers) {
         return tabletOnServer2List;
       }
     };
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
index 4ece846fd5..df5841228d 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java
@@ -67,8 +67,8 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.FateTxId;
+import org.apache.accumulo.core.iterators.user.GcWalsFilter;
 import org.apache.accumulo.core.iterators.user.HasCurrentFilter;
-import org.apache.accumulo.core.iterators.user.HasWalsFilter;
 import org.apache.accumulo.core.iterators.user.TabletMetadataFilter;
 import org.apache.accumulo.core.metadata.AccumuloTable;
 import org.apache.accumulo.core.metadata.RootTable;
@@ -939,7 +939,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
         assertEquals(Status.ACCEPTED, results.get(ke).getStatus());
       }
       // check that applying a combination of filters returns only tablets that meet the criteria
-      testFilterApplied(context, Set.of(new TestTabletMetadataFilter(), new HasWalsFilter()),
+      testFilterApplied(context, Set.of(new TestTabletMetadataFilter(), new GcWalsFilter(Set.of())),
           tabletsWithWalCompactFlush, "Combination of filters did not return the expected tablets");
 
       TServerInstance serverInstance = new TServerInstance(server, 1L);
@@ -961,7 +961,8 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
       Set<KeyExtent> expected = Sets.intersection(tabletsWithWalCompactFlush, tabletsWithLocation);
       assertFalse(expected.isEmpty());
       testFilterApplied(context,
-          Set.of(new HasCurrentFilter(), new HasWalsFilter(), new TestTabletMetadataFilter()),
+          Set.of(new HasCurrentFilter(), new GcWalsFilter(Set.of()),
+              new TestTabletMetadataFilter()),
           expected, "Combination of filters did not return the expected tablets");
     }
 
@@ -1015,7 +1016,7 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
     public void walFilter() {
       ServerContext context = cluster.getServerContext();
       ConditionalTabletsMutatorImpl ctmi = new ConditionalTabletsMutatorImpl(context);
-      Set<TabletMetadataFilter> filter = Set.of(new HasWalsFilter());
+      Set<TabletMetadataFilter> filter = Set.of(new GcWalsFilter(Set.of()));
 
       // make sure we read all tablets on table initially with no filters
       testFilterApplied(context, Set.of(), Set.of(e1, e2, e3, e4),
@@ -1052,6 +1053,41 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness {
 
       // test that now only the tablet with a wal is returned when using filter()
       testFilterApplied(context, filter, Set.of(e2), "Only tablets with wals should be returned");
+
+      var ts1 = new TServerInstance("localhost:9997", 5000L);
+      var ts2 = new TServerInstance("localhost:9997", 6000L);
+
+      ctmi = new ConditionalTabletsMutatorImpl(context);
+      ctmi.mutateTablet(e1).requireAbsentOperation().requireAbsentLocation()
+          .putLocation(Location.future(ts1)).submit(tabletMetadata -> false);
+      ctmi.mutateTablet(e2).requireAbsentOperation().requireAbsentLocation()
+          .putLocation(Location.current(ts1)).submit(tabletMetadata -> false);
+      ctmi.mutateTablet(e3).requireAbsentOperation().requireAbsentLocation()
+          .putLocation(Location.future(ts2)).submit(tabletMetadata -> false);
+      ctmi.mutateTablet(e4).requireAbsentOperation().requireAbsentLocation()
+          .putLocation(Location.current(ts2)).submit(tabletMetadata -> false);
+      results = ctmi.process();
+      assertEquals(Status.ACCEPTED, results.get(e1).getStatus());
+      assertEquals(Status.ACCEPTED, results.get(e2).getStatus());
+      assertEquals(Status.ACCEPTED, results.get(e3).getStatus());
+      assertEquals(Status.ACCEPTED, results.get(e4).getStatus());
+
+      testFilterApplied(context, filter, Set.of(e1, e2, e3, e4),
+          "All tablets should appear to be assigned to dead tservers and be returned");
+
+      // add ts1 to live tservers set and make ts2 look like a dead tserver
+      filter = Set.of(new GcWalsFilter(Set.of(ts1)));
+      testFilterApplied(context, filter, Set.of(e2, e3, e4),
+          "Tablets assigned to ts2 or with a wal should be returned");
+
+      // add ts2 to live tservers set and make ts1 look like a dead tserver
+      filter = Set.of(new GcWalsFilter(Set.of(ts2)));
+      testFilterApplied(context, filter, Set.of(e1, e2),
+          "Tablets assigned to ts1 or with a wal should be returned");
+
+      // add ts1 and ts2 to live tserver set, so nothing should look dead
+      filter = Set.of(new GcWalsFilter(Set.of(ts1, ts2)));
+      testFilterApplied(context, filter, Set.of(e2), "Only tablets with a wal should be returned");
     }
 
     @Test