You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/07/09 11:08:26 UTC

[flink] branch master updated: [FLINK-12730][runtime] Unify BitSet implementations in flink-runtime

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 14c4b23  [FLINK-12730][runtime] Unify BitSet implementations in flink-runtime
14c4b23 is described below

commit 14c4b23ca8744f83864a8c91ff0c1b88af52e532
Author: liyafan82 <42...@users.noreply.github.com>
AuthorDate: Tue Jul 9 19:08:08 2019 +0800

    [FLINK-12730][runtime] Unify BitSet implementations in flink-runtime
    
    This closes #8613.
---
 .../flink/runtime/operators/util/BitSet.java       | 15 ++--
 .../flink/runtime/operators/util/BloomFilter.java  | 79 ----------------------
 .../flink/runtime/operators/util/BitSetTest.java   | 19 +++++-
 .../runtime/operators/util/BloomFilterTest.java    |  7 +-
 4 files changed, 27 insertions(+), 93 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java
index b6e1e07..df13caf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java
@@ -32,7 +32,6 @@ public class BitSet {
 	// The BitSet bit size.
 	private int bitLength;
 
-	private final int BYTE_POSITION_MASK = 0xfffffff8;
 	private final int BYTE_INDEX_MASK = 0x00000007;
 
 	public BitSet(int byteSize) {
@@ -58,7 +57,7 @@ public class BitSet {
 	public void set(int index) {
 		Preconditions.checkArgument(index < bitLength && index >= 0);
 
-		int byteIndex = (index & BYTE_POSITION_MASK) >>> 3;
+		int byteIndex = index >>> 3;
 		byte current = memorySegment.get(offset + byteIndex);
 		current |= (1 << (index & BYTE_INDEX_MASK));
 		memorySegment.put(offset + byteIndex, current);
@@ -73,7 +72,7 @@ public class BitSet {
 	public boolean get(int index) {
 		Preconditions.checkArgument(index < bitLength && index >= 0);
 		
-		int byteIndex = (index & BYTE_POSITION_MASK) >>> 3;
+		int byteIndex = index >>> 3;
 		byte current = memorySegment.get(offset + byteIndex);
 		return (current & (1 << (index & BYTE_INDEX_MASK))) != 0;
 	}
@@ -89,8 +88,14 @@ public class BitSet {
 	 * Clear the bit set.
 	 */
 	public void clear() {
-		for (int i = 0; i < byteLength; i++) {
-			memorySegment.put(offset + i, (byte) 0);
+		int index = 0;
+		while (index + 8 <= byteLength) {
+			memorySegment.putLong(offset + index, 0L);
+			index += 8;
+		}
+		while (index < byteLength) {
+			memorySegment.put(offset + index, (byte) 0);
+			index += 1;
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
index 5f09391..5abf5a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BloomFilter.java
@@ -143,83 +143,4 @@ public class BloomFilter {
 		output.append(bitSet);
 		return output.toString();
 	}
-	
-	/**
-	 * Bare metal bit set implementation. For performance reasons, this implementation does not check
-	 * for index bounds nor expand the bit set size if the specified index is greater than the size.
-	 */
-	public class BitSet {
-		private MemorySegment memorySegment;
-		// MemorySegment byte array offset.
-		private int offset;
-		// MemorySegment byte size.
-		private int length;
-		private final int LONG_POSITION_MASK = 0xffffffc0;
-		
-		public BitSet(int byteSize) {
-			checkArgument(byteSize > 0, "bits size should be greater than 0.");
-			checkArgument(byteSize << 29 == 0, "bytes size should be integral multiple of long size(8 Bytes).");
-			this.length = byteSize;
-		}
-		
-		public void setMemorySegment(MemorySegment memorySegment, int offset) {
-			this.memorySegment = memorySegment;
-			this.offset = offset;
-		}
-		
-		/**
-		 * Sets the bit at specified index.
-		 *
-		 * @param index - position
-		 */
-		public void set(int index) {
-			int longIndex = (index & LONG_POSITION_MASK) >>> 3;
-			long current = memorySegment.getLong(offset + longIndex);
-			current |= (1L << index);
-			memorySegment.putLong(offset + longIndex, current);
-		}
-		
-		/**
-		 * Returns true if the bit is set in the specified index.
-		 *
-		 * @param index - position
-		 * @return - value at the bit position
-		 */
-		public boolean get(int index) {
-			int longIndex = (index & LONG_POSITION_MASK) >>> 3;
-			long current = memorySegment.getLong(offset + longIndex);
-			return (current & (1L << index)) != 0;
-		}
-		
-		/**
-		 * Number of bits
-		 */
-		public int bitSize() {
-			return length << 3;
-		}
-		
-		public MemorySegment getMemorySegment() {
-			return this.memorySegment;
-		}
-		
-		/**
-		 * Clear the bit set.
-		 */
-		public void clear() {
-			long zeroValue = 0L;
-			for (int i = 0; i < (length / 8); i++) {
-				memorySegment.putLong(offset + i * 8, zeroValue);
-			}
-		}
-		
-		@Override
-		public String toString() {
-			StringBuilder output = new StringBuilder();
-			output.append("BitSet:\n");
-			output.append("\tMemorySegment:").append(memorySegment.size()).append("\n");
-			output.append("\tOffset:").append(offset).append("\n");
-			output.append("\tLength:").append(length).append("\n");
-			return output.toString();
-		}
-	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java
index ec8ae2b..dd1890f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java
@@ -21,16 +21,24 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class BitSetTest {
 
 	private BitSet bitSet;
-	int byteSize = 1024;
-	MemorySegment memorySegment = MemorySegmentFactory.allocateUnpooledSegment(byteSize);
+	int byteSize;
+	MemorySegment memorySegment;
+
+	public BitSetTest(int byteSize) {
+		this.byteSize = byteSize;
+		memorySegment = MemorySegmentFactory.allocateUnpooledSegment(byteSize);
+	}
 
 	@Before
 	public void init() {
@@ -67,7 +75,7 @@ public class BitSetTest {
 	@Test
 	public void testSetValues() {
 		int bitSize = bitSet.bitSize();
-		assertEquals(bitSize, 8 * 1024);
+		assertEquals(bitSize, 8 * byteSize);
 		for (int i = 0; i < bitSize; i++) {
 			assertFalse(bitSet.get(i));
 			if (i % 2 == 0) {
@@ -83,4 +91,9 @@ public class BitSetTest {
 			}
 		}
 	}
+
+	@Parameterized.Parameters(name = "byte size = {0}")
+	public static Object[] getByteSize() {
+		return new Integer[]{1000, 1024, 2019};
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
index 256d4bc..78d0d62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
@@ -63,12 +63,7 @@ public class BloomFilterTest {
 	public void testBloomFilterArguments4() {
 		new BloomFilter(1024, 0);
 	}
-	
-	@Test(expected = IllegalArgumentException.class)
-	public void testBloomFilterArguments5() {
-		new BloomFilter(1024, 21);
-	}
-	
+
 	@Test
 	public void testBloomNumBits() {
 		assertEquals(0, BloomFilter.optimalNumOfBits(0, 0));