You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2019/10/10 18:55:19 UTC

[accumulo] branch 2.0 updated: Retry new Bulk import on merge. Fixes #471 (#1367)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new aabe05c  Retry new Bulk import on merge. Fixes #471 (#1367)
aabe05c is described below

commit aabe05c19ec39b30908a9093a1f5090c01cf3fc1
Author: Mike Miller <mm...@apache.org>
AuthorDate: Thu Oct 10 14:55:14 2019 -0400

    Retry new Bulk import on merge. Fixes #471 (#1367)
    
    * Created AccumuloBulkMergeException for handling merges with Retry maxWait up to 2 Minutes
---
 .../clientImpl/AccumuloBulkMergeException.java     | 32 +++++++++++
 .../core/clientImpl/TableOperationsImpl.java       |  2 +
 .../accumulo/core/clientImpl/bulk/BulkImport.java  | 64 ++++++++++++++++++----
 .../thrift/TableOperationExceptionType.java        |  5 +-
 core/src/main/thrift/client.thrift                 |  1 +
 .../master/tableOps/bulkVer2/PrepBulkImport.java   |  7 +--
 .../tableOps/bulkVer2/PrepBulkImportTest.java      |  6 +-
 7 files changed, 97 insertions(+), 20 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloBulkMergeException.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloBulkMergeException.java
new file mode 100644
index 0000000..2a7527e
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloBulkMergeException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import org.apache.accumulo.core.client.AccumuloException;
+
+/**
+ * Internal class indicating a concurrent merge occurred during the new bulk import.
+ */
+public class AccumuloBulkMergeException extends AccumuloException {
+
+  private static final String MSG = "Concurrent merge happened";
+
+  public AccumuloBulkMergeException(final Throwable cause) {
+    super(MSG, cause);
+  }
+
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index fbd5f74..762b85c 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@ -382,6 +382,8 @@ public class TableOperationsImpl extends TableOperationsHelper {
         case OFFLINE:
           throw new TableOfflineException(
               Tables.getTableOfflineMsg(context, Tables.getTableId(context, tableOrNamespaceName)));
+        case BULK_CONCURRENT_MERGE:
+          throw new AccumuloBulkMergeException(e);
         default:
           throw new AccumuloException(e.description, e);
       }
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
index 48a7f93..52860b7 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
@@ -17,6 +17,8 @@
 package org.apache.accumulo.core.clientImpl.bulk;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.stream.Collectors.groupingBy;
 
 import java.io.FileNotFoundException;
@@ -41,6 +43,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import org.apache.accumulo.core.Constants;
@@ -49,6 +52,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationArguments;
 import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions;
+import org.apache.accumulo.core.clientImpl.AccumuloBulkMergeException;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
 import org.apache.accumulo.core.clientImpl.Tables;
@@ -72,6 +76,7 @@ import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.spi.crypto.CryptoService;
 import org.apache.accumulo.core.volume.VolumeConfiguration;
+import org.apache.accumulo.fate.util.Retry;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -125,21 +130,56 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
     Path srcPath = checkPath(fs, dir);
 
     SortedMap<KeyExtent,Bulk.Files> mappings;
-    if (plan == null) {
-      mappings = computeMappingFromFiles(fs, tableId, srcPath);
-    } else {
-      mappings = computeMappingFromPlan(fs, tableId, srcPath);
-    }
+    TableOperationsImpl tableOps = new TableOperationsImpl(context);
+    Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
+        .incrementBy(100, MILLISECONDS).maxWait(2, MINUTES).backOffFactor(1.5)
+        .logInterval(3, TimeUnit.MINUTES).createRetry();
+
+    // retry if a merge occurs
+    boolean shouldRetry = true;
+    while (shouldRetry) {
+      if (plan == null) {
+        mappings = computeMappingFromFiles(fs, tableId, srcPath);
+      } else {
+        mappings = computeMappingFromPlan(fs, tableId, srcPath);
+      }
 
-    if (mappings.isEmpty())
-      throw new IllegalArgumentException("Attempted to import zero files from " + srcPath);
+      if (mappings.isEmpty())
+        throw new IllegalArgumentException("Attempted to import zero files from " + srcPath);
 
-    BulkSerialize.writeLoadMapping(mappings, srcPath.toString(), fs::create);
+      BulkSerialize.writeLoadMapping(mappings, srcPath.toString(), fs::create);
+
+      List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.canonical().getBytes(UTF_8)),
+          ByteBuffer.wrap(srcPath.toString().getBytes(UTF_8)),
+          ByteBuffer.wrap((setTime + "").getBytes(UTF_8)));
+      try {
+        tableOps.doBulkFateOperation(args, tableName);
+        shouldRetry = false;
+      } catch (AccumuloBulkMergeException ae) {
+        if (plan != null) {
+          checkPlanForSplits(ae);
+        }
+        try {
+          retry.waitForNextAttempt();
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+        log.info(ae.getMessage() + ". Retrying bulk import to " + tableName);
+      }
+    }
+  }
 
-    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.canonical().getBytes(UTF_8)),
-        ByteBuffer.wrap(srcPath.toString().getBytes(UTF_8)),
-        ByteBuffer.wrap((setTime + "").getBytes(UTF_8)));
-    new TableOperationsImpl(context).doBulkFateOperation(args, tableName);
+  /**
+   * Check if splits were specified in plan when a concurrent merge occurred. If so, throw error
+   * back to user since retrying won't help. If not, then retry.
+   */
+  private void checkPlanForSplits(AccumuloBulkMergeException abme) throws AccumuloException {
+    for (Destination des : plan.getDestinations()) {
+      if (des.getRangeType().equals(RangeType.TABLE)) {
+        throw new AccumuloException("The splits provided in Load Plan do not exist in " + tableName,
+            abme);
+      }
+    }
   }
 
   /**
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/thrift/TableOperationExceptionType.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/thrift/TableOperationExceptionType.java
index 7764e3a..bd57e1c 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/thrift/TableOperationExceptionType.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/thrift/TableOperationExceptionType.java
@@ -34,7 +34,8 @@ public enum TableOperationExceptionType implements org.apache.thrift.TEnum {
   NAMESPACE_EXISTS(7),
   NAMESPACE_NOTFOUND(8),
   INVALID_NAME(9),
-  BULK_BAD_LOAD_MAPPING(10);
+  BULK_BAD_LOAD_MAPPING(10),
+  BULK_CONCURRENT_MERGE(11);
 
   private final int value;
 
@@ -78,6 +79,8 @@ public enum TableOperationExceptionType implements org.apache.thrift.TEnum {
         return INVALID_NAME;
       case 10:
         return BULK_BAD_LOAD_MAPPING;
+      case 11:
+        return BULK_CONCURRENT_MERGE;
       default:
         return null;
     }
diff --git a/core/src/main/thrift/client.thrift b/core/src/main/thrift/client.thrift
index 91aec2e..85c93b2 100644
--- a/core/src/main/thrift/client.thrift
+++ b/core/src/main/thrift/client.thrift
@@ -52,6 +52,7 @@ enum TableOperationExceptionType {
   NAMESPACE_NOTFOUND
   INVALID_NAME
   BULK_BAD_LOAD_MAPPING
+  BULK_CONCURRENT_MERGE
 }
 
 enum ConfigurationType {
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
index b215774..2199e99 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
@@ -146,12 +146,9 @@ public class PrepBulkImport extends MasterRepo {
     }
 
     if (currRange != null || lmi.hasNext()) {
-      // a merge happened between the time the mapping was generated and the table lock was
-      // acquired
+      // merge happened after the mapping was generated and before the table lock was acquired
       throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT,
-          TableOperationExceptionType.OTHER, "Concurrent merge happened"); // TODO need to handle
-                                                                           // this on the client
-                                                                           // side
+          TableOperationExceptionType.BULK_CONCURRENT_MERGE, "Concurrent merge happened");
     }
   }
 
diff --git a/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java b/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java
index 0b6f8e7..80d1615 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -103,7 +104,8 @@ public class PrepBulkImportTest {
         .collect(Collectors.joining(","));
   }
 
-  public void runExceptionTest(List<KeyExtent> loadRanges, List<KeyExtent> tabletRanges) {
+  public void runExceptionTest(List<KeyExtent> loadRanges, List<KeyExtent> tabletRanges)
+      throws AccumuloException {
     try {
       runTest(loadRanges, tabletRanges);
       fail("expected " + toRangeStrings(loadRanges) + " to fail against "
@@ -144,7 +146,7 @@ public class PrepBulkImportTest {
   }
 
   @Test
-  public void testException() {
+  public void testException() throws Exception {
     for (List<KeyExtent> loadRanges : powerSet(nke(null, "b"), nke("b", "m"), nke("m", "r"),
         nke("r", "v"), nke("v", null))) {