You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/04/17 20:55:06 UTC
[27/50] [abbrv] hbase git commit: HBASE-17872 The MSLABImpl generates
the invaild cells when unsafe is not availble
HBASE-17872 The MSLABImpl generates the invaild cells when unsafe is not availble
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/df96d328
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/df96d328
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/df96d328
Branch: refs/heads/hbase-12439
Commit: df96d328fb9fa11f04f84607e9a23f254f513202
Parents: 59e8b8e
Author: CHIA-PING TSAI <ch...@gmail.com>
Authored: Sat Apr 8 17:37:37 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Sun Apr 9 23:28:34 2017 +0800
----------------------------------------------------------------------
.../hadoop/hbase/util/ByteBufferUtils.java | 30 ++--
.../hadoop/hbase/util/TestByteBufferUtils.java | 165 ++++++++++++++++++-
.../hbase/util/TestFromClientSide3WoUnsafe.java | 43 +++++
3 files changed, 213 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/df96d328/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
index ff4c843..34a4e02 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java
@@ -43,15 +43,14 @@ import sun.nio.ch.DirectBuffer;
@SuppressWarnings("restriction")
@InterfaceAudience.Public
public final class ByteBufferUtils {
-
// "Compressed integer" serialization helper constants.
public final static int VALUE_MASK = 0x7f;
public final static int NEXT_BIT_SHIFT = 7;
public final static int NEXT_BIT_MASK = 1 << 7;
@VisibleForTesting
- static boolean UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable();
+ final static boolean UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable();
@VisibleForTesting
- static boolean UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned();
+ final static boolean UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned();
private ByteBufferUtils() {
}
@@ -404,12 +403,11 @@ public final class ByteBufferUtils {
} else if (UNSAFE_AVAIL) {
UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length);
} else {
- int outOldPos = out.position();
- out.position(destinationOffset);
+ ByteBuffer outDup = out.duplicate();
+ outDup.position(destinationOffset);
ByteBuffer inDup = in.duplicate();
inDup.position(sourceOffset).limit(sourceOffset + length);
- out.put(inDup);
- out.position(outOldPos);
+ outDup.put(inDup);
}
return destinationOffset + length;
}
@@ -990,7 +988,7 @@ public final class ByteBufferUtils {
/**
* Copies bytes from given array's offset to length part into the given buffer. Puts the bytes
- * to buffer's given position.
+ * to buffer's given position. This doesn't affact the position of buffer.
* @param out
* @param in
* @param inOffset
@@ -1003,16 +1001,15 @@ public final class ByteBufferUtils {
} else if (UNSAFE_AVAIL) {
UnsafeAccess.copy(in, inOffset, out, outOffset, length);
} else {
- int oldPos = out.position();
- out.position(outOffset);
- out.put(in, inOffset, length);
- out.position(oldPos);
+ ByteBuffer outDup = out.duplicate();
+ outDup.position(outOffset);
+ outDup.put(in, inOffset, length);
}
}
/**
* Copies specified number of bytes from given offset of 'in' ByteBuffer to
- * the array.
+ * the array. This doesn't affact the position of buffer.
* @param out
* @param in
* @param sourceOffset
@@ -1026,10 +1023,9 @@ public final class ByteBufferUtils {
} else if (UNSAFE_AVAIL) {
UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length);
} else {
- int oldPos = in.position();
- in.position(sourceOffset);
- in.get(out, destinationOffset, length);
- in.position(oldPos);
+ ByteBuffer inDup = in.duplicate();
+ inDup.position(sourceOffset);
+ inDup.get(out, destinationOffset, length);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/df96d328/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java
index 053fb24..ee03c7b 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java
@@ -27,14 +27,22 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.MiscTests;
@@ -50,13 +58,13 @@ import org.junit.runners.Parameterized;
@Category({MiscTests.class, SmallTests.class})
@RunWith(Parameterized.class)
public class TestByteBufferUtils {
-
+ private static final String UNSAFE_AVAIL_NAME = "UNSAFE_AVAIL";
+ private static final String UNSAFE_UNALIGNED_NAME = "UNSAFE_UNALIGNED";
private byte[] array;
@AfterClass
public static void afterClass() throws Exception {
- ByteBufferUtils.UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable();
- ByteBufferUtils.UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned();
+ detectAvailabilityOfUnsafe();
}
@Parameterized.Parameters
@@ -69,15 +77,50 @@ public class TestByteBufferUtils {
return paramList;
}
- public TestByteBufferUtils(boolean useUnsafeIfPossible) {
+ private static void setUnsafe(String fieldName, boolean value) throws Exception {
+ Field field = ByteBufferUtils.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ int oldModifiers = field.getModifiers();
+ modifiersField.setInt(field, oldModifiers & ~Modifier.FINAL);
+ try {
+ field.set(null, value);
+ } finally {
+ modifiersField.setInt(field, oldModifiers);
+ }
+ }
+
+ static void disableUnsafe() throws Exception {
+ if (ByteBufferUtils.UNSAFE_AVAIL) {
+ setUnsafe(UNSAFE_AVAIL_NAME, false);
+ }
+ if (ByteBufferUtils.UNSAFE_UNALIGNED) {
+ setUnsafe(UNSAFE_UNALIGNED_NAME, false);
+ }
+ assertFalse(ByteBufferUtils.UNSAFE_AVAIL);
+ assertFalse(ByteBufferUtils.UNSAFE_UNALIGNED);
+ }
+
+ static void detectAvailabilityOfUnsafe() throws Exception {
+ if (ByteBufferUtils.UNSAFE_AVAIL != UnsafeAvailChecker.isAvailable()) {
+ setUnsafe(UNSAFE_AVAIL_NAME, UnsafeAvailChecker.isAvailable());
+ }
+ if (ByteBufferUtils.UNSAFE_UNALIGNED != UnsafeAvailChecker.unaligned()) {
+ setUnsafe(UNSAFE_UNALIGNED_NAME, UnsafeAvailChecker.unaligned());
+ }
+ assertEquals(ByteBufferUtils.UNSAFE_AVAIL, UnsafeAvailChecker.isAvailable());
+ assertEquals(ByteBufferUtils.UNSAFE_UNALIGNED, UnsafeAvailChecker.unaligned());
+ }
+
+ public TestByteBufferUtils(boolean useUnsafeIfPossible) throws Exception {
if (useUnsafeIfPossible) {
- ByteBufferUtils.UNSAFE_AVAIL = UnsafeAvailChecker.isAvailable();
- ByteBufferUtils.UNSAFE_UNALIGNED = UnsafeAvailChecker.unaligned();
+ detectAvailabilityOfUnsafe();
} else {
- ByteBufferUtils.UNSAFE_AVAIL = false;
- ByteBufferUtils.UNSAFE_UNALIGNED = false;
+ disableUnsafe();
}
}
+
/**
* Create an array with sample data.
*/
@@ -388,6 +431,111 @@ public class TestByteBufferUtils {
assertEquals(i, buffer.getInt());
}
+ private void testCopyFromSrcToDestWithThreads(Object input, Object output,
+ List<Integer> lengthes, List<Integer> offsets) throws InterruptedException {
+ assertTrue((input instanceof ByteBuffer) || (input instanceof byte[]));
+ assertTrue((output instanceof ByteBuffer) || (output instanceof byte[]));
+ assertEquals(lengthes.size(), offsets.size());
+
+ final int threads = lengthes.size();
+ CountDownLatch latch = new CountDownLatch(1);
+ List<Runnable> exes = new ArrayList<>(threads);
+ int oldInputPos = (input instanceof ByteBuffer) ? ((ByteBuffer) input).position() : 0;
+ int oldOutputPos = (output instanceof ByteBuffer) ? ((ByteBuffer) output).position() : 0;
+ for (int i = 0; i != threads; ++i) {
+ int offset = offsets.get(i);
+ int length = lengthes.get(i);
+ exes.add(() -> {
+ try {
+ latch.await();
+ if (input instanceof ByteBuffer && output instanceof byte[]) {
+ ByteBufferUtils.copyFromBufferToArray((byte[]) output,
+ (ByteBuffer) input, offset, offset, length);
+ }
+ if (input instanceof byte[] && output instanceof ByteBuffer) {
+ ByteBufferUtils.copyFromArrayToBuffer((ByteBuffer) output,
+ offset, (byte[]) input, offset, length);
+ }
+ if (input instanceof ByteBuffer && output instanceof ByteBuffer) {
+ ByteBufferUtils.copyFromBufferToBuffer((ByteBuffer) input,
+ (ByteBuffer) output, offset, offset, length);
+ }
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ });
+ }
+ ExecutorService service = Executors.newFixedThreadPool(threads);
+ exes.forEach(service::execute);
+ latch.countDown();
+ service.shutdown();
+ assertTrue(service.awaitTermination(5, TimeUnit.SECONDS));
+ if (input instanceof ByteBuffer) {
+ assertEquals(oldInputPos, ((ByteBuffer) input).position());
+ }
+ if (output instanceof ByteBuffer) {
+ assertEquals(oldOutputPos, ((ByteBuffer) output).position());
+ }
+ String inputString = (input instanceof ByteBuffer) ?
+ Bytes.toString(Bytes.toBytes((ByteBuffer) input)) : Bytes.toString((byte[]) input);
+ String outputString = (output instanceof ByteBuffer) ?
+ Bytes.toString(Bytes.toBytes((ByteBuffer) output)) : Bytes.toString((byte[]) output);
+ assertEquals(inputString, outputString);
+ }
+
+ @Test
+ public void testCopyFromSrcToDestWithThreads() throws InterruptedException {
+ List<byte[]> words = Arrays.asList(
+ Bytes.toBytes("with"),
+ Bytes.toBytes("great"),
+ Bytes.toBytes("power"),
+ Bytes.toBytes("comes"),
+ Bytes.toBytes("great"),
+ Bytes.toBytes("responsibility")
+ );
+ List<Integer> lengthes = words.stream().map(v -> v.length).collect(Collectors.toList());
+ List<Integer> offsets = new ArrayList<>(words.size());
+ for (int i = 0; i != words.size(); ++i) {
+ offsets.add(words.subList(0, i).stream().mapToInt(v -> v.length).sum());
+ }
+
+ int totalSize = words.stream().mapToInt(v -> v.length).sum();
+ byte[] fullContent = new byte[totalSize];
+ int offset = 0;
+ for (byte[] w : words) {
+ offset = Bytes.putBytes(fullContent, offset, w, 0, w.length);
+ }
+
+ // test copyFromBufferToArray
+ for (ByteBuffer input : Arrays.asList(
+ ByteBuffer.allocateDirect(totalSize),
+ ByteBuffer.allocate(totalSize))) {
+ words.forEach(input::put);
+ byte[] output = new byte[totalSize];
+ testCopyFromSrcToDestWithThreads(input, output, lengthes, offsets);
+ }
+
+ // test copyFromArrayToBuffer
+ for (ByteBuffer output : Arrays.asList(
+ ByteBuffer.allocateDirect(totalSize),
+ ByteBuffer.allocate(totalSize))) {
+ byte[] input = fullContent;
+ testCopyFromSrcToDestWithThreads(input, output, lengthes, offsets);
+ }
+
+ // test copyFromBufferToBuffer
+ for (ByteBuffer input : Arrays.asList(
+ ByteBuffer.allocateDirect(totalSize),
+ ByteBuffer.allocate(totalSize))) {
+ words.forEach(input::put);
+ for (ByteBuffer output : Arrays.asList(
+ ByteBuffer.allocateDirect(totalSize),
+ ByteBuffer.allocate(totalSize))) {
+ testCopyFromSrcToDestWithThreads(input, output, lengthes, offsets);
+ }
+ }
+ }
+
@Test
public void testCopyFromBufferToArray() {
ByteBuffer buffer = ByteBuffer.allocate(15);
@@ -492,4 +640,5 @@ public class TestByteBufferUtils {
bb[i] = b;
}
}
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/df96d328/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFromClientSide3WoUnsafe.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFromClientSide3WoUnsafe.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFromClientSide3WoUnsafe.java
new file mode 100644
index 0000000..c04e76b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFromClientSide3WoUnsafe.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.hbase.client.TestFromClientSide3;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({LargeTests.class, ClientTests.class})
+public class TestFromClientSide3WoUnsafe extends TestFromClientSide3 {
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TestByteBufferUtils.disableUnsafe();
+ TestFromClientSide3.setUpBeforeClass();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TestFromClientSide3.tearDownAfterClass();
+ TestByteBufferUtils.detectAvailabilityOfUnsafe();
+ }
+}