You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/11/16 15:54:04 UTC
svn commit: r1542519 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/util/
test/java/org/apache/hadoop/hbase/regionserver/
Author: tedyu
Date: Sat Nov 16 14:54:04 2013
New Revision: 1542519
URL: http://svn.apache.org/r1542519
Log:
HBASE-9949 Fix the race condition between Compaction and StoreScanner.init
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1542519&r1=1542518&r2=1542519&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Sat Nov 16 14:54:04 2013
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.regionserver.StoreScanner.StoreScannerCompactionRace;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -77,6 +78,8 @@ import org.apache.hadoop.hbase.util.Byte
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.InjectionEvent;
+import org.apache.hadoop.hbase.util.InjectionHandler;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -1421,6 +1424,8 @@ public class HStore implements Store {
// scenario that could have happened if continue to hold the lock.
notifyChangedReadersObservers();
// At this point the store will use new files for all scanners.
+ InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] {
+ StoreScannerCompactionRace.BEFORE_SEEK.ordinal()});
// let the archive util decide if we should archive or delete the files
LOG.debug("Removing store files after compaction...");
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1542519&r1=1542518&r2=1542519&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sat Nov 16 14:54:04 2013
@@ -38,10 +38,11 @@ import org.apache.hadoop.hbase.client.Is
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.InjectionEvent;
+import org.apache.hadoop.hbase.util.InjectionHandler;
/**
* Scanner scans both the memstore and the Store. Coalesce KeyValue stream
@@ -100,6 +101,13 @@ public class StoreScanner extends NonLaz
private final long readPt;
+ // used by the injection framework to test race between StoreScanner construction and compaction
+ enum StoreScannerCompactionRace {
+ BEFORE_SEEK,
+ AFTER_SEEK,
+ COMPACT_COMPLETE
+ }
+
/** An internal constructor. */
protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
final NavigableSet<byte[]> columns, long ttl, int minVersions, long readPt) {
@@ -155,6 +163,8 @@ public class StoreScanner extends NonLaz
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
oldestUnexpiredTS);
+ this.store.addChangedReaderObserver(this);
+
// Pass columns to try to filter out unnecessary StoreFiles.
List<KeyValueScanner> scanners = getScannersNoCompaction();
@@ -162,6 +172,8 @@ public class StoreScanner extends NonLaz
// key does not exist, then to the start of the next matching Row).
// Always check bloom filter to optimize the top row seek for delete
// family marker.
+ InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] {
+ StoreScannerCompactionRace.BEFORE_SEEK.ordinal()});
if (explicitColumnQuery && lazySeekEnabledGlobally) {
for (KeyValueScanner scanner : scanners) {
scanner.requestSeek(matcher.getStartKey(), false, true);
@@ -184,8 +196,8 @@ public class StoreScanner extends NonLaz
// Combine all seeked scanners with a heap
heap = new KeyValueHeap(scanners, store.getComparator());
-
- this.store.addChangedReaderObserver(this);
+ InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] {
+ StoreScannerCompactionRace.AFTER_SEEK.ordinal()});
}
/**
@@ -235,9 +247,13 @@ public class StoreScanner extends NonLaz
earliestPutTs, oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow);
}
+ this.store.addChangedReaderObserver(this);
+
// Filter the list of scanners using Bloom filters, time range, TTL, etc.
scanners = selectScannersFrom(scanners);
+ InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] {
+ StoreScannerCompactionRace.BEFORE_SEEK.ordinal()});
// Seek all scanners to the initial key
if (!isParallelSeekEnabled) {
for (KeyValueScanner scanner : scanners) {
@@ -249,6 +265,8 @@ public class StoreScanner extends NonLaz
// Combine all seeked scanners with a heap
heap = new KeyValueHeap(scanners, store.getComparator());
+ InjectionHandler.processEvent(InjectionEvent.STORESCANNER_COMPACTION_RACE, new Object[] {
+ StoreScannerCompactionRace.AFTER_SEEK.ordinal()});
}
/** Constructor for testing. */
@@ -280,6 +298,10 @@ public class StoreScanner extends NonLaz
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS);
+ // In unit tests, the store could be null
+ if (this.store != null) {
+ this.store.addChangedReaderObserver(this);
+ }
// Seek all scanners to the initial key
if (!isParallelSeekEnabled) {
for (KeyValueScanner scanner : scanners) {
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java?rev=1542519&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionEvent.java Sat Nov 16 14:54:04 2013
@@ -0,0 +1,32 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+/**
+ * Enumeration of all injection events.
+ * When defining new events, please PREFIX the name
+ * with the supervised class.
+ *
+ * Please see InjectionHandler.
+ */
+public enum InjectionEvent {
+ // Injection into Store.java
+ STORESCANNER_COMPACTION_RACE
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java?rev=1542519&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/InjectionHandler.java Sat Nov 16 14:54:04 2013
@@ -0,0 +1,171 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The InjectionHandler is an object provided to a class,
+ * which can perform custom actions for JUnit testing.
+ * JUnit test can implement custom version of the handler.
+ * For example, let's say we want to supervise FSImage object:
+ *
+ * <code>
+ * // JUnit test code
+ * class MyInjectionHandler extends InjectionHandler {
+ * protected void _processEvent(InjectionEvent event,
+ * Object... args) {
+ * if (event == InjectionEvent.MY_EVENT) {
+ * LOG.info("Handling my event for fsImage: "
+ * + args[0].toString());
+ * }
+ * }
+ * }
+ *
+ * public void testMyEvent() {
+ * InjectionHandler ih = new MyInjectionHandler();
+ * InjectionHandler.set(ih);
+ * ...
+ *
+ * InjectionHandler.clear();
+ * }
+ *
+ * // supervised code example
+ *
+ * class FSImage {
+ *
+ * private doSomething() {
+ * ...
+ * if (condition1 && InjectionHandler.trueCondition(MY_EVENT1) {
+ * ...
+ * }
+ * if (condition2 || condition3
+ * || InjectionHandler.falseCondition(MY_EVENT1) {
+ * ...
+ * }
+ * ...
+ * InjectionHandler.processEvent(MY_EVENT2, this)
+ * ...
+ * try {
+ * read();
+ * InjectionHandler.processEventIO(MY_EVENT3, this, object);
+ * // might throw an exception when testing
+ * catch (IOEXception) {
+ * LOG.info("Exception")
+ * }
+ * ...
+ * }
+ * ...
+ * }
+ * </code>
+ *
+ * Each unit test should use a unique event type.
+ * The types can be defined by adding them to
+ * InjectionEvent class.
+ *
+ * methods:
+ *
+ * // simulate actions
+ * void processEvent()
+ * // simulate exceptions
+ * void processEventIO() throws IOException
+ *
+ * // simulate conditions
+ * boolean trueCondition()
+ * boolean falseCondition()
+ *
+ * The class implementing InjectionHandler must
+ * override respective protected methods
+ * _processEvent()
+ * _processEventIO()
+ * _trueCondition()
+ * _falseCondition()
+ */
+public class InjectionHandler {
+
+ private static final Log LOG = LogFactory.getLog(InjectionHandler.class);
+
+ // the only handler to which everyone reports
+ private static InjectionHandler handler = new InjectionHandler();
+
+ // can not be instantiated outside, unless a testcase extends it
+ protected InjectionHandler() {}
+
+ // METHODS FOR PRODUCTION CODE
+
+ protected void _processEvent(InjectionEvent event, Object... args) {
+ // by default do nothing
+ }
+
+ protected void _processEventIO(InjectionEvent event, Object... args) throws IOException{
+ // by default do nothing
+ }
+
+ protected boolean _trueCondition(InjectionEvent event, Object... args) {
+ return true; // neutral in conjunction
+ }
+
+ protected boolean _falseCondition(InjectionEvent event, Object... args) {
+ return false; // neutral in alternative
+ }
+
+ ////////////////////////////////////////////////////////////
+
+ /**
+ * Set to the empty/production implementation.
+ */
+ public static void clear() {
+ handler = new InjectionHandler();
+ }
+
+ /**
+ * Set custom implementation of the handler.
+ */
+ public static void set(InjectionHandler custom) {
+ LOG.warn("WARNING: SETTING INJECTION HANDLER" +
+ " - THIS SHOULD NOT BE USED IN PRODUCTION !!!");
+ handler = custom;
+ }
+
+ /*
+ * Static methods for reporting to the handler
+ */
+
+ public static void processEvent(InjectionEvent event, Object... args) {
+ handler._processEvent(event, args);
+ }
+
+ public static void processEventIO(InjectionEvent event, Object... args)
+ throws IOException {
+ handler._processEventIO(event, args);
+ }
+
+ public static boolean trueCondition(InjectionEvent event, Object... args) {
+ return handler._trueCondition(event, args);
+ }
+
+ public static boolean falseCondition(InjectionEvent event, Object... args) {
+ return handler._falseCondition(event, args);
+ }
+}
+
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java?rev=1542519&r1=1542518&r2=1542519&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java Sat Nov 16 14:54:04 2013
@@ -27,18 +27,28 @@ import java.util.Arrays;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import junit.framework.TestCase;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.StoreScanner.StoreScannerCompactionRace;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.InjectionEvent;
+import org.apache.hadoop.hbase.util.InjectionHandler;
+import org.apache.hadoop.hbase.util.Threads;
import org.junit.experimental.categories.Category;
// Can't be small as it plays with EnvironmentEdgeManager
@@ -501,6 +511,110 @@ public class TestStoreScanner extends Te
assertEquals(false, scanner.next(results));
}
+ private class StoreScannerCompactionRaceCondition extends InjectionHandler {
+ final Store store;
+ Boolean beforeSeek = false;
+ Boolean afterSeek = false;
+ Boolean compactionComplete = false;
+ final int waitTime;
+ boolean doneSeeking = false;
+ public Future<Void> f;
+ StoreScannerCompactionRaceCondition(Store s, int waitTime) {
+ this.store = s;
+ this.waitTime = waitTime;
+ }
+
+ protected void _processEvent(InjectionEvent event, Object... args) {
+ if (event == InjectionEvent.STORESCANNER_COMPACTION_RACE) {
+ // To prevent other scanners which are not supposed to be tested from taking this code path.
+ if ((args instanceof Object[]) && (args.length == 1)
+ && (args[0] instanceof Integer)) {
+ StoreScannerCompactionRace sscr = StoreScannerCompactionRace.values()[(Integer)args[0]];
+ switch (sscr) {
+ case BEFORE_SEEK :
+ // Inside StoreScanner ctor before seek.
+ synchronized (beforeSeek) {
+ if (!beforeSeek) {
+ beforeSeek = true;
+ f = Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ StoreScanner.enableLazySeekGlobally(false);
+ ((HStore)store).compactRecentForTestingAssumingDefaultPolicy(
+ store.getStorefiles().size() / 2);
+ StoreScanner.enableLazySeekGlobally(true);
+ return null;
+ }
+ });
+ Threads.sleep(waitTime);
+ }
+ }
+ break;
+ case AFTER_SEEK:
+ // Inside StoreScanner ctor after seek.
+ synchronized (afterSeek) {
+ if (!afterSeek) {
+ afterSeek = true;
+ this.doneSeeking = true;
+ }
+ }
+ break;
+ case COMPACT_COMPLETE:
+ // Inside HStore.completeCompaction
+ synchronized (compactionComplete) {
+ if (!compactionComplete) {
+ compactionComplete = true;
+ assertTrue(doneSeeking);
+ }
+ }
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /*
+ * Verifies that there is no race condition between StoreScanner construction and compaction.
+ * This is done through 3 injection points:
+ * 1. before seek operation in StoreScanner ctor
+ * 2. after seek operation in StoreScanner ctor
+ * 3. after compaction completion
+ */
+ public void testCompactionRaceCondition() throws Exception {
+ HBaseTestingUtility util = new HBaseTestingUtility();
+ util.startMiniCluster(1);
+ byte[] t = Bytes.toBytes("tbl"), cf = Bytes.toBytes("cf");
+ HTable table = util.createTable(t, cf);
+ util.loadTable(table, cf);
+ util.flush();
+ util.loadTable(table, cf);
+ util.flush();
+ List<HRegion> regions = util.getHBaseCluster().getRegions(t);
+ assertTrue(regions.size() == 1);
+ HRegion r = regions.get(0);
+ Store s = r.getStore(cf);
+
+ // Setup the injection handler.
+ StoreScannerCompactionRaceCondition ih =
+ new StoreScannerCompactionRaceCondition(s, 5);
+ InjectionHandler.set(ih);
+
+ // Create a StoreScanner
+ TreeSet<byte[]> set = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ set.add(cf);
+ Scan scanSpec = new Scan();
+ scanSpec.setStartRow(Bytes.toBytes("hjfsd"));
+ scanSpec.setStartRow(Bytes.toBytes("zjfsd"));
+ KeyValueScanner scanner = s.getScanner(scanSpec, set, s.getSmallestReadPoint());
+ ih.f.get();
+
+ // Clear injection handling and shutdown the minicluster.
+ InjectionHandler.clear();
+ scanner.close();
+ util.shutdownMiniCluster();
+ }
+
public void testDeleteMarkerLongevity() throws Exception {
try {
final long now = System.currentTimeMillis();