You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/12 13:21:25 UTC

[1/4] flink git commit: [hotfix][javadoc] Fix typo in StreamElement javadoc

Repository: flink
Updated Branches:
  refs/heads/master 9ef6796e8 -> beb11976f


[hotfix][javadoc] Fix typo in StreamElement javadoc

This closes #5152.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/72cd5921
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/72cd5921
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/72cd5921

Branch: refs/heads/master
Commit: 72cd5921684e6daac4a7dd7916698eeee98b56d5
Parents: 9ef6796
Author: Matrix42 <93...@qq.com>
Authored: Tue Dec 12 18:10:15 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Tue Dec 12 11:47:51 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/streaming/runtime/streamstatus/StreamStatus.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/72cd5921/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java
index 8e58340..3dee61c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.java
@@ -63,7 +63,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
  *         active. However, for watermarks, since there may be watermark generators that might produce watermarks
  *         anywhere in the middle of topologies regardless of whether there are input data at the operator, the current
  *         status of the task must be checked before forwarding watermarks emitted from
- *         an operator. It the status is actually idle, the watermark must be blocked.
+ *         an operator. If the status is actually idle, the watermark must be blocked.
  *
  *     <li>For downstream tasks with multiple input streams, the watermarks of input streams that are temporarily idle,
  *         or has resumed to be active but its watermark is behind the overall min watermark of the operator, should not


[2/4] flink git commit: [FLINK-8145][tests] fix various IOManagerAsync instances not being shut down

Posted by ch...@apache.org.
[FLINK-8145][tests] fix various IOManagerAsync instances not being shut down

This closes #5064.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30fc069a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30fc069a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30fc069a

Branch: refs/heads/master
Commit: 30fc069add77ae6783a87b6920c59e739903296f
Parents: 72cd592
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Nov 24 11:31:48 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Dec 12 11:50:00 2017 +0100

----------------------------------------------------------------------
 .../AsynchronousBufferFileWriterTest.java       |  7 ++
 .../BufferFileWriterFileSegmentReaderTest.java  |  6 ++
 .../iomanager/BufferFileWriterReaderTest.java   |  6 ++
 .../runtime/operators/hash/HashTableITCase.java | 11 +--
 .../hash/HashTablePerformanceComparison.java    |  6 +-
 .../runtime/operators/hash/HashTableTest.java   | 92 ++++++++++----------
 ...bstractSortMergeOuterJoinIteratorITCase.java |  3 -
 .../sort/FixedLengthRecordSorterTest.java       |  5 ++
 .../flink/test/manual/MassiveStringSorting.java | 24 ++++-
 .../test/manual/MassiveStringValueSorting.java  | 24 ++++-
 10 files changed, 117 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/30fc069a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
index 0397de5..40f3e32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.disk.iomanager;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestNotificationListener;
+
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -45,6 +47,11 @@ public class AsynchronousBufferFileWriterTest {
 
 	private AsynchronousBufferFileWriter writer;
 
+	@AfterClass
+	public static void shutdown() {
+		ioManager.shutdown();
+	}
+
 	@Before
 	public void setUp() throws IOException {
 		writer = new AsynchronousBufferFileWriter(ioManager.createChannel(), new RequestQueue<WriteRequest>());

http://git-wip-us.apache.org/repos/asf/flink/blob/30fc069a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
index 7fee0fd..0d554c7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
 import org.apache.flink.runtime.util.event.NotificationListener;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -55,6 +56,11 @@ public class BufferFileWriterFileSegmentReaderTest {
 
 	private LinkedBlockingQueue<FileSegment> returnedFileSegments = new LinkedBlockingQueue<>();
 
+	@AfterClass
+	public static void shutdown() {
+		ioManager.shutdown();
+	}
+
 	@Before
 	public void setUpWriterAndReader() {
 		final FileIOChannel.ID channel = ioManager.createChannel();

http://git-wip-us.apache.org/repos/asf/flink/blob/30fc069a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
index 2da0f7e..31702f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -52,6 +53,11 @@ public class BufferFileWriterReaderTest {
 
 	private LinkedBlockingQueue<Buffer> returnedBuffers = new LinkedBlockingQueue<>();
 
+	@AfterClass
+	public static void shutdown() {
+		ioManager.shutdown();
+	}
+
 	@Before
 	public void setUpWriterAndReader() {
 		final FileIOChannel.ID channel = ioManager.createChannel();

http://git-wip-us.apache.org/repos/asf/flink/blob/30fc069a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
index f3eac19..a94227c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java
@@ -806,11 +806,8 @@ public class HashTableITCase extends TestLogger {
 			return;
 		}
 		
-		// create the I/O access for spilling
-		final IOManager ioManager = new IOManagerAsync();
-		
 		// ----------------------------------------------------------------------------------------
-		
+
 		final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>(
 			this.pairBuildSideAccesssor, this.pairProbeSideAccesssor, 
 			this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
@@ -907,9 +904,6 @@ public class HashTableITCase extends TestLogger {
 			return;
 		}
 		
-		// create the I/O access for spilling
-		IOManager ioManager = new IOManagerAsync();
-		
 		// create the map for validating the results
 		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
 		
@@ -1022,9 +1016,6 @@ public class HashTableITCase extends TestLogger {
 			return;
 		}
 		
-		// create the I/O access for spilling
-		IOManager ioManager = new IOManagerAsync();
-		
 		// create the map for validating the results
 		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/30fc069a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
index 10bac1f..f426a94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.operators.testutils.types.IntPairPairComparator;
 import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
 import org.apache.flink.util.MutableObjectIterator;
 
+import org.junit.AfterClass;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -54,8 +55,6 @@ public class HashTablePerformanceComparison {
 	
 	private final TypePairComparator<IntPair, IntPair> pairComparator = new IntPairPairComparator();
 	
-	private IOManager ioManager = new IOManagerAsync();
-	
 	@Test
 	public void testCompactingHashMapPerformance() {
 		
@@ -132,6 +131,7 @@ public class HashTablePerformanceComparison {
 	
 	@Test
 	public void testMutableHashMapPerformance() {
+		final IOManager ioManager = new IOManagerAsync();
 		try {
 			final int NUM_MEM_PAGES = SIZE * NUM_PAIRS / PAGE_SIZE;
 			
@@ -207,6 +207,8 @@ public class HashTablePerformanceComparison {
 		catch (Exception e) {
 			e.printStackTrace();
 			fail("Error: " + e.getMessage());
+		} finally {
+			ioManager.shutdown();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/30fc069a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
index 7c385fc..bcf620c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
@@ -205,54 +205,58 @@ public class HashTableTest {
 	public void testSpillingWhenBuildingTableWithoutOverflow() throws Exception {
 		final IOManager ioMan = new IOManagerAsync();
 
-		final TypeSerializer<byte[]> serializer = BytePrimitiveArraySerializer.INSTANCE;
-		final TypeComparator<byte[]> buildComparator = new BytePrimitiveArrayComparator(true);
-		final TypeComparator<byte[]> probeComparator = new BytePrimitiveArrayComparator(true);
+		try {
+			final TypeSerializer<byte[]> serializer = BytePrimitiveArraySerializer.INSTANCE;
+			final TypeComparator<byte[]> buildComparator = new BytePrimitiveArrayComparator(true);
+			final TypeComparator<byte[]> probeComparator = new BytePrimitiveArrayComparator(true);
 
-		@SuppressWarnings("unchecked")
-		final TypePairComparator<byte[], byte[]> pairComparator = new GenericPairComparator<>(
-			new BytePrimitiveArrayComparator(true), new BytePrimitiveArrayComparator(true));
-
-		final int pageSize = 128;
-		final int numSegments = 33;
-
-		List<MemorySegment> memory = getMemory(numSegments, pageSize);
-
-		MutableHashTable<byte[], byte[]> table = new MutableHashTable<byte[], byte[]>(
-			serializer,
-			serializer,
-			buildComparator,
-			probeComparator,
-			pairComparator,
-			memory,
-			ioMan,
-			1,
-			false);
-
-		int numElements = 9;
-
-		table.open(
-			new CombiningIterator<byte[]>(
-				new ByteArrayIterator(numElements, 128,(byte) 0),
-				new ByteArrayIterator(numElements, 128,(byte) 1)),
-			new CombiningIterator<byte[]>(
-				new ByteArrayIterator(1, 128,(byte) 0),
-				new ByteArrayIterator(1, 128,(byte) 1)));
-
-		while(table.nextRecord()) {
-			MutableObjectIterator<byte[]> iterator = table.getBuildSideIterator();
-
-			int counter = 0;
-
-			while(iterator.next() != null) {
-				counter++;
+			@SuppressWarnings("unchecked") final TypePairComparator<byte[], byte[]> pairComparator =
+				new GenericPairComparator<>(
+					new BytePrimitiveArrayComparator(true), new BytePrimitiveArrayComparator(true));
+
+			final int pageSize = 128;
+			final int numSegments = 33;
+
+			List<MemorySegment> memory = getMemory(numSegments, pageSize);
+
+			MutableHashTable<byte[], byte[]> table = new MutableHashTable<byte[], byte[]>(
+				serializer,
+				serializer,
+				buildComparator,
+				probeComparator,
+				pairComparator,
+				memory,
+				ioMan,
+				1,
+				false);
+
+			int numElements = 9;
+
+			table.open(
+				new CombiningIterator<byte[]>(
+					new ByteArrayIterator(numElements, 128, (byte) 0),
+					new ByteArrayIterator(numElements, 128, (byte) 1)),
+				new CombiningIterator<byte[]>(
+					new ByteArrayIterator(1, 128, (byte) 0),
+					new ByteArrayIterator(1, 128, (byte) 1)));
+
+			while (table.nextRecord()) {
+				MutableObjectIterator<byte[]> iterator = table.getBuildSideIterator();
+
+				int counter = 0;
+
+				while (iterator.next() != null) {
+					counter++;
+				}
+
+				// check that we retrieve all our elements
+				Assert.assertEquals(numElements, counter);
 			}
 
-			// check that we retrieve all our elements
-			Assert.assertEquals(numElements, counter);
+			table.close();
+		} finally {
+			ioMan.shutdown();
 		}
-
-		table.close();
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/30fc069a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
index 28bded2..94c0fd4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AbstractSortMergeOuterJoinIteratorITCase.java
@@ -300,9 +300,6 @@ public abstract class AbstractSortMergeOuterJoinIteratorITCase extends TestLogge
 
 		TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator = new GenericPairComparator<>(comparator1, comparator2);
 
-		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
-		this.ioManager = new IOManagerAsync();
-
 		final int DUPLICATE_KEY = 13;
 
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/30fc069a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
index 288e86d..bba713e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/FixedLengthRecordSorterTest.java
@@ -77,6 +77,11 @@ public class FixedLengthRecordSorterTest {
 		if (!this.memoryManager.verifyEmpty()) {
 			Assert.fail("Memory Leak: Some memory has not been returned to the memory manager.");
 		}
+
+		if (this.ioManager != null) {
+			ioManager.shutdown();
+			ioManager = null;
+		}
 		
 		if (this.memoryManager != null) {
 			this.memoryManager.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/30fc069a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
index c69e6fd..46e4485 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringSorting.java
@@ -82,10 +82,12 @@ public class MassiveStringSorting {
 			UnilateralSortMerger<String> sorter = null;
 			BufferedReader reader = null;
 			BufferedReader verifyReader = null;
+			MemoryManager mm = null;
+			IOManager ioMan = null;
 
 			try {
-				MemoryManager mm = new MemoryManager(1024 * 1024, 1);
-				IOManager ioMan = new IOManagerAsync();
+				mm = new MemoryManager(1024 * 1024, 1);
+				ioMan = new IOManagerAsync();
 
 				TypeSerializer<String> serializer = StringSerializer.INSTANCE;
 				TypeComparator<String> comparator = new StringComparator(true);
@@ -122,6 +124,12 @@ public class MassiveStringSorting {
 				if (sorter != null) {
 					sorter.close();
 				}
+				if (mm != null) {
+					mm.shutdown();
+				}
+				if (ioMan != null) {
+					ioMan.shutdown();
+				}
 			}
 		}
 		catch (Exception e) {
@@ -173,10 +181,12 @@ public class MassiveStringSorting {
 			UnilateralSortMerger<Tuple2<String, String[]>> sorter = null;
 			BufferedReader reader = null;
 			BufferedReader verifyReader = null;
+			MemoryManager mm = null;
+			IOManager ioMan = null;
 
 			try {
-				MemoryManager mm = new MemoryManager(1024 * 1024, 1);
-				IOManager ioMan = new IOManagerAsync();
+				mm = new MemoryManager(1024 * 1024, 1);
+				ioMan = new IOManagerAsync();
 
 				TupleTypeInfo<Tuple2<String, String[]>> typeInfo = (TupleTypeInfo<Tuple2<String, String[]>>)
 						TypeInfoParser.<Tuple2<String, String[]>>parse("Tuple2<String, String[]>");
@@ -243,6 +253,12 @@ public class MassiveStringSorting {
 				if (sorter != null) {
 					sorter.close();
 				}
+				if (mm != null) {
+					mm.shutdown();
+				}
+				if (ioMan != null) {
+					ioMan.shutdown();
+				}
 			}
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/30fc069a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
index 453aa14..1f72e4a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/MassiveStringValueSorting.java
@@ -83,10 +83,12 @@ public class MassiveStringValueSorting {
 			UnilateralSortMerger<StringValue> sorter = null;
 			BufferedReader reader = null;
 			BufferedReader verifyReader = null;
+			MemoryManager mm = null;
+			IOManager ioMan = null;
 
 			try {
-				MemoryManager mm = new MemoryManager(1024 * 1024, 1);
-				IOManager ioMan = new IOManagerAsync();
+				mm = new MemoryManager(1024 * 1024, 1);
+				ioMan = new IOManagerAsync();
 
 				TypeSerializer<StringValue> serializer = new CopyableValueSerializer<StringValue>(StringValue.class);
 				TypeComparator<StringValue> comparator = new CopyableValueComparator<StringValue>(true, StringValue.class);
@@ -124,6 +126,12 @@ public class MassiveStringValueSorting {
 				if (sorter != null) {
 					sorter.close();
 				}
+				if (mm != null) {
+					mm.shutdown();
+				}
+				if (ioMan != null) {
+					ioMan.shutdown();
+				}
 			}
 		}
 		catch (Exception e) {
@@ -177,10 +185,12 @@ public class MassiveStringValueSorting {
 			UnilateralSortMerger<Tuple2<StringValue, StringValue[]>> sorter = null;
 			BufferedReader reader = null;
 			BufferedReader verifyReader = null;
+			MemoryManager mm = null;
+			IOManager ioMan = null;
 
 			try {
-				MemoryManager mm = new MemoryManager(1024 * 1024, 1);
-				IOManager ioMan = new IOManagerAsync();
+				mm = new MemoryManager(1024 * 1024, 1);
+				ioMan = new IOManagerAsync();
 
 				TupleTypeInfo<Tuple2<StringValue, StringValue[]>> typeInfo = (TupleTypeInfo<Tuple2<StringValue, StringValue[]>>)
 						TypeInfoParser.<Tuple2<StringValue, StringValue[]>>parse("Tuple2<org.apache.flink.types.StringValue, org.apache.flink.types.StringValue[]>");
@@ -247,6 +257,12 @@ public class MassiveStringValueSorting {
 				if (sorter != null) {
 					sorter.close();
 				}
+				if (mm != null) {
+					mm.shutdown();
+				}
+				if (ioMan != null) {
+					ioMan.shutdown();
+				}
 			}
 		}
 		catch (Exception e) {


[4/4] flink git commit: [FLINK-8235][build] Spotbugs exclusion file path now absolute

Posted by ch...@apache.org.
[FLINK-8235][build] Spotbugs exclusion file path now absolute

This closes #5146.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/beb11976
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/beb11976
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/beb11976

Branch: refs/heads/master
Commit: beb11976fe63c20a5dc9f22ea713c05b4d5e9585
Parents: 784dbbe
Author: zentol <ch...@apache.org>
Authored: Mon Dec 11 11:44:57 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Dec 12 11:53:41 2017 +0100

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/beb11976/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d18a3a6..ec17818 100644
--- a/pom.xml
+++ b/pom.xml
@@ -616,7 +616,7 @@ under the License.
 							<threshold>Low</threshold>
 							<effort>default</effort>
 							<findbugsXmlOutputDirectory>${project.build.directory}/spotbugs</findbugsXmlOutputDirectory>
-							<excludeFilterFile>tools/maven/spotbugs-exclude.xml</excludeFilterFile>
+							<excludeFilterFile>${rootDir}/tools/maven/spotbugs-exclude.xml</excludeFilterFile>
 							<failOnError>true</failOnError>
 						</configuration>
 					</plugin>


[3/4] flink git commit: [FLINK-7692][metrics] Support user-defined variables

Posted by ch...@apache.org.
[FLINK-7692][metrics] Support user-defined variables

This closes #5115.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/784dbbee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/784dbbee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/784dbbee

Branch: refs/heads/master
Commit: 784dbbeeeb0736c29225b58ec01f1cf95234e881
Parents: 30fc069
Author: Tony Wei <to...@gmail.com>
Authored: Tue Nov 28 15:35:07 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Dec 12 11:52:40 2017 +0100

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      |  51 ++++++-
 .../org/apache/flink/metrics/MetricGroup.java   |  13 ++
 .../groups/UnregisteredMetricsGroup.java        |   5 +
 .../metrics/groups/AbstractMetricGroup.java     |  50 ++++++-
 .../metrics/groups/GenericKeyMetricGroup.java   |  52 +++++++
 .../metrics/groups/GenericValueMetricGroup.java |  57 ++++++++
 .../metrics/groups/ProxyMetricGroup.java        |   5 +
 .../checkpoint/CheckpointStatsTrackerTest.java  |   5 +
 .../runtime/metrics/groups/MetricGroupTest.java | 138 +++++++++++++++++++
 9 files changed, 368 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/784dbbee/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 7963e1a..e4b5161 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -424,14 +424,17 @@ class MyMapper extends RichMapFunction[Long,Long] {
 
 ## Scope
 
-Every metric is assigned an identifier under which it will be reported that is based on 3 components: the user-provided name when registering the metric, an optional user-defined scope and a system-provided scope.
+Every metric is assigned an identifier and a set of key-value pairs under which the metric will be reported.
+
+THe identifier is based on 3 components: the user-defined name when registering the metric, an optional user-defined scope and a system-provided scope.
 For example, if `A.B` is the system scope, `C.D` the user scope and `E` the name, then the identifier for the metric will be `A.B.C.D.E`.
 
 You can configure which delimiter to use for the identifier (default: `.`) by setting the `metrics.scope.delimiter` key in `conf/flink-conf.yaml`.
 
 ### User Scope
 
-You can define a user scope by calling either `MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`.
+You can define a user scope by calling `MetricGroup#addGroup(String name)`, `MetricGroup#addGroup(int name)` or `Metric#addGroup(String key, String value)`.
+These methods affect what `MetricGroup#getMetricIdentifier` and `MetricGroup#getScopeComponents` return.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -442,6 +445,11 @@ counter = getRuntimeContext()
   .addGroup("MyMetrics")
   .counter("myCounter");
 
+counter = getRuntimeContext()
+  .getMetricGroup()
+  .addGroup("MyMetricsKey", "MyMetricsValue")
+  .counter("myCounter");
+
 {% endhighlight %}
 </div>
 
@@ -453,6 +461,11 @@ counter = getRuntimeContext()
   .addGroup("MyMetrics")
   .counter("myCounter")
 
+counter = getRuntimeContext()
+  .getMetricGroup()
+  .addGroup("MyMetricsKey", "MyMetricsValue")
+  .counter("myCounter")
+
 {% endhighlight %}
 </div>
 
@@ -508,6 +521,40 @@ or by assigning unique names to jobs and operators.
 
 **Important:** For the Batch API, &lt;operator_id&gt; is always equal to &lt;task_id&gt;.
 
+### User Variables
+
+You can define a user variable by calling `MetricGroup#addGroup(String key, String value)`.
+This method affects what `MetricGroup#getMetricIdentifier`, `MetricGroup#getScopeComponents` and `MetricGroup#getAllVariables()` returns.
+
+**Important:** User variables cannot be used in scope formats.
+
+{% highlight java %}
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+counter = getRuntimeContext()
+  .getMetricGroup()
+  .addGroup("MyMetricsKey", "MyMetricsValue")
+  .counter("myCounter");
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+counter = getRuntimeContext()
+  .getMetricGroup()
+  .addGroup("MyMetricsKey", "MyMetricsValue")
+  .counter("myCounter")
+
+{% endhighlight %}
+</div>
+
+</div>
+
 ## Reporter
 
 Metrics can be exposed to an external system by configuring one or several reporters in `conf/flink-conf.yaml`. These

http://git-wip-us.apache.org/repos/asf/flink/blob/784dbbee/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
index 39ab3b6..f1f1981 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
@@ -150,6 +150,19 @@ public interface MetricGroup {
 	 */
 	MetricGroup addGroup(String name);
 
+	/**
+	 * Creates a new key-value MetricGroup pair. The key group is added to this groups sub-groups, while the value group
+	 * is added to the key group's sub-groups. This method returns the value group.
+	 *
+	 * <p>The only difference between calling this method and {@code group.addGroup(key).addGroup(value)} is that
+	 * {@link #getAllVariables()} of the value group return an additional {@code "<key>"="value"} pair.
+	 *
+	 * @param key name of the first group
+	 * @param value name of the second group
+	 * @return the second created group
+	 */
+	MetricGroup addGroup(String key, String value);
+
 	// ------------------------------------------------------------------------
 	// Scope
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/784dbbee/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
index ea11b43..e004124 100644
--- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
+++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
@@ -96,6 +96,11 @@ public class UnregisteredMetricsGroup implements MetricGroup {
 	}
 
 	@Override
+	public MetricGroup addGroup(String key, String value) {
+		return new UnregisteredMetricsGroup();
+	}
+
+	@Override
 	public String[] getScopeComponents() {
 		return new String[0];
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/784dbbee/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 66eace5..e6df3a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -113,10 +113,10 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 		if (variables == null) { // avoid synchronization for common case
 			synchronized (this) {
 				if (variables == null) {
-					if (parent != null) {
-						variables = parent.getAllVariables();
-					} else { // this case should only be true for mock groups
-						variables = new HashMap<>();
+					variables = new HashMap<>();
+					putVariables(variables);
+					if (parent != null) { // not true for Job-/TaskManagerMetricGroup and mocks
+						variables.putAll(parent.getAllVariables());
 					}
 				}
 			}
@@ -125,6 +125,14 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 	}
 
 	/**
+	 * Enters all variables specific to this {@link AbstractMetricGroup} and their associated values into the map.
+	 *
+	 * @param variables map to enter variables and their values into
+	 */
+	protected void putVariables(Map<String, String> variables) {
+	}
+
+	/**
 	 * Returns the logical scope of this group, for example
 	 * {@code "taskmanager.job.task"}.
 	 *
@@ -388,11 +396,20 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 
 	@Override
 	public MetricGroup addGroup(int name) {
-		return addGroup(String.valueOf(name));
+		return addGroup(String.valueOf(name), ChildType.GENERIC);
 	}
 
 	@Override
 	public MetricGroup addGroup(String name) {
+		return addGroup(name, ChildType.GENERIC);
+	}
+
+	@Override
+	public MetricGroup addGroup(String key, String value) {
+		return addGroup(key, ChildType.KEY).addGroup(value, ChildType.VALUE);
+	}
+
+	private AbstractMetricGroup<?> addGroup(String name, ChildType childType) {
 		synchronized (this) {
 			if (!closed) {
 				// adding a group with the same name as a metric creates problems in many reporters/dashboards
@@ -403,7 +420,7 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 							name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents));
 				}
 
-				AbstractMetricGroup newGroup = new GenericMetricGroup(registry, this, name);
+				AbstractMetricGroup newGroup = createChildGroup(name, childType);
 				AbstractMetricGroup prior = groups.put(name, newGroup);
 				if (prior == null) {
 					// no prior group with that name
@@ -422,4 +439,25 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl
 			}
 		}
 	}
+
+	protected GenericMetricGroup createChildGroup(String name, ChildType childType) {
+		switch (childType) {
+			case KEY:
+				return new GenericKeyMetricGroup(registry, this, name);
+			default:
+				return new GenericMetricGroup(registry, this, name);
+		}
+	}
+
+	/**
+	 * Enum for indicating which child group should be created.
+	 * `KEY` is used to create {@link GenericKeyMetricGroup}.
+	 * `VALUE` is used to create {@link GenericValueMetricGroup}.
+	 * `GENERIC` is used to create {@link GenericMetricGroup}.
+	 */
+	protected enum ChildType {
+		KEY,
+		VALUE,
+		GENERIC
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/784dbbee/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericKeyMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericKeyMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericKeyMetricGroup.java
new file mode 100644
index 0000000..0911935
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericKeyMetricGroup.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+
+/**
+ * A {@link GenericMetricGroup} for representing the key part of a key-value metric group pair.
+ *
+ * @see GenericValueMetricGroup
+ * @see MetricGroup#addGroup(String, String)
+ */
+@Internal
+public class GenericKeyMetricGroup extends GenericMetricGroup {
+
+	GenericKeyMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) {
+		super(registry, parent, name);
+	}
+
+	@Override
+	public MetricGroup addGroup(String key, String value) {
+		return addGroup(key).addGroup(value);
+	}
+
+	@Override
+	protected GenericMetricGroup createChildGroup(String name, ChildType childType) {
+		switch (childType) {
+			case VALUE:
+				return new GenericValueMetricGroup(registry, this, name);
+			default:
+				return new GenericMetricGroup(registry, this, name);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/784dbbee/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericValueMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericValueMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericValueMetricGroup.java
new file mode 100644
index 0000000..ef8e6e8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericValueMetricGroup.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+
+import java.util.Map;
+
+/**
+ * A {@link GenericMetricGroup} for representing the value part of a key-value metric group pair.
+ *
+ * @see GenericKeyMetricGroup
+ * @see MetricGroup#addGroup(String, String)
+ */
+@Internal
+public class GenericValueMetricGroup extends GenericMetricGroup {
+	private String key;
+	private final String value;
+
+	GenericValueMetricGroup(MetricRegistry registry, GenericKeyMetricGroup parent, String value) {
+		super(registry, parent, value);
+		this.key = parent.getGroupName(name -> name);
+		this.value = value;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	protected void putVariables(Map<String, String> variables) {
+		variables.put(ScopeFormat.asVariable(this.key), value);
+	}
+
+	@Override
+	public String getLogicalScope(CharacterFilter filter, char delimiter) {
+		return parent.getLogicalScope(filter, delimiter);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/784dbbee/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
index 2d49913..ea1ba41 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
@@ -103,6 +103,11 @@ public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup {
 	}
 
 	@Override
+	public final MetricGroup addGroup(String key, String value) {
+		return parentMetricGroup.addGroup(key, value);
+	}
+
+	@Override
 	public String[] getScopeComponents() {
 		return parentMetricGroup.getScopeComponents();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/784dbbee/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index a47b497..e7a1d7b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -385,6 +385,11 @@ public class CheckpointStatsTrackerTest {
 			}
 
 			@Override
+			public MetricGroup addGroup(String key, String value) {
+				throw new UnsupportedOperationException("Not expected in this test");
+			}
+
+			@Override
 			public String[] getScopeComponents() {
 				throw new UnsupportedOperationException("Not expected in this test");
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/784dbbee/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 94760e6..0fced33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -27,7 +27,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.TestLogger;
@@ -39,6 +41,7 @@ import org.junit.Test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -78,6 +81,141 @@ public class MetricGroupTest extends TestLogger {
 		assertTrue(subgroup1 == subgroup2);
 	}
 
+	/**
+	 * Verifies the basic behavior when defining user-defined variables.
+	 */
+	@Test
+	public void testUserDefinedVariable() {
+		MetricRegistry registry = new NoOpMetricRegistry();
+		GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
+
+		String key = "key";
+		String value = "value";
+		MetricGroup group = root.addGroup(key, value);
+
+		String variableValue = group.getAllVariables().get(ScopeFormat.asVariable("key"));
+		assertEquals(value, variableValue);
+
+		String identifier = group.getMetricIdentifier("metric");
+		assertTrue("Key is missing from metric identifier.", identifier.contains("key"));
+		assertTrue("Value is missing from metric identifier.", identifier.contains("value"));
+
+		String logicalScope = ((AbstractMetricGroup) group).getLogicalScope(new DummyCharacterFilter());
+		assertTrue("Key is missing from logical scope.", logicalScope.contains(key));
+		assertFalse("Value is present in logical scope.", logicalScope.contains(value));
+	}
+
+	/**
+	 * Verifies that calling {@link MetricGroup#addGroup(String, String)} on a {@link GenericKeyMetricGroup} goes
+	 * through the generic code path.
+	 */
+	@Test
+	public void testUserDefinedVariableOnKeyGroup() {
+		MetricRegistry registry = new NoOpMetricRegistry();
+		GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
+
+		String key1 = "key1";
+		String value1 = "value1";
+		root.addGroup(key1, value1);
+
+		String key2 = "key2";
+		String value2 = "value2";
+		MetricGroup group = root.addGroup(key1).addGroup(key2, value2);
+
+		String variableValue = group.getAllVariables().get("value2");
+		assertNull(variableValue);
+
+		String identifier = group.getMetricIdentifier("metric");
+		assertTrue("Key1 is missing from metric identifier.", identifier.contains("key1"));
+		assertTrue("Key2 is missing from metric identifier.", identifier.contains("key2"));
+		assertTrue("Value2 is missing from metric identifier.", identifier.contains("value2"));
+
+		String logicalScope = ((AbstractMetricGroup) group).getLogicalScope(new DummyCharacterFilter());
+		assertTrue("Key1 is missing from logical scope.", logicalScope.contains(key1));
+		assertTrue("Key2 is missing from logical scope.", logicalScope.contains(key2));
+		assertTrue("Value2 is missing from logical scope.", logicalScope.contains(value2));
+	}
+
+	/**
+	 * Verifies that calling {@link MetricGroup#addGroup(String, String)} if a generic group with the key name already
+	 * exists goes through the generic code path.
+	 */
+	@Test
+	public void testNameCollisionForKeyAfterGenericGroup() {
+		MetricRegistry registry = new NoOpMetricRegistry();
+		GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
+
+		String key = "key";
+		String value = "value";
+
+		root.addGroup(key);
+		MetricGroup group = root.addGroup(key, value);
+
+		String variableValue = group.getAllVariables().get(ScopeFormat.asVariable("key"));
+		assertNull(variableValue);
+
+		String identifier = group.getMetricIdentifier("metric");
+		assertTrue("Key is missing from metric identifier.", identifier.contains("key"));
+		assertTrue("Value is missing from metric identifier.", identifier.contains("value"));
+
+		String logicalScope = ((AbstractMetricGroup) group).getLogicalScope(new DummyCharacterFilter());
+		assertTrue("Key is missing from logical scope.", logicalScope.contains(key));
+		assertTrue("Value is missing from logical scope.", logicalScope.contains(value));
+	}
+
+	/**
+	 * Verifies that calling {@link MetricGroup#addGroup(String, String)} if a generic group with the key and value name
+	 * already exists goes through the generic code path.
+	 */
+	@Test
+	public void testNameCollisionForKeyAndValueAfterGenericGroup() {
+		MetricRegistry registry = new NoOpMetricRegistry();
+		GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
+
+		String key = "key";
+		String value = "value";
+
+		root.addGroup(key).addGroup(value);
+		MetricGroup group = root.addGroup(key, value);
+
+		String variableValue = group.getAllVariables().get(ScopeFormat.asVariable("key"));
+		assertNull(variableValue);
+
+		String identifier = group.getMetricIdentifier("metric");
+		assertTrue("Key is missing from metric identifier.", identifier.contains("key"));
+		assertTrue("Value is missing from metric identifier.", identifier.contains("value"));
+
+		String logicalScope = ((AbstractMetricGroup) group).getLogicalScope(new DummyCharacterFilter());
+		assertTrue("Key is missing from logical scope.", logicalScope.contains(key));
+		assertTrue("Value is missing from logical scope.", logicalScope.contains(value));
+	}
+
+	/**
+	 * Verifies that existing key/value groups are returned when calling {@link MetricGroup#addGroup(String)}.
+	 */
+	@Test
+	public void testNameCollisionAfterKeyValueGroup() {
+		MetricRegistry registry = new NoOpMetricRegistry();
+		GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
+
+		String key = "key";
+		String value = "value";
+
+		root.addGroup(key, value);
+		MetricGroup group = root.addGroup(key).addGroup(value);
+
+		String variableValue = group.getAllVariables().get(ScopeFormat.asVariable("key"));
+		assertEquals(value, variableValue);
+
+		String identifier = group.getMetricIdentifier("metric");
+		assertTrue("Key is missing from metric identifier.", identifier.contains("key"));
+		assertTrue("Value is missing from metric identifier.", identifier.contains("value"));
+
+		String logicalScope = ((AbstractMetricGroup) group).getLogicalScope(new DummyCharacterFilter());
+		assertTrue("Key is missing from logical scope.", logicalScope.contains(key));
+		assertFalse("Value is present in logical scope.", logicalScope.contains(value));
+	}
+
 	@Test
 	public void closedGroupDoesNotRegisterMetrics() {
 		GenericMetricGroup group = new GenericMetricGroup(