You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/02/27 00:09:30 UTC
[09/37] hbase git commit: HBASE-15016 Services a Store needs from a
Region
HBASE-15016 Services a Store needs from a Region
Signed-off-by: stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/876a6ab7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/876a6ab7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/876a6ab7
Branch: refs/heads/hbase-12439
Commit: 876a6ab73ecff71b9b4010a532272474ea241daf
Parents: 28cd48b
Author: eshcar <es...@yahoo-inc.com>
Authored: Wed Feb 24 09:56:25 2016 +0200
Committer: stack <st...@apache.org>
Committed: Wed Feb 24 07:07:07 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/util/ClassSize.java | 4 +
.../hbase/regionserver/DefaultMemStore.java | 4 +
.../hadoop/hbase/regionserver/HMobStore.java | 3 +
.../hadoop/hbase/regionserver/HRegion.java | 93 +++++++++++++++-----
.../hadoop/hbase/regionserver/HStore.java | 4 +
.../hadoop/hbase/regionserver/MemStore.java | 7 ++
.../hadoop/hbase/regionserver/Region.java | 12 +--
.../regionserver/RegionServicesForStores.java | 53 +++++++++++
.../apache/hadoop/hbase/regionserver/Store.java | 8 ++
.../org/apache/hadoop/hbase/TestIOFencing.java | 10 ++-
10 files changed, 165 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/876a6ab7/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
index 77acf9b..fdd0fae 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java
@@ -110,6 +110,8 @@ public class ClassSize {
/** Overhead for CellSkipListSet */
public static final int CELL_SKIPLIST_SET;
+ public static final int STORE_SERVICES;
+
/* Are we running on jdk7? */
private static final boolean JDK7;
static {
@@ -193,6 +195,8 @@ public class ClassSize {
TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2);
CELL_SKIPLIST_SET = align(OBJECT + REFERENCE);
+
+ STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/876a6ab7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 82d40b6..92bb7b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -162,6 +162,10 @@ public class DefaultMemStore extends AbstractMemStore {
return;
}
+ @Override
+ public void finalizeFlush() {
+ }
+
/**
* Code to help figure if our approximation of object heap sizes is close
* enough. See hbase-900. Fills memstores then waits so user can heap
http://git-wip-us.apache.org/repos/asf/hbase/blob/876a6ab7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index d666db5..7b44338 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -511,6 +511,9 @@ public class HMobStore extends HStore {
}
}
+ @Override public void finalizeFlush() {
+ }
+
public void updateCellsCountCompactedToMob(long count) {
cellsCountCompactedToMob += count;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/876a6ab7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 0d5a71e..b70a4c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -17,6 +17,20 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.TextFormat;
+
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -181,20 +195,6 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Closeables;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-import com.google.protobuf.TextFormat;
-
@SuppressWarnings("deprecation")
@InterfaceAudience.Private
public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
@@ -258,6 +258,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
private final AtomicLong memstoreSize = new AtomicLong(0);
+ private final RegionServicesForStores regionServicesForStores = new RegionServicesForStores(this);
// Debug possible data loss due to WAL off
final Counter numMutationsWithoutWAL = new Counter();
@@ -999,6 +1000,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return false;
}
+ public void blockUpdates() {
+ this.updatesLock.writeLock().lock();
+ }
+
+ public void unblockUpdates() {
+ this.updatesLock.writeLock().unlock();
+ }
+
@Override
public HDFSBlocksDistribution getHDFSBlocksDistribution() {
HDFSBlocksDistribution hdfsBlocksDistribution =
@@ -1116,6 +1125,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return memstoreSize.get();
}
+ public RegionServicesForStores getRegionServicesForStores() {
+ return regionServicesForStores;
+ }
+
@Override
public long getNumMutationsWithoutWAL() {
return numMutationsWithoutWAL.get();
@@ -2035,7 +2048,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Should the store be flushed because it is old enough.
* <p>
* Every FlushPolicy should call this to determine whether a store is old enough to flush (except
- * that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always
+ * that you always flush all stores). Otherwise the method will always
* returns true which will make a lot of flush requests.
*/
boolean shouldFlushStore(Store store) {
@@ -2477,6 +2490,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// If we get to here, the HStores have been written.
+ for(Store storeToFlush :storesToFlush) {
+ storeToFlush.finalizeFlush();
+ }
if (wal != null) {
wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
}
@@ -2883,9 +2899,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
long addedSize = doMiniBatchMutate(batchOp);
long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
- if (isFlushSize(newSize)) {
- requestFlush();
- }
+ requestFlushIfNeeded(newSize);
}
} finally {
closeRegionOperation(op);
@@ -3762,6 +3776,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
+ private void requestFlushIfNeeded(long memstoreTotalSize) throws RegionTooBusyException {
+ if(memstoreTotalSize > this.getMemstoreFlushSize()) {
+ requestFlush();
+ }
+ }
+
private void requestFlush() {
if (this.rsServices == null) {
return;
@@ -5170,7 +5190,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long c = count.decrementAndGet();
if (c <= 0) {
synchronized (lock) {
- if (count.get() <= 0 ){
+ if (count.get() <= 0){
usable.set(false);
RowLockContext removed = lockedRows.remove(row);
assert removed == this: "we should never remove a different context";
@@ -5978,7 +5998,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
protected boolean isStopRow(Cell currentRowCell) {
return currentRowCell == null
- || (stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length) >= isScan);
+ || (stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0, stopRow
+ .length) >= isScan);
}
@Override
@@ -6860,8 +6881,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
processor.postProcess(this, walEdit, success);
} finally {
closeRegionOperation();
- if (!mutations.isEmpty() && isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) {
- requestFlush();
+ if (!mutations.isEmpty()) {
+ long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
+ requestFlushIfNeeded(newSize);
}
}
}
@@ -7290,7 +7312,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Do a specific Get on passed <code>columnFamily</code> and column qualifiers.
* @param mutation Mutation we are doing this Get for.
- * @param columnFamily Which column family on row (TODO: Go all Gets in one go)
+ * @param store Which column family on row (TODO: Go all Gets in one go)
* @param coordinates Cells from <code>mutation</code> used as coordinates applied to Get.
* @return Return list of Cells found.
*/
@@ -7340,7 +7362,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
- 45 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
+ 46 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(14 * Bytes.SIZEOF_LONG) +
5 * Bytes.SIZEOF_BOOLEAN);
@@ -7365,6 +7387,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
MultiVersionConcurrencyControl.FIXED_SIZE // mvcc
+ ClassSize.TREEMAP // maxSeqIdInStores
+ 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress
+ + ClassSize.STORE_SERVICES // store services
;
@Override
@@ -7847,4 +7870,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
public long getMemstoreFlushSize() {
return this.memstoreFlushSize;
}
+
+ //// method for debugging tests
+ void throwException(String title, String regionName) {
+ StringBuffer buf = new StringBuffer();
+ buf.append(title + ", ");
+ buf.append(getRegionInfo().toString());
+ buf.append(getRegionInfo().isMetaRegion() ? " meta region " : " ");
+ buf.append(getRegionInfo().isMetaTable() ? " meta table " : " ");
+ buf.append("stores: ");
+ for (Store s : getStores()) {
+ buf.append(s.getFamily().getNameAsString());
+ buf.append(" size: ");
+ buf.append(s.getMemStoreSize());
+ buf.append(" ");
+ }
+ buf.append("end-of-stores");
+ buf.append(", memstore size ");
+ buf.append(getMemstoreSize());
+ if (getRegionInfo().getRegionNameAsString().startsWith(regionName)) {
+ throw new RuntimeException(buf.toString());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/876a6ab7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 5cc3fc9..22f99e9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2448,6 +2448,10 @@ public class HStore implements Store {
}
}
+ @Override public void finalizeFlush() {
+ memstore.finalizeFlush();
+ }
+
private void clearCompactedfiles(final List<StoreFile> filesToRemove) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Clearing the compacted file " + filesToRemove + " from this store");
http://git-wip-us.apache.org/repos/asf/hbase/blob/876a6ab7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index a10ccd9..6bb7081 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -144,4 +144,11 @@ public interface MemStore extends HeapSize {
* @return Total memory occupied by this MemStore.
*/
long size();
+
+ /**
+ * This method is called when it is clear that the flush to disk is completed.
+ * The store may do any post-flush actions at this point.
+ * One example is to update the wal with sequence number that is known only at the store level.
+ */
+ void finalizeFlush();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/876a6ab7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 976bddb..9b1f82a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
@@ -49,11 +53,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-
/**
* Regions store data for a certain region of a table. It stores all columns
* for each row. A given table consists of one or more Regions.
@@ -200,6 +199,9 @@ public interface Region extends ConfigurationObserver {
/** @return memstore size for this region, in bytes */
long getMemstoreSize();
+ /** @return store services for this region, to access services required by store level needs */
+ RegionServicesForStores getRegionServicesForStores();
+
/** @return the number of mutations processed bypassing the WAL */
long getNumMutationsWithoutWAL();
http://git-wip-us.apache.org/repos/asf/hbase/blob/876a6ab7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
new file mode 100644
index 0000000..d3c35b3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Services a Store needs from a Region.
+ * RegionServicesForStores class is the interface through which memstore access services at the
+ * region level.
+ * For example, when using alternative memory formats or due to compaction the memstore needs to
+ * take occasional lock and update size counters at the region level.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RegionServicesForStores {
+
+ private final HRegion region;
+
+ public RegionServicesForStores(HRegion region) {
+ this.region = region;
+ }
+
+ public void blockUpdates() {
+ this.region.blockUpdates();
+ }
+
+ public void unblockUpdates() {
+ this.region.unblockUpdates();
+ }
+
+ public long addAndGetGlobalMemstoreSize(long size) {
+ return this.region.addAndGetGlobalMemstoreSize(size);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/876a6ab7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 09e0254..c167535 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -515,4 +515,12 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
* Closes and archives the compacted files under this store
*/
void closeAndArchiveCompactedFiles() throws IOException;
+
+ /**
+ * This method is called when it is clear that the flush to disk is completed.
+ * The store may do any post-flush actions at this point.
+ * One example is to update the wal with sequence number that is known only at the store level.
+ */
+ void finalizeFlush();
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/876a6ab7/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 35a7403..3aae5d5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -17,14 +17,12 @@
*/
package org.apache.hadoop.hbase;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -54,7 +52,8 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* Test for the case where a regionserver going down has enough cycles to do damage to regions
@@ -206,6 +205,9 @@ public class TestIOFencing {
}
super.completeCompaction(compactedFiles);
}
+
+ @Override public void finalizeFlush() {
+ }
}
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();