You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ac...@apache.org on 2020/05/20 04:34:43 UTC
[phoenix] branch 4.x updated: PHOENIX-5899 Index writes and
verifications should contain information of underlying cause of failure
This is an automated email from the ASF dual-hosted git repository.
achouhan pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new dcc5cbc PHOENIX-5899 Index writes and verifications should contain information of underlying cause of failure
dcc5cbc is described below
commit dcc5cbc0836eaa5fdce9fa9c66c46d2cda0db100
Author: Abhishek Singh Chouhan <ab...@salesforce.com>
AuthorDate: Mon May 18 17:42:26 2020 -0700
PHOENIX-5899 Index writes and verifications should contain information of underlying cause of failure
---
.../coprocessor/IndexRebuildRegionScanner.java | 12 ++--
.../phoenix/coprocessor/IndexerRegionScanner.java | 14 ++--
.../phoenix/hbase/index/IndexRegionObserver.java | 2 +-
.../exception/MultiIndexWriteFailureException.java | 17 +++--
.../hbase/index/parallel/BaseTaskRunner.java | 10 ++-
.../phoenix/hbase/index/parallel/TaskRunner.java | 20 +++---
.../TrackingParallelWriterIndexCommitter.java | 47 ++++++++++---
.../java/org/apache/phoenix/util/ServerUtil.java | 11 +++
.../hbase/index/parallel/TestTaskRunner.java | 79 ++++++++++++++++++++++
9 files changed, 176 insertions(+), 36 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 9feb27f..6b4bab1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@@ -855,20 +856,23 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner {
if (keys.size() > 0) {
addVerifyTask(keys, perTaskVerificationPhaseResult);
}
- List<Boolean> taskResultList = null;
+ Pair<List<Boolean>, List<Future<Boolean>>> resultsAndFutures = null;
try {
LOGGER.debug("Waiting on index verify tasks to complete...");
- taskResultList = this.pool.submitUninterruptible(tasks);
+ resultsAndFutures = this.pool.submitUninterruptible(tasks);
} catch (ExecutionException e) {
throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
} catch (EarlyExitFailure e) {
throw new RuntimeException("Stopped while waiting for batch, quitting!", e);
}
- for (Boolean result : taskResultList) {
+ int index = 0;
+ for (Boolean result : resultsAndFutures.getFirst()) {
if (result == null) {
+ Throwable cause = ServerUtil.getExceptionFromFailedFuture(resultsAndFutures.getSecond().get(index));
// there was a failure
- throw new IOException(exceptionMessage);
+ throw new IOException(exceptionMessage, cause);
}
+ index++;
}
for (IndexToolVerificationResult.PhaseResult result : verificationPhaseResultList) {
verificationPhaseResult.add(result);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
index b493729..ad8924e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -48,7 +49,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.ValueGetter;
@@ -269,20 +270,23 @@ public class IndexerRegionScanner extends GlobalIndexRegionScanner {
if (keys.size() > 0) {
addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
}
- List<Boolean> taskResultList = null;
+ Pair<List<Boolean>, List<Future<Boolean>>> resultsAndFutures = null;
try {
LOGGER.debug("Waiting on index verify tasks to complete...");
- taskResultList = this.pool.submitUninterruptible(tasks);
+ resultsAndFutures = this.pool.submitUninterruptible(tasks);
} catch (ExecutionException e) {
throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
} catch (EarlyExitFailure e) {
throw new RuntimeException("Stopped while waiting for batch, quitting!", e);
}
- for (Boolean result : taskResultList) {
+ int index = 0;
+ for (Boolean result : resultsAndFutures.getFirst()) {
if (result == null) {
+ Throwable cause = ServerUtil.getExceptionFromFailedFuture(resultsAndFutures.getSecond().get(index));
// there was a failure
- throw new IOException(exceptionMessage);
+ throw new IOException(exceptionMessage, cause);
}
+ index++;
}
for (IndexToolVerificationResult.PhaseResult result : verificationPhaseResultList) {
verificationPhaseResult.add(result);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 602aba2..a705528 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -1059,7 +1059,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
removePendingRows(context);
context.rowLocks.clear();
if (context.rebuild) {
- throw new IOException(String.format("%s for rebuild", e.getMessage()));
+ throw new IOException(String.format("%s for rebuild", e.getMessage()), e);
} else {
rethrowIndexingException(e);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java
index a14e8a5..472d027 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/exception/MultiIndexWriteFailureException.java
@@ -40,14 +40,21 @@ public class MultiIndexWriteFailureException extends IndexWriteException {
/**
* @param failures the tables to which the index write did not succeed
*/
- public MultiIndexWriteFailureException(List<HTableInterfaceReference> failures, boolean disableIndexOnFailure) {
- super(disableIndexOnFailure);
- this.failures = failures;
+ public MultiIndexWriteFailureException(List<HTableInterfaceReference> failures,
+ boolean disableIndexOnFailure) {
+ super(disableIndexOnFailure);
+ this.failures = failures;
+ }
+
+ public MultiIndexWriteFailureException(List<HTableInterfaceReference> failures,
+ boolean disableIndexOnFailure, Throwable cause) {
+ super(cause, disableIndexOnFailure);
+ this.failures = failures;
}
/**
- * This constructor used to rematerialize this exception when receiving
- * an rpc exception from the server
+ * This constructor used to rematerialize this exception when receiving an rpc exception from the
+ * server
* @param message detail message
*/
public MultiIndexWriteFailureException(String message) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java
index 145c95b..b765170 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java
@@ -18,12 +18,15 @@
package org.apache.phoenix.hbase.index.parallel;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +53,7 @@ public abstract class BaseTaskRunner implements TaskRunner {
}
@Override
- public <R> List<R> submit(TaskBatch<R> tasks) throws CancellationException, ExecutionException,
+ public <R> Pair<List<R>, List<Future<R>>> submit(TaskBatch<R> tasks) throws CancellationException, ExecutionException,
InterruptedException {
// submit each task to the pool and queue it up to be watched
List<ListenableFuture<R>> futures = new ArrayList<ListenableFuture<R>>(tasks.size());
@@ -63,7 +66,7 @@ public abstract class BaseTaskRunner implements TaskRunner {
// advantage of being (1) less code, and (2) supported as part of a library, it is just that
// little bit slower. If push comes to shove, we can revert back to the previous
// implementation, but for right now, this works just fine.
- return submitTasks(futures).get();
+ return Pair.newPair(submitTasks(futures).get(), Collections.unmodifiableList(((List<Future<R>>)(List<?>)futures)));
} catch (CancellationException e) {
// propagate the failure back out
logAndNotifyAbort(e, tasks);
@@ -90,7 +93,7 @@ public abstract class BaseTaskRunner implements TaskRunner {
protected abstract <R> ListenableFuture<List<R>> submitTasks(List<ListenableFuture<R>> futures);
@Override
- public <R> List<R> submitUninterruptible(TaskBatch<R> tasks) throws EarlyExitFailure,
+ public <R> Pair<List<R>, List<Future<R>>> submitUninterruptible(TaskBatch<R> tasks) throws EarlyExitFailure,
ExecutionException {
boolean interrupted = false;
try {
@@ -120,6 +123,7 @@ public abstract class BaseTaskRunner implements TaskRunner {
}
LOGGER.info("Shutting down task runner because " + why);
this.writerPool.shutdownNow();
+ this.stopped = true;
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java
index 003e18f..2581d52 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java
@@ -19,8 +19,10 @@ package org.apache.phoenix.hbase.index.parallel;
import java.util.List;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.util.Pair;
/**
*
@@ -35,13 +37,14 @@ public interface TaskRunner extends Stoppable {
* ignored) and interrupt any running tasks. It is up to the passed tasks to respect the interrupt
* notification
* @param tasks to run
- * @return the result from each task
+ * @return Pair containing ordered List of results from each task and an ordered immutable list of
+ * underlying futures which can be used for getting underlying exceptions
* @throws ExecutionException if any of the tasks fails. Wraps the underyling failure, which can
- * be retrieved via {@link ExecutionException#getCause()}.
+ * be retrieved via {@link ExecutionException#getCause()}.
* @throws InterruptedException if the current thread is interrupted while waiting for the batch
- * to complete
+ * to complete
*/
- public <R> List<R> submit(TaskBatch<R> tasks) throws
+ public <R> Pair<List<R>, List<Future<R>>> submit(TaskBatch<R> tasks) throws
ExecutionException, InterruptedException;
/**
@@ -49,12 +52,13 @@ public interface TaskRunner extends Stoppable {
* waiting for results, we ignore it and only stop is {@link #stop(String)} has been called. On
* return from the method, the interrupt status of the thread is restored.
* @param tasks to run
- * @return the result from each task
+ * @return Pair containing ordered List of results from each task and an ordered immutable list of
+ * underlying futures which can be used for getting underlying exceptions
* @throws EarlyExitFailure if there are still tasks to submit to the pool, but there is a stop
- * notification
+ * notification
* @throws ExecutionException if any of the tasks fails. Wraps the underyling failure, which can
- * be retrieved via {@link ExecutionException#getCause()}.
+ * be retrieved via {@link ExecutionException#getCause()}.
*/
- public <R> List<R> submitUninterruptible(TaskBatch<R> tasks) throws EarlyExitFailure,
+ public <R> Pair<List<R>, List<Future<R>>> submitUninterruptible(TaskBatch<R> tasks) throws EarlyExitFailure,
ExecutionException;
}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
index 7f85aee..7bbe3fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
@@ -18,6 +18,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
@@ -25,6 +26,7 @@ import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.hbase.index.CapturingAbortable;
import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
@@ -46,7 +48,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Multimap;
-
import static org.apache.phoenix.util.ServerUtil.wrapInDoNotRetryIOException;
/**
@@ -214,10 +215,10 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
});
}
- List<Boolean> results = null;
+ Pair<List<Boolean>, List<Future<Boolean>>> resultsAndFutures = null;
try {
LOGGER.debug("Waiting on index update tasks to complete...");
- results = this.pool.submitUninterruptible(tasks);
+ resultsAndFutures = this.pool.submitUninterruptible(tasks);
} catch (ExecutionException e) {
throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
} catch (EarlyExitFailure e) {
@@ -227,25 +228,33 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
// track the failures. We only ever access this on return from our calls, so no extra
// synchronization is needed. We could update all the failures as we find them, but that add a
// lot of locking overhead, and just doing the copy later is about as efficient.
- List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>();
+ List<HTableInterfaceReference> failedTables = new ArrayList<HTableInterfaceReference>();
+ List<Future<Boolean>> failedFutures = new ArrayList<>();
int index = 0;
- for (Boolean result : results) {
+ for (Boolean result : resultsAndFutures.getFirst()) {
// there was a failure
if (result == null) {
// we know which table failed by the index of the result
- failures.add(tables.get(index));
+ failedTables.add(tables.get(index));
+ failedFutures.add(resultsAndFutures.getSecond().get(index));
}
index++;
}
// if any of the tasks failed, then we need to propagate the failure
- if (failures.size() > 0) {
+ if (failedTables.size() > 0) {
+ Throwable cause = logFailedTasksAndGetCause(failedFutures, failedTables);
// make the list unmodifiable to avoid any more synchronization concerns
- MultiIndexWriteFailureException exception = new MultiIndexWriteFailureException(Collections.unmodifiableList(failures),
+ MultiIndexWriteFailureException exception = null;
+ // DisableIndexOnFailure flag is used by the old design. Setting the cause in MIWFE
+ // does not work for old design, so only do this for new design
+ if (disableIndexOnFailure) {
+ exception = new MultiIndexWriteFailureException(Collections.unmodifiableList(failedTables),
disableIndexOnFailure && PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env));
- if (disableIndexOnFailure)
throw exception;
- else {
+ } else {
+ exception = new MultiIndexWriteFailureException(Collections.unmodifiableList(failedTables),
+ disableIndexOnFailure && PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env), cause);
throw wrapInDoNotRetryIOException("At least one index write failed after retries", exception,
EnvironmentEdgeManager.currentTimeMillis());
}
@@ -253,6 +262,24 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
return;
}
+ private Throwable logFailedTasksAndGetCause(List<Future<Boolean>> failedFutures,
+ List<HTableInterfaceReference> failedTables) {
+ int i = 0;
+ Throwable t = null;
+ for (Future<Boolean> future : failedFutures) {
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.warn("Index Write failed for table " + failedTables.get(i), e);
+ if (t == null) {
+ t = e;
+ }
+ }
+ i++;
+ }
+ return t;
+ }
+
@Override
public void stop(String why) {
LOGGER.info("Shutting down " + this.getClass().getSimpleName());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index 39986fb..f1075d5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -450,4 +451,14 @@ public class ServerUtil {
}
+ public static <T> Throwable getExceptionFromFailedFuture(Future<T> f) {
+ Throwable t = null;
+ try {
+ f.get();
+ } catch (Exception e) {
+ t = e;
+ }
+ return t;
+ }
+
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/parallel/TestTaskRunner.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/parallel/TestTaskRunner.java
new file mode 100644
index 0000000..af1e4b8
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/parallel/TestTaskRunner.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.parallel;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Test;
+
+public class TestTaskRunner {
+
+ @Test
+ public void testWaitForCompletionTaskRunner() throws Exception {
+ TaskRunner tr = new WaitForCompletionTaskRunner(Executors.newFixedThreadPool(4));
+ TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(4);
+ for (int i = 0; i < 4; i++) {
+ tasks.add(new EvenNumberFailingTask(i));
+ }
+ Pair<List<Boolean>, List<Future<Boolean>>> resultAndFutures =
+ tr.submitUninterruptible(tasks);
+ List<Boolean> results = resultAndFutures.getFirst();
+ List<Future<Boolean>> futures = resultAndFutures.getSecond();
+ for (int j = 0; j < 4; j++) {
+ if (j % 2 == 0) {
+ assertNull(results.get(j));
+ try {
+ futures.get(j).get();
+ fail("Should have received ExecutionException");
+ } catch (Exception e) {
+ assertTrue(e instanceof ExecutionException);
+ assertTrue(e.getCause().getMessage().equals("Even number task"));
+ }
+ } else {
+ assertTrue(results.get(j));
+ }
+ }
+ }
+
+ private static class EvenNumberFailingTask extends Task<Boolean> {
+ private int num;
+
+ public EvenNumberFailingTask(int i) {
+ this.num = i;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ if (num % 2 == 0) {
+ throw new IOException("Even number task");
+ }
+ return Boolean.TRUE;
+ }
+
+ }
+
+}