You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/03/10 22:10:13 UTC
[33/50] [abbrv] hbase git commit: HBASE-17712 Remove/Simplify the
logic of RegionScannerImpl.handleFileNotFound
HBASE-17712 Remove/Simplify the logic of RegionScannerImpl.handleFileNotFound
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/58c76192
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/58c76192
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/58c76192
Branch: refs/heads/hbase-12439
Commit: 58c76192bdbf1f4863c1c87d165c2e3b9674d4ad
Parents: c42a066
Author: zhangduo <zh...@apache.org>
Authored: Mon Mar 6 21:00:50 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Mar 8 14:39:29 2017 +0800
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 185 +++++++------------
.../hbase/regionserver/HRegionServer.java | 5 +
.../regionserver/RegionServerServices.java | 10 +
.../hbase/regionserver/RegionUnassigner.java | 68 +++++++
.../hbase/regionserver/wal/AsyncFSWAL.java | 12 +-
.../hadoop/hbase/MockRegionServerServices.java | 4 +
.../hadoop/hbase/master/MockRegionServer.java | 4 +
.../TestCompactionInDeadRegionServer.java | 164 ++++++++++++++++
.../TestCorruptedRegionStoreFile.java | 28 ++-
9 files changed, 357 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index be01220..29773c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -202,6 +202,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final String HREGION_MVCC_PRE_ASSIGN = "hbase.hregion.mvcc.preassign";
public static final boolean DEFAULT_HREGION_MVCC_PRE_ASSIGN = true;
+ public static final String HREGION_UNASSIGN_FOR_FNFE = "hbase.hregion.unassign.for.fnfe";
+ public static final boolean DEFAULT_HREGION_UNASSIGN_FOR_FNFE = true;
+
/**
* This is the global default value for durability. All tables/mutations not
* defining a durability or using USE_DEFAULT will default to this value.
@@ -238,9 +241,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
protected volatile long lastReplayedOpenRegionSeqId = -1L;
protected volatile long lastReplayedCompactionSeqId = -1L;
- // collects Map(s) of Store to sequence Id when handleFileNotFound() is involved
- protected List<Map> storeSeqIds = new ArrayList<>();
-
//////////////////////////////////////////////////////////////////////////////
// Members
//////////////////////////////////////////////////////////////////////////////
@@ -642,6 +642,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(
Bytes.BYTES_COMPARATOR);
+ // whether to unassign region if we hit FNFE
+ private final RegionUnassigner regionUnassigner;
/**
* HRegion constructor. This constructor should only be used for testing and
* extensions. Instances of HRegion should be instantiated with the
@@ -800,6 +802,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
false :
conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE,
HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE);
+ boolean unassignForFNFE =
+ conf.getBoolean(HREGION_UNASSIGN_FOR_FNFE, DEFAULT_HREGION_UNASSIGN_FOR_FNFE);
+ if (unassignForFNFE) {
+ this.regionUnassigner = new RegionUnassigner(rsServices, fs.getRegionInfo());
+ } else {
+ this.regionUnassigner = null;
+ }
}
void setHTableSpecificConf() {
@@ -1536,7 +1545,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
status.setStatus("Running coprocessor pre-close hooks");
this.coprocessorHost.preClose(abort);
}
-
status.setStatus("Disabling compacts and flushes for region");
boolean canFlush = true;
synchronized (writestate) {
@@ -1697,7 +1705,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// stop the Compacted hfile discharger
if (this.compactedFileDischarger != null) this.compactedFileDischarger.cancel(true);
-
status.markComplete("Closed");
LOG.info("Closed " + this);
return result;
@@ -5070,15 +5077,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
+ justification = "Notify is about post replay. Intentional")
@Override
public boolean refreshStoreFiles() throws IOException {
- return refreshStoreFiles(false);
- }
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
- justification="Notify is about post replay. Intentional")
- protected boolean refreshStoreFiles(boolean force) throws IOException {
- if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
+ if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
return false; // if primary nothing to do
}
@@ -5110,7 +5113,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// see whether we can drop the memstore or the snapshot
if (storeSeqId > maxSeqIdBefore) {
-
if (writestate.flushing) {
// only drop memstore snapshots if they are smaller than last flush for the store
if (this.prepareFlushResult.flushOpSeqId <= storeSeqId) {
@@ -5150,17 +5152,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
if (!map.isEmpty()) {
- if (!force) {
- for (Map.Entry<Store, Long> entry : map.entrySet()) {
- // Drop the memstore contents if they are now smaller than the latest seen flushed file
- totalFreedDataSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey())
- .getDataSize();
- }
- } else {
- synchronized (storeSeqIds) {
- // don't try to acquire write lock of updatesLock now
- storeSeqIds.add(map);
- }
+ for (Map.Entry<Store, Long> entry : map.entrySet()) {
+ // Drop the memstore contents if they are now smaller than the latest seen flushed file
+ totalFreedDataSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey())
+ .getDataSize();
}
}
// C. Finally notify anyone waiting on memstore to clear:
@@ -5844,12 +5839,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
- KeyValueScanner scanner;
- try {
- scanner = store.getScanner(scan, entry.getValue(), this.readPt);
- } catch (FileNotFoundException e) {
- throw handleFileNotFound(e);
- }
+ KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
instantiatedScanners.add(scanner);
if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
|| this.filter.isFamilyEssential(entry.getKey())) {
@@ -5873,8 +5863,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
+ private void handleFileNotFound(Throwable fnfe) {
+ // Try reopenning the region since we have lost some storefiles.
+ // See HBASE-17712 for more details.
+ LOG.warn("A store file got lost", fnfe);
+ if (regionUnassigner != null) {
+ regionUnassigner.unassign();
+ }
+ }
+
private IOException handleException(List<KeyValueScanner> instantiatedScanners,
Throwable t) {
+ if (t instanceof FileNotFoundException) {
+ handleFileNotFound(t);
+ }
// remove scaner read point before throw the exception
scannerReadPoints.remove(this);
if (storeHeap != null) {
@@ -5957,14 +5959,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
throw new UnknownScannerException("Scanner was closed");
}
boolean moreValues = false;
- if (outResults.isEmpty()) {
- // Usually outResults is empty. This is true when next is called
- // to handle scan or get operation.
- moreValues = nextInternal(outResults, scannerContext);
- } else {
- List<Cell> tmpList = new ArrayList<>();
- moreValues = nextInternal(tmpList, scannerContext);
- outResults.addAll(tmpList);
+ try {
+ if (outResults.isEmpty()) {
+ // Usually outResults is empty. This is true when next is called
+ // to handle scan or get operation.
+ moreValues = nextInternal(outResults, scannerContext);
+ } else {
+ List<Cell> tmpList = new ArrayList<Cell>();
+ moreValues = nextInternal(tmpList, scannerContext);
+ outResults.addAll(tmpList);
+ }
+ } catch (FileNotFoundException e) {
+ handleFileNotFound(e);
+ throw e;
}
// If the size limit was reached it means a partial Result is being returned. Returning a
@@ -6014,33 +6021,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
boolean tmpKeepProgress = scannerContext.getKeepProgress();
// Scanning between column families and thus the scope is between cells
LimitScope limitScope = LimitScope.BETWEEN_CELLS;
- try {
- do {
- // We want to maintain any progress that is made towards the limits while scanning across
- // different column families. To do this, we toggle the keep progress flag on during calls
- // to the StoreScanner to ensure that any progress made thus far is not wiped away.
- scannerContext.setKeepProgress(true);
- heap.next(results, scannerContext);
- scannerContext.setKeepProgress(tmpKeepProgress);
-
- nextKv = heap.peek();
- moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
- if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);
- if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
- return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
- } else if (scannerContext.checkSizeLimit(limitScope)) {
- ScannerContext.NextState state =
- moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED;
- return scannerContext.setScannerState(state).hasMoreValues();
- } else if (scannerContext.checkTimeLimit(limitScope)) {
- ScannerContext.NextState state =
- moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED;
- return scannerContext.setScannerState(state).hasMoreValues();
- }
- } while (moreCellsInRow);
- } catch (FileNotFoundException e) {
- throw handleFileNotFound(e);
- }
+ do {
+ // We want to maintain any progress that is made towards the limits while scanning across
+ // different column families. To do this, we toggle the keep progress flag on during calls
+ // to the StoreScanner to ensure that any progress made thus far is not wiped away.
+ scannerContext.setKeepProgress(true);
+ heap.next(results, scannerContext);
+ scannerContext.setKeepProgress(tmpKeepProgress);
+
+ nextKv = heap.peek();
+ moreCellsInRow = moreCellsInRow(nextKv, currentRowCell);
+ if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext);
+ if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) {
+ return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
+ } else if (scannerContext.checkSizeLimit(limitScope)) {
+ ScannerContext.NextState state =
+ moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
+ return scannerContext.setScannerState(state).hasMoreValues();
+ } else if (scannerContext.checkTimeLimit(limitScope)) {
+ ScannerContext.NextState state =
+ moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
+ return scannerContext.setScannerState(state).hasMoreValues();
+ }
+ } while (moreCellsInRow);
return nextKv != null;
}
@@ -6389,35 +6392,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
result = this.joinedHeap.requestSeek(kv, true, true) || result;
}
} catch (FileNotFoundException e) {
- throw handleFileNotFound(e);
+ handleFileNotFound(e);
+ throw e;
} finally {
closeRegionOperation();
}
return result;
}
- private IOException handleFileNotFound(FileNotFoundException fnfe) throws IOException {
- // tries to refresh the store files, otherwise shutdown the RS.
- // TODO: add support for abort() of a single region and trigger reassignment.
- try {
- region.refreshStoreFiles(true);
- return new IOException("unable to read store file");
- } catch (IOException e) {
- String msg = "a store file got lost: " + fnfe.getMessage();
- LOG.error("unable to refresh store files", e);
- abortRegionServer(msg);
- return new NotServingRegionException(
- getRegionInfo().getRegionNameAsString() + " is closing");
- }
- }
-
- private void abortRegionServer(String msg) throws IOException {
- if (rsServices instanceof HRegionServer) {
- ((HRegionServer)rsServices).abort(msg);
- }
- throw new UnsupportedOperationException("not able to abort RS after: " + msg);
- }
-
@Override
public void shipped() throws IOException {
if (storeHeap != null) {
@@ -7220,29 +7202,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
- // dropMemstoreContentsForSeqId() would acquire write lock of updatesLock
- // We perform this operation outside of the read lock of updatesLock to avoid dead lock
- // See HBASE-16304
- @SuppressWarnings("unchecked")
- private void dropMemstoreContents() throws IOException {
- MemstoreSize totalFreedSize = new MemstoreSize();
- while (!storeSeqIds.isEmpty()) {
- Map<Store, Long> map = null;
- synchronized (storeSeqIds) {
- if (storeSeqIds.isEmpty()) break;
- map = storeSeqIds.remove(storeSeqIds.size()-1);
- }
- for (Map.Entry<Store, Long> entry : map.entrySet()) {
- // Drop the memstore contents if they are now smaller than the latest seen flushed file
- totalFreedSize
- .incMemstoreSize(dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey()));
- }
- }
- if (totalFreedSize.getDataSize() > 0) {
- LOG.debug("Freed " + totalFreedSize.getDataSize() + " bytes from memstore");
- }
- }
-
@Override
public Result increment(Increment mutation, long nonceGroup, long nonce)
throws IOException {
@@ -7306,10 +7265,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
writeEntry = null;
} finally {
this.updatesLock.readLock().unlock();
- // For increment/append, a region scanner for doing a get operation could throw
- // FileNotFoundException. So we call dropMemstoreContents() in finally block
- // after releasing read lock
- dropMemstoreContents();
}
// If results is null, then client asked that we not return the calculated results.
return results != null && returnResults? Result.create(results): Result.EMPTY_RESULT;
http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index cbf6561..be4cca0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -3651,4 +3651,9 @@ public class HRegionServer extends HasThread implements
return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator())
.regionLock(regionInfos, description, abort);
}
+
+ @Override
+ public void unassign(byte[] regionName) throws IOException {
+ clusterConnection.getAdmin().unassign(regionName, false);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index c92124c..3382263 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -273,4 +273,14 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
*/
EntityLock regionLock(List<HRegionInfo> regionInfos, String description,
Abortable abort) throws IOException;
+
+ /**
+ * Unassign the given region from the current regionserver and assign it randomly. Could still be
+ * assigned to us. This is used to solve some tough problems for which you need to reset the state
+ * of a region. For example, if you hit FileNotFound exception and want to refresh the store file
+ * list.
+ * <p>
+ * See HBASE-17712 for more details.
+ */
+ void unassign(byte[] regionName) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java
new file mode 100644
index 0000000..b347b4b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Used to unssign a region when we hit FNFE.
+ */
+@InterfaceAudience.Private
+class RegionUnassigner {
+
+ private static final Log LOG = LogFactory.getLog(RegionUnassigner.class);
+
+ private final RegionServerServices rsServices;
+
+ private final HRegionInfo regionInfo;
+
+ private boolean unassigning = false;
+
+ RegionUnassigner(RegionServerServices rsServices, HRegionInfo regionInfo) {
+ this.rsServices = rsServices;
+ this.regionInfo = regionInfo;
+ }
+
+ synchronized void unassign() {
+ if (unassigning) {
+ return;
+ }
+ unassigning = true;
+ new Thread("Unassign-" + regionInfo) {
+
+ @Override
+ public void run() {
+ LOG.info("Unassign " + regionInfo.getRegionNameAsString());
+ try {
+ rsServices.unassign(regionInfo.getRegionName());
+ } catch (IOException e) {
+ LOG.warn("Unassigned " + regionInfo.getRegionNameAsString() + " failed", e);
+ } finally {
+ synchronized (RegionUnassigner.this) {
+ unassigning = false;
+ }
+ }
+ }
+ }.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index c3e96cf..69ca1c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -615,7 +615,15 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
break;
}
} else {
- throw e.unwrapRemoteException();
+ IOException ioe = e.unwrapRemoteException();
+ // this usually means master already think we are dead so let's fail all the pending
+ // syncs. The shutdown process of RS will wait for all regions to be closed before calling
+ // WAL.close so if we do not wake up the thread blocked by sync here it will cause dead
+ // lock.
+ if (e.getMessage().contains("Parent directory doesn't exist:")) {
+ syncFutures.forEach(f -> f.done(f.getTxid(), ioe));
+ }
+ throw ioe;
}
} catch (NameNodeException e) {
throw e;
@@ -696,6 +704,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
this.writer.close();
this.writer = null;
closeExecutor.shutdown();
+ IOException error = new IOException("WAL has been closed");
+ syncFutures.forEach(f -> f.done(f.getTxid(), error));
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 7740e66..81b3489 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -356,4 +356,8 @@ public class MockRegionServerServices implements RegionServerServices {
public SecureBulkLoadManager getSecureBulkLoadManager() {
return null;
}
+
+ @Override
+ public void unassign(byte[] regionName) throws IOException {
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index a5fe952..b653e3f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -716,4 +716,8 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
public SecureBulkLoadManager getSecureBulkLoadManager() {
return null;
}
+
+ @Override
+ public void unassign(byte[] regionName) throws IOException {
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java
new file mode 100644
index 0000000..ae81a4b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.YouAreDeadException;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSHLogProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * This testcase is used to ensure that the compaction marker will fail a compaction if the RS is
+ * already dead. It can not eliminate FNFE when scanning but it does reduce the possibility a lot.
+ */
+@RunWith(Parameterized.class)
+@Category({ RegionServerTests.class, LargeTests.class })
+public class TestCompactionInDeadRegionServer {
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static final TableName TABLE_NAME = TableName.valueOf("test");
+
+ private static final byte[] CF = Bytes.toBytes("cf");
+
+ private static final byte[] CQ = Bytes.toBytes("cq");
+
+ public static final class IgnoreYouAreDeadRS extends HRegionServer {
+
+ public IgnoreYouAreDeadRS(Configuration conf) throws IOException, InterruptedException {
+ super(conf);
+ }
+
+ public IgnoreYouAreDeadRS(Configuration conf, CoordinatedStateManager csm) throws IOException {
+ super(conf, csm);
+ }
+
+ @Override
+ protected void tryRegionServerReport(long reportStartTime, long reportEndTime)
+ throws IOException {
+ try {
+ super.tryRegionServerReport(reportStartTime, reportEndTime);
+ } catch (YouAreDeadException e) {
+ // ignore, do not abort
+ }
+ }
+ }
+
+ @Parameter
+ public Class<? extends WALProvider> walProvider;
+
+ @Parameters(name = "{index}: wal={0}")
+ public static List<Object[]> params() {
+ return Arrays.asList(new Object[] { FSHLogProvider.class },
+ new Object[] { AsyncFSWALProvider.class });
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, walProvider, WALProvider.class);
+ UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, 2000);
+ UTIL.getConfiguration().setClass(HConstants.REGION_SERVER_IMPL, IgnoreYouAreDeadRS.class,
+ HRegionServer.class);
+ UTIL.startMiniCluster(2);
+ Table table = UTIL.createTable(TABLE_NAME, CF);
+ for (int i = 0; i < 10; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ UTIL.getAdmin().flush(TABLE_NAME);
+ for (int i = 10; i < 20; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ UTIL.getAdmin().flush(TABLE_NAME);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void test() throws Exception {
+ HRegionServer rsToSuspend = UTIL.getRSForFirstRegionInTable(TABLE_NAME);
+ HRegion region = (HRegion) rsToSuspend.getOnlineRegions(TABLE_NAME).get(0);
+ ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher();
+ watcher.getRecoverableZooKeeper().delete(
+ ZKUtil.joinZNode(watcher.getZNodePaths().rsZNode, rsToSuspend.getServerName().toString()),
+ -1);
+ UTIL.waitFor(60000, 1000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ for (RegionServerThread thread : UTIL.getHBaseCluster().getRegionServerThreads()) {
+ HRegionServer rs = thread.getRegionServer();
+ if (rs != rsToSuspend) {
+ return !rs.getOnlineRegions(TABLE_NAME).isEmpty();
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "The region for " + TABLE_NAME + " is still on " + rsToSuspend.getServerName();
+ }
+ });
+ try {
+ region.compact(true);
+ fail("Should fail as our wal file has already been closed, " +
+ "and walDir has also been renamed");
+ } catch (Exception e) {
+ // expected
+ }
+ Table table = UTIL.getConnection().getTable(TABLE_NAME);
+ // should not hit FNFE
+ for (int i = 0; i < 20; i++) {
+ assertEquals(i, Bytes.toInt(table.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/58c76192/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
index cec5fc7..4264863 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
@@ -18,12 +18,14 @@
package org.apache.hadoop.hbase.regionserver;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -133,6 +135,23 @@ public class TestCorruptedRegionStoreFile {
}
}
+ private void removeStoreFile(FileSystem fs, Path tmpStoreFilePath) throws Exception {
+ try (FSDataInputStream input = fs.open(storeFiles.get(0))) {
+ fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+ LOG.info("Move file to local");
+ evictHFileCache(storeFiles.get(0));
+ // make sure that all the replicas have been deleted on DNs.
+ for (;;) {
+ try {
+ input.read(0, new byte[1], 0, 1);
+ } catch (FileNotFoundException e) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ }
+ }
+
@Test(timeout=180000)
public void testLosingFileDuringScan() throws Exception {
assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
@@ -148,9 +167,7 @@ public class TestCorruptedRegionStoreFile {
public void beforeScanNext(Table table) throws Exception {
// move the path away (now the region is corrupted)
if (hasFile) {
- fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
- LOG.info("Move file to local");
- evictHFileCache(storeFiles.get(0));
+ removeStoreFile(fs, tmpStoreFilePath);
hasFile = false;
}
}
@@ -174,9 +191,7 @@ public class TestCorruptedRegionStoreFile {
public void beforeScan(Table table, Scan scan) throws Exception {
// move the path away (now the region is corrupted)
if (hasFile) {
- fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
- LOG.info("Move file to local");
- evictHFileCache(storeFiles.get(0));
+ removeStoreFile(fs, tmpStoreFilePath);
hasFile = false;
}
}
@@ -201,7 +216,6 @@ public class TestCorruptedRegionStoreFile {
HRegionServer rs = rst.getRegionServer();
rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName());
}
- Thread.sleep(6000);
}
private int fullScanAndCount(final TableName tableName) throws Exception {