You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/01/04 07:39:50 UTC

[48/50] [abbrv] hbase git commit: HBASE-17373: Fixing bug in moving segments from compaction pipeline to snapshot

HBASE-17373: Fixing bug in moving segments from compaction pipeline to snapshot

Signed-off-by: Michael Stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/69ce5967
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/69ce5967
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/69ce5967

Branch: refs/heads/hbase-12439
Commit: 69ce5967fd3b8f33486239bcdd7e5e4a817691b9
Parents: c3d5f26
Author: eshcar <es...@yahoo-inc.com>
Authored: Tue Jan 3 15:28:10 2017 +0200
Committer: Michael Stack <st...@apache.org>
Committed: Tue Jan 3 19:13:52 2017 -0800

----------------------------------------------------------------------
 .../hbase/regionserver/CompactingMemStore.java  | 19 ++++--
 .../hbase/regionserver/CompactionPipeline.java  | 69 ++++++++++----------
 .../regionserver/VersionedSegmentsList.java     |  5 +-
 .../client/TestAsyncTableGetMultiThreaded.java  | 22 +++++--
 ...ableGetMultiThreadedWithBasicCompaction.java | 35 ++++++++++
 ...ableGetMultiThreadedWithEagerCompaction.java | 35 ++++++++++
 6 files changed, 137 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/69ce5967/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index f8192a2..e1289f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -213,8 +213,10 @@ public class CompactingMemStore extends AbstractMemStore {
     }
   }
 
+  // the getSegments() method is used for tests only
+  @VisibleForTesting
   @Override
-  public List<Segment> getSegments() {
+  protected List<Segment> getSegments() {
     List<Segment> pipelineList = pipeline.getSegments();
     List<Segment> list = new ArrayList<Segment>(pipelineList.size() + 2);
     list.add(this.active);
@@ -266,6 +268,7 @@ public class CompactingMemStore extends AbstractMemStore {
     long order = pipelineList.size();
     // The list of elements in pipeline + the active element + the snapshot segment
     // TODO : This will change when the snapshot is made of more than one element
+    // The order is the Segment ordinal
     List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(pipelineList.size() + 2);
     list.add(this.active.getScanner(readPt, order + 1));
     for (Segment item : pipelineList) {
@@ -374,10 +377,18 @@ public class CompactingMemStore extends AbstractMemStore {
   }
 
   private void pushTailToSnapshot() {
-    ImmutableSegment tail = pipeline.pullTail();
-    if (!tail.isEmpty()) {
-      this.snapshot = tail;
+    VersionedSegmentsList segments = pipeline.getVersionedTail();
+    pushToSnapshot(segments.getStoreSegments());
+    pipeline.swap(segments,null,false); // do not close segments as they are in snapshot now
+  }
+
+  private void pushToSnapshot(List<ImmutableSegment> segments) {
+    if(segments.isEmpty()) return;
+    if(segments.size() == 1 && !segments.get(0).isEmpty()) {
+      this.snapshot = segments.get(0);
+      return;
     }
+    // TODO else craete composite snapshot
   }
 
   private RegionServicesForStores getRegionServices() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/69ce5967/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 6676170..9d5df77 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -45,18 +46,14 @@ public class CompactionPipeline {
   public final static long FIXED_OVERHEAD = ClassSize
       .align(ClassSize.OBJECT + (2 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
   public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.LINKEDLIST;
-  public final static long ENTRY_OVERHEAD = ClassSize.LINKEDLIST_ENTRY;
 
   private final RegionServicesForStores region;
   private LinkedList<ImmutableSegment> pipeline;
   private long version;
 
-  private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance()
-      .createImmutableSegment((CellComparator) null);
-
   public CompactionPipeline(RegionServicesForStores region) {
     this.region = region;
-    this.pipeline = new LinkedList<ImmutableSegment>();
+    this.pipeline = new LinkedList<>();
     this.version = 0;
   }
 
@@ -68,31 +65,33 @@ public class CompactionPipeline {
     }
   }
 
-  public ImmutableSegment pullTail() {
+  public VersionedSegmentsList getVersionedList() {
     synchronized (pipeline){
-      if(pipeline.isEmpty()) {
-        return EMPTY_MEM_STORE_SEGMENT;
-      }
-      return removeLast();
+      List<ImmutableSegment> segmentList = new ArrayList<>(pipeline);
+      return new VersionedSegmentsList(segmentList, version);
     }
   }
 
-  public VersionedSegmentsList getVersionedList() {
+  public VersionedSegmentsList getVersionedTail() {
     synchronized (pipeline){
-      LinkedList<ImmutableSegment> segmentList = new LinkedList<ImmutableSegment>(pipeline);
-      VersionedSegmentsList res = new VersionedSegmentsList(segmentList, version);
-      return res;
+      List<ImmutableSegment> segmentList = new ArrayList<>();
+      if(!pipeline.isEmpty()) {
+        segmentList.add(0, pipeline.getLast());
+      }
+      return new VersionedSegmentsList(segmentList, version);
     }
   }
 
   /**
-   * Swaps the versioned list at the tail of the pipeline with the new compacted segment.
-   * Swapping only if there were no changes to the suffix of the list while it was compacted.
-   * @param versionedList tail of the pipeline that was compacted
-   * @param segment new compacted segment
+   * Swaps the versioned list at the tail of the pipeline with a new segment.
+   * Swapping only if there were no changes to the suffix of the list since the version list was
+   * created.
+   * @param versionedList suffix of the pipeline to be replaced can be tail or all the pipeline
+   * @param segment new segment to replace the suffix. Can be null if the suffix just needs to be
+   *                removed.
    * @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out
    *        During index merge op this will be false and for compaction it will be true.
-   * @return true iff swapped tail with new compacted segment
+   * @return true iff swapped tail with new segment
    */
   public boolean swap(
       VersionedSegmentsList versionedList, ImmutableSegment segment, boolean closeSuffix) {
@@ -106,26 +105,32 @@ public class CompactionPipeline {
       }
       suffix = versionedList.getStoreSegments();
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Swapping pipeline suffix with compacted item. "
+        int count = 0;
+        if(segment != null) {
+          count = segment.getCellsCount();
+        }
+        LOG.debug("Swapping pipeline suffix. "
             + "Just before the swap the number of segments in pipeline is:"
             + versionedList.getStoreSegments().size()
-            + ", and the number of cells in new segment is:" + segment.getCellsCount());
+            + ", and the number of cells in new segment is:" + count);
       }
-      swapSuffix(suffix,segment, closeSuffix);
+      swapSuffix(suffix, segment, closeSuffix);
     }
-    if (region != null) {
+    if (closeSuffix && region != null) {
       // update the global memstore size counter
       long suffixDataSize = getSegmentsKeySize(suffix);
-      long newDataSize = segment.keySize();
+      long newDataSize = 0;
+      if(segment != null) newDataSize = segment.keySize();
       long dataSizeDelta = suffixDataSize - newDataSize;
       long suffixHeapOverhead = getSegmentsHeapOverhead(suffix);
-      long newHeapOverhead = segment.heapOverhead();
+      long newHeapOverhead = 0;
+      if(segment != null) newHeapOverhead = segment.heapOverhead();
       long heapOverheadDelta = suffixHeapOverhead - newHeapOverhead;
       region.addMemstoreSize(new MemstoreSize(-dataSizeDelta, -heapOverheadDelta));
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Suffix data size: " + suffixDataSize + " compacted item data size: "
+        LOG.debug("Suffix data size: " + suffixDataSize + " new segment data size: "
             + newDataSize + ". Suffix heap overhead: " + suffixHeapOverhead
-            + " compacted item heap overhead: " + newHeapOverhead);
+            + " new segment heap overhead: " + newHeapOverhead);
       }
     }
     return true;
@@ -193,8 +198,7 @@ public class CompactionPipeline {
 
   public List<Segment> getSegments() {
     synchronized (pipeline){
-      List<Segment> res = new LinkedList<Segment>(pipeline);
-      return res;
+      return new LinkedList<>(pipeline);
     }
   }
 
@@ -230,12 +234,7 @@ public class CompactionPipeline {
       }
     }
     pipeline.removeAll(suffix);
-    pipeline.addLast(segment);
-  }
-
-  private ImmutableSegment removeLast() {
-    version++;
-    return pipeline.removeLast();
+    if(segment != null) pipeline.addLast(segment);
   }
 
   private boolean addFirst(ImmutableSegment segment) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/69ce5967/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
index 01160bf..ab751f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/VersionedSegmentsList.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -36,10 +35,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 @InterfaceAudience.Private
 public class VersionedSegmentsList {
 
-  private final LinkedList<ImmutableSegment> storeSegments;
+  private final List<ImmutableSegment> storeSegments;
   private final long version;
 
-  public VersionedSegmentsList(LinkedList<ImmutableSegment> storeSegments, long version) {
+  public VersionedSegmentsList(List<ImmutableSegment> storeSegments, long version) {
     this.storeSegments = storeSegments;
     this.version = version;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/69ce5967/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
index da8141b..82fe3cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
@@ -33,17 +33,18 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -75,11 +76,18 @@ public class TestAsyncTableGetMultiThreaded {
 
   @BeforeClass
   public static void setUp() throws Exception {
+    setUp(HColumnDescriptor.MemoryCompaction.NONE);
+  }
+
+  protected static void setUp(HColumnDescriptor.MemoryCompaction memoryCompaction) throws Exception {
     TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
     TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
     TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L);
     TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000);
     TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
+    TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
+        String.valueOf(memoryCompaction));
+
     TEST_UTIL.startMiniCluster(5);
     SPLIT_KEYS = new byte[8][];
     for (int i = 111; i < 999; i += 111) {
@@ -103,11 +111,13 @@ public class TestAsyncTableGetMultiThreaded {
 
   private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException {
     while (!stop.get()) {
-      int i = ThreadLocalRandom.current().nextInt(COUNT);
-      assertEquals(i,
-        Bytes.toInt(
-          CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get()
-              .getValue(FAMILY, QUALIFIER)));
+      for (int i = 0; i < COUNT; i++) {
+        assertEquals(i,
+            Bytes.toInt(
+                CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i))))
+                    .get()
+                    .getValue(FAMILY, QUALIFIER)));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/69ce5967/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java
new file mode 100644
index 0000000..eb07875
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithBasicCompaction.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({ LargeTests.class, ClientTests.class })
+public class TestAsyncTableGetMultiThreadedWithBasicCompaction extends
+    TestAsyncTableGetMultiThreaded {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    setUp(HColumnDescriptor.MemoryCompaction.BASIC);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/69ce5967/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java
new file mode 100644
index 0000000..6fe8045
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreadedWithEagerCompaction.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({ LargeTests.class, ClientTests.class })
+public class TestAsyncTableGetMultiThreadedWithEagerCompaction extends
+    TestAsyncTableGetMultiThreaded {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    setUp(HColumnDescriptor.MemoryCompaction.EAGER);
+  }
+
+}