You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2019/09/20 21:00:42 UTC

[GitHub] [accumulo] keith-turner commented on a change in pull request #1367: Retry new Bulk import on merge. Fixes #471

keith-turner commented on a change in pull request #1367: Retry new Bulk import on merge. Fixes #471
URL: https://github.com/apache/accumulo/pull/1367#discussion_r326801456
 
 

 ##########
 File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
 ##########
 @@ -348,54 +348,59 @@ String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,Strin
       throws AccumuloSecurityException, TableExistsException, TableNotFoundException,
       AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
     Long opid = null;
-
-    try {
-      opid = beginFateOperation();
-      executeFateOperation(opid, op, args, opts, !wait);
-      if (!wait) {
-        opid = null;
-        return null;
-      }
-      return waitForFateOperation(opid);
-    } catch (ThriftSecurityException e) {
-      switch (e.getCode()) {
-        case TABLE_DOESNT_EXIST:
-          throw new TableNotFoundException(null, tableOrNamespaceName,
-              "Target table does not exist");
-        case NAMESPACE_DOESNT_EXIST:
-          throw new NamespaceNotFoundException(null, tableOrNamespaceName,
-              "Target namespace does not exist");
-        default:
-          String tableInfo = Tables.getPrintableTableInfoFromName(context, tableOrNamespaceName);
-          throw new AccumuloSecurityException(e.user, e.code, tableInfo, e);
-      }
-    } catch (ThriftTableOperationException e) {
-      switch (e.getType()) {
-        case EXISTS:
-          throw new TableExistsException(e);
-        case NOTFOUND:
-          throw new TableNotFoundException(e);
-        case NAMESPACE_EXISTS:
-          throw new NamespaceExistsException(e);
-        case NAMESPACE_NOTFOUND:
-          throw new NamespaceNotFoundException(e);
-        case OFFLINE:
-          throw new TableOfflineException(
-              Tables.getTableOfflineMsg(context, Tables.getTableId(context, tableOrNamespaceName)));
-        default:
-          throw new AccumuloException(e.description, e);
-      }
-    } catch (Exception e) {
-      throw new AccumuloException(e.getMessage(), e);
-    } finally {
-      Tables.clearCache(context);
-      // always finish table op, even when exception
-      if (opid != null)
-        try {
-          finishFateOperation(opid);
-        } catch (Exception e) {
-          log.warn("Exception thrown while finishing fate table operation", e);
+    // keep retrying bulk import if a concurrent merge happens. all other ops throw error or return
+    while (true) {
+      try {
+        opid = beginFateOperation();
+        executeFateOperation(opid, op, args, opts, !wait);
+        if (!wait) {
+          opid = null;
+          return null;
+        }
+        return waitForFateOperation(opid);
+      } catch (ThriftSecurityException e) {
+        switch (e.getCode()) {
+          case TABLE_DOESNT_EXIST:
+            throw new TableNotFoundException(null, tableOrNamespaceName,
+                "Target table does not exist");
+          case NAMESPACE_DOESNT_EXIST:
+            throw new NamespaceNotFoundException(null, tableOrNamespaceName,
+                "Target namespace does not exist");
+          default:
+            String tableInfo = Tables.getPrintableTableInfoFromName(context, tableOrNamespaceName);
+            throw new AccumuloSecurityException(e.user, e.code, tableInfo, e);
         }
+      } catch (ThriftTableOperationException e) {
+        switch (e.getType()) {
+          case EXISTS:
+            throw new TableExistsException(e);
+          case NOTFOUND:
+            throw new TableNotFoundException(e);
+          case NAMESPACE_EXISTS:
+            throw new NamespaceExistsException(e);
+          case NAMESPACE_NOTFOUND:
+            throw new NamespaceNotFoundException(e);
+          case OFFLINE:
+            throw new TableOfflineException(Tables.getTableOfflineMsg(context,
+                Tables.getTableId(context, tableOrNamespaceName)));
+          case BULK_CONCURRENT_MERGE:
+            log.info("Concurrent merge happened. Retrying Bulk Import to " + e.tableName);
+            break;
 
 Review comment:
   I think this is the wrong place to retry.  I think the load plan needs to be recomputed.  Could throw an internal (non-API) runtime exception here and catch that exception in the bulk code.  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services