You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2019/10/10 18:55:19 UTC
[accumulo] branch 2.0 updated: Retry new Bulk import on merge.
Fixes #471 (#1367)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.0 by this push:
new aabe05c Retry new Bulk import on merge. Fixes #471 (#1367)
aabe05c is described below
commit aabe05c19ec39b30908a9093a1f5090c01cf3fc1
Author: Mike Miller <mm...@apache.org>
AuthorDate: Thu Oct 10 14:55:14 2019 -0400
Retry new Bulk import on merge. Fixes #471 (#1367)
* Created AccumuloBulkMergeException for handling merges with Retry maxWait up to 2 Minutes
---
.../clientImpl/AccumuloBulkMergeException.java | 32 +++++++++++
.../core/clientImpl/TableOperationsImpl.java | 2 +
.../accumulo/core/clientImpl/bulk/BulkImport.java | 64 ++++++++++++++++++----
.../thrift/TableOperationExceptionType.java | 5 +-
core/src/main/thrift/client.thrift | 1 +
.../master/tableOps/bulkVer2/PrepBulkImport.java | 7 +--
.../tableOps/bulkVer2/PrepBulkImportTest.java | 6 +-
7 files changed, 97 insertions(+), 20 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloBulkMergeException.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloBulkMergeException.java
new file mode 100644
index 0000000..2a7527e
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloBulkMergeException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import org.apache.accumulo.core.client.AccumuloException;
+
+/**
+ * Internal class indicating a concurrent merge occurred during the new bulk import.
+ */
+public class AccumuloBulkMergeException extends AccumuloException {
+
+ private static final String MSG = "Concurrent merge happened";
+
+ public AccumuloBulkMergeException(final Throwable cause) {
+ super(MSG, cause);
+ }
+
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index fbd5f74..762b85c 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@ -382,6 +382,8 @@ public class TableOperationsImpl extends TableOperationsHelper {
case OFFLINE:
throw new TableOfflineException(
Tables.getTableOfflineMsg(context, Tables.getTableId(context, tableOrNamespaceName)));
+ case BULK_CONCURRENT_MERGE:
+ throw new AccumuloBulkMergeException(e);
default:
throw new AccumuloException(e.description, e);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
index 48a7f93..52860b7 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java
@@ -17,6 +17,8 @@
package org.apache.accumulo.core.clientImpl.bulk;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.stream.Collectors.groupingBy;
import java.io.FileNotFoundException;
@@ -41,6 +43,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.accumulo.core.Constants;
@@ -49,6 +52,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationArguments;
import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions;
+import org.apache.accumulo.core.clientImpl.AccumuloBulkMergeException;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.TableOperationsImpl;
import org.apache.accumulo.core.clientImpl.Tables;
@@ -72,6 +76,7 @@ import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.volume.VolumeConfiguration;
+import org.apache.accumulo.fate.util.Retry;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -125,21 +130,56 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti
Path srcPath = checkPath(fs, dir);
SortedMap<KeyExtent,Bulk.Files> mappings;
- if (plan == null) {
- mappings = computeMappingFromFiles(fs, tableId, srcPath);
- } else {
- mappings = computeMappingFromPlan(fs, tableId, srcPath);
- }
+ TableOperationsImpl tableOps = new TableOperationsImpl(context);
+ Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS)
+ .incrementBy(100, MILLISECONDS).maxWait(2, MINUTES).backOffFactor(1.5)
+ .logInterval(3, TimeUnit.MINUTES).createRetry();
+
+ // retry if a merge occurs
+ boolean shouldRetry = true;
+ while (shouldRetry) {
+ if (plan == null) {
+ mappings = computeMappingFromFiles(fs, tableId, srcPath);
+ } else {
+ mappings = computeMappingFromPlan(fs, tableId, srcPath);
+ }
- if (mappings.isEmpty())
- throw new IllegalArgumentException("Attempted to import zero files from " + srcPath);
+ if (mappings.isEmpty())
+ throw new IllegalArgumentException("Attempted to import zero files from " + srcPath);
- BulkSerialize.writeLoadMapping(mappings, srcPath.toString(), fs::create);
+ BulkSerialize.writeLoadMapping(mappings, srcPath.toString(), fs::create);
+
+ List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.canonical().getBytes(UTF_8)),
+ ByteBuffer.wrap(srcPath.toString().getBytes(UTF_8)),
+ ByteBuffer.wrap((setTime + "").getBytes(UTF_8)));
+ try {
+ tableOps.doBulkFateOperation(args, tableName);
+ shouldRetry = false;
+ } catch (AccumuloBulkMergeException ae) {
+ if (plan != null) {
+ checkPlanForSplits(ae);
+ }
+ try {
+ retry.waitForNextAttempt();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ log.info(ae.getMessage() + ". Retrying bulk import to " + tableName);
+ }
+ }
+ }
- List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.canonical().getBytes(UTF_8)),
- ByteBuffer.wrap(srcPath.toString().getBytes(UTF_8)),
- ByteBuffer.wrap((setTime + "").getBytes(UTF_8)));
- new TableOperationsImpl(context).doBulkFateOperation(args, tableName);
+ /**
+ * Check if splits were specified in plan when a concurrent merge occurred. If so, throw error
+ * back to user since retrying won't help. If not, then retry.
+ */
+ private void checkPlanForSplits(AccumuloBulkMergeException abme) throws AccumuloException {
+ for (Destination des : plan.getDestinations()) {
+ if (des.getRangeType().equals(RangeType.TABLE)) {
+ throw new AccumuloException("The splits provided in Load Plan do not exist in " + tableName,
+ abme);
+ }
+ }
}
/**
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/thrift/TableOperationExceptionType.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/thrift/TableOperationExceptionType.java
index 7764e3a..bd57e1c 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/thrift/TableOperationExceptionType.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/thrift/TableOperationExceptionType.java
@@ -34,7 +34,8 @@ public enum TableOperationExceptionType implements org.apache.thrift.TEnum {
NAMESPACE_EXISTS(7),
NAMESPACE_NOTFOUND(8),
INVALID_NAME(9),
- BULK_BAD_LOAD_MAPPING(10);
+ BULK_BAD_LOAD_MAPPING(10),
+ BULK_CONCURRENT_MERGE(11);
private final int value;
@@ -78,6 +79,8 @@ public enum TableOperationExceptionType implements org.apache.thrift.TEnum {
return INVALID_NAME;
case 10:
return BULK_BAD_LOAD_MAPPING;
+ case 11:
+ return BULK_CONCURRENT_MERGE;
default:
return null;
}
diff --git a/core/src/main/thrift/client.thrift b/core/src/main/thrift/client.thrift
index 91aec2e..85c93b2 100644
--- a/core/src/main/thrift/client.thrift
+++ b/core/src/main/thrift/client.thrift
@@ -52,6 +52,7 @@ enum TableOperationExceptionType {
NAMESPACE_NOTFOUND
INVALID_NAME
BULK_BAD_LOAD_MAPPING
+ BULK_CONCURRENT_MERGE
}
enum ConfigurationType {
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
index b215774..2199e99 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImport.java
@@ -146,12 +146,9 @@ public class PrepBulkImport extends MasterRepo {
}
if (currRange != null || lmi.hasNext()) {
- // a merge happened between the time the mapping was generated and the table lock was
- // acquired
+ // merge happened after the mapping was generated and before the table lock was acquired
throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT,
- TableOperationExceptionType.OTHER, "Concurrent merge happened"); // TODO need to handle
- // this on the client
- // side
+ TableOperationExceptionType.BULK_CONCURRENT_MERGE, "Concurrent merge happened");
}
}
diff --git a/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java b/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java
index 0b6f8e7..80d1615 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/tableOps/bulkVer2/PrepBulkImportTest.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -103,7 +104,8 @@ public class PrepBulkImportTest {
.collect(Collectors.joining(","));
}
- public void runExceptionTest(List<KeyExtent> loadRanges, List<KeyExtent> tabletRanges) {
+ public void runExceptionTest(List<KeyExtent> loadRanges, List<KeyExtent> tabletRanges)
+ throws AccumuloException {
try {
runTest(loadRanges, tabletRanges);
fail("expected " + toRangeStrings(loadRanges) + " to fail against "
@@ -144,7 +146,7 @@ public class PrepBulkImportTest {
}
@Test
- public void testException() {
+ public void testException() throws Exception {
for (List<KeyExtent> loadRanges : powerSet(nke(null, "b"), nke("b", "m"), nke("m", "r"),
nke("r", "v"), nke("v", null))) {