You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/07/09 11:08:26 UTC
[flink] branch master updated: [FLINK-12730][runtime] Unify BitSet
implementations in flink-runtime
This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 14c4b23 [FLINK-12730][runtime] Unify BitSet implementations in flink-runtime
14c4b23 is described below
commit 14c4b23ca8744f83864a8c91ff0c1b88af52e532
Author: liyafan82 <42...@users.noreply.github.com>
AuthorDate: Tue Jul 9 19:08:08 2019 +0800
[FLINK-12730][runtime] Unify BitSet implementations in flink-runtime
This closes #8613.
---
.../flink/runtime/operators/util/BitSet.java | 15 ++--
.../flink/runtime/operators/util/BloomFilter.java | 79 ----------------------
.../flink/runtime/operators/util/BitSetTest.java | 19 +++++-
.../runtime/operators/util/BloomFilterTest.java | 7 +-
4 files changed, 27 insertions(+), 93 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java
index b6e1e07..df13caf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java
@@ -32,7 +32,6 @@ public class BitSet {
// The BitSet bit size.
private int bitLength;
- private final int BYTE_POSITION_MASK = 0xfffffff8;
private final int BYTE_INDEX_MASK = 0x00000007;
public BitSet(int byteSize) {
@@ -58,7 +57,7 @@ public class BitSet {
public void set(int index) {
Preconditions.checkArgument(index < bitLength && index >= 0);
- int byteIndex = (index & BYTE_POSITION_MASK) >>> 3;
+ int byteIndex = index >>> 3;
byte current = memorySegment.get(offset + byteIndex);
current |= (1 << (index & BYTE_INDEX_MASK));
memorySegment.put(offset + byteIndex, current);
@@ -73,7 +72,7 @@ public class BitSet {
public boolean get(int index) {
Preconditions.checkArgument(index < bitLength && index >= 0);
- int byteIndex = (index & BYTE_POSITION_MASK) >>> 3;
+ int byteIndex = index >>> 3;
byte current = memorySegment.get(offset + byteIndex);
return (current & (1 << (index & BYTE_INDEX_MASK))) != 0;
}
@@ -89,8 +88,14 @@ public class BitSet {
* Clear the bit set.
*/
public void clear() {
- for (int i = 0; i < byteLength; i++) {
- memorySegment.put(offset + i, (byte) 0);
+ int index = 0;
+ while (index + 8 <= byteLength) {
+ memorySegment.putLong(offset + index, 0L);
+ index += 8;
+ }
+ while (index < byteLength) {
+ memorySegment.put(offset + index, (byte) 0);
+ index += 1;
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
index 5f09391..5abf5a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
@@ -143,83 +143,4 @@ public class BloomFilter {
output.append(bitSet);
return output.toString();
}
-
- /**
- * Bare metal bit set implementation. For performance reasons, this implementation does not check
- * for index bounds nor expand the bit set size if the specified index is greater than the size.
- */
- public class BitSet {
- private MemorySegment memorySegment;
- // MemorySegment byte array offset.
- private int offset;
- // MemorySegment byte size.
- private int length;
- private final int LONG_POSITION_MASK = 0xffffffc0;
-
- public BitSet(int byteSize) {
- checkArgument(byteSize > 0, "bits size should be greater than 0.");
- checkArgument(byteSize << 29 == 0, "bytes size should be integral multiple of long size(8 Bytes).");
- this.length = byteSize;
- }
-
- public void setMemorySegment(MemorySegment memorySegment, int offset) {
- this.memorySegment = memorySegment;
- this.offset = offset;
- }
-
- /**
- * Sets the bit at specified index.
- *
- * @param index - position
- */
- public void set(int index) {
- int longIndex = (index & LONG_POSITION_MASK) >>> 3;
- long current = memorySegment.getLong(offset + longIndex);
- current |= (1L << index);
- memorySegment.putLong(offset + longIndex, current);
- }
-
- /**
- * Returns true if the bit is set in the specified index.
- *
- * @param index - position
- * @return - value at the bit position
- */
- public boolean get(int index) {
- int longIndex = (index & LONG_POSITION_MASK) >>> 3;
- long current = memorySegment.getLong(offset + longIndex);
- return (current & (1L << index)) != 0;
- }
-
- /**
- * Number of bits
- */
- public int bitSize() {
- return length << 3;
- }
-
- public MemorySegment getMemorySegment() {
- return this.memorySegment;
- }
-
- /**
- * Clear the bit set.
- */
- public void clear() {
- long zeroValue = 0L;
- for (int i = 0; i < (length / 8); i++) {
- memorySegment.putLong(offset + i * 8, zeroValue);
- }
- }
-
- @Override
- public String toString() {
- StringBuilder output = new StringBuilder();
- output.append("BitSet:\n");
- output.append("\tMemorySegment:").append(memorySegment.size()).append("\n");
- output.append("\tOffset:").append(offset).append("\n");
- output.append("\tLength:").append(length).append("\n");
- return output.toString();
- }
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java
index ec8ae2b..dd1890f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java
@@ -21,16 +21,24 @@ import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+@RunWith(Parameterized.class)
public class BitSetTest {
private BitSet bitSet;
- int byteSize = 1024;
- MemorySegment memorySegment = MemorySegmentFactory.allocateUnpooledSegment(byteSize);
+ int byteSize;
+ MemorySegment memorySegment;
+
+ public BitSetTest(int byteSize) {
+ this.byteSize = byteSize;
+ memorySegment = MemorySegmentFactory.allocateUnpooledSegment(byteSize);
+ }
@Before
public void init() {
@@ -67,7 +75,7 @@ public class BitSetTest {
@Test
public void testSetValues() {
int bitSize = bitSet.bitSize();
- assertEquals(bitSize, 8 * 1024);
+ assertEquals(bitSize, 8 * byteSize);
for (int i = 0; i < bitSize; i++) {
assertFalse(bitSet.get(i));
if (i % 2 == 0) {
@@ -83,4 +91,9 @@ public class BitSetTest {
}
}
}
+
+ @Parameterized.Parameters(name = "byte size = {0}")
+ public static Object[] getByteSize() {
+ return new Integer[]{1000, 1024, 2019};
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
index 256d4bc..78d0d62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
@@ -63,12 +63,7 @@ public class BloomFilterTest {
public void testBloomFilterArguments4() {
new BloomFilter(1024, 0);
}
-
- @Test(expected = IllegalArgumentException.class)
- public void testBloomFilterArguments5() {
- new BloomFilter(1024, 21);
- }
-
+
@Test
public void testBloomNumBits() {
assertEquals(0, BloomFilter.optimalNumOfBits(0, 0));