You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2017/01/01 15:58:55 UTC
hbase git commit: HBASE-17373 Reverse the order of snapshot creation
in the CompactingMemStore (Eshcar Hillel)
Repository: hbase
Updated Branches:
refs/heads/HBASE-17081 85d4947dc -> 1d235b9aa
HBASE-17373 Reverse the order of snapshot creation in the CompactingMemStore (Eshcar Hillel)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1d235b9a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1d235b9a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1d235b9a
Branch: refs/heads/HBASE-17081
Commit: 1d235b9aafc67cc3df236b7e0ff3251d162078d6
Parents: 85d4947
Author: tedyu <yu...@gmail.com>
Authored: Sun Jan 1 07:58:48 2017 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Sun Jan 1 07:58:48 2017 -0800
----------------------------------------------------------------------
.../hbase/regionserver/CompactingMemStore.java | 24 ++++---
.../hbase/regionserver/CompactionPipeline.java | 75 ++++++++------------
.../regionserver/VersionedSegmentsList.java | 5 +-
.../client/TestAsyncTableGetMultiThreaded.java | 22 ++++--
...ableGetMultiThreadedWithBasicCompaction.java | 35 +++++++++
...ableGetMultiThreadedWithEagerCompaction.java | 35 +++++++++
6 files changed, 135 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/1d235b9a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 1cd30dd..5c31122 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -399,18 +399,26 @@ public class CompactingMemStore extends AbstractMemStore {
}
private void pushTailToSnapshot() {
- ImmutableSegment tail = pipeline.pullTail();
- if (!tail.isEmpty()) {
- this.snapshot = tail;
- }
+ VersionedSegmentsList segments = pipeline.getVersionedTail();
+ pushToSnapshot(segments.getStoreSegments());
+ pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now
}
private void pushPipelineToSnapshot() {
- List<ImmutableSegment> segments = pipeline.drain();
- if (!segments.isEmpty()) {
- this.snapshot =
- SegmentFactory.instance().createCompositeImmutableSegment(getComparator(),segments);
+ VersionedSegmentsList segments = pipeline.getVersionedList();
+ pushToSnapshot(segments.getStoreSegments());
+ pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now
+ }
+
+ private void pushToSnapshot(List<ImmutableSegment> segments) {
+ if(segments.isEmpty()) return;
+ if(segments.size() == 1 && !segments.get(0).isEmpty()) {
+ this.snapshot = segments.get(0);
+ return;
}
+ // else craete composite snapshot
+ this.snapshot =
+ SegmentFactory.instance().createCompositeImmutableSegment(getComparator(),segments);
}
private RegionServicesForStores getRegionServices() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/1d235b9a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 2fd2a14..a8afef8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -52,9 +52,6 @@ public class CompactionPipeline {
private LinkedList<ImmutableSegment> pipeline;
private long version;
- private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance()
- .createImmutableSegment((CellComparator) null);
-
public CompactionPipeline(RegionServicesForStores region) {
this.region = region;
this.pipeline = new LinkedList<ImmutableSegment>();
@@ -69,44 +66,33 @@ public class CompactionPipeline {
}
}
- public ImmutableSegment pullTail() {
+ public VersionedSegmentsList getVersionedList() {
synchronized (pipeline){
- if(pipeline.isEmpty()) {
- return EMPTY_MEM_STORE_SEGMENT;
- }
- return removeLast();
+ List<ImmutableSegment> segmentList = new ArrayList<>(pipeline);
+ return new VersionedSegmentsList(segmentList, version);
}
}
- public List<ImmutableSegment> drain() {
- int drainSize = pipeline.size();
- List<ImmutableSegment> result = new ArrayList<ImmutableSegment>(drainSize);
+ public VersionedSegmentsList getVersionedTail() {
synchronized (pipeline){
- version++;
- for(int i=0; i<drainSize; i++) {
- ImmutableSegment segment = this.pipeline.removeFirst();
- result.add(i,segment);
+ List<ImmutableSegment> segmentList = new ArrayList<>();
+ if(!pipeline.isEmpty()) {
+ segmentList.add(0, pipeline.getLast());
}
- return result;
- }
- }
-
- public VersionedSegmentsList getVersionedList() {
- synchronized (pipeline){
- LinkedList<ImmutableSegment> segmentList = new LinkedList<ImmutableSegment>(pipeline);
- VersionedSegmentsList res = new VersionedSegmentsList(segmentList, version);
- return res;
+ return new VersionedSegmentsList(segmentList, version);
}
}
/**
- * Swaps the versioned list at the tail of the pipeline with the new compacted segment.
- * Swapping only if there were no changes to the suffix of the list while it was compacted.
- * @param versionedList tail of the pipeline that was compacted
- * @param segment new compacted segment
+ * Swaps the versioned list at the tail of the pipeline with a new segment.
+ * Swapping only if there were no changes to the suffix of the list since the version list was
+ * created.
+ * @param versionedList suffix of the pipeline to be replaced can be tail or all the pipeline
+ * @param segment new segment to replace the suffix. Can be null if the suffix just needs to be
+ * removed.
* @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out
* During index merge op this will be false and for compaction it will be true.
- * @return true iff swapped tail with new compacted segment
+ * @return true iff swapped tail with new segment
*/
public boolean swap(
VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) {
@@ -120,26 +106,32 @@ public class CompactionPipeline {
}
suffix = versionedList.getStoreSegments();
if (LOG.isDebugEnabled()) {
- LOG.debug("Swapping pipeline suffix with compacted item. "
+ int count = 0;
+ if(segment != null) {
+ segment.getCellsCount();
+ }
+ LOG.debug("Swapping pipeline suffix. "
+ "Just before the swap the number of segments in pipeline is:"
+ versionedList.getStoreSegments().size()
- + ", and the number of cells in new segment is:" + segment.getCellsCount());
+ + ", and the number of cells in new segment is:" + count);
}
- swapSuffix(suffix,segment, closeSuffix);
+ swapSuffix(suffix, segment, closeSuffix);
}
- if (region != null) {
+ if (closeSuffix && region != null) {
// update the global memstore size counter
long suffixDataSize = getSegmentsKeySize(suffix);
- long newDataSize = segment.keySize();
+ long newDataSize = 0;
+ if(segment != null) newDataSize = segment.keySize();
long dataSizeDelta = suffixDataSize - newDataSize;
long suffixHeapOverhead = getSegmentsHeapOverhead(suffix);
- long newHeapOverhead = segment.heapOverhead();
+ long newHeapOverhead = 0;
+ if(segment != null) newHeapOverhead = segment.heapOverhead();
long heapOverheadDelta = suffixHeapOverhead - newHeapOverhead;
region.addMemstoreSize(new MemstoreSize(-dataSizeDelta, -heapOverheadDelta));
if (LOG.isDebugEnabled()) {
- LOG.debug("Suffix data size: " + suffixDataSize + " compacted item data size: "
+ LOG.debug("Suffix data size: " + suffixDataSize + " new segment data size: "
+ newDataSize + ". Suffix heap overhead: " + suffixHeapOverhead
- + " compacted item heap overhead: " + newHeapOverhead);
+ + " new segment heap overhead: " + newHeapOverhead);
}
}
return true;
@@ -207,7 +199,7 @@ public class CompactionPipeline {
public List<Segment> getSegments() {
synchronized (pipeline){
- return new LinkedList<Segment>(pipeline);
+ return new LinkedList<>(pipeline);
}
}
@@ -260,12 +252,7 @@ public class CompactionPipeline {
}
}
pipeline.removeAll(suffix);
- pipeline.addLast(segment);
- }
-
- private ImmutableSegment removeLast() {
- version++;
- return pipeline.removeLast();
+ if(segment != null) pipeline.addLast(segment);
}
private boolean addFirst(ImmutableSegment segment) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/1d235b9a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
index 01160bf..ab751f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
@@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -36,10 +35,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.Private
public class VersionedSegmentsList {
- private final LinkedList<ImmutableSegment> storeSegments;
+ private final List<ImmutableSegment> storeSegments;
private final long version;
- public VersionedSegmentsList(LinkedList<ImmutableSegment> storeSegments, long version) {
+ public VersionedSegmentsList(List<ImmutableSegment> storeSegments, long version) {
this.storeSegments = storeSegments;
this.version = version;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1d235b9a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
index da8141b..82fe3cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
@@ -33,17 +33,18 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -75,11 +76,18 @@ public class TestAsyncTableGetMultiThreaded {
@BeforeClass
public static void setUp() throws Exception {
+ setUp(HColumnDescriptor.MemoryCompaction.NONE);
+ }
+
+ protected static void setUp(HColumnDescriptor.MemoryCompaction memoryCompaction) throws Exception {
TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L);
TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000);
TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
+ TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
+ String.valueOf(memoryCompaction));
+
TEST_UTIL.startMiniCluster(5);
SPLIT_KEYS = new byte[8][];
for (int i = 111; i < 999; i += 111) {
@@ -103,11 +111,13 @@ public class TestAsyncTableGetMultiThreaded {
private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException {
while (!stop.get()) {
- int i = ThreadLocalRandom.current().nextInt(COUNT);
- assertEquals(i,
- Bytes.toInt(
- CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get()
- .getValue(FAMILY, QUALIFIER)));
+ for (int i = 0; i < COUNT; i++) {
+ assertEquals(i,
+ Bytes.toInt(
+ CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i))))
+ .get()
+ .getValue(FAMILY, QUALIFIER)));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1d235b9a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java
new file mode 100644
index 0000000..3243175
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java
@@ -0,0 +1,35 @@
+package org.apache.hadoop.hbase.client;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({ LargeTests.class, ClientTests.class })
+public class TestAsyncTableGetMultiThreadedWithBasicCompaction extends
+ TestAsyncTableGetMultiThreaded {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ setUp(HColumnDescriptor.MemoryCompaction.BASIC);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1d235b9a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java
new file mode 100644
index 0000000..863ec1f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java
@@ -0,0 +1,35 @@
+package org.apache.hadoop.hbase.client;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({ LargeTests.class, ClientTests.class })
+public class TestAsyncTableGetMultiThreadedWithEagerCompaction extends
+ TestAsyncTableGetMultiThreaded {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ setUp(HColumnDescriptor.MemoryCompaction.EAGER);
+ }
+
+}