You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2023/11/30 21:13:56 UTC

(accumulo) branch 2.1 updated: fixes bug with bulk import RPC failure and retry (#4000)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 751a110efe fixes bug with bulk import RPC failure and retry (#4000)
751a110efe is described below

commit 751a110efe6ecf43c0aa6662160fa3766d881936
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Nov 30 16:13:51 2023 -0500

    fixes bug with bulk import RPC failure and retry (#4000)
    
    The bulk import code had a bug where if an exception was thrown in a
    certain place in the tablet code that it would mark the bulk import as
    complete.  After the exception, other code would retry and see the bulk
    import was complete even though it was not.  This commit changes the
    behavior to only note success when there is no exception. In addition
    test were added that recreated this bug.  These test set a constraint on
    the metadata table that cause bulk import writes to fail.
---
 .../org/apache/accumulo/tserver/tablet/Tablet.java |  10 +-
 .../apache/accumulo/test/functional/BulkNewIT.java | 147 +++++++++++++++++++++
 .../apache/accumulo/test/functional/BulkOldIT.java |  51 +++++++
 3 files changed, 203 insertions(+), 5 deletions(-)

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 175f3a1f27..3f7eebe95f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1790,6 +1790,11 @@ public class Tablet extends TabletBase {
       var storedTabletFile = getDatafileManager().importMapFiles(tid, entries, setTime);
       lastMapFileImportTime = System.currentTimeMillis();
 
+      synchronized (this) {
+        // only mark the bulk import a success if no exception was thrown
+        bulkImported.computeIfAbsent(tid, k -> new ArrayList<>()).addAll(fileMap.keySet());
+      }
+
       if (isSplitPossible()) {
         getTabletServer().executeSplit(this);
       } else {
@@ -1804,11 +1809,6 @@ public class Tablet extends TabletBase {
               "Likely bug in code, always expect to remove something.  Please open an Accumulo issue.");
         }
 
-        try {
-          bulkImported.computeIfAbsent(tid, k -> new ArrayList<>()).addAll(fileMap.keySet());
-        } catch (Exception ex) {
-          log.info(ex.toString(), ex);
-        }
         tabletServer.removeBulkImportState(files);
       }
     }
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
index afa0afc677..a8aebd2e25 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
@@ -18,10 +18,13 @@
  */
 package org.apache.accumulo.test.functional;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
 import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -37,18 +40,23 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.admin.TimeType;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -56,20 +64,30 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.LoadPlan;
 import org.apache.accumulo.core.data.LoadPlan.RangeType;
+import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.constraints.Constraint;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVWriter;
 import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory;
 import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
 import org.apache.accumulo.minicluster.MemoryUnit;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.constraints.MetadataConstraints;
+import org.apache.accumulo.server.constraints.SystemEnvironment;
+import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -77,6 +95,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -506,6 +525,43 @@ public class BulkNewIT extends SharedMiniClusterBase {
     }
   }
 
+  @Test
+  public void testExceptionInMetadataUpdate() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      // after setting this up, bulk imports should never succeed on a tablet server
+      setupBulkConstraint(getPrincipal(), c);
+
+      String dir = getDir("/testExceptionInMetadataUpdate-");
+
+      String h1 = writeData(dir + "/f1.", aconf, 0, 333);
+
+      var executor = Executors.newSingleThreadExecutor();
+      // With the constraint configured that makes tservers throw an exception on bulk import, the
+      // bulk import should never succeed. So run the bulk import in another thread.
+      var future = executor.submit(() -> {
+        c.tableOperations().importDirectory(dir).to(tableName).load();
+        return null;
+      });
+
+      Thread.sleep(10000);
+
+      // the bulk import should not be done
+      assertFalse(future.isDone());
+
+      // remove the constraint which should allow the bulk import running in the background thread
+      // to complete
+      removeBulkConstraint(getPrincipal(), c);
+
+      // wait for the future to complete and ensure it had no exceptions
+      future.get();
+
+      // verifty the data was bulk imported
+      verifyData(c, tableName, 0, 333, false);
+      verifyMetadata(c, tableName, Map.of("null", Set.of(h1)));
+    }
+  }
+
   private void addSplits(AccumuloClient client, String tableName, String splitString)
       throws Exception {
     SortedSet<Text> splits = new TreeSet<>();
@@ -605,4 +661,95 @@ public class BulkNewIT extends SharedMiniClusterBase {
 
     return hash(filename);
   }
+
+  /**
+   * This constraint is used to simulate an error in the metadata write for a bulk import.
+   */
+  public static class NoBulkConstratint implements Constraint {
+
+    public static final String CANARY_VALUE = "a!p@a#c$h%e^&*()";
+    public static final short CANARY_CODE = 31234;
+
+    @Override
+    public String getViolationDescription(short violationCode) {
+      if (violationCode == 1) {
+        return "Bulk import files are not allowed in this test";
+      } else if (violationCode == CANARY_CODE) {
+        return "Check used to see if constraint is active";
+      }
+
+      return null;
+    }
+
+    @Override
+    public List<Short> check(Environment env, Mutation mutation) {
+      for (var colUpdate : mutation.getUpdates()) {
+        var fam = new Text(colUpdate.getColumnFamily());
+        if (fam.equals(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME)) {
+          var stf = new StoredTabletFile(new String(colUpdate.getColumnQualifier(), UTF_8));
+          if (stf.getFileName().startsWith("I")) {
+            return List.of((short) 1);
+          }
+        }
+
+        if (new String(colUpdate.getValue(), UTF_8).equals(CANARY_VALUE)) {
+          return List.of(CANARY_CODE);
+        }
+
+      }
+
+      return null;
+    }
+  }
+
+  static void setupBulkConstraint(String principal, AccumuloClient c)
+      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    // add a constraint to the metadata table that disallows bulk import files to be added
+    c.securityOperations().grantTablePermission(principal, MetadataTable.NAME,
+        TablePermission.WRITE);
+    c.securityOperations().grantTablePermission(principal, MetadataTable.NAME,
+        TablePermission.ALTER_TABLE);
+
+    c.tableOperations().addConstraint(MetadataTable.NAME, NoBulkConstratint.class.getName());
+
+    var metaConstraints = new MetadataConstraints();
+    SystemEnvironment env = EasyMock.createMock(SystemEnvironment.class);
+    ServerContext context = EasyMock.createMock(ServerContext.class);
+    EasyMock.expect(env.getServerContext()).andReturn(context);
+    EasyMock.replay(env);
+
+    // wait for the constraint to be active on the metadata table
+    Wait.waitFor(() -> {
+      try (var bw = c.createBatchWriter(MetadataTable.NAME)) {
+        Mutation m = new Mutation("~garbage");
+        m.put("", "", NoBulkConstratint.CANARY_VALUE);
+        // This test assume the metadata constraint check will not flag this mutation, the following
+        // validates this assumption.
+        assertNull(metaConstraints.check(env, m));
+        bw.addMutation(m);
+        return false;
+      } catch (MutationsRejectedException e) {
+        return e.getConstraintViolationSummaries().stream()
+            .anyMatch(cvs -> cvs.violationCode == NoBulkConstratint.CANARY_CODE);
+      }
+    });
+
+    // delete the junk added to the metadata table
+    try (var bw = c.createBatchWriter(MetadataTable.NAME)) {
+      Mutation m = new Mutation("~garbage");
+      m.putDelete("", "");
+      bw.addMutation(m);
+    }
+  }
+
+  static void removeBulkConstraint(String principal, AccumuloClient c)
+      throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
+    int constraintNum = c.tableOperations().listConstraints(MetadataTable.NAME)
+        .get(NoBulkConstratint.class.getName());
+    c.tableOperations().removeConstraint(MetadataTable.NAME, constraintNum);
+    c.securityOperations().revokeTablePermission(principal, MetadataTable.NAME,
+        TablePermission.WRITE);
+    c.securityOperations().revokeTablePermission(principal, MetadataTable.NAME,
+        TablePermission.ALTER_TABLE);
+  }
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkOldIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkOldIT.java
index 093277a5dd..d8df93be07 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkOldIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkOldIT.java
@@ -18,6 +18,8 @@
  */
 package org.apache.accumulo.test.functional;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Iterator;
@@ -30,6 +32,7 @@ import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
@@ -60,6 +63,9 @@ public class BulkOldIT extends AccumuloClusterHarness {
   @Override
   public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf) {
     cfg.setMemory(ServerType.TABLET_SERVER, 512, MemoryUnit.MEGABYTE);
+    // lowering this because the test testExceptionInMetadataUpdate() will cause retries and the
+    // default takes forever
+    cfg.setProperty(Property.TSERV_BULK_RETRY, "2");
   }
 
   // suppress importDirectory deprecated since this is the only test for legacy technique
@@ -107,6 +113,51 @@ public class BulkOldIT extends AccumuloClusterHarness {
 
   }
 
+  // test case where the metadata data update throws an exception
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testExceptionInMetadataUpdate() throws Exception {
+    try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
+
+      // after setting this up, bulk imports should fail
+      BulkNewIT.setupBulkConstraint(getAdminPrincipal(), c);
+
+      String tableName = getUniqueNames(1)[0];
+
+      c.tableOperations().create(tableName);
+      Configuration conf = new Configuration();
+      AccumuloConfiguration aconf = getCluster().getServerContext().getConfiguration();
+      FileSystem fs = getCluster().getFileSystem();
+      String rootPath = cluster.getTemporaryPath().toString();
+
+      String dir = rootPath + "/bulk_test_diff_files_89723987592_" + getUniqueNames(1)[0];
+
+      fs.delete(new Path(dir), true);
+
+      writeData(conf, aconf, fs, dir, "f1", 0, 333);
+
+      String failDir = dir + "_failures";
+      Path failPath = new Path(failDir);
+      fs.delete(failPath, true);
+      fs.mkdirs(failPath);
+      fs.deleteOnExit(failPath);
+
+      // this should fail and it should copy the file to the fail dir
+      c.tableOperations().importDirectory(tableName, dir, failDir, false);
+
+      if (fs.listStatus(failPath).length < 1) {
+        throw new Exception("Expected files in failure directory");
+      }
+
+      try (var scanner = c.createScanner(tableName)) {
+        // verify the table is empty
+        assertEquals(0, scanner.stream().count());
+      }
+
+      BulkNewIT.removeBulkConstraint(getAdminPrincipal(), c);
+    }
+  }
+
   private void writeData(Configuration conf, AccumuloConfiguration aconf, FileSystem fs, String dir,
       String file, int start, int end) throws IOException, Exception {
     FileSKVWriter writer1 = FileOperations.getInstance().newWriterBuilder()