You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ac...@apache.org on 2020/05/20 04:34:43 UTC

[phoenix] branch 4.x updated: PHOENIX-5899 Index writes and verifications should contain information of underlying cause of failure

This is an automated email from the ASF dual-hosted git repository.

achouhan pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x by this push:
     new dcc5cbc  PHOENIX-5899 Index writes and verifications should contain information of underlying cause of failure
dcc5cbc is described below

commit dcc5cbc0836eaa5fdce9fa9c66c46d2cda0db100
Author: Abhishek Singh Chouhan <ab...@salesforce.com>
AuthorDate: Mon May 18 17:42:26 2020 -0700

    PHOENIX-5899 Index writes and verifications should contain information of underlying cause of failure
---
 .../coprocessor/IndexRebuildRegionScanner.java     | 12 ++--
 .../phoenix/coprocessor/IndexerRegionScanner.java  | 14 ++--
 .../phoenix/hbase/index/IndexRegionObserver.java   |  2 +-
 .../exception/MultiIndexWriteFailureException.java | 17 +++--
 .../hbase/index/parallel/BaseTaskRunner.java       | 10 ++-
 .../phoenix/hbase/index/parallel/TaskRunner.java   | 20 +++---
 .../TrackingParallelWriterIndexCommitter.java      | 47 ++++++++++---
 .../java/org/apache/phoenix/util/ServerUtil.java   | 11 +++
 .../hbase/index/parallel/TestTaskRunner.java       | 79 ++++++++++++++++++++++
 9 files changed, 176 insertions(+), 36 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 9feb27f..6b4bab1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -855,20 +856,23 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner {
         if (keys.size() > 0) {
             addVerifyTask(keys, perTaskVerificationPhaseResult);
         }
-        List<Boolean> taskResultList = null;
+        Pair<List<Boolean>, List<Future<Boolean>>> resultsAndFutures = null;
         try {
             LOGGER.debug("Waiting on index verify tasks to complete...");
-            taskResultList = this.pool.submitUninterruptible(tasks);
+            resultsAndFutures = this.pool.submitUninterruptible(tasks);
         } catch (ExecutionException e) {
             throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
         } catch (EarlyExitFailure e) {
             throw new RuntimeException("Stopped while waiting for batch, quitting!", e);
         }
-        for (Boolean result : taskResultList) {
+        int index = 0;
+        for (Boolean result : resultsAndFutures.getFirst()) {
             if (result == null) {
+                Throwable cause = ServerUtil.getExceptionFromFailedFuture(resultsAndFutures.getSecond().get(index));
                 // there was a failure
-                throw new IOException(exceptionMessage);
+                throw new IOException(exceptionMessage, cause);
             }
+            index++;
         }
         for (IndexToolVerificationResult.PhaseResult result : verificationPhaseResultList) {
             verificationPhaseResult.add(result);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
index b493729..ad8924e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -48,7 +49,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.ValueGetter;
@@ -269,20 +270,23 @@ public class IndexerRegionScanner extends GlobalIndexRegionScanner {
         if (keys.size() > 0) {
             addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
         }
-        List<Boolean> taskResultList = null;
+        Pair<List<Boolean>, List<Future<Boolean>>> resultsAndFutures = null;
         try {
             LOGGER.debug("Waiting on index verify tasks to complete...");
-            taskResultList = this.pool.submitUninterruptible(tasks);
+            resultsAndFutures = this.pool.submitUninterruptible(tasks);
         } catch (ExecutionException e) {
             throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
         } catch (EarlyExitFailure e) {
             throw new RuntimeException("Stopped while waiting for batch, quitting!", e);
         }
-        for (Boolean result : taskResultList) {
+        int index = 0;
+        for (Boolean result : resultsAndFutures.getFirst()) {
             if (result == null) {
+                Throwable cause = ServerUtil.getExceptionFromFailedFuture(resultsAndFutures.getSecond().get(index));
                 // there was a failure
-                throw new IOException(exceptionMessage);
+                throw new IOException(exceptionMessage, cause);
             }
+            index++;
         }
         for (IndexToolVerificationResult.PhaseResult result : verificationPhaseResultList) {
             verificationPhaseResult.add(result);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 602aba2..a705528 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -1059,7 +1059,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
           removePendingRows(context);
           context.rowLocks.clear();
           if (context.rebuild) {
-              throw new IOException(String.format("%s for rebuild", e.getMessage()));
+              throw new IOException(String.format("%s for rebuild", e.getMessage()), e);
           } else {
               rethrowIndexingException(e);
           }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java
index a14e8a5..472d027 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java
@@ -40,14 +40,21 @@ public class MultiIndexWriteFailureException extends IndexWriteException {
   /**
    * @param failures the tables to which the index write did not succeed
    */
-  public MultiIndexWriteFailureException(List<HTableInterfaceReference> failures, boolean disableIndexOnFailure) {
-    super(disableIndexOnFailure);
-    this.failures = failures;
+  public MultiIndexWriteFailureException(List<HTableInterfaceReference> failures,
+          boolean disableIndexOnFailure) {
+      super(disableIndexOnFailure);
+      this.failures = failures;
+  }
+
+  public MultiIndexWriteFailureException(List<HTableInterfaceReference> failures,
+          boolean disableIndexOnFailure, Throwable cause) {
+      super(cause, disableIndexOnFailure);
+      this.failures = failures;
   }
 
   /**
-   * This constructor used to rematerialize this exception when receiving
-   * an rpc exception from the server
+   * This constructor used to rematerialize this exception when receiving an rpc exception from the
+   * server
    * @param message detail message
    */
   public MultiIndexWriteFailureException(String message) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java
index 145c95b..b765170 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java
@@ -18,12 +18,15 @@
 package org.apache.phoenix.hbase.index.parallel;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +53,7 @@ public abstract class BaseTaskRunner implements TaskRunner {
   }
 
   @Override
-  public <R> List<R> submit(TaskBatch<R> tasks) throws CancellationException, ExecutionException,
+  public <R> Pair<List<R>, List<Future<R>>> submit(TaskBatch<R> tasks) throws CancellationException, ExecutionException,
       InterruptedException {
     // submit each task to the pool and queue it up to be watched
     List<ListenableFuture<R>> futures = new ArrayList<ListenableFuture<R>>(tasks.size());
@@ -63,7 +66,7 @@ public abstract class BaseTaskRunner implements TaskRunner {
       // advantage of being (1) less code, and (2) supported as part of a library, it is just that
       // little bit slower. If push comes to shove, we can revert back to the previous
       // implementation, but for right now, this works just fine.
-      return submitTasks(futures).get();
+      return Pair.newPair(submitTasks(futures).get(), Collections.unmodifiableList(((List<Future<R>>)(List<?>)futures)));
     } catch (CancellationException e) {
       // propagate the failure back out
       logAndNotifyAbort(e, tasks);
@@ -90,7 +93,7 @@ public abstract class BaseTaskRunner implements TaskRunner {
   protected abstract <R> ListenableFuture<List<R>> submitTasks(List<ListenableFuture<R>> futures);
 
   @Override
-  public <R> List<R> submitUninterruptible(TaskBatch<R> tasks) throws EarlyExitFailure,
+  public <R> Pair<List<R>, List<Future<R>>> submitUninterruptible(TaskBatch<R> tasks) throws EarlyExitFailure,
       ExecutionException {
     boolean interrupted = false;
     try {
@@ -120,6 +123,7 @@ public abstract class BaseTaskRunner implements TaskRunner {
     }
     LOGGER.info("Shutting down task runner because " + why);
     this.writerPool.shutdownNow();
+    this.stopped = true;
   }
 
   @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java
index 003e18f..2581d52 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java
@@ -19,8 +19,10 @@ package org.apache.phoenix.hbase.index.parallel;
 
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  *
@@ -35,13 +37,14 @@ public interface TaskRunner extends Stoppable {
    * ignored) and interrupt any running tasks. It is up to the passed tasks to respect the interrupt
    * notification
    * @param tasks to run
-   * @return the result from each task
+   * @return Pair containing ordered List of results from each task and an ordered immutable list of
+   *         underlying futures which can be used for getting underlying exceptions
    * @throws ExecutionException if any of the tasks fails. Wraps the underyling failure, which can
-   *           be retrieved via {@link ExecutionException#getCause()}.
+   *             be retrieved via {@link ExecutionException#getCause()}.
    * @throws InterruptedException if the current thread is interrupted while waiting for the batch
-   *           to complete
+   *             to complete
    */
-  public <R> List<R> submit(TaskBatch<R> tasks) throws
+  public <R> Pair<List<R>, List<Future<R>>> submit(TaskBatch<R> tasks) throws
       ExecutionException, InterruptedException;
 
   /**
@@ -49,12 +52,13 @@ public interface TaskRunner extends Stoppable {
    * waiting for results, we ignore it and only stop is {@link #stop(String)} has been called. On
    * return from the method, the interrupt status of the thread is restored.
    * @param tasks to run
-   * @return the result from each task
+   * @return Pair containing ordered List of results from each task and an ordered immutable list of
+   *         underlying futures which can be used for getting underlying exceptions
    * @throws EarlyExitFailure if there are still tasks to submit to the pool, but there is a stop
-   *           notification
+   *             notification
    * @throws ExecutionException if any of the tasks fails. Wraps the underyling failure, which can
-   *           be retrieved via {@link ExecutionException#getCause()}.
+   *             be retrieved via {@link ExecutionException#getCause()}.
    */
-  public <R> List<R> submitUninterruptible(TaskBatch<R> tasks) throws EarlyExitFailure,
+  public <R> Pair<List<R>, List<Future<R>>> submitUninterruptible(TaskBatch<R> tasks) throws EarlyExitFailure,
       ExecutionException;
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
index 7f85aee..7bbe3fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
@@ -18,6 +18,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
@@ -25,6 +26,7 @@ import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.hbase.index.CapturingAbortable;
 import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
@@ -46,7 +48,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Multimap;
-
 import static org.apache.phoenix.util.ServerUtil.wrapInDoNotRetryIOException;
 
 /**
@@ -214,10 +215,10 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
             });
         }
 
-        List<Boolean> results = null;
+        Pair<List<Boolean>, List<Future<Boolean>>> resultsAndFutures = null;
         try {
             LOGGER.debug("Waiting on index update tasks to complete...");
-            results = this.pool.submitUninterruptible(tasks);
+            resultsAndFutures = this.pool.submitUninterruptible(tasks);
         } catch (ExecutionException e) {
             throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
         } catch (EarlyExitFailure e) {
@@ -227,25 +228,33 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
         // track the failures. We only ever access this on return from our calls, so no extra
         // synchronization is needed. We could update all the failures as we find them, but that add a
         // lot of locking overhead, and just doing the copy later is about as efficient.
-        List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>();
+        List<HTableInterfaceReference> failedTables = new ArrayList<HTableInterfaceReference>();
+        List<Future<Boolean>> failedFutures = new ArrayList<>();
         int index = 0;
-        for (Boolean result : results) {
+        for (Boolean result : resultsAndFutures.getFirst()) {
             // there was a failure
             if (result == null) {
                 // we know which table failed by the index of the result
-                failures.add(tables.get(index));
+                failedTables.add(tables.get(index));
+                failedFutures.add(resultsAndFutures.getSecond().get(index));
             }
             index++;
         }
 
         // if any of the tasks failed, then we need to propagate the failure
-        if (failures.size() > 0) {
+        if (failedTables.size() > 0) {
+            Throwable cause = logFailedTasksAndGetCause(failedFutures, failedTables);
             // make the list unmodifiable to avoid any more synchronization concerns
-            MultiIndexWriteFailureException exception = new MultiIndexWriteFailureException(Collections.unmodifiableList(failures),
+            MultiIndexWriteFailureException exception = null;
+            // DisableIndexOnFailure flag is used by the old design. Setting the cause in MIWFE
+            // does not work for old design, so only do this for new design
+            if (disableIndexOnFailure) {
+                exception = new MultiIndexWriteFailureException(Collections.unmodifiableList(failedTables),
                     disableIndexOnFailure && PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env));
-            if (disableIndexOnFailure)
                 throw exception;
-            else {
+            } else {
+                exception = new MultiIndexWriteFailureException(Collections.unmodifiableList(failedTables),
+                    disableIndexOnFailure && PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env), cause);
                 throw wrapInDoNotRetryIOException("At least one index write failed after retries", exception,
                         EnvironmentEdgeManager.currentTimeMillis());
             }
@@ -253,6 +262,24 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
         return;
     }
 
+    private Throwable logFailedTasksAndGetCause(List<Future<Boolean>> failedFutures,
+            List<HTableInterfaceReference> failedTables) {
+        int i = 0;
+        Throwable t = null;
+        for (Future<Boolean> future : failedFutures) {
+            try {
+                future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                LOGGER.warn("Index Write failed for table " + failedTables.get(i), e);
+                if (t == null) {
+                    t = e;
+                }
+            }
+            i++;
+        }
+        return t;
+    }
+
     @Override
     public void stop(String why) {
         LOGGER.info("Shutting down " + this.getClass().getSimpleName());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index 39986fb..f1075d5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -450,4 +451,14 @@ public class ServerUtil {
 
     }
 
+    public static <T> Throwable getExceptionFromFailedFuture(Future<T> f) {
+        Throwable t = null;
+        try {
+            f.get();
+        } catch (Exception e) {
+            t = e;
+        }
+        return t;
+    }
+
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/parallel/TestTaskRunner.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/parallel/TestTaskRunner.java
new file mode 100644
index 0000000..af1e4b8
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/parallel/TestTaskRunner.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.parallel;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Test;
+
+public class TestTaskRunner {
+
+    @Test
+    public void testWaitForCompletionTaskRunner() throws Exception {
+        TaskRunner tr = new WaitForCompletionTaskRunner(Executors.newFixedThreadPool(4));
+        TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(4);
+        for (int i = 0; i < 4; i++) {
+            tasks.add(new EvenNumberFailingTask(i));
+        }
+        Pair<List<Boolean>, List<Future<Boolean>>> resultAndFutures =
+                tr.submitUninterruptible(tasks);
+        List<Boolean> results = resultAndFutures.getFirst();
+        List<Future<Boolean>> futures = resultAndFutures.getSecond();
+        for (int j = 0; j < 4; j++) {
+            if (j % 2 == 0) {
+                assertNull(results.get(j));
+                try {
+                    futures.get(j).get();
+                    fail("Should have received ExecutionException");
+                } catch (Exception e) {
+                    assertTrue(e instanceof ExecutionException);
+                    assertTrue(e.getCause().getMessage().equals("Even number task"));
+                }
+            } else {
+                assertTrue(results.get(j));
+            }
+        }
+    }
+
+    private static class EvenNumberFailingTask extends Task<Boolean> {
+        private int num;
+
+        public EvenNumberFailingTask(int i) {
+            this.num = i;
+        }
+
+        @Override
+        public Boolean call() throws Exception {
+            if (num % 2 == 0) {
+                throw new IOException("Even number task");
+            }
+            return Boolean.TRUE;
+        }
+
+    }
+
+}