You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2017/04/09 15:35:04 UTC

hbase git commit: HBASE-17872 The MSLABImpl generates the invaild cells when unsafe is not availble

Repository: hbase
Updated Branches:
  refs/heads/master 59e8b8e2b -> df96d328f


HBASE-17872 The MSLABImpl generates the invaild cells when unsafe is not availble


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/df96d328
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/df96d328
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/df96d328

Branch: refs/heads/master
Commit: df96d328fb9fa11f04f84607e9a23f254f513202
Parents: 59e8b8e
Author: CHIA-PING TSAI <ch...@gmail.com>
Authored: Sat Apr 8 17:37:37 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Sun Apr 9 23:28:34 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/util/ByteBufferUtils.java      |  30 ++--
 .../hadoop/hbase/util/TestByteBufferUtils.java  | 165 ++++++++++++++++++-
 .../hbase/util/TestFromClientSide3WoUnsafe.java |  43 +++++
 3 files changed, 213 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/df96d328/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
index ff4c843..34a4e02 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
@@ -43,15 +43,14 @@ import sun.nio.ch.DirectBuffer;
 @SuppressWarnings("restriction")
 @InterfaceAudience.Public
 public final class ByteBufferUtils {
-
   // "Compressed integer" serialization helper constants.
   public final static int VALUE_MASK = 0x7f;
   public final static int NEXT_BIT_SHIFT = 7;
   public final static int NEXT_BIT_MASK = 1 << 7;
   @VisibleForTesting
-  static boolean UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable();
+  final static boolean UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable();
   @VisibleForTesting
-  static boolean UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned();
+  final static boolean UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned();
 
   private ByteBufferUtils() {
   }
@@ -404,12 +403,11 @@ public final class ByteBufferUtils {
     } else if (UNSAFE_AVAIL) {
       UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length);
     } else {
-      int outOldPos = out.position();
-      out.position(destinationOffset);
+      ByteBuffer outDup = out.duplicate();
+      outDup.position(destinationOffset);
       ByteBuffer inDup = in.duplicate();
       inDup.position(sourceOffset).limit(sourceOffset + length);
-      out.put(inDup);
-      out.position(outOldPos);
+      outDup.put(inDup);
     }
     return destinationOffset + length;
   }
@@ -990,7 +988,7 @@ public final class ByteBufferUtils {
 
   /**
    * Copies bytes from given array's offset to length part into the given buffer. Puts the bytes
-   * to buffer's given position.
+   * to buffer's given position. This doesn't affact the position of buffer.
    * @param out
    * @param in
    * @param inOffset
@@ -1003,16 +1001,15 @@ public final class ByteBufferUtils {
     } else if (UNSAFE_AVAIL) {
       UnsafeAccess.copy(in, inOffset, out, outOffset, length);
     } else {
-      int oldPos = out.position();
-      out.position(outOffset);
-      out.put(in, inOffset, length);
-      out.position(oldPos);
+      ByteBuffer outDup = out.duplicate();
+      outDup.position(outOffset);
+      outDup.put(in, inOffset, length);
     }
   }
 
   /**
    * Copies specified number of bytes from given offset of 'in' ByteBuffer to
-   * the array.
+   * the array. This doesn't affact the position of buffer.
    * @param out
    * @param in
    * @param sourceOffset
@@ -1026,10 +1023,9 @@ public final class ByteBufferUtils {
     } else if (UNSAFE_AVAIL) {
       UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length);
     } else {
-      int oldPos = in.position();
-      in.position(sourceOffset);
-      in.get(out, destinationOffset, length);
-      in.position(oldPos);
+      ByteBuffer inDup = in.duplicate();
+      inDup.position(sourceOffset);
+      inDup.get(out, destinationOffset, length);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/df96d328/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java
index 053fb24..ee03c7b 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java
@@ -27,14 +27,22 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
@@ -50,13 +58,13 @@ import org.junit.runners.Parameterized;
 @Category({MiscTests.class, SmallTests.class})
 @RunWith(Parameterized.class)
 public class TestByteBufferUtils {
-
+  private static final String UNSAFE_AVAIL_NAME = "UNSAFE_AVAIL";
+  private static final String UNSAFE_UNALIGNED_NAME = "UNSAFE_UNALIGNED";
   private byte[] array;
 
   @AfterClass
   public static void afterClass() throws Exception {
-    ByteBufferUtils.UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable();
-    ByteBufferUtils.UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned();
+    detectAvailabilityOfUnsafe();
   }
 
   @Parameterized.Parameters
@@ -69,15 +77,50 @@ public class TestByteBufferUtils {
     return paramList;
   }
 
-  public TestByteBufferUtils(boolean useUnsafeIfPossible) {
+  private static void setUnsafe(String fieldName, boolean value) throws Exception {
+    Field field = ByteBufferUtils.class.getDeclaredField(fieldName);
+    field.setAccessible(true);
+    Field modifiersField = Field.class.getDeclaredField("modifiers");
+    modifiersField.setAccessible(true);
+    int oldModifiers = field.getModifiers();
+    modifiersField.setInt(field, oldModifiers & ~Modifier.FINAL);
+    try {
+      field.set(null, value);
+    } finally {
+      modifiersField.setInt(field, oldModifiers);
+    }
+  }
+
+  static void disableUnsafe() throws Exception {
+    if (ByteBufferUtils.UNSAFE_AVAIL) {
+      setUnsafe(UNSAFE_AVAIL_NAME, false);
+    }
+    if (ByteBufferUtils.UNSAFE_UNALIGNED) {
+      setUnsafe(UNSAFE_UNALIGNED_NAME, false);
+    }
+    assertFalse(ByteBufferUtils.UNSAFE_AVAIL);
+    assertFalse(ByteBufferUtils.UNSAFE_UNALIGNED);
+  }
+
+  static void detectAvailabilityOfUnsafe() throws Exception {
+    if (ByteBufferUtils.UNSAFE_AVAIL != UnsafeAvailChecker.isAvailable()) {
+      setUnsafe(UNSAFE_AVAIL_NAME, UnsafeAvailChecker.isAvailable());
+    }
+    if (ByteBufferUtils.UNSAFE_UNALIGNED != UnsafeAvailChecker.unaligned()) {
+      setUnsafe(UNSAFE_UNALIGNED_NAME, UnsafeAvailChecker.unaligned());
+    }
+    assertEquals(ByteBufferUtils.UNSAFE_AVAIL, UnsafeAvailChecker.isAvailable());
+    assertEquals(ByteBufferUtils.UNSAFE_UNALIGNED, UnsafeAvailChecker.unaligned());
+  }
+
+  public TestByteBufferUtils(boolean useUnsafeIfPossible) throws Exception {
     if (useUnsafeIfPossible) {
-      ByteBufferUtils.UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable();
-      ByteBufferUtils.UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned();
+      detectAvailabilityOfUnsafe();
     } else {
-      ByteBufferUtils.UNSAFE_AVAIL = false;
-      ByteBufferUtils.UNSAFE_UNALIGNED = false;
+      disableUnsafe();
     }
   }
+
   /**
    * Create an array with sample data.
    */
@@ -388,6 +431,111 @@ public class TestByteBufferUtils {
     assertEquals(i, buffer.getInt());
   }
 
+  private void testCopyFromSrcToDestWithThreads(Object input, Object output,
+    List<Integer> lengthes, List<Integer> offsets) throws InterruptedException {
+    assertTrue((input instanceof ByteBuffer) || (input instanceof byte[]));
+    assertTrue((output instanceof ByteBuffer) || (output instanceof byte[]));
+    assertEquals(lengthes.size(), offsets.size());
+
+    final int threads = lengthes.size();
+    CountDownLatch latch = new CountDownLatch(1);
+    List<Runnable> exes = new ArrayList<>(threads);
+    int oldInputPos = (input instanceof ByteBuffer) ? ((ByteBuffer) input).position() : 0;
+    int oldOutputPos = (output instanceof ByteBuffer) ? ((ByteBuffer) output).position() : 0;
+    for (int i = 0; i != threads; ++i) {
+      int offset = offsets.get(i);
+      int length = lengthes.get(i);
+      exes.add(() -> {
+        try {
+          latch.await();
+          if (input instanceof ByteBuffer && output instanceof byte[]) {
+            ByteBufferUtils.copyFromBufferToArray((byte[]) output,
+                (ByteBuffer) input, offset, offset, length);
+          }
+          if (input instanceof byte[] && output instanceof ByteBuffer) {
+            ByteBufferUtils.copyFromArrayToBuffer((ByteBuffer) output,
+                offset, (byte[]) input, offset, length);
+          }
+          if (input instanceof ByteBuffer && output instanceof ByteBuffer) {
+            ByteBufferUtils.copyFromBufferToBuffer((ByteBuffer) input,
+                (ByteBuffer) output, offset, offset, length);
+          }
+        } catch (InterruptedException ex) {
+          throw new RuntimeException(ex);
+        }
+      });
+    }
+    ExecutorService service = Executors.newFixedThreadPool(threads);
+    exes.forEach(service::execute);
+    latch.countDown();
+    service.shutdown();
+    assertTrue(service.awaitTermination(5, TimeUnit.SECONDS));
+    if (input instanceof ByteBuffer) {
+      assertEquals(oldInputPos, ((ByteBuffer) input).position());
+    }
+    if (output instanceof ByteBuffer) {
+      assertEquals(oldOutputPos, ((ByteBuffer) output).position());
+    }
+    String inputString = (input instanceof ByteBuffer) ?
+      Bytes.toString(Bytes.toBytes((ByteBuffer) input)) : Bytes.toString((byte[]) input);
+    String outputString = (output instanceof ByteBuffer) ?
+      Bytes.toString(Bytes.toBytes((ByteBuffer) output)) : Bytes.toString((byte[]) output);
+    assertEquals(inputString, outputString);
+  }
+
+  @Test
+  public void testCopyFromSrcToDestWithThreads() throws InterruptedException {
+    List<byte[]> words = Arrays.asList(
+      Bytes.toBytes("with"),
+      Bytes.toBytes("great"),
+      Bytes.toBytes("power"),
+      Bytes.toBytes("comes"),
+      Bytes.toBytes("great"),
+      Bytes.toBytes("responsibility")
+    );
+    List<Integer> lengthes = words.stream().map(v -> v.length).collect(Collectors.toList());
+    List<Integer> offsets = new ArrayList<>(words.size());
+    for (int i = 0; i != words.size(); ++i) {
+      offsets.add(words.subList(0, i).stream().mapToInt(v -> v.length).sum());
+    }
+
+    int totalSize = words.stream().mapToInt(v -> v.length).sum();
+    byte[] fullContent = new byte[totalSize];
+    int offset = 0;
+    for (byte[] w : words) {
+      offset = Bytes.putBytes(fullContent, offset, w, 0, w.length);
+    }
+
+    // test copyFromBufferToArray
+    for (ByteBuffer input : Arrays.asList(
+            ByteBuffer.allocateDirect(totalSize),
+            ByteBuffer.allocate(totalSize))) {
+      words.forEach(input::put);
+      byte[] output = new byte[totalSize];
+      testCopyFromSrcToDestWithThreads(input, output, lengthes, offsets);
+    }
+
+    // test copyFromArrayToBuffer
+    for (ByteBuffer output : Arrays.asList(
+            ByteBuffer.allocateDirect(totalSize),
+            ByteBuffer.allocate(totalSize))) {
+      byte[] input = fullContent;
+      testCopyFromSrcToDestWithThreads(input, output, lengthes, offsets);
+    }
+
+    // test copyFromBufferToBuffer
+    for (ByteBuffer input : Arrays.asList(
+            ByteBuffer.allocateDirect(totalSize),
+            ByteBuffer.allocate(totalSize))) {
+      words.forEach(input::put);
+      for (ByteBuffer output : Arrays.asList(
+            ByteBuffer.allocateDirect(totalSize),
+            ByteBuffer.allocate(totalSize))) {
+        testCopyFromSrcToDestWithThreads(input, output, lengthes, offsets);
+      }
+    }
+  }
+
   @Test
   public void testCopyFromBufferToArray() {
     ByteBuffer buffer = ByteBuffer.allocate(15);
@@ -492,4 +640,5 @@ public class TestByteBufferUtils {
       bb[i] = b;
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/df96d328/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFromClientSide3WoUnsafe.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFromClientSide3WoUnsafe.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFromClientSide3WoUnsafe.java
new file mode 100644
index 0000000..c04e76b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFromClientSide3WoUnsafe.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.hbase.client.TestFromClientSide3;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({LargeTests.class, ClientTests.class})
+public class TestFromClientSide3WoUnsafe extends TestFromClientSide3 {
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TestByteBufferUtils.disableUnsafe();
+    TestFromClientSide3.setUpBeforeClass();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TestFromClientSide3.tearDownAfterClass();
+    TestByteBufferUtils.detectAvailabilityOfUnsafe();
+  }
+}