You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/07/05 13:56:04 UTC

[1/5] flink git commit: [FLINK-9593][cep] Unified After Match semantics with SQL MATCH_RECOGNIZE

Repository: flink
Updated Branches:
  refs/heads/master ce345e394 -> d934cb8fb


http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
index e6efd5e..4462d10 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
@@ -19,7 +19,9 @@
 package org.apache.flink.cep.nfa;
 
 import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
@@ -29,6 +31,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
@@ -216,14 +219,14 @@ public class AfterMatchSkipITCase extends TestLogger{
 		Event d1 = new Event(7, "d1", 0.0);
 		Event d2 = new Event(7, "d2", 0.0);
 
-		streamEvents.add(new StreamRecord<Event>(a1));
-		streamEvents.add(new StreamRecord<Event>(a2));
-		streamEvents.add(new StreamRecord<Event>(b1));
-		streamEvents.add(new StreamRecord<Event>(b2));
-		streamEvents.add(new StreamRecord<Event>(c1));
-		streamEvents.add(new StreamRecord<Event>(c2));
-		streamEvents.add(new StreamRecord<Event>(d1));
-		streamEvents.add(new StreamRecord<Event>(d2));
+		streamEvents.add(new StreamRecord<>(a1));
+		streamEvents.add(new StreamRecord<>(a2));
+		streamEvents.add(new StreamRecord<>(b1));
+		streamEvents.add(new StreamRecord<>(b2));
+		streamEvents.add(new StreamRecord<>(c1));
+		streamEvents.add(new StreamRecord<>(c2));
+		streamEvents.add(new StreamRecord<>(d1));
+		streamEvents.add(new StreamRecord<>(d2));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("a", AfterMatchSkipStrategy.skipPastLastEvent()).where(new SimpleCondition<Event>() {
 
@@ -245,7 +248,7 @@ public class AfterMatchSkipITCase extends TestLogger{
 			public boolean filter(Event value) throws Exception {
 				return value.getName().contains("c");
 			}
-		}).followedByAny("d").where(new SimpleCondition<Event>() {
+		}).followedBy("d").where(new SimpleCondition<Event>() {
 				@Override
 				public boolean filter(Event value) throws Exception {
 					return value.getName().contains("d");
@@ -255,15 +258,8 @@ public class AfterMatchSkipITCase extends TestLogger{
 
 		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
 
-		compareMaps(resultingPatterns, Lists.newArrayList(
-			Lists.newArrayList(a1, b1, c1, d1),
-			Lists.newArrayList(a1, b1, c2, d1),
-			Lists.newArrayList(a1, b2, c1, d1),
-			Lists.newArrayList(a1, b2, c2, d1),
-			Lists.newArrayList(a2, b1, c1, d1),
-			Lists.newArrayList(a2, b1, c2, d1),
-			Lists.newArrayList(a2, b2, c1, d1),
-			Lists.newArrayList(a2, b2, c2, d1)
+		compareMaps(resultingPatterns, Collections.singletonList(
+			Lists.newArrayList(a1, b1, c1, d1)
 		));
 	}
 
@@ -481,4 +477,279 @@ public class AfterMatchSkipITCase extends TestLogger{
 			Lists.newArrayList(a3, b4)
 		));
 	}
+
+	/** Example from docs. */
+	@Test
+	public void testSkipPastLastWithOneOrMoreAtBeginning() throws Exception {
+		List<StreamRecord<Event>> streamEvents = new ArrayList<>();
+
+		Event a1 = new Event(1, "a1", 0.0);
+		Event a2 = new Event(2, "a2", 0.0);
+		Event a3 = new Event(3, "a3", 0.0);
+		Event b1 = new Event(4, "b1", 0.0);
+
+		streamEvents.add(new StreamRecord<>(a1));
+		streamEvents.add(new StreamRecord<>(a2));
+		streamEvents.add(new StreamRecord<>(a3));
+		streamEvents.add(new StreamRecord<>(b1));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("a", AfterMatchSkipStrategy.skipPastLastEvent()
+		).where(
+			new SimpleCondition<Event>() {
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().contains("a");
+				}
+			}
+		).oneOrMore().consecutive().greedy()
+			.next("b").where(new SimpleCondition<Event>() {
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().contains("b");
+				}
+			});
+		NFA<Event> nfa = compile(pattern, false);
+
+		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+
+		compareMaps(resultingPatterns, Collections.singletonList(
+			Lists.newArrayList(a1, a2, a3, b1)
+		));
+	}
+
+	/** Example from docs. */
+	@Test
+	public void testSkipToLastWithOneOrMoreAtBeginning() throws Exception {
+		List<StreamRecord<Event>> streamEvents = new ArrayList<>();
+
+		Event a1 = new Event(1, "a1", 0.0);
+		Event a2 = new Event(2, "a2", 0.0);
+		Event a3 = new Event(3, "a3", 0.0);
+		Event b1 = new Event(4, "b1", 0.0);
+
+		streamEvents.add(new StreamRecord<>(a1));
+		streamEvents.add(new StreamRecord<>(a2));
+		streamEvents.add(new StreamRecord<>(a3));
+		streamEvents.add(new StreamRecord<>(b1));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("a", AfterMatchSkipStrategy.skipToLast("a")
+		).where(
+			new SimpleCondition<Event>() {
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().contains("a");
+				}
+			}
+		).oneOrMore().consecutive().greedy()
+			.next("b").where(new SimpleCondition<Event>() {
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().contains("b");
+				}
+			});
+		NFA<Event> nfa = compile(pattern, false);
+
+		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+
+		compareMaps(resultingPatterns, Lists.newArrayList(
+			Lists.newArrayList(a1, a2, a3, b1),
+			Lists.newArrayList(a3, b1)
+		));
+	}
+
+	/** Example from docs. */
+	@Test
+	public void testSkipToFirstWithOneOrMoreAtBeginning() throws Exception {
+		List<StreamRecord<Event>> streamEvents = new ArrayList<>();
+
+		Event a1 = new Event(1, "a1", 0.0);
+		Event a2 = new Event(2, "a2", 0.0);
+		Event a3 = new Event(3, "a3", 0.0);
+		Event b1 = new Event(4, "b1", 0.0);
+
+		streamEvents.add(new StreamRecord<>(a1));
+		streamEvents.add(new StreamRecord<>(a2));
+		streamEvents.add(new StreamRecord<>(a3));
+		streamEvents.add(new StreamRecord<>(b1));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("a", AfterMatchSkipStrategy.skipToFirst("a")
+		).where(
+			new SimpleCondition<Event>() {
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().contains("a");
+				}
+			}
+		).oneOrMore().consecutive().greedy()
+			.next("b").where(new SimpleCondition<Event>() {
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().contains("b");
+				}
+			});
+		NFA<Event> nfa = compile(pattern, false);
+
+		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+
+		compareMaps(resultingPatterns, Lists.newArrayList(
+			Lists.newArrayList(a1, a2, a3, b1),
+			Lists.newArrayList(a2, a3, b1),
+			Lists.newArrayList(a3, b1)
+		));
+	}
+
+	/** Example from docs. */
+	@Test
+	public void testNoSkipWithOneOrMoreAtBeginning() throws Exception {
+		List<StreamRecord<Event>> streamEvents = new ArrayList<>();
+
+		Event a1 = new Event(1, "a1", 0.0);
+		Event a2 = new Event(2, "a2", 0.0);
+		Event a3 = new Event(3, "a3", 0.0);
+		Event b1 = new Event(4, "b1", 0.0);
+
+		streamEvents.add(new StreamRecord<>(a1));
+		streamEvents.add(new StreamRecord<>(a2));
+		streamEvents.add(new StreamRecord<>(a3));
+		streamEvents.add(new StreamRecord<>(b1));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("a", AfterMatchSkipStrategy.noSkip()
+		).where(
+			new SimpleCondition<Event>() {
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().contains("a");
+				}
+			}
+		).oneOrMore().consecutive().greedy()
+			.next("b").where(new SimpleCondition<Event>() {
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().contains("b");
+				}
+			});
+		NFA<Event> nfa = compile(pattern, false);
+
+		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+
+		compareMaps(resultingPatterns, Lists.newArrayList(
+			Lists.newArrayList(a1, a2, a3, b1),
+			Lists.newArrayList(a2, a3, b1),
+			Lists.newArrayList(a3, b1)
+		));
+	}
+
+	/** Example from docs. */
+	@Test
+	public void testSkipToFirstDiscarding() throws Exception {
+		List<StreamRecord<Event>> streamEvents = new ArrayList<>();
+
+		Event a = new Event(1, "a", 0.0);
+		Event b = new Event(2, "b", 0.0);
+		Event c1 = new Event(3, "c1", 0.0);
+		Event c2 = new Event(4, "c2", 0.0);
+		Event c3 = new Event(5, "c3", 0.0);
+		Event d = new Event(6, "d", 0.0);
+
+		streamEvents.add(new StreamRecord<>(a));
+		streamEvents.add(new StreamRecord<>(b));
+		streamEvents.add(new StreamRecord<>(c1));
+		streamEvents.add(new StreamRecord<>(c2));
+		streamEvents.add(new StreamRecord<>(c3));
+		streamEvents.add(new StreamRecord<>(d));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("a or c", AfterMatchSkipStrategy.skipToFirst("c*")
+		).where(
+			new SimpleCondition<Event>() {
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().contains("a") || value.getName().contains("c");
+				}
+			}
+		).followedBy("b or c").where(
+			new SimpleCondition<Event>() {
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().contains("b") || value.getName().contains("c");
+				}
+			}
+		).followedBy("c*").where(
+			new SimpleCondition<Event>() {
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().contains("c");
+				}
+			}
+		).oneOrMore().greedy()
+			.followedBy("d").where(new SimpleCondition<Event>() {
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().contains("d");
+				}
+			});
+		NFA<Event> nfa = compile(pattern, false);
+
+		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+
+		compareMaps(resultingPatterns, Lists.newArrayList(
+			Lists.newArrayList(a, b, c1, c2, c3, d),
+			Lists.newArrayList(c1, c2, c3, d)
+		));
+	}
+
+	@Test
+	public void testSkipBeforeOtherAlreadyCompleted() throws Exception {
+		List<StreamRecord<Event>> streamEvents = new ArrayList<>();
+
+		Event a1 = new Event(1, "a1", 0.0);
+		Event c1 = new Event(2, "c1", 0.0);
+		Event a2 = new Event(3, "a2", 1.0);
+		Event c2 = new Event(4, "c2", 0.0);
+		Event b1 = new Event(5, "b1", 1.0);
+		Event b2 = new Event(6, "b2", 0.0);
+
+		streamEvents.add(new StreamRecord<>(a1));
+		streamEvents.add(new StreamRecord<>(c1));
+		streamEvents.add(new StreamRecord<>(a2));
+		streamEvents.add(new StreamRecord<>(c2));
+		streamEvents.add(new StreamRecord<>(b1));
+		streamEvents.add(new StreamRecord<>(b2));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("a", AfterMatchSkipStrategy.skipToFirst("c")
+		).where(
+			new SimpleCondition<Event>() {
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().contains("a");
+				}
+			}
+		).followedBy("c").where(new SimpleCondition<Event>() {
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().contains("c");
+			}
+		}).followedBy("b").where(new IterativeCondition<Event>() {
+			@Override
+			public boolean filter(Event value, Context<Event> ctx) throws Exception {
+				return value.getName().contains("b") &&
+					ctx.getEventsForPattern("a").iterator().next().getPrice() == value.getPrice();
+			}
+		});
+		NFA<Event> nfa = compile(pattern, false);
+
+		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+
+		compareMaps(resultingPatterns, Lists.newArrayList(
+			Lists.newArrayList(a1, c1, b2),
+			Lists.newArrayList(a2, c2, b1)
+		));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
index a992f46..93116ff 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
@@ -1084,8 +1084,8 @@ public class GroupITCase extends TestLogger {
 			Lists.newArrayList(c, a1, b1, a2, b2, d)
 		));
 
-		assertEquals(1, nfaState.getComputationStates().size());
-		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
+		assertEquals(1, nfaState.getPartialMatches().size());
+		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index ae68d02..b615114 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -2342,8 +2342,8 @@ public class NFAITCase extends TestLogger {
 		//pruning element
 		nfa.advanceTime(sharedBuffer, nfaState, 10);
 
-		assertEquals(1, nfaState.getComputationStates().size());
-		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
+		assertEquals(1, nfaState.getPartialMatches().size());
+		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -2386,8 +2386,8 @@ public class NFAITCase extends TestLogger {
 		//pruning element
 		nfa.advanceTime(sharedBuffer, nfaState, 10);
 
-		assertEquals(1, nfaState.getComputationStates().size());
-		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
+		assertEquals(1, nfaState.getPartialMatches().size());
+		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -2432,8 +2432,8 @@ public class NFAITCase extends TestLogger {
 		//pruning element
 		nfa.advanceTime(sharedBuffer, nfaState, 10);
 
-		assertEquals(1, nfaState.getComputationStates().size());
-		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
+		assertEquals(1, nfaState.getPartialMatches().size());
+		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -2478,8 +2478,8 @@ public class NFAITCase extends TestLogger {
 		//pruning element
 		nfa.advanceTime(sharedBuffer, nfaState, 10);
 
-		assertEquals(1, nfaState.getComputationStates().size());
-		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
+		assertEquals(1, nfaState.getPartialMatches().size());
+		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
 	}
 
 	///////////////////////////////////////   Skip till next     /////////////////////////////

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
index 58ba224..805c142 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
@@ -19,6 +19,7 @@
 package org.apache.flink.cep.nfa;
 
 import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.cep.utils.TestSharedBuffer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
index 88504e9..b7f1177 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
@@ -146,8 +146,8 @@ public void testClearingBuffer() throws Exception {
 	compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 		Lists.newArrayList(a1, b1, c1, d)
 	));
-	assertEquals(1, nfaState.getComputationStates().size());
-	assertEquals("a", nfaState.getComputationStates().peek().getCurrentStateName());
+	assertEquals(1, nfaState.getPartialMatches().size());
+	assertEquals("a", nfaState.getPartialMatches().peek().getCurrentStateName());
 }
 
 @Test
@@ -193,8 +193,8 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
 		Lists.newArrayList(a1, d1, d2),
 		Lists.newArrayList(a1, d1)
 	));
-	assertEquals(1, nfaState.getComputationStates().size());
-	assertEquals("a", nfaState.getComputationStates().peek().getCurrentStateName());
+	assertEquals(1, nfaState.getPartialMatches().size());
+	assertEquals("a", nfaState.getPartialMatches().peek().getCurrentStateName());
 }
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
index fb7f086..be0f28b 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
@@ -100,8 +100,8 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1, breaking)
 		));
 
-		assertEquals(1, nfaState.getComputationStates().size());
-		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
+		assertEquals(1, nfaState.getPartialMatches().size());
+		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -151,8 +151,8 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, breaking),
 			Lists.newArrayList(startEvent, middleEvent1, breaking)
 		));
-		assertEquals(1, nfaState.getComputationStates().size());
-		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
+		assertEquals(1, nfaState.getPartialMatches().size());
+		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -200,8 +200,8 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
 			Lists.newArrayList(startEvent, middleEvent1, breaking)
 		));
-		assertEquals(1, nfaState.getComputationStates().size());
-		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
+		assertEquals(1, nfaState.getPartialMatches().size());
+		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -250,8 +250,8 @@ public class UntilConditionITCase {
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, breaking)
 		));
-		assertEquals(1, nfaState.getComputationStates().size());
-		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
+		assertEquals(1, nfaState.getPartialMatches().size());
+		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -300,8 +300,8 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1, breaking),
 			Lists.newArrayList(startEvent, breaking)
 		));
-		assertEquals(1, nfaState.getComputationStates().size());
-		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
+		assertEquals(1, nfaState.getPartialMatches().size());
+		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -352,8 +352,8 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1, breaking),
 			Lists.newArrayList(startEvent, breaking)
 		));
-		assertEquals(1, nfaState.getComputationStates().size());
-		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
+		assertEquals(1, nfaState.getPartialMatches().size());
+		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -402,8 +402,8 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1, breaking),
 			Lists.newArrayList(startEvent, breaking)
 		));
-		assertEquals(1, nfaState.getComputationStates().size());
-		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
+		assertEquals(1, nfaState.getPartialMatches().size());
+		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -534,8 +534,8 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1)
 		));
 
-		assertEquals(1, nfaState.getComputationStates().size());
-		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
+		assertEquals(1, nfaState.getPartialMatches().size());
+		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -588,8 +588,8 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1)
 		));
 
-		assertEquals(1, nfaState.getComputationStates().size());
-		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
+		assertEquals(1, nfaState.getPartialMatches().size());
+		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -643,7 +643,7 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent)
 		));
 
-		assertEquals(1, nfaState.getComputationStates().size());
-		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
+		assertEquals(1, nfaState.getPartialMatches().size());
+		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index 92f59a5..f39b174 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -21,11 +21,11 @@ package org.apache.flink.cep.nfa.compiler;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.State;
 import org.apache.flink.cep.nfa.StateTransition;
 import org.apache.flink.cep.nfa.StateTransitionAction;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.pattern.MalformedPatternException;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
index 342c9ef..e2673a9 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
@@ -26,8 +26,6 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -107,33 +105,34 @@ public class SharedBufferTest extends TestLogger {
 		NodeId aLoop5 = sharedBuffer.put("a[]", eventIds[6], aLoop4, DeweyNumber.fromString("1.1"));
 		NodeId b3 = sharedBuffer.put("b", eventIds[7], aLoop5, DeweyNumber.fromString("1.1.0"));
 
-		Collection<Map<String, List<Event>>> patterns3 = sharedBuffer.extractPatterns(b3,
+		List<Map<String, List<EventId>>> patterns3 = sharedBuffer.extractPatterns(b3,
 			DeweyNumber.fromString("1.1.0"));
+		assertEquals(1L, patterns3.size());
+		assertEquals(expectedPattern3, sharedBuffer.materializeMatch(patterns3.get(0)));
 		sharedBuffer.releaseNode(b3);
-		Collection<Map<String, List<Event>>> patterns4 = sharedBuffer.extractPatterns(b3,
+
+		List<Map<String, List<EventId>>> patterns4 = sharedBuffer.extractPatterns(b3,
 			DeweyNumber.fromString("1.1.0"));
+		assertEquals(0L, patterns4.size());
+		assertTrue(patterns4.isEmpty());
 
-		Collection<Map<String, List<Event>>> patterns1 = sharedBuffer.extractPatterns(b1,
+		List<Map<String, List<EventId>>> patterns1 = sharedBuffer.extractPatterns(b1,
 			DeweyNumber.fromString("2.0.0"));
-		Collection<Map<String, List<Event>>> patterns2 = sharedBuffer.extractPatterns(b0,
+		assertEquals(1L, patterns1.size());
+		assertEquals(expectedPattern1, sharedBuffer.materializeMatch(patterns1.get(0)));
+
+		List<Map<String, List<EventId>>> patterns2 = sharedBuffer.extractPatterns(b0,
 			DeweyNumber.fromString("1.0.0"));
-		sharedBuffer.releaseNode(b0);
+		assertEquals(1L, patterns2.size());
+		assertEquals(expectedPattern2, sharedBuffer.materializeMatch(patterns2.get(0)));
 		sharedBuffer.releaseNode(b1);
+		sharedBuffer.releaseNode(b0);
 
 		for (EventId eventId : eventIds) {
 			sharedBuffer.releaseEvent(eventId);
 		}
 
-		assertEquals(1L, patterns3.size());
-		assertEquals(0L, patterns4.size());
-		assertEquals(1L, patterns1.size());
-		assertEquals(1L, patterns2.size());
-
 		assertTrue(sharedBuffer.isEmpty());
-		assertTrue(patterns4.isEmpty());
-		assertEquals(Collections.singletonList(expectedPattern1), patterns1);
-		assertEquals(Collections.singletonList(expectedPattern2), patterns2);
-		assertEquals(Collections.singletonList(expectedPattern3), patterns3);
 	}
 
 	@Test
@@ -200,8 +199,8 @@ public class SharedBufferTest extends TestLogger {
 		NodeId bb = sharedBuffer.put("bb", eventIds[3], aa, DeweyNumber.fromString("1.0.0.0"));
 		NodeId c = sharedBuffer.put("c", eventIds[4], bb, DeweyNumber.fromString("1.0.0.0.0"));
 
-		Collection<Map<String, List<Event>>> patternsResult = sharedBuffer.extractPatterns(c,
-			DeweyNumber.fromString("1.0.0.0.0"));
+		Map<String, List<Event>> patternsResult = sharedBuffer.materializeMatch(sharedBuffer.extractPatterns(c,
+			DeweyNumber.fromString("1.0.0.0.0")).get(0));
 
 		List<String> expectedOrder = new ArrayList<>();
 		expectedOrder.add("a");
@@ -214,7 +213,7 @@ public class SharedBufferTest extends TestLogger {
 			sharedBuffer.releaseEvent(eventId);
 		}
 
-		List<String> resultOrder = new ArrayList<>(patternsResult.iterator().next().keySet());
+		List<String> resultOrder = new ArrayList<>(patternsResult.keySet());
 		assertEquals(expectedOrder, resultOrder);
 	}
 


[4/5] flink git commit: [FLINK-6469][core] Configure Memory Sizes with units

Posted by dw...@apache.org.
[FLINK-6469][core] Configure Memory Sizes with units

This closes #5448


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d02167dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d02167dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d02167dc

Branch: refs/heads/master
Commit: d02167dc5eb6a6d8520955499e84dd4d06f06f6e
Parents: abd61cf
Author: yanghua <ya...@gmail.com>
Authored: Sat Feb 10 14:52:13 2018 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Thu Jul 5 15:54:54 2018 +0200

----------------------------------------------------------------------
 docs/_includes/generated/common_section.html    |  12 +-
 .../generated/job_manager_configuration.html    |   6 +-
 .../generated/task_manager_configuration.html   |  22 +--
 .../client/deployment/ClusterSpecification.java |   5 +-
 .../kafka/KafkaShortRetentionTestBase.java      |   2 +-
 .../connectors/kafka/KafkaTestBase.java         |   2 +-
 .../manualtests/ManualExactlyOnceTest.java      |   2 +-
 ...nualExactlyOnceWithStreamReshardingTest.java |   2 +-
 .../addons/hbase/HBaseConnectorITCase.java      |   2 +-
 .../flink/storm/api/FlinkLocalCluster.java      |   2 +-
 .../flink/configuration/JobManagerOptions.java  |  14 +-
 .../apache/flink/configuration/MemorySize.java  | 143 ++++++++++-----
 .../flink/configuration/TaskManagerOptions.java |  43 +++--
 .../flink/configuration/MemorySizeTest.java     |  17 ++
 flink-dist/src/main/flink-bin/bin/config.sh     | 180 +++++++++++++++++--
 flink-dist/src/main/flink-bin/bin/jobmanager.sh |  13 +-
 .../src/main/flink-bin/bin/taskmanager.sh       |  11 +-
 flink-dist/src/main/resources/flink-conf.yaml   |   4 +-
 flink-dist/src/test/bin/calcTMHeapSizeMB.sh     |   8 +
 flink-dist/src/test/bin/calcTMNetBufMem.sh      |   2 +
 ...kManagerHeapSizeCalculationJavaBashTest.java |  36 ++--
 .../gateway/local/LocalExecutorITCase.java      |   2 +-
 .../HAQueryableStateFsBackendITCase.java        |   2 +-
 .../HAQueryableStateRocksDBBackendITCase.java   |   2 +-
 .../NonHAQueryableStateFsBackendITCase.java     |   2 +-
 ...NonHAQueryableStateRocksDBBackendITCase.java |   2 +-
 .../runtime/webmonitor/WebFrontendITCase.java   |   2 +-
 .../taskexecutor/TaskManagerServices.java       |  26 ++-
 .../TaskManagerServicesConfiguration.java       |  25 ++-
 .../minicluster/LocalFlinkMiniCluster.scala     |   6 +-
 .../runtime/jobmanager/JobManagerTest.java      |   2 +-
 .../NetworkBufferCalculationTest.java           |   8 +-
 .../TaskManagerServicesConfigurationTest.java   |  16 +-
 .../taskexecutor/TaskManagerServicesTest.java   |  44 ++---
 ...cyTaskCancelAsyncProducerConsumerITCase.java |   2 +-
 .../TaskCancelAsyncProducerConsumerITCase.java  |   2 +-
 .../TaskManagerProcessReapingTestBase.java      |   2 +-
 .../taskmanager/TaskManagerStartupTest.java     |  11 +-
 .../runtime/taskmanager/TaskManagerTest.java    |   2 +-
 .../runtime/testutils/TaskManagerProcess.java   |   2 +-
 .../runtime/testingUtils/TestingUtils.scala     |   2 +-
 .../org/apache/flink/api/scala/FlinkShell.scala |  12 +-
 .../LegacyLocalStreamEnvironment.java           |   2 +-
 .../api/environment/LocalStreamEnvironment.java |   2 +-
 .../StreamNetworkBenchmarkEnvironment.java      |   5 +-
 .../flink/test/util/MiniClusterResource.java    |   2 +-
 .../apache/flink/test/util/TestBaseUtils.java   |   4 +-
 .../accumulators/AccumulatorErrorITCase.java    |   2 +-
 .../test/cancelling/CancelingTestBase.java      |   2 +-
 ...tractEventTimeWindowCheckpointingITCase.java |   4 +-
 .../EventTimeAllWindowCheckpointingITCase.java  |   2 +-
 .../KeyedStateCheckpointingITCase.java          |   2 +-
 .../test/checkpointing/SavepointITCase.java     |   2 +-
 .../WindowCheckpointingITCase.java              |   2 +-
 .../test/classloading/ClassLoaderITCase.java    |   2 +-
 .../failing/JobSubmissionFailsITCase.java       |   2 +-
 .../manual/StreamingScalabilityAndLatency.java  |   2 +-
 .../test/misc/CustomSerializationITCase.java    |   2 +-
 ...SuccessAfterNetworkBuffersFailureITCase.java |   2 +-
 ...ctTaskManagerProcessFailureRecoveryTest.java |   2 +-
 ...agerHAProcessFailureBatchRecoveryITCase.java |   2 +-
 .../TaskManagerFailureRecoveryITCase.java       |   2 +-
 .../flink/test/runtime/IPv6HostnamesITCase.java |   2 +-
 .../test/streaming/runtime/TimestampITCase.java |   2 +-
 .../YARNSessionCapacitySchedulerITCase.java     |  30 ++--
 .../flink/yarn/YARNSessionFIFOITCase.java       |  12 +-
 .../flink/yarn/YarnConfigurationITCase.java     |   4 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |   4 +-
 .../apache/flink/yarn/YarnResourceManager.java  |   3 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   9 +-
 .../flink/yarn/FlinkYarnSessionCliTest.java     |  10 +-
 71 files changed, 579 insertions(+), 246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/docs/_includes/generated/common_section.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/common_section.html b/docs/_includes/generated/common_section.html
index 6ad3bef..29245c9 100644
--- a/docs/_includes/generated/common_section.html
+++ b/docs/_includes/generated/common_section.html
@@ -8,14 +8,14 @@
     </thead>
     <tbody>
         <tr>
-            <td><h5>jobmanager.heap.mb</h5></td>
-            <td style="word-wrap: break-word;">1024</td>
-            <td>JVM heap size (in megabytes) for the JobManager.</td>
+            <td><h5>jobmanager.heap.size</h5></td>
+            <td style="word-wrap: break-word;">"1024m"</td>
+            <td>JVM heap size for the JobManager.</td>
         </tr>
         <tr>
-            <td><h5>taskmanager.heap.mb</h5></td>
-            <td style="word-wrap: break-word;">1024</td>
-            <td>JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value.</td>
+            <td><h5>taskmanager.heap.size</h5></td>
+            <td style="word-wrap: break-word;">"1024m"</td>
+            <td>JVM heap size for the TaskManagers, which are the parallel workers of the system. On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value.</td>
         </tr>
         <tr>
             <td><h5>parallelism.default</h5></td>

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/docs/_includes/generated/job_manager_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/job_manager_configuration.html b/docs/_includes/generated/job_manager_configuration.html
index db0ca53..0353874 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -23,9 +23,9 @@
             <td>The maximum number of prior execution attempts kept in history.</td>
         </tr>
         <tr>
-            <td><h5>jobmanager.heap.mb</h5></td>
-            <td style="word-wrap: break-word;">1024</td>
-            <td>JVM heap size (in megabytes) for the JobManager.</td>
+            <td><h5>jobmanager.heap.size</h5></td>
+            <td style="word-wrap: break-word;">"1024m"</td>
+            <td>JVM heap size for the JobManager.</td>
         </tr>
         <tr>
             <td><h5>jobmanager.resourcemanager.reconnect-interval</h5></td>

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/docs/_includes/generated/task_manager_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/task_manager_configuration.html b/docs/_includes/generated/task_manager_configuration.html
index fdb975f..e780f6e 100644
--- a/docs/_includes/generated/task_manager_configuration.html
+++ b/docs/_includes/generated/task_manager_configuration.html
@@ -53,9 +53,9 @@
             <td>Whether the quarantine monitor for task managers shall be started. The quarantine monitor shuts down the actor system if it detects that it has quarantined another actor system or if it has been quarantined by another actor system.</td>
         </tr>
         <tr>
-            <td><h5>taskmanager.heap.mb</h5></td>
-            <td style="word-wrap: break-word;">1024</td>
-            <td>JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value.</td>
+            <td><h5>taskmanager.heap.size</h5></td>
+            <td style="word-wrap: break-word;">"1024m"</td>
+            <td>JVM heap size for the TaskManagers, which are the parallel workers of the system. On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value.</td>
         </tr>
         <tr>
             <td><h5>taskmanager.host</h5></td>
@@ -84,13 +84,13 @@
         </tr>
         <tr>
             <td><h5>taskmanager.memory.segment-size</h5></td>
-            <td style="word-wrap: break-word;">32768</td>
-            <td>Size of memory buffers used by the network stack and the memory manager (in bytes).</td>
+            <td style="word-wrap: break-word;">"32768"</td>
+            <td>Size of memory buffers used by the network stack and the memory manager.</td>
         </tr>
         <tr>
             <td><h5>taskmanager.memory.size</h5></td>
-            <td style="word-wrap: break-word;">-1</td>
-            <td>Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not set, a relative fraction will be allocated.</td>
+            <td style="word-wrap: break-word;">"0"</td>
+            <td>Amount of memory to be allocated by the task manager's memory manager. If not set, a relative fraction will be allocated.</td>
         </tr>
         <tr>
             <td><h5>taskmanager.network.detailed-metrics</h5></td>
@@ -114,13 +114,13 @@
         </tr>
         <tr>
             <td><h5>taskmanager.network.memory.max</h5></td>
-            <td style="word-wrap: break-word;">1073741824</td>
-            <td>Maximum memory size for network buffers (in bytes).</td>
+            <td style="word-wrap: break-word;">"1073741824"</td>
+            <td>Maximum memory size for network buffers.</td>
         </tr>
         <tr>
             <td><h5>taskmanager.network.memory.min</h5></td>
-            <td style="word-wrap: break-word;">67108864</td>
-            <td>Minimum memory size for network buffers (in bytes).</td>
+            <td style="word-wrap: break-word;">"67108864"</td>
+            <td>Minimum memory size for network buffers.</td>
         </tr>
         <tr>
             <td><h5>taskmanager.network.request-backoff.initial</h5></td>

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
index cf2ae4c..90de955 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.deployment;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 
 /**
@@ -67,8 +68,8 @@ public final class ClusterSpecification {
 	public static ClusterSpecification fromConfiguration(Configuration configuration) {
 		int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
 
-		int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
-		int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+		int jobManagerMemoryMb = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+		int taskManagerMemoryMb = MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
 
 		return new ClusterSpecificationBuilder()
 			.setMasterMemoryMB(jobManagerMemoryMb)

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 1e3e1fc..3fd6873 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -84,7 +84,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 
 	private static Configuration getConfiguration() {
 		Configuration flinkConfig = new Configuration();
-		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
+		flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 		return flinkConfig;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index ae77b6f..05307ac 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -134,7 +134,7 @@ public abstract class KafkaTestBase extends TestLogger {
 		Configuration flinkConfig = new Configuration();
 		flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s");
 		flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 s");
-		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
+		flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		return flinkConfig;

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
index 40225fb..6600147 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -79,7 +79,7 @@ public class ManualExactlyOnceTest {
 		final Configuration flinkConfig = new Configuration();
 		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 		flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
-		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
+		flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 
 		LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index 34dcdc0..b221ae5 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -91,7 +91,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
 		final Configuration flinkConfig = new Configuration();
 		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 		flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
-		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16);
+		flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 
 		LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
index 1f64397..dd57fb3 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseConnectorITCase.java
@@ -363,7 +363,7 @@ public class HBaseConnectorITCase extends HBaseTestingClusterAutostarter {
 		public static void setAsContext() {
 			Configuration config = new Configuration();
 			// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
-			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB
+			config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
 			final LocalEnvironment le = new LocalEnvironment(config);
 
 			initializeContextEnvironment(new ExecutionEnvironmentFactory() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
index 6b0b503..655978f 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -88,7 +88,7 @@ public class FlinkLocalCluster {
 			Configuration configuration = new Configuration();
 			configuration.addAll(jobGraph.getJobConfiguration());
 
-			configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
+			configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
 			configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
 
 			this.flink = new LocalFlinkMiniCluster(configuration, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 1ea6919..f78ed9d 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -74,10 +74,20 @@ public class JobManagerOptions {
 			" leader from potentially multiple standby JobManagers.");
 
 	/**
-	 * JVM heap size (in megabytes) for the JobManager.
+	 * JVM heap size for the JobManager with memory size.
 	 */
 	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
-	public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY =
+	public static final ConfigOption<String> JOB_MANAGER_HEAP_MEMORY =
+		key("jobmanager.heap.size")
+		.defaultValue("1024m")
+		.withDescription("JVM heap size for the JobManager.");
+
+	/**
+	 * JVM heap size (in megabytes) for the JobManager.
+	 * @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY}
+	 */
+	@Deprecated
+	public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY_MB =
 		key("jobmanager.heap.mb")
 		.defaultValue(1024)
 		.withDescription("JVM heap size (in megabytes) for the JobManager.");

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java b/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java
index fba2a68..092c012 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MemorySize.java
@@ -22,6 +22,12 @@ import org.apache.flink.annotation.PublicEvolving;
 
 import java.util.Locale;
 
+import static org.apache.flink.configuration.MemorySize.MemoryUnit.BYTES;
+import static org.apache.flink.configuration.MemorySize.MemoryUnit.GIGA_BYTES;
+import static org.apache.flink.configuration.MemorySize.MemoryUnit.KILO_BYTES;
+import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
+import static org.apache.flink.configuration.MemorySize.MemoryUnit.TERA_BYTES;
+import static org.apache.flink.configuration.MemorySize.MemoryUnit.hasUnit;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -33,34 +39,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * <p>The size can be parsed from a text expression. If the expression is a pure number,
  * the value will be interpreted as bytes.
  *
- * <p>To make larger values more compact, the common size suffixes are supported:
- *
- * <ul>
- *     <li>q or 1b or 1bytes (bytes)
- *     <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)
- *     <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)
- *     <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)
- *     <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)
- * </ul>
  */
 @PublicEvolving
 public class MemorySize implements java.io.Serializable {
 
 	private static final long serialVersionUID = 1L;
 
-	private static final String[] BYTES_UNITS = { "b", "bytes" };
-
-	private static final String[] KILO_BYTES_UNITS = { "k", "kb", "kibibytes" };
-
-	private static final String[] MEGA_BYTES_UNITS = { "m", "mb", "mebibytes" };
-
-	private static final String[] GIGA_BYTES_UNITS = { "g", "gb", "gibibytes" };
-
-	private static final String[] TERA_BYTES_UNITS = { "t", "tb", "tebibytes" };
-
-	private static final String ALL_UNITS = concatenateUnits(
-			BYTES_UNITS, KILO_BYTES_UNITS, MEGA_BYTES_UNITS, GIGA_BYTES_UNITS, TERA_BYTES_UNITS);
-
 	// ------------------------------------------------------------------------
 
 	/** The memory size, in bytes. */
@@ -95,8 +79,8 @@ public class MemorySize implements java.io.Serializable {
 	/**
 	 * Gets the memory size in Mebibytes (= 1024 Kibibytes).
 	 */
-	public long getMebiBytes() {
-		return bytes >> 20;
+	public int getMebiBytes() {
+		return (int) (bytes >> 20);
 	}
 
 	/**
@@ -137,7 +121,6 @@ public class MemorySize implements java.io.Serializable {
 
 	/**
 	 * Parses the given string as as MemorySize.
-	 * The supported expressions are listed under {@link MemorySize}.
 	 *
 	 * @param text The string to parse
 	 * @return The parsed MemorySize
@@ -149,6 +132,23 @@ public class MemorySize implements java.io.Serializable {
 	}
 
 	/**
+	 * Parses the given string with a default unit.
+	 *
+	 * @param text The string to parse.
+	 * @param defaultUnit specify the default unit.
+	 * @return The parsed MemorySize.
+	 *
+	 * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.
+	 */
+	public static MemorySize parse(String text, MemoryUnit defaultUnit) throws IllegalArgumentException {
+		if (!hasUnit(text)) {
+			return parse(text + defaultUnit.getUnits()[0]);
+		}
+
+		return parse(text);
+	}
+
+	/**
 	 * Parses the given string as bytes.
 	 * The supported expressions are listed under {@link MemorySize}.
 	 *
@@ -192,24 +192,24 @@ public class MemorySize implements java.io.Serializable {
 			multiplier = 1L;
 		}
 		else {
-			if (matchesAny(unit, BYTES_UNITS)) {
+			if (matchesAny(unit, BYTES)) {
 				multiplier = 1L;
 			}
-			else if (matchesAny(unit, KILO_BYTES_UNITS)) {
+			else if (matchesAny(unit, KILO_BYTES)) {
 				multiplier = 1024L;
 			}
-			else if (matchesAny(unit, MEGA_BYTES_UNITS)) {
+			else if (matchesAny(unit, MEGA_BYTES)) {
 				multiplier = 1024L * 1024L;
 			}
-			else if (matchesAny(unit, GIGA_BYTES_UNITS)) {
+			else if (matchesAny(unit, GIGA_BYTES)) {
 				multiplier = 1024L * 1024L * 1024L;
 			}
-			else if (matchesAny(unit, TERA_BYTES_UNITS)) {
+			else if (matchesAny(unit, TERA_BYTES)) {
 				multiplier = 1024L * 1024L * 1024L * 1024L;
 			}
 			else {
 				throw new IllegalArgumentException("Memory size unit '" + unit +
-						"' does not match any of the recognized units: " + ALL_UNITS);
+						"' does not match any of the recognized units: " + MemoryUnit.getAllUnits());
 			}
 		}
 
@@ -224,8 +224,8 @@ public class MemorySize implements java.io.Serializable {
 		return result;
 	}
 
-	private static boolean matchesAny(String str, String[] variants) {
-		for (String s : variants) {
+	private static boolean matchesAny(String str, MemoryUnit unit) {
+		for (String s : unit.getUnits()) {
 			if (s.equals(str)) {
 				return true;
 			}
@@ -233,22 +233,79 @@ public class MemorySize implements java.io.Serializable {
 		return false;
 	}
 
-	private static String concatenateUnits(final String[]... allUnits) {
-		final StringBuilder builder = new StringBuilder(128);
+	/**
+	 *  Enum which defines memory unit, mostly used to parse value from configuration file.
+	 *
+	 * <p>To make larger values more compact, the common size suffixes are supported:
+	 *
+	 * <ul>
+	 *     <li>q or 1b or 1bytes (bytes)
+	 *     <li>1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes)
+	 *     <li>1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes)
+	 *     <li>1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes)
+	 *     <li>1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes)
+	 * </ul>
+	 *
+	 */
+	public enum MemoryUnit {
+
+		BYTES(new String[] { "b", "bytes" }),
+		KILO_BYTES(new String[] { "k", "kb", "kibibytes" }),
+		MEGA_BYTES(new String[] { "m", "mb", "mebibytes" }),
+		GIGA_BYTES(new String[] { "g", "gb", "gibibytes" }),
+		TERA_BYTES(new String[] { "t", "tb", "tebibytes" });
+
+		private String[] units;
+
+		MemoryUnit(String[] units) {
+			this.units = units;
+		}
+
+		public String[] getUnits() {
+			return units;
+		}
+
+		public static String getAllUnits() {
+			return concatenateUnits(BYTES.getUnits(), KILO_BYTES.getUnits(), MEGA_BYTES.getUnits(), GIGA_BYTES.getUnits(), TERA_BYTES.getUnits());
+		}
+
+		public static boolean hasUnit(String text) {
+			checkNotNull(text, "text");
+
+			final String trimmed = text.trim();
+			checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");
+
+			final int len = trimmed.length();
+			int pos = 0;
+
+			char current;
+			while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {
+				pos++;
+			}
+
+			final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);
+
+			return unit.length() > 0;
+		}
+
+		private static String concatenateUnits(final String[]... allUnits) {
+			final StringBuilder builder = new StringBuilder(128);
+
+			for (String[] units : allUnits) {
+				builder.append('(');
 
-		for (String[] units : allUnits) {
-			builder.append('(');
+				for (String unit : units) {
+					builder.append(unit);
+					builder.append(" | ");
+				}
 
-			for (String unit : units) {
-				builder.append(unit);
-				builder.append(" | ");
+				builder.setLength(builder.length() - 3);
+				builder.append(") / ");
 			}
 
 			builder.setLength(builder.length() - 3);
-			builder.append(") / ");
+			return builder.toString();
 		}
 
-		builder.setLength(builder.length() - 3);
-		return builder.toString();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 7cdda22..2907f6b 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -34,10 +34,23 @@ public class TaskManagerOptions {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * JVM heap size (in megabytes) for the TaskManagers.
+	 * JVM heap size for the TaskManagers with memory size.
 	 */
 	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)
-	public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY =
+	public static final ConfigOption<String> TASK_MANAGER_HEAP_MEMORY =
+			key("taskmanager.heap.size")
+			.defaultValue("1024m")
+			.withDescription("JVM heap size for the TaskManagers, which are the parallel workers of" +
+					" the system. On YARN setups, this value is automatically configured to the size of the TaskManager's" +
+					" YARN container, minus a certain tolerance value.");
+
+	/**
+	 * JVM heap size (in megabytes) for the TaskManagers.
+	 *
+	 * @deprecated use {@link #TASK_MANAGER_HEAP_MEMORY}
+	 */
+	@Deprecated
+	public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY_MB =
 			key("taskmanager.heap.mb")
 			.defaultValue(1024)
 			.withDescription("JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of" +
@@ -179,19 +192,19 @@ public class TaskManagerOptions {
 	/**
 	 * Size of memory buffers used by the network stack and the memory manager (in bytes).
 	 */
-	public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
+	public static final ConfigOption<String> MEMORY_SEGMENT_SIZE =
 			key("taskmanager.memory.segment-size")
-			.defaultValue(32768)
-			.withDescription("Size of memory buffers used by the network stack and the memory manager (in bytes).");
+			.defaultValue("32768")
+			.withDescription("Size of memory buffers used by the network stack and the memory manager.");
 
 	/**
-	 * Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not
+	 * Amount of memory to be allocated by the task manager's memory manager. If not
 	 * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}.
 	 */
-	public static final ConfigOption<Long> MANAGED_MEMORY_SIZE =
+	public static final ConfigOption<String> MANAGED_MEMORY_SIZE =
 			key("taskmanager.memory.size")
-			.defaultValue(-1L)
-			.withDescription("Amount of memory to be allocated by the task manager's memory manager (in megabytes)." +
+			.defaultValue("0")
+			.withDescription("Amount of memory to be allocated by the task manager's memory manager." +
 				" If not set, a relative fraction will be allocated.");
 
 	/**
@@ -257,18 +270,18 @@ public class TaskManagerOptions {
 	/**
 	 * Minimum memory size for network buffers (in bytes).
 	 */
-	public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MIN =
+	public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MIN =
 			key("taskmanager.network.memory.min")
-			.defaultValue(64L << 20) // 64 MB
-			.withDescription("Minimum memory size for network buffers (in bytes).");
+			.defaultValue(String.valueOf(64L << 20)) // 64 MB
+			.withDescription("Minimum memory size for network buffers.");
 
 	/**
 	 * Maximum memory size for network buffers (in bytes).
 	 */
-	public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MAX =
+	public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MAX =
 			key("taskmanager.network.memory.max")
-			.defaultValue(1024L << 20) // 1 GB
-			.withDescription("Maximum memory size for network buffers (in bytes).");
+			.defaultValue(String.valueOf(1024L << 20)) // 1 GB
+			.withDescription("Maximum memory size for network buffers.");
 
 	/**
 	 * Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-core/src/test/java/org/apache/flink/configuration/MemorySizeTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/MemorySizeTest.java b/flink-core/src/test/java/org/apache/flink/configuration/MemorySizeTest.java
index 3fbc58b..9cc1fe3 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/MemorySizeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/MemorySizeTest.java
@@ -24,7 +24,9 @@ import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 /**
@@ -203,4 +205,19 @@ public class MemorySizeTest {
 	public void testParseNumberTimeUnitOverflow() {
 		MemorySize.parseBytes("100000000000000 tb");
 	}
+
+	@Test
+	public void testParseWithDefaultUnit() {
+		assertEquals(7, MemorySize.parse("7", MEGA_BYTES).getMebiBytes());
+		assertNotEquals(7, MemorySize.parse("7340032", MEGA_BYTES));
+		assertEquals(7, MemorySize.parse("7m", MEGA_BYTES).getMebiBytes());
+		assertEquals(7168, MemorySize.parse("7", MEGA_BYTES).getKibiBytes());
+		assertEquals(7168, MemorySize.parse("7m", MEGA_BYTES).getKibiBytes());
+		assertEquals(7, MemorySize.parse("7 m", MEGA_BYTES).getMebiBytes());
+		assertEquals(7, MemorySize.parse("7mb", MEGA_BYTES).getMebiBytes());
+		assertEquals(7, MemorySize.parse("7 mb", MEGA_BYTES).getMebiBytes());
+		assertEquals(7, MemorySize.parse("7mebibytes", MEGA_BYTES).getMebiBytes());
+		assertEquals(7, MemorySize.parse("7 mebibytes", MEGA_BYTES).getMebiBytes());
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 46f06ef..82b27c1 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -101,8 +101,10 @@ DEFAULT_ENV_SSH_OPTS=""                             # Optional SSH parameters ru
 # CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml
 ########################################################################################################################
 
-KEY_JOBM_MEM_SIZE="jobmanager.heap.mb"
-KEY_TASKM_MEM_SIZE="taskmanager.heap.mb"
+KEY_JOBM_MEM_SIZE="jobmanager.heap.size"
+KEY_JOBM_MEM_MB="jobmanager.heap.mb"
+KEY_TASKM_MEM_SIZE="taskmanager.heap.size"
+KEY_TASKM_MEM_MB="taskmanager.heap.mb"
 KEY_TASKM_MEM_MANAGED_SIZE="taskmanager.memory.size"
 KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction"
 KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap"
@@ -129,6 +131,143 @@ KEY_ZK_HEAP_MB="zookeeper.heap.mb"
 KEY_FLINK_MODE="mode"
 
 ########################################################################################################################
+# MEMORY SIZE UNIT
+########################################################################################################################
+
+BYTES_UNITS=("b" "bytes")
+KILO_BYTES_UNITS=("k" "kb" "kibibytes")
+MEGA_BYTES_UNITS=("m" "mb" "mebibytes")
+GIGA_BYTES_UNITS=("g" "gb" "gibibytes")
+TERA_BYTES_UNITS=("t" "tb" "tebibytes")
+
+hasUnit() {
+    text=$1
+
+    trimmed=$(echo -e "${text}" | tr -d '[:space:]')
+
+    if [ -z "$trimmed" -o "$trimmed" == " " ]; then
+        echo "$trimmed is an empty- or whitespace-only string"
+	exit 1
+    fi
+
+    len=${#trimmed}
+    pos=0
+
+    while [ $pos -lt $len ]; do
+	current=${trimmed:pos:1}
+	if [[ ! $current < '0' ]] && [[ ! $current > '9' ]]; then
+	    let pos+=1
+	else
+	    break
+	fi
+    done
+
+    number=${trimmed:0:pos}
+
+    unit=${trimmed:$pos}
+    unit=$(echo -e "${unit}" | tr -d '[:space:]')
+    unit=$(echo -e "${unit}" | tr '[A-Z]' '[a-z]')
+
+    [[ ! -z "$unit" ]]
+}
+
+parseBytes() {
+    text=$1
+
+    trimmed=$(echo -e "${text}" | tr -d '[:space:]')
+
+    if [ -z "$trimmed" -o "$trimmed" == " " ]; then
+        echo "$trimmed is an empty- or whitespace-only string"
+	exit 1
+    fi
+
+    len=${#trimmed}
+    pos=0
+
+    while [ $pos -lt $len ]; do
+	current=${trimmed:pos:1}
+	if [[ ! $current < '0' ]] && [[ ! $current > '9' ]]; then
+	    let pos+=1
+	else
+	    break
+	fi
+    done
+
+    number=${trimmed:0:pos}
+
+    unit=${trimmed:$pos}
+    unit=$(echo -e "${unit}" | tr -d '[:space:]')
+    unit=$(echo -e "${unit}" | tr '[A-Z]' '[a-z]')
+
+    if [ -z "$number" ]; then
+        echo "text does not start with a number"
+        exit 1
+    fi
+
+    local multiplier
+    if [ -z "$unit" ]; then
+        multiplier=1
+    else
+        if matchesAny $unit "${BYTES_UNITS[*]}"; then
+            multiplier=1
+        elif matchesAny $unit "${KILO_BYTES_UNITS[*]}"; then
+                multiplier=1024
+        elif matchesAny $unit "${MEGA_BYTES_UNITS[*]}"; then
+                multiplier=`expr 1024 \* 1024`
+        elif matchesAny $unit "${GIGA_BYTES_UNITS[*]}"; then
+                multiplier=`expr 1024 \* 1024 \* 1024`
+        elif matchesAny $unit "${TERA_BYTES_UNITS[*]}"; then
+                multiplier=`expr 1024 \* 1024 \* 1024 \* 1024`
+        else
+            echo "[ERROR] Memory size unit $unit does not match any of the recognized units"
+            exit 1
+        fi
+    fi
+
+    ((result=$number * $multiplier))
+
+    if [ $[result / multiplier] != "$number" ]; then
+        echo "[ERROR] The value $text cannot be re represented as 64bit number of bytes (numeric overflow)."
+        exit 1
+    fi
+
+    echo "$result"
+}
+
+matchesAny() {
+    str=$1
+    variants=$2
+
+    for s in ${variants[*]}; do
+        if [ $str == $s ]; then
+            return 0
+        fi
+    done
+
+    return 1
+}
+
+getKibiBytes() {
+    bytes=$1
+    echo "$(($bytes >>10))"
+}
+
+getMebiBytes() {
+    bytes=$1
+    echo "$(($bytes >> 20))"
+}
+
+getGibiBytes() {
+    bytes=$1
+    echo "$(($bytes >> 30))"
+}
+
+getTebiBytes() {
+    bytes=$1
+    echo "$(($bytes >> 40))"
+}
+
+########################################################################################################################
 # PATHS AND CONFIG
 ########################################################################################################################
 
@@ -216,14 +355,30 @@ if [ -z "${FLINK_JM_HEAP}" ]; then
     FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")
 fi
 
+# Try read old config key, if new key not exists
+if [ "${FLINK_JM_HEAP}" == 0 ]; then
+    FLINK_JM_HEAP_MB=$(readFromConfig ${KEY_JOBM_MEM_MB} 0 "${YAML_CONF}")
+fi
+
 # Define FLINK_TM_HEAP if it is not already set
 if [ -z "${FLINK_TM_HEAP}" ]; then
     FLINK_TM_HEAP=$(readFromConfig ${KEY_TASKM_MEM_SIZE} 0 "${YAML_CONF}")
 fi
 
+# Try read old config key, if new key not exists
+if [ "${FLINK_TM_HEAP}" == 0 ]; then
+    FLINK_TM_HEAP_MB=$(readFromConfig ${KEY_TASKM_MEM_MB} 0 "${YAML_CONF}")
+fi
+
 # Define FLINK_TM_MEM_MANAGED_SIZE if it is not already set
 if [ -z "${FLINK_TM_MEM_MANAGED_SIZE}" ]; then
     FLINK_TM_MEM_MANAGED_SIZE=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_SIZE} 0 "${YAML_CONF}")
+
+    if hasUnit ${FLINK_TM_MEM_MANAGED_SIZE}; then
+        FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE}))
+    else
+        FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE}"m"))
+    fi
 fi
 
 # Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
@@ -250,19 +405,24 @@ fi
 # Define FLINK_TM_NET_BUF_MIN and FLINK_TM_NET_BUF_MAX if not already set (as a fallback)
 if [ -z "${FLINK_TM_NET_BUF_MIN}" -a -z "${FLINK_TM_NET_BUF_MAX}" ]; then
     FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_NR} -1 "${YAML_CONF}")
-    FLINK_TM_NET_BUF_MAX=${FLINK_TM_NET_BUF_MIN}
+    if [ $FLINK_TM_NET_BUF_MIN != -1 ]; then
+        FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN})
+        FLINK_TM_NET_BUF_MAX=${FLINK_TM_NET_BUF_MIN}
+    fi
 fi
 
 # Define FLINK_TM_NET_BUF_MIN if it is not already set
 if [ -z "${FLINK_TM_NET_BUF_MIN}" -o "${FLINK_TM_NET_BUF_MIN}" = "-1" ]; then
     # default: 64MB = 67108864 bytes (same as the previous default with 2048 buffers of 32k each)
     FLINK_TM_NET_BUF_MIN=$(readFromConfig ${KEY_TASKM_NET_BUF_MIN} 67108864 "${YAML_CONF}")
+    FLINK_TM_NET_BUF_MIN=$(parseBytes ${FLINK_TM_NET_BUF_MIN})
 fi
 
 # Define FLINK_TM_NET_BUF_MAX if it is not already set
 if [ -z "${FLINK_TM_NET_BUF_MAX}" -o "${FLINK_TM_NET_BUF_MAX}" = "-1" ]; then
     # default: 1GB = 1073741824 bytes
     FLINK_TM_NET_BUF_MAX=$(readFromConfig ${KEY_TASKM_NET_BUF_MAX} 1073741824 "${YAML_CONF}")
+    FLINK_TM_NET_BUF_MAX=$(parseBytes ${FLINK_TM_NET_BUF_MAX})
 fi
 
 # Define FLIP if it is not already set
@@ -506,7 +666,7 @@ HAVE_AWK=
 # same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config)
 calculateNetworkBufferMemory() {
     local network_buffers_bytes
-    if [ "${FLINK_TM_HEAP}" -le "0" ]; then
+    if [ "${FLINK_TM_HEAP_MB}" -le "0" ]; then
         echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
         exit 1
     fi
@@ -541,13 +701,13 @@ calculateNetworkBufferMemory() {
             exit 1
         fi
 
-        network_buffers_bytes=`awk "BEGIN { x = ${FLINK_TM_HEAP} * 1048576 * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
+        network_buffers_bytes=`awk "BEGIN { x = ${FLINK_TM_HEAP_MB} * 1048576 * ${FLINK_TM_NET_BUF_FRACTION}; netbuf = x > ${FLINK_TM_NET_BUF_MAX} ? ${FLINK_TM_NET_BUF_MAX} : x < ${FLINK_TM_NET_BUF_MIN} ? ${FLINK_TM_NET_BUF_MIN} : x; printf \"%.0f\n\", netbuf }"`
     fi
 
     # recalculate the JVM heap memory by taking the network buffers into account
-    local tm_heap_size_bytes=$((${FLINK_TM_HEAP} << 20)) # megabytes to bytes
+    local tm_heap_size_bytes=$((${FLINK_TM_HEAP_MB} << 20)) # megabytes to bytes
     if [[ "${tm_heap_size_bytes}" -le "${network_buffers_bytes}" ]]; then
-        echo "[ERROR] Configured TaskManager memory size (${FLINK_TM_HEAP} MB, from '${KEY_TASKM_MEM_SIZE}') must be larger than the network buffer memory size (${network_buffers_bytes} bytes, from: '${KEY_TASKM_NET_BUF_FRACTION}', '${KEY_TASKM_NET_BUF_MIN}', '${KEY_TASKM_NET_BUF_MAX}', and '${KEY_TASKM_NET_BUF_NR}')."
+        echo "[ERROR] Configured TaskManager memory size (${FLINK_TM_HEAP_MB} MB, from '${KEY_TASKM_MEM_SIZE}') must be larger than the network buffer memory size (${network_buffers_bytes} bytes, from: '${KEY_TASKM_NET_BUF_FRACTION}', '${KEY_TASKM_NET_BUF_MIN}', '${KEY_TASKM_NET_BUF_MAX}', and '${KEY_TASKM_NET_BUF_NR}')."
         exit 1
     fi
 
@@ -556,21 +716,21 @@ calculateNetworkBufferMemory() {
 
 # same as org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)
 calculateTaskManagerHeapSizeMB() {
-    if [ "${FLINK_TM_HEAP}" -le "0" ]; then
+    if [ "${FLINK_TM_HEAP_MB}" -le "0" ]; then
         echo "Variable 'FLINK_TM_HEAP' not set (usually read from '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE})."
         exit 1
     fi
 
     local network_buffers_mb=$(($(calculateNetworkBufferMemory) >> 20)) # bytes to megabytes
     # network buffers are always off-heap and thus need to be deduced from the heap memory size
-    local tm_heap_size_mb=$((${FLINK_TM_HEAP} - network_buffers_mb))
+    local tm_heap_size_mb=$((${FLINK_TM_HEAP_MB} - network_buffers_mb))
 
     if useOffHeapMemory; then
 
         if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
             # We split up the total memory in heap and off-heap memory
             if [[ "${tm_heap_size_mb}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
-                echo "[ERROR] Remaining TaskManager memory size (${tm_heap_size_mb} MB, from: '${KEY_TASKM_MEM_SIZE}' (${FLINK_TM_HEAP} MB) minus network buffer memory size (${network_buffers_mb} MB, from: '${KEY_TASKM_NET_BUF_FRACTION}', '${KEY_TASKM_NET_BUF_MIN}', '${KEY_TASKM_NET_BUF_MAX}', and '${KEY_TASKM_NET_BUF_NR}')) must be larger than the managed memory size (${FLINK_TM_MEM_MANAGED_SIZE} MB, from: '${KEY_TASKM_MEM_MANAGED_SIZE}')."
+                echo "[ERROR] Remaining TaskManager memory size (${tm_heap_size_mb} MB, from: '${KEY_TASKM_MEM_SIZE}' (${FLINK_TM_HEAP_MB} MB) minus network buffer memory size (${network_buffers_mb} MB, from: '${KEY_TASKM_NET_BUF_FRACTION}', '${KEY_TASKM_NET_BUF_MIN}', '${KEY_TASKM_NET_BUF_MAX}', and '${KEY_TASKM_NET_BUF_NR}')) must be larger than the managed memory size (${FLINK_TM_MEM_MANAGED_SIZE} MB, from: '${KEY_TASKM_MEM_MANAGED_SIZE}')."
                 exit 1
             fi
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-dist/src/main/flink-bin/bin/jobmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index f03c0a9..60ff450 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -41,13 +41,20 @@ if [[ "${FLINK_MODE}" == "new" ]]; then
 fi
 
 if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
-    if [[ ! ${FLINK_JM_HEAP} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP}" -lt "0" ]]; then
+    if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then
+	    echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`"
+    else
+	    flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})
+	    FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})
+    fi
+
+    if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then
         echo "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
         exit 1
     fi
 
-    if [ "${FLINK_JM_HEAP}" -gt "0" ]; then
-        export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP"m -Xmx"$FLINK_JM_HEAP"m"
+    if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then
+        export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m"
     fi
 
     # Add JobManager-specific JVM options

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index 6baa79a..771d53f 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -46,12 +46,19 @@ if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
         export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
     fi
 
-    if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then
+    if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then
+	    echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, pelase replace with key \`${KEY_TASKM_MEM_SIZE}\`"
+    else
+	    flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
+	    FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
+    fi
+
+    if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then
         echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
         exit 1
     fi
 
-    if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
+    if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then
 
         TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
         # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 04d3b93..62ed13d 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -39,12 +39,12 @@ jobmanager.rpc.port: 6123
 
 # The heap size for the JobManager JVM
 
-jobmanager.heap.mb: 1024
+jobmanager.heap.size: 1024m
 
 
 # The heap size for the TaskManager JVM
 
-taskmanager.heap.mb: 1024
+taskmanager.heap.size: 1024m
 
 
 # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/test/bin/calcTMHeapSizeMB.sh b/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
index d5b7742..a0a82b5 100755
--- a/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
+++ b/flink-dist/src/test/bin/calcTMHeapSizeMB.sh
@@ -39,4 +39,12 @@ fi
 FLINK_CONF_DIR=${bin}/../../main/resources
 . ${bin}/../../main/flink-bin/bin/config.sh > /dev/null
 
+FLINK_TM_HEAP_MB=$(getMebiBytes $(parseBytes ${FLINK_TM_HEAP}))
+
+if hasUnit ${FLINK_TM_MEM_MANAGED_SIZE}; then
+    FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE}))
+else
+    FLINK_TM_MEM_MANAGED_SIZE=$(getMebiBytes $(parseBytes ${FLINK_TM_MEM_MANAGED_SIZE}"m"))
+fi
+
 calculateTaskManagerHeapSizeMB

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-dist/src/test/bin/calcTMNetBufMem.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/test/bin/calcTMNetBufMem.sh b/flink-dist/src/test/bin/calcTMNetBufMem.sh
index 355a978..ef56a75 100755
--- a/flink-dist/src/test/bin/calcTMNetBufMem.sh
+++ b/flink-dist/src/test/bin/calcTMNetBufMem.sh
@@ -36,4 +36,6 @@ fi
 FLINK_CONF_DIR=${bin}/../../main/resources
 . ${bin}/../../main/flink-bin/bin/config.sh > /dev/null
 
+FLINK_TM_HEAP_MB=$(getMebiBytes $(parseBytes ${FLINK_TM_HEAP}))
+
 calculateNetworkBufferMemory

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
----------------------------------------------------------------------
diff --git a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
index 7f8db8e..2b84e18 100644
--- a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
+++ b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.dist;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.util.OperatingSystem;
@@ -51,7 +52,7 @@ import static org.junit.Assert.assertThat;
 public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
 
 	/** Key that is used by <tt>config.sh</tt>. */
-	private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
+	private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.size";
 
 	/**
 	 * Number of tests with random values.
@@ -73,7 +74,7 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
 	 */
 	@Test
 	public void compareNetworkBufShellScriptWithJava() throws Exception {
-		int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
+		int managedMemSize = Integer.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue());
 		float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
 
 		// manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
@@ -102,7 +103,7 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
 	 */
 	@Test
 	public void compareHeapSizeShellScriptWithJava() throws Exception {
-		int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
+		int managedMemSize = Integer.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue());
 		float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
 
 		// manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
@@ -157,10 +158,14 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
 		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, useOffHeap);
 
 		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac);
-		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, netBufMemMin);
-		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, netBufMemMax);
+		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(netBufMemMin));
+		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(netBufMemMax));
 
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemSizeMB);
+		if (managedMemSizeMB == 0) {
+			config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
+		} else {
+			config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemSizeMB + "m");
+		}
 		config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, managedMemFrac);
 
 		return config;
@@ -181,13 +186,13 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
 		// note: we are testing with integers only here to avoid overly complicated checks for
 		// overflowing or negative Long values - this should be enough for any practical scenario
 		// though
-		long min = (long) TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue() + ran.nextInt(Integer.MAX_VALUE);
+		long min = MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes() + ran.nextInt(Integer.MAX_VALUE);
 		long max = ran.nextInt(Integer.MAX_VALUE) + min;
 
 		int javaMemMB = Math.max((int) (max >> 20), ran.nextInt(Integer.MAX_VALUE)) + 1;
 		boolean useOffHeap = ran.nextBoolean();
 
-		int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
+		int managedMemSize = Integer.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue());
 		float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
 
 		if (ran.nextBoolean()) {
@@ -224,10 +229,11 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
 		long javaNetworkBufMem = TaskManagerServices.calculateNetworkBufferMemory(totalJavaMemorySizeMB << 20, config);
 
 		String[] command = {"src/test/bin/calcTMNetBufMem.sh",
-			String.valueOf(totalJavaMemorySizeMB),
+			totalJavaMemorySizeMB + "m",
 			String.valueOf(config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
-			String.valueOf(config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)),
-			String.valueOf(config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX))};
+			config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN),
+			config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)};
+
 		String scriptOutput = executeScript(command);
 
 		long absoluteTolerance = (long) (javaNetworkBufMem * tolerance);
@@ -261,12 +267,12 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
 		long javaHeapSizeMB = TaskManagerServices.calculateHeapSizeMB(totalJavaMemorySizeMB, config);
 
 		String[] command = {"src/test/bin/calcTMHeapSizeMB.sh",
-			String.valueOf(totalJavaMemorySizeMB),
+			totalJavaMemorySizeMB + "m",
 			String.valueOf(config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)),
 			String.valueOf(config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
-			String.valueOf(config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)),
-			String.valueOf(config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)),
-			String.valueOf(config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE)),
+			config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN),
+			config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX),
+			config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE),
 			String.valueOf(config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION))};
 		String scriptOutput = executeScript(command);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index b78a6ac..9a1eb08 100644
--- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -87,7 +87,7 @@ public class LocalExecutorITCase extends TestLogger {
 
 	private static Configuration getConfig() {
 		Configuration config = new Configuration();
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
 		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
 		config.setBoolean(WebOptions.SUBMIT_ENABLE, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
index 63feeaf..ff84775 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java
@@ -95,7 +95,7 @@ public class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestB
 	private static Configuration getConfig() throws Exception {
 
 		Configuration config = new Configuration();
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
 		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index 702e712..828123e 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -95,7 +95,7 @@ public class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableState
 	private static Configuration getConfig() throws Exception {
 
 		Configuration config = new Configuration();
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
 		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
index b166002..edf0b62 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java
@@ -78,7 +78,7 @@ public class NonHAQueryableStateFsBackendITCase extends AbstractQueryableStateTe
 
 	private static Configuration getConfig() {
 		Configuration config = new Configuration();
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
 		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
 		config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index dedf8b1..b9fd453 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -78,7 +78,7 @@ public class NonHAQueryableStateRocksDBBackendITCase extends AbstractQueryableSt
 
 	private static Configuration getConfig() {
 		Configuration config = new Configuration();
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
 		config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
 		config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index 380ab96..fb8258a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -103,7 +103,7 @@ public class WebFrontendITCase extends TestLogger {
 		} catch (Exception e) {
 			throw new AssertionError("Could not setup test.", e);
 		}
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "12m");
 		config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
 		return config;

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index ad19a57..0e98e44 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
@@ -61,6 +63,9 @@ import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
+import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
+import static org.apache.flink.util.MathUtils.checkedDownCast;
+
 /**
  * Container for {@link TaskExecutor} services such as the {@link MemoryManager}, {@link IOManager},
  * {@link NetworkEnvironment}. All services are exclusive to a single {@link TaskExecutor}.
@@ -482,14 +487,16 @@ public class TaskManagerServices {
 	public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) {
 		Preconditions.checkArgument(totalJavaMemorySize > 0);
 
-		int segmentSize = config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
+		int segmentSize =
+			checkedDownCast(MemorySize.parse(config.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());
 
 		final long networkBufBytes;
 		if (TaskManagerServicesConfiguration.hasNewNetworkBufConf(config)) {
 			// new configuration based on fractions of available memory with selectable min and max
 			float networkBufFraction = config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
-			long networkBufMin = config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN);
-			long networkBufMax = config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX);
+			long networkBufMin = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
+			long networkBufMax = MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
+
 
 			TaskManagerServicesConfiguration
 				.checkNetworkBufferConfig(segmentSize, networkBufFraction, networkBufMin, networkBufMax);
@@ -647,7 +654,18 @@ public class TaskManagerServices {
 		final long heapSizeMB;
 		if (useOffHeap) {
 
-			long offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+			long offHeapSize;
+			String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
+			if (!config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
+				try {
+					offHeapSize = MemorySize.parse(config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
+				} catch (IllegalArgumentException e) {
+					throw new IllegalConfigurationException(
+						"Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
+				}
+			} else {
+				offHeapSize = Long.valueOf(managedMemorySizeDefaultVal);
+			}
 
 			if (offHeapSize <= 0) {
 				// calculate off-heap section via fraction

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index bf1494e..86bc46d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.QueryableStateOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
@@ -41,6 +42,8 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
 
+import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
+import static org.apache.flink.util.MathUtils.checkedDownCast;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -223,9 +226,21 @@ public class TaskManagerServicesConfiguration {
 				parseQueryableStateConfiguration(configuration);
 
 		// extract memory settings
-		long configuredMemory = configuration.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+		long configuredMemory;
+		String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
+		if (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) {
+			try {
+				configuredMemory = MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes();
+			} catch (IllegalArgumentException e) {
+				throw new IllegalConfigurationException(
+					"Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
+			}
+		} else {
+			configuredMemory = Long.valueOf(managedMemorySizeDefaultVal);
+		}
+
 		checkConfigParameter(
-			configuredMemory == TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue() ||
+			configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) ||
 				configuredMemory > 0, configuredMemory,
 			TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
 			"MemoryManager needs at least one MB of memory. " +
@@ -294,7 +309,7 @@ public class TaskManagerServicesConfiguration {
 		checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(),
 			"Number of task slots must be at least one.");
 
-		final int pageSize = configuration.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
+		final int pageSize = checkedDownCast(MemorySize.parse(configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());
 
 		// check page size of for minimum size
 		checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
@@ -309,8 +324,8 @@ public class TaskManagerServicesConfiguration {
 		// network buffer memory fraction
 
 		float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
-		long networkBufMin = configuration.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN);
-		long networkBufMax = configuration.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX);
+		long networkBufMin = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
+		long networkBufMax = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
 		checkNetworkBufferConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax);
 
 		// fallback: number of network buffers

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index f62ef1b..fb57861 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -367,8 +367,8 @@ class LocalFlinkMiniCluster(
 
   def setMemory(config: Configuration): Unit = {
     // set this only if no memory was pre-configured
-    if (config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE) ==
-        TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) {
+    if (config.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(
+      TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue())) {
 
       val numTaskManager = config.getInteger(
         ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
@@ -387,7 +387,7 @@ class LocalFlinkMiniCluster(
       memorySize -= TaskManagerServices.calculateNetworkBufferMemory(memorySize, config)
       memorySize = (memorySize * memoryFraction).toLong
       memorySize >>= 20 // bytes to megabytes
-      config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memorySize)
+      config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, memorySize + "m")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 79e6d20..873a4f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -639,7 +639,7 @@ public class JobManagerTest extends TestLogger {
 				leaderId);
 
 		Configuration tmConfig = new Configuration();
-		tmConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+		tmConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
 		tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
 
 		ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor(

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
index 7bb74ae..6374cf8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
@@ -27,6 +28,7 @@ import org.junit.Test;
 
 import java.net.InetAddress;
 
+import static org.apache.flink.util.MathUtils.checkedDownCast;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -42,13 +44,13 @@ public class NetworkBufferCalculationTest extends TestLogger {
 	public void calculateNetworkBufFromHeapSize() throws Exception {
 		TaskManagerServicesConfiguration tmConfig;
 
-		tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
+		tmConfig = getTmConfig(Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
 			TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
 			0.1f, 60L << 20, 1L << 30, MemoryType.HEAP);
 		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
 			TaskManagerServices.calculateNetworkBufferMemory(tmConfig, 900L << 20)); // 900MB
 
-		tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
+		tmConfig = getTmConfig(Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
 			TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
 			0.2f, 60L << 20, 1L << 30, MemoryType.HEAP);
 		assertEquals((200L << 20) + 3 /* slightly too many due to floating point imprecision */,
@@ -86,7 +88,7 @@ public class NetworkBufferCalculationTest extends TestLogger {
 			networkBufFraction,
 			networkBufMin,
 			networkBufMax,
-			TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(),
+			checkedDownCast(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes()),
 			null,
 			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
 			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
index 960a13d..01d4dd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfigurationTest.java
@@ -58,8 +58,8 @@ public class TaskManagerServicesConfigurationTest extends TestLogger {
 
 		// fully defined:
 		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
-		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1024);
-		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 2048);
+		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
+		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, "2048");
 
 		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
 
@@ -67,19 +67,19 @@ public class TaskManagerServicesConfigurationTest extends TestLogger {
 		config = new Configuration();
 		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
 		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
-		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1024);
+		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
 		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
 
 		config = new Configuration();
-		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1024);
+		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
 		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
 		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
 		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
 
 		config = new Configuration();
-		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1024);
+		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
 		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
-		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1024);
+		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
 		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config));
 	}
 
@@ -102,11 +102,11 @@ public class TaskManagerServicesConfigurationTest extends TestLogger {
 		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1));
 
 		config1 = config.clone();
-		config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1024);
+		config1.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, "1024");
 		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1));
 
 		config1 = config.clone();
-		config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1024);
+		config1.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, "1024");
 		assertTrue(TaskManagerServicesConfiguration.hasNewNetworkBufConf(config1));
 	}
 


[3/5] flink git commit: [FLINK-6469][core] Configure Memory Sizes with units

Posted by dw...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
index 77d6ec7..665b4bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.util.TestLogger;
 
@@ -46,13 +47,13 @@ public class TaskManagerServicesTest extends TestLogger {
 		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
 
 		// note: actual network buffer memory size is independent of the totalJavaMemorySize
-		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+		assertEquals(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes(),
 			TaskManagerServices.calculateNetworkBufferMemory(10L << 20, config));
-		assertEquals(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().longValue(),
+		assertEquals(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes(),
 			TaskManagerServices.calculateNetworkBufferMemory(64L << 20, config));
 
 		// test integer overflow in the memory size
-		int numBuffers = (int) ((2L << 32) / TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()); // 2^33
+		int numBuffers = (int) ((2L << 32) / MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes()); // 2^33
 		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, numBuffers);
 		assertEquals(2L << 32, TaskManagerServices.calculateNetworkBufferMemory(2L << 33, config));
 	}
@@ -69,8 +70,8 @@ public class TaskManagerServicesTest extends TestLogger {
 
 		// (1) defaults
 		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
-		final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
-		final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+		final Long defaultMin = MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes();
+		final Long defaultMax = MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue()).getBytes();
 		assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), defaultMin, defaultMax),
 			TaskManagerServices.calculateNetworkBufferMemory((64L << 20 + 1), config));
 		assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), defaultMin, defaultMax),
@@ -87,8 +88,9 @@ public class TaskManagerServicesTest extends TestLogger {
 	 */
 	private static void calculateNetworkBufNew(final Configuration config) {
 		// (2) fixed size memory
-		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 1L << 20); // 1MB
-		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 20); // 1MB
+		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20)); // 1MB
+		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(1L << 20)); // 1MB
+
 
 		// note: actual network buffer memory size is independent of the totalJavaMemorySize
 		assertEquals(1 << 20, TaskManagerServices.calculateNetworkBufferMemory(10L << 20, config));
@@ -101,11 +103,12 @@ public class TaskManagerServicesTest extends TestLogger {
 			float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
 			config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, frac);
 
-			long min = Math.max(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(), ran.nextLong());
-			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, min);
+			long min = Math.max(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes(), ran.nextLong());
+			config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(min));
+
 
 			long max = Math.max(min, ran.nextLong());
-			config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, max);
+			config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(max));
 
 			long javaMem = Math.max(max + 1, ran.nextLong());
 
@@ -139,8 +142,9 @@ public class TaskManagerServicesTest extends TestLogger {
 		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1);
 
 		final Float defaultFrac = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.defaultValue();
-		final Long defaultMin = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue();
-		final Long defaultMax = TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue();
+		final Long defaultMin = MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes();
+		final Long defaultMax = MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.defaultValue()).getBytes();
+
 
 		// old + 1 new parameter = new:
 		Configuration config1 = config.clone();
@@ -151,16 +155,16 @@ public class TaskManagerServicesTest extends TestLogger {
 			TaskManagerServices.calculateNetworkBufferMemory((10L << 30), config1));
 
 		config1 = config.clone();
-		long newMin = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(); // smallest value possible
-		config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, newMin);
+		long newMin = MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes(); // smallest value possible
+		config1.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(newMin));
 		assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), newMin, defaultMax),
 			TaskManagerServices.calculateNetworkBufferMemory((10L << 20), config1));
 		assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), newMin, defaultMax),
 			TaskManagerServices.calculateNetworkBufferMemory((10L << 30), config1));
 
 		config1 = config.clone();
-		long newMax = Math.max(64L << 20 + 1, TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue());
-		config1.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, newMax);
+		long newMax = Math.max(64L << 20 + 1, MemorySize.parse(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.defaultValue()).getBytes());
+		config1.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(newMax));
 		assertEquals(enforceBounds((long) (defaultFrac * (10L << 20)), defaultMin, newMax),
 			TaskManagerServices.calculateNetworkBufferMemory((64L << 20 + 1), config1));
 		assertEquals(enforceBounds((long) (defaultFrac * (10L << 30)), defaultMin, newMax),
@@ -192,8 +196,8 @@ public class TaskManagerServicesTest extends TestLogger {
 	public void calculateHeapSizeMB() throws Exception {
 		Configuration config = new Configuration();
 		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
-		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, 64L << 20); // 64MB
-		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 30); // 1GB
+		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(64L << 20)); // 64MB
+		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(1L << 30)); // 1GB
 
 		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
 		assertEquals(900, TaskManagerServices.calculateHeapSizeMB(1000, config));
@@ -204,10 +208,10 @@ public class TaskManagerServicesTest extends TestLogger {
 
 		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
 		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10); // 10MB
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "10m"); // 10MB
 		assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config));
 
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1); // use fraction of given memory
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0"); // use fraction of given memory
 		config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.1f); // 10%
 		assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, config));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java
index ee0bfda..f1f5e45 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java
@@ -83,7 +83,7 @@ public class LegacyTaskCancelAsyncProducerConsumerITCase extends TestLogger {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-			config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
+			config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
 			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9);
 
 			flink = new TestingCluster(config, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index fe7e661..561b81b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -75,7 +75,7 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 
 		// Cluster
 		Configuration config = new Configuration();
-		config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
+		config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
 		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9);
 
 		MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index fadebce..ae66a08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -247,7 +247,7 @@ public abstract class TaskManagerProcessReapingTestBase extends TestLogger {
 			Configuration cfg = new Configuration();
 			cfg.setString(JobManagerOptions.ADDRESS, "localhost");
 			cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
-			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+			cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
 			cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 256);
 
 			final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index fce9620..158d381 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -147,7 +148,7 @@ public class TaskManagerStartupTest extends TestLogger {
 		try {
 			Configuration cfg = new Configuration();
 			cfg.setString(CoreOptions.TMP_DIRS, nonWritable.getAbsolutePath());
-			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+			cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
 			cfg.setString(JobManagerOptions.ADDRESS, "localhost");
 			cfg.setInteger(JobManagerOptions.PORT, 21656);
 
@@ -195,7 +196,7 @@ public class TaskManagerStartupTest extends TestLogger {
 			cfg.setBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE, true);
 
 			// something invalid
-			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -42L);
+			cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "-42m");
 			try {
 				TaskManager.runTaskManager(
 					"localhost",
@@ -211,8 +212,8 @@ public class TaskManagerStartupTest extends TestLogger {
 
 			// something ridiculously high
 			final long memSize = (((long) Integer.MAX_VALUE - 1) *
-					TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()) >> 20;
-			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memSize);
+				MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes()) >> 20;
+			cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, memSize + "m");
 			try {
 				TaskManager.runTaskManager(
 					"localhost",
@@ -248,7 +249,7 @@ public class TaskManagerStartupTest extends TestLogger {
 			final Configuration cfg = new Configuration();
 			cfg.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
 			cfg.setInteger(TaskManagerOptions.DATA_PORT, blocker.getLocalPort());
-			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 1L);
+			cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "1m");
 			ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(cfg);
 			TaskManager.startTaskManagerComponentsAndActor(
 				cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 977ab9e..2da2ba4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -1570,7 +1570,7 @@ public class TaskManagerTest extends TestLogger {
 			// set the memory segment to the smallest size possible, because we have to fill one
 			// memory buffer to trigger the schedule or update consumers message to the downstream
 			// operators
-			configuration.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
+			configuration.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
 
 			final JobID jid = new JobID();
 			final JobVertexID vid = new JobVertexID();

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
index 36eb47f..b381f62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
@@ -103,7 +103,7 @@ public class TaskManagerProcess extends TestJvmProcess {
 				Configuration config = ParameterTool.fromArgs(args).getConfiguration();
 
 				if (!config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
-					config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+					config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
 				}
 
 				if (!config.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index b89d2a6..9d791a9 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -262,7 +262,7 @@ object TestingUtils {
 
     val resultingConfiguration = new Configuration()
 
-    resultingConfiguration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10L)
+    resultingConfiguration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "10m")
 
     resultingConfiguration.addAll(configuration)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index ae22d87..c04e845 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -52,11 +52,11 @@ object FlinkShell {
   /** YARN configuration object */
   case class YarnConfig(
     containers: Option[Int] = None,
-    jobManagerMemory: Option[Int] = None,
+    jobManagerMemory: Option[String] = None,
     name: Option[String] = None,
     queue: Option[String] = None,
     slots: Option[Int] = None,
-    taskManagerMemory: Option[Int] = None
+    taskManagerMemory: Option[String] = None
   )
 
   /** Buffered reader to substitute input in test */
@@ -99,10 +99,10 @@ object FlinkShell {
           (x, c) =>
             c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers = Some(x))))
         } text "Number of YARN container to allocate (= Number of TaskManagers)",
-        opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
+        opt[String]("jobManagerMemory") abbr ("jm") valueName ("arg") action {
           (x, c) =>
             c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x))))
-        } text "Memory for JobManager container [in MB]",
+        } text "Memory for JobManager container",
         opt[String]("name") abbr ("nm") action {
           (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name = Some(x))))
         } text "Set a custom name for the application on YARN",
@@ -112,10 +112,10 @@ object FlinkShell {
         opt[Int]("slots") abbr ("s") valueName ("<arg>") action {
           (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(slots = Some(x))))
         } text "Number of slots per TaskManager",
-        opt[Int]("taskManagerMemory") abbr ("tm") valueName ("<arg>") action {
+        opt[String]("taskManagerMemory") abbr ("tm") valueName ("<arg>") action {
           (x, c) =>
             c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x))))
-        } text "Memory per TaskManager container [in MB]",
+        } text "Memory per TaskManager container",
         opt[(String)] ("addclasspath") abbr("a") valueName("<path/to/jar>") action {
           case (x, c) =>
             val xArray = x.split(":")

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LegacyLocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LegacyLocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LegacyLocalStreamEnvironment.java
index 8341ec4..ab276d9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LegacyLocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LegacyLocalStreamEnvironment.java
@@ -78,7 +78,7 @@ public class LegacyLocalStreamEnvironment extends LocalStreamEnvironment {
 		Configuration configuration = new Configuration();
 		configuration.addAll(jobGraph.getJobConfiguration());
 
-		configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
+		configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
 		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
 
 		// add (and override) the settings with what the user defined

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 6eec705..ec80973 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -94,7 +94,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 
 		Configuration configuration = new Configuration();
 		configuration.addAll(jobGraph.getJobConfiguration());
-		configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
+		configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
 
 		// add (and override) the settings with what the user defined
 		configuration.addAll(this.configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 3e6dcf6..6b53488 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io.benchmark;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -59,6 +60,7 @@ import java.net.UnknownHostException;
 import java.util.Arrays;
 
 import static org.apache.flink.util.ExceptionUtils.suppressExceptions;
+import static org.apache.flink.util.MathUtils.checkedDownCast;
 
 /**
  * Context for network benchmarks executed by the external
@@ -66,7 +68,8 @@ import static org.apache.flink.util.ExceptionUtils.suppressExceptions;
  */
 public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
-	private static final int BUFFER_SIZE = TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue();
+	private static final int BUFFER_SIZE =
+		checkedDownCast(MemorySize.parse(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()).getBytes());
 
 	private static final int NUM_SLOTS_AND_THREADS = 1;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 94842a6..1835fc6 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -225,7 +225,7 @@ public class MiniClusterResource extends ExternalResource {
 		}
 
 		if (!configuration.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
-			configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, TestBaseUtils.TASK_MANAGER_MEMORY_SIZE);
+			configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, TestBaseUtils.TASK_MANAGER_MEMORY_SIZE);
 		}
 
 		// set rest port to 0 to avoid clashes with concurrent MiniClusters

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 10fe1f0..a0f52f2 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -95,7 +95,7 @@ public class TestBaseUtils extends TestLogger {
 
 	protected static final int MINIMUM_HEAP_SIZE_MB = 192;
 
-	protected static final long TASK_MANAGER_MEMORY_SIZE = 80;
+	protected static final String TASK_MANAGER_MEMORY_SIZE = "80m";
 
 	protected static final long DEFAULT_AKKA_ASK_TIMEOUT = 1000;
 
@@ -181,7 +181,7 @@ public class TestBaseUtils extends TestLogger {
 		}
 
 		if (!config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
-			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, TASK_MANAGER_MEMORY_SIZE);
+			config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, TASK_MANAGER_MEMORY_SIZE);
 		}
 
 		LocalFlinkMiniCluster cluster =  new LocalFlinkMiniCluster(config, singleActorSystem);

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index d186e36..1ac7531 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -58,7 +58,7 @@ public class AccumulatorErrorITCase extends TestLogger {
 
 	public static Configuration getConfiguration() {
 		Configuration config = new Configuration();
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "12m");
 		return config;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 5f7ca86..a560891 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -76,7 +76,7 @@ public abstract class CancelingTestBase extends TestLogger {
 		Configuration config = new Configuration();
 		config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
 		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
-		config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
+		config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
 		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);
 
 		return config;

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index a1e31e9..df74450 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -190,9 +190,9 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		final File haDir = temporaryFolder.newFolder();
 
 		Configuration config = new Configuration();
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
 		// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
-		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB
+		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
 		config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");
 
 		if (zkServer != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index b82abb9..d05bafb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -75,7 +75,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
 		config.setString(AkkaOptions.LOOKUP_TIMEOUT, "60 s");
 		config.setString(AkkaOptions.ASK_TIMEOUT, "60 s");
 		return config;

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
index caa1791..989024c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java
@@ -88,7 +88,7 @@ public class KeyedStateCheckpointingITCase extends TestLogger {
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "12m");
 		return config;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index b66c745..7eb5b8f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -562,7 +562,7 @@ public class SavepointITCase extends TestLogger {
 
 		Configuration config = new Configuration();
 		config.addAll(jobGraph.getJobConfiguration());
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
 		final File checkpointDir = new File(tmpDir, "checkpoints");
 		final File savepointDir = new File(tmpDir, "savepoints");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index 1b93684..b6163e8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -83,7 +83,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
 		return config;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index fed65b4..4851e54 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -115,7 +115,7 @@ public class ClassLoaderITCase extends TestLogger {
 				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
 
 		// required as we otherwise run out of memory
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
 
 		testCluster = new MiniCluster(
 			new MiniClusterConfiguration.Builder()

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index 5738eb7..66ffd5d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -60,7 +60,7 @@ public class JobSubmissionFailsITCase extends TestLogger {
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
 		return config;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index 5aabd31..131b6a0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -49,7 +49,7 @@ public class StreamingScalabilityAndLatency {
 
 		try {
 			Configuration config = new Configuration();
-			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L);
+			config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
 			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 20000);
 
 			config.setInteger("taskmanager.net.server.numThreads", 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index ddc7dd8..07d146d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -59,7 +59,7 @@ public class CustomSerializationITCase extends TestLogger {
 
 	public static Configuration getConfiguration() {
 		Configuration config = new Configuration();
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 30L);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "30m");
 		return config;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index c905071..ca6cb14 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -61,7 +61,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m");
 		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
 		return config;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 6764f6f..4144f46 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -404,7 +404,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 				Configuration cfg = new Configuration();
 				cfg.setString(JobManagerOptions.ADDRESS, "localhost");
 				cfg.setInteger(JobManagerOptions.PORT, jobManagerPort);
-				cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+				cfg.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
 				cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
 				cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
 				cfg.setString(AkkaOptions.ASK_TIMEOUT, "100 s");

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 50404a4..7beb927 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -267,7 +267,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 			jmProcess[0].startProcess();
 
 			// Task manager configuration
-			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+			config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
 			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
 			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 755b3d3..d99f195 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -78,7 +78,7 @@ public class TaskManagerFailureRecoveryITCase extends TestLogger {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
-			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
+			config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
 
 			config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "500 ms");
 			config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "20 s");

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index 5272581..003ea57 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -78,7 +78,7 @@ public class IPv6HostnamesITCase extends TestLogger {
 		Configuration config = new Configuration();
 		config.setString(JobManagerOptions.ADDRESS, addressString);
 		config.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, addressString);
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
 		return config;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 5d165bf..8793b1f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -87,7 +87,7 @@ public class TimestampITCase extends TestLogger {
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "12m");
 		return config;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 99ebcd1..1009fbb 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -107,8 +107,8 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 		LOG.info("Starting testClientStartup()");
 		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
 						"-n", "1",
-						"-jm", "768",
-						"-tm", "1024", "-qu", "qa-team"},
+						"-jm", "768m",
+						"-tm", "1024m", "-qu", "qa-team"},
 				"Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0);
 		LOG.info("Finished testClientStartup()");
 	}
@@ -129,8 +129,8 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 				"-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(),
 				"-yn", "1",
 				"-ys", "2", //test that the job is executed with a DOP of 2
-				"-yjm", "768",
-				"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
+				"-yjm", "768m",
+				"-ytm", "1024m", exampleJarLocation.getAbsolutePath()},
 			/* test succeeded after this string */
 			"Program execution finished",
 			/* prohibited strings: (to verify the parallelism) */
@@ -172,10 +172,10 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 				"-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(),
 				"-yn", "1",
 				"-ys", "2", //test that the job is executed with a DOP of 2
-				"-yjm", "768",
-				"-ytm", String.valueOf(taskManagerMemoryMB),
+				"-yjm", "768m",
+				"-ytm", taskManagerMemoryMB + "m",
 				"-yD", "taskmanager.memory.off-heap=true",
-				"-yD", "taskmanager.memory.size=" + offHeapMemory,
+				"-yD", "taskmanager.memory.size=" + offHeapMemory + "m",
 				"-yD", "taskmanager.memory.preallocate=true", exampleJarLocation.getAbsolutePath()},
 			/* test succeeded after this string */
 			"Program execution finished",
@@ -195,8 +195,8 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 		LOG.info("Starting testTaskManagerFailure()");
 		Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
 				"-n", "1",
-				"-jm", "768",
-				"-tm", "1024",
+				"-jm", "768m",
+				"-tm", "1024m",
 				"-s", "3", // set the slots 3 to check if the vCores are set properly!
 				"-nm", "customName",
 				"-Dfancy-configuration-value=veryFancy",
@@ -374,8 +374,8 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 			runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
 				"-t", flinkLibFolder.getAbsolutePath(),
 				"-n", "1",
-				"-jm", "768",
-				"-tm", "1024",
+				"-jm", "768m",
+				"-tm", "1024m",
 				"-qu", "doesntExist"}, "to unknown queue: doesntExist", null, RunTypes.YARN_SESSION, 1);
 		} catch (Exception e) {
 			assertTrue(ExceptionUtils.findThrowableWithMessage(e, "to unknown queue: doesntExist").isPresent());
@@ -401,8 +401,8 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 				"-yt", flinkLibFolder.getAbsolutePath(),
 				"-yn", "1",
 				"-ys", "2",
-				"-yjm", "768",
-				"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
+				"-yjm", "768m",
+				"-ytm", "1024m", exampleJarLocation.getAbsolutePath()},
 				/* test succeeded after this string */
 			"Program execution finished",
 			/* prohibited strings: (we want to see "DataSink (...) (2/2) switched to FINISHED") */
@@ -468,12 +468,12 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 				"-yj", flinkUberjar.getAbsolutePath(),
 				"-yt", flinkLibFolder.getAbsolutePath(),
 				"-yn", "1",
-				"-yjm", "768",
+				"-yjm", "768m",
 				// test if the cutoff is passed correctly (only useful when larger than the value
 				// of containerized.heap-cutoff-min (default: 600MB)
 				"-yD", "yarn.heap-cutoff-ratio=0.7",
 				"-yD", "yarn.tags=test-tag",
-				"-ytm", "1024",
+				"-ytm", "1024m",
 				"-ys", "2", // test requesting slots from YARN.
 				"-p", "2",
 				"--detached", job,

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index eee6867..bb479ae 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -108,10 +108,10 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		args.add("1");
 
 		args.add("-jm");
-		args.add("768");
+		args.add("768m");
 
 		args.add("-tm");
-		args.add("1024");
+		args.add("1024m");
 
 		if (SecureTestEnvironment.getTestKeytab() != null) {
 			args.add("-D" + SecurityOptions.KERBEROS_LOGIN_KEYTAB.key() + "=" + SecureTestEnvironment.getTestKeytab());
@@ -260,8 +260,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		LOG.info("Starting testResourceComputation()");
 		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
 				"-n", "5",
-				"-jm", "256",
-				"-tm", "1585"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
+				"-jm", "256m",
+				"-tm", "1585m"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
 		LOG.info("Finished testResourceComputation()");
 		checkForLogString("This YARN session requires 8437MB of memory in the cluster. There are currently only 8192MB available.");
 	}
@@ -288,8 +288,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		LOG.info("Starting testfullAlloc()");
 		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
 				"-n", "2",
-				"-jm", "256",
-				"-tm", "3840"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
+				"-jm", "256m",
+				"-tm", "3840m"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
 		LOG.info("Finished testfullAlloc()");
 		checkForLogString("There is not enough memory available in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\n" +
 				"After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]");

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index f770135..58710bc 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -87,8 +87,8 @@ public class YarnConfigurationITCase extends YarnTestBase {
 
 		// disable heap cutoff min
 		configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);
-		configuration.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, (1L << 20));
-		configuration.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, (4L << 20));
+		configuration.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20));
+		configuration.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(4L << 20));
 
 		final YarnConfiguration yarnConfiguration = getYarnConfiguration();
 		final YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 65c9472..636ae16 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -859,9 +859,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			TaskManagerOptions.NUM_TASK_SLOTS,
 			clusterSpecification.getSlotsPerTaskManager());
 
-		configuration.setInteger(
+		configuration.setString(
 			TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY,
-			clusterSpecification.getTaskManagerMemoryMB());
+			clusterSpecification.getTaskManagerMemoryMB() + "m");
 
 		// Upload the flink configuration
 		// write out configuration file

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 572e6ba..9dd9922 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
@@ -162,7 +163,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 		this.webInterfaceUrl = webInterfaceUrl;
 		this.numberOfTaskSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
-		this.defaultTaskManagerMemoryMB = flinkConfig.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+		this.defaultTaskManagerMemoryMB = MemorySize.parse(flinkConfig.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
 		this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index ef840da..1ad1bcc 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
@@ -385,10 +386,10 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		}
 
 		// JobManager Memory
-		final int jobManagerMemoryMB = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
+		final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
 
 		// Task Managers memory
-		final int taskManagerMemoryMB = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+		final int taskManagerMemoryMB = MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
 
 		int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
 
@@ -499,11 +500,11 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
 		}
 
 		if (commandLine.hasOption(jmMemory.getOpt())) {
-			effectiveConfiguration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, Integer.parseInt(commandLine.getOptionValue(jmMemory.getOpt())));
+			effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(jmMemory.getOpt()));
 		}
 
 		if (commandLine.hasOption(tmMemory.getOpt())) {
-			effectiveConfiguration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, Integer.parseInt(commandLine.getOptionValue(tmMemory.getOpt())));
+			effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(tmMemory.getOpt()));
 		}
 
 		if (commandLine.hasOption(slots.getOpt())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d02167dc/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index da4aa84..977da45 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -302,11 +302,11 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		final int taskManagerMemory = 7331;
 		final int slotsPerTaskManager = 30;
 
-		configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jobManagerMemory);
-		configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, taskManagerMemory);
+		configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jobManagerMemory + "m");
+		configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, taskManagerMemory + "m");
 		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager);
 
-		final String[] args = {"-yjm", String.valueOf(jobManagerMemory), "-ytm", String.valueOf(taskManagerMemory), "-ys", String.valueOf(slotsPerTaskManager)};
+		final String[] args = {"-yjm", String.valueOf(jobManagerMemory) + "m", "-ytm", String.valueOf(taskManagerMemory) + "m", "-ys", String.valueOf(slotsPerTaskManager)};
 		final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
 			configuration,
 			tmp.getRoot().getAbsolutePath(),
@@ -330,9 +330,9 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 	public void testConfigurationClusterSpecification() throws Exception {
 		final Configuration configuration = new Configuration();
 		final int jobManagerMemory = 1337;
-		configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jobManagerMemory);
+		configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jobManagerMemory + "m");
 		final int taskManagerMemory = 7331;
-		configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, taskManagerMemory);
+		configuration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, taskManagerMemory + "m");
 		final int slotsPerTaskManager = 42;
 		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, slotsPerTaskManager);
 


[2/5] flink git commit: [FLINK-9593][cep] Unified After Match semantics with SQL MATCH_RECOGNIZE

Posted by dw...@apache.org.
[FLINK-9593][cep] Unified After Match semantics with SQL MATCH_RECOGNIZE

This closes #6171


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d934cb8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d934cb8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d934cb8f

Branch: refs/heads/master
Commit: d934cb8fba22855894e260e3e68533b599ac8e43
Parents: d02167d
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Thu Jun 14 17:10:05 2018 +0200
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Thu Jul 5 15:54:54 2018 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            |  69 +++--
 .../flink/cep/scala/pattern/Pattern.scala       |   2 +-
 .../flink/cep/nfa/AfterMatchSkipStrategy.java   | 139 ---------
 .../apache/flink/cep/nfa/ComputationState.java  |  21 +-
 .../org/apache/flink/cep/nfa/DeweyNumber.java   |   4 +
 .../apache/flink/cep/nfa/MigrationUtils.java    |  10 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 216 ++++++-------
 .../java/org/apache/flink/cep/nfa/NFAState.java |  48 ++-
 .../flink/cep/nfa/NFAStateSerializer.java       |  98 ++++--
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |  16 +-
 .../nfa/aftermatch/AfterMatchSkipStrategy.java  | 155 ++++++++++
 .../cep/nfa/aftermatch/NoSkipStrategy.java      |  58 ++++
 .../nfa/aftermatch/SkipPastLastStrategy.java    |  65 ++++
 .../cep/nfa/aftermatch/SkipToFirstStrategy.java |  76 +++++
 .../cep/nfa/aftermatch/SkipToLastStrategy.java  |  76 +++++
 .../flink/cep/nfa/compiler/NFACompiler.java     |  10 +-
 .../flink/cep/nfa/sharedbuffer/EventId.java     |  11 +-
 .../cep/nfa/sharedbuffer/SharedBuffer.java      |  38 ++-
 .../AbstractKeyedCEPPatternOperator.java        |   2 +-
 .../flink/cep/operator/CEPOperatorUtils.java    |   2 +-
 .../cep/operator/FlatSelectCepOperator.java     |   2 +-
 .../operator/FlatSelectTimeoutCepOperator.java  |   2 +-
 .../flink/cep/operator/SelectCepOperator.java   |   2 +-
 .../cep/operator/SelectTimeoutCepOperator.java  |   2 +-
 .../apache/flink/cep/pattern/GroupPattern.java  |   2 +-
 .../org/apache/flink/cep/pattern/Pattern.java   |   2 +-
 .../java/org/apache/flink/cep/CEPITCase.java    |   2 +-
 .../flink/cep/nfa/AfterMatchSkipITCase.java     | 307 +++++++++++++++++--
 .../org/apache/flink/cep/nfa/GroupITCase.java   |   4 +-
 .../org/apache/flink/cep/nfa/NFAITCase.java     |  16 +-
 .../apache/flink/cep/nfa/NFATestUtilities.java  |   1 +
 .../apache/flink/cep/nfa/SameElementITCase.java |   8 +-
 .../flink/cep/nfa/UntilConditionITCase.java     |  40 +--
 .../flink/cep/nfa/compiler/NFACompilerTest.java |   2 +-
 .../cep/nfa/sharedbuffer/SharedBufferTest.java  |  37 ++-
 35 files changed, 1139 insertions(+), 406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 921718a..6723e71 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -1252,13 +1252,13 @@ pattern.within(Time.seconds(10))
 For a given pattern, the same event may be assigned to multiple successful matches. To control to how many matches an event will be assigned, you need to specify the skip strategy called `AfterMatchSkipStrategy`. There are four types of skip strategies, listed as follows:
 
 * <strong>*NO_SKIP*</strong>: Every possible match will be emitted.
-* <strong>*SKIP_PAST_LAST_EVENT*</strong>: Discards every partial match that contains event of the match.
-* <strong>*SKIP_TO_FIRST*</strong>: Discards every partial match that contains event of the match preceding the first of *PatternName*.
-* <strong>*SKIP_TO_LAST*</strong>: Discards every partial match that contains event of the match preceding the last of *PatternName*.
+* <strong>*SKIP_PAST_LAST_EVENT*</strong>: Discards every partial match that started after the match started but before it ended.
+* <strong>*SKIP_TO_FIRST*</strong>: Discards every partial match that started after the match started but before the first event of *PatternName* occurred.
+* <strong>*SKIP_TO_LAST*</strong>: Discards every partial match that started after the match started but before the last event of *PatternName* occurred.
 
 Notice that when using *SKIP_TO_FIRST* and *SKIP_TO_LAST* skip strategy, a valid *PatternName* should also be specified.
 
-For example, for a given pattern `a b{2}` and a data stream `ab1, ab2, ab3, ab4, ab5, ab6`, the differences between these four skip strategies are as follows:
+For example, for a given pattern `b+ c` and a data stream `b1 b2 b3 c`, the differences between these four skip strategies are as follows:
 
 <table class="table table-bordered">
     <tr>
@@ -1269,38 +1269,65 @@ For example, for a given pattern `a b{2}` and a data stream `ab1, ab2, ab3, ab4,
     <tr>
         <td><strong>NO_SKIP</strong></td>
         <td>
-            <code>ab1 ab2 ab3</code><br>
-            <code>ab2 ab3 ab4</code><br>
-            <code>ab3 ab4 ab5</code><br>
-            <code>ab4 ab5 ab6</code><br>
+            <code>b1 b2 b3 c</code><br>
+            <code>b2 b3 c</code><br>
+            <code>b3 c</code><br>
         </td>
-        <td>After found matching <code>ab1 ab2 ab3</code>, the match process will not discard any result.</td>
+        <td>After found matching <code>b1 b2 b3 c</code>, the match process will not discard any result.</td>
     </tr>
     <tr>
         <td><strong>SKIP_PAST_LAST_EVENT</strong></td>
         <td>
-            <code>ab1 ab2 ab3</code><br>
-            <code>ab4 ab5 ab6</code><br>
+            <code>b1 b2 b3 c</code><br>
         </td>
-        <td>After found matching <code>ab1 ab2 ab3</code>, the match process will discard all started partial matches.</td>
+        <td>After found matching <code>b1 b2 b3 c</code>, the match process will discard all started partial matches.</td>
     </tr>
     <tr>
-        <td><strong>SKIP_TO_FIRST</strong>[<code>b</code>]</td>
+        <td><strong>SKIP_TO_FIRST</strong>[<code>b*</code>]</td>
         <td>
-            <code>ab1 ab2 ab3</code><br>
-            <code>ab2 ab3 ab4</code><br>
-            <code>ab3 ab4 ab5</code><br>
-            <code>ab4 ab5 ab6</code><br>
+            <code>b1 b2 b3 c</code><br>
+            <code>b2 b3 c</code><br>
+            <code>b3 c</code><br>
         </td>
-        <td>After found matching <code>ab1 ab2 ab3</code>, the match process will discard all partial matches containing <code>ab1</code>, which is the only event that comes before the first <code>b</code>.</td>
+        <td>After found matching <code>b1 b2 b3 c</code>, the match process will try to discard all partial matches started before <code>b1</code>, but there are no such matches. Therefore nothing will be discarded.</td>
     </tr>
     <tr>
         <td><strong>SKIP_TO_LAST</strong>[<code>b</code>]</td>
         <td>
-            <code>ab1 ab2 ab3</code><br>
-            <code>ab3 ab4 ab5</code><br>
+            <code>b1 b2 b3 c</code><br>
+            <code>b3 c</code><br>
         </td>
-        <td>After found matching <code>ab1 ab2 ab3</code>, the match process will discard all partial matches containing <code>ab1</code> and <code>ab2</code>, which are events that comes before the last <code>b</code>.</td>
+        <td>After found matching <code>b1 b2 b3 c</code>, the match process will try to discard all partial matches started before <code>b3</code>. There is one such match <code>b2 b3 c</code></td>
+    </tr>
+</table>
+
+Have a look also at another example to better see the difference between NO_SKIP and SKIP_TO_FIRST:
+Pattern: `(a | c) (b | c) c+.greedy d` and sequence: `a b c1 c2 c3 d` Then the results will be:
+
+
+<table class="table table-bordered">
+    <tr>
+        <th class="text-left" style="width: 25%">Skip Strategy</th>
+        <th class="text-center" style="width: 25%">Result</th>
+        <th class="text-center"> Description</th>
+    </tr>
+    <tr>
+        <td><strong>NO_SKIP</strong></td>
+        <td>
+            <code>a b c1 c2 c3 d</code><br>
+            <code>b c1 c2 c3 d</code><br>
+            <code>c1 c2 c3 d</code><br>
+            <code>c2 c3 d</code><br>
+        </td>
+        <td>After found matching <code>a b c1 c2 c3 d</code>, the match process will not discard any result.</td>
+    </tr>
+    <tr>
+        <td><strong>SKIP_TO_FIRST</strong>[<code>b*</code>]</td>
+        <td>
+            <code>a b c1 c2 c3 d</code><br>
+            <code>c1 c2 c3 d</code><br>
+        </td>
+        <td>After found matching <code>a b c1 c2 c3 d</code>, the match process will try to discard all partial matches started before <code>c1</code>. There is one such match <code>b c1 c2 c3 d</code>.</td>
     </tr>
 </table>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 42a95e8..c2d2788 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.cep.scala.pattern
 
 import org.apache.flink.cep
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
 import org.apache.flink.cep.pattern.conditions.IterativeCondition.{Context => JContext}
 import org.apache.flink.cep.pattern.conditions.{IterativeCondition, SimpleCondition}
 import org.apache.flink.cep.pattern.{MalformedPatternException, Quantifier, Pattern => JPattern}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
deleted file mode 100644
index dcda441..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.nfa;
-
-import java.io.Serializable;
-
-
-/**
- * Indicate the skip strategy after a match process.
- *
- * <p>For more info on possible skip strategies see {@link SkipStrategy}.
- */
-public class AfterMatchSkipStrategy implements Serializable {
-
-	private static final long serialVersionUID = -4048930333619068531L;
-	// default strategy
-	private SkipStrategy strategy = SkipStrategy.NO_SKIP;
-
-	// pattern name to skip to
-	private String patternName = null;
-
-	/**
-	 * Discards every partial match that contains event of the match preceding the first of *PatternName*.
-	 * @param patternName the pattern name to skip to
-	 * @return the created AfterMatchSkipStrategy
-	 */
-	public static AfterMatchSkipStrategy skipToFirst(String patternName) {
-		return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_FIRST, patternName);
-	}
-
-	/**
-	 * Discards every partial match that contains event of the match preceding the last of *PatternName*.
-	 * @param patternName the pattern name to skip to
-	 * @return the created AfterMatchSkipStrategy
-	 */
-	public static AfterMatchSkipStrategy skipToLast(String patternName) {
-		return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_LAST, patternName);
-	}
-
-	/**
-	 * Discards every partial match that contains event of the match.
-	 * @return the created AfterMatchSkipStrategy
-	 */
-	public static AfterMatchSkipStrategy skipPastLastEvent() {
-		return new AfterMatchSkipStrategy(SkipStrategy.SKIP_PAST_LAST_EVENT);
-	}
-
-	/**
-	 * Every possible match will be emitted.
-	 * @return the created AfterMatchSkipStrategy
-	 */
-	public static AfterMatchSkipStrategy noSkip() {
-		return new AfterMatchSkipStrategy(SkipStrategy.NO_SKIP);
-	}
-
-	private AfterMatchSkipStrategy(SkipStrategy strategy) {
-		this(strategy, null);
-	}
-
-	private AfterMatchSkipStrategy(SkipStrategy strategy, String patternName) {
-		if (patternName == null && (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST)) {
-			throw new IllegalArgumentException("The patternName field can not be empty when SkipStrategy is " + strategy);
-		}
-		this.strategy = strategy;
-		this.patternName = patternName;
-	}
-
-	/**
-	 * Get the {@link SkipStrategy} enum.
-	 * @return the skip strategy
-	 */
-	public SkipStrategy getStrategy() {
-		return strategy;
-	}
-
-	/**
-	 * Get the referenced pattern name of this strategy.
-	 * @return the referenced pattern name.
-	 */
-	public String getPatternName() {
-		return patternName;
-	}
-
-	@Override
-	public String toString() {
-		switch (strategy) {
-			case NO_SKIP:
-			case SKIP_PAST_LAST_EVENT:
-				return "AfterMatchStrategy{" +
-					strategy +
-					"}";
-			case SKIP_TO_FIRST:
-			case SKIP_TO_LAST:
-				return "AfterMatchStrategy{" +
-					strategy + "[" +
-					patternName + "]" +
-					"}";
-		}
-		return super.toString();
-	}
-
-	/**
-	 * Skip Strategy Enum.
-	 */
-	public enum SkipStrategy{
-		/**
-		 * Every possible match will be emitted.
-		 */
-		NO_SKIP,
-		/**
-		 * Discards every partial match that contains event of the match.
-		 */
-		SKIP_PAST_LAST_EVENT,
-		/**
-		 * Discards every partial match that contains event of the match preceding the first of *PatternName*.
-		 */
-		SKIP_TO_FIRST,
-		/**
-		 * Discards every partial match that contains event of the match preceding the last of *PatternName*.
-		 */
-		SKIP_TO_LAST
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
index 65715a7..2edc7ee 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
 
 import javax.annotation.Nullable;
@@ -42,15 +43,24 @@ public class ComputationState {
 	@Nullable
 	private final NodeId previousBufferEntry;
 
+	@Nullable
+	private final EventId startEventID;
+
 	private ComputationState(
 			final String currentState,
 			@Nullable final NodeId previousBufferEntry,
 			final DeweyNumber version,
+			@Nullable final EventId startEventID,
 			final long startTimestamp) {
 		this.currentStateName = currentState;
 		this.version = version;
 		this.startTimestamp = startTimestamp;
 		this.previousBufferEntry = previousBufferEntry;
+		this.startEventID = startEventID;
+	}
+
+	public EventId getStartEventID() {
+		return startEventID;
 	}
 
 	public NodeId getPreviousBufferEntry() {
@@ -76,6 +86,7 @@ public class ComputationState {
 			return Objects.equals(currentStateName, other.currentStateName) &&
 				Objects.equals(version, other.version) &&
 				startTimestamp == other.startTimestamp &&
+				Objects.equals(startEventID, other.startEventID) &&
 				Objects.equals(previousBufferEntry, other.previousBufferEntry);
 		} else {
 			return false;
@@ -89,12 +100,13 @@ public class ComputationState {
 			", version=" + version +
 			", startTimestamp=" + startTimestamp +
 			", previousBufferEntry=" + previousBufferEntry +
+			", startEventID=" + startEventID +
 			'}';
 	}
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(currentStateName, version, startTimestamp, previousBufferEntry);
+		return Objects.hash(currentStateName, version, startTimestamp, startEventID, previousBufferEntry);
 	}
 
 	public static ComputationState createStartState(final String state) {
@@ -102,14 +114,15 @@ public class ComputationState {
 	}
 
 	public static ComputationState createStartState(final String state, final DeweyNumber version) {
-		return createState(state, null, version, -1L);
+		return createState(state, null, version, -1L, null);
 	}
 
 	public static ComputationState createState(
 			final String currentState,
 			final NodeId previousEntry,
 			final DeweyNumber version,
-			final long startTimestamp) {
-		return new ComputationState(currentState, previousEntry, version, startTimestamp);
+			final long startTimestamp,
+			final EventId startEventID) {
+		return new ComputationState(currentState, previousEntry, version, startEventID, startTimestamp);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index 34897fa..68e0eec 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -90,6 +90,10 @@ public class DeweyNumber implements Serializable {
 		}
 	}
 
+	public int getRun() {
+		return deweyNumber[0];
+	}
+
 	public int length() {
 		return deweyNumber.length;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java
index f1656a1..86d7fac 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/MigrationUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.EnumSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.util.Preconditions;
@@ -40,7 +41,7 @@ class MigrationUtils {
 	/**
 	 * Skips bytes corresponding to serialized states. In flink 1.6+ the states are no longer kept in state.
 	 */
-	static <T> void skipSerializedStates(DataInputView in) throws IOException {
+	static void skipSerializedStates(DataInputView in) throws IOException {
 		TypeSerializer<String> nameSerializer = StringSerializer.INSTANCE;
 		TypeSerializer<State.StateType> stateTypeSerializer = new EnumSerializer<>(State.StateType.class);
 		TypeSerializer<StateTransitionAction> actionSerializer = new EnumSerializer<>(StateTransitionAction.class);
@@ -73,7 +74,7 @@ class MigrationUtils {
 		}
 	}
 
-	private static <T> void skipCondition(DataInputView in) throws IOException, ClassNotFoundException {
+	private static void skipCondition(DataInputView in) throws IOException, ClassNotFoundException {
 		boolean hasCondition = in.readBoolean();
 		if (hasCondition) {
 			int length = in.readInt();
@@ -115,13 +116,16 @@ class MigrationUtils {
 			}
 
 			NodeId nodeId;
+			EventId startEventId;
 			if (prevState != null) {
 				nodeId = sharedBuffer.getNodeId(prevState, timestamp, counter, event);
+				startEventId = sharedBuffer.getStartEventId(version.getRun());
 			} else {
 				nodeId = null;
+				startEventId = null;
 			}
 
-			computationStates.add(ComputationState.createState(state, nodeId, version, startTimestamp));
+			computationStates.add(ComputationState.createState(state, nodeId, version, startTimestamp, startEventId));
 		}
 		return computationStates;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 276cde7..815b25a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
@@ -45,14 +46,12 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Queue;
-import java.util.Set;
 import java.util.Stack;
 
 import static org.apache.flink.cep.nfa.MigrationUtils.deserializeComputationStates;
@@ -237,18 +236,18 @@ public class NFA<T> {
 			final NFAState nfaState,
 			final long timestamp) throws Exception {
 
-		Queue<ComputationState> computationStates = nfaState.getComputationStates();
 		final Collection<Tuple2<Map<String, List<T>>, Long>> timeoutResult = new ArrayList<>();
+		final PriorityQueue<ComputationState> newPartialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
 
-		final int numberComputationStates = computationStates.size();
-		for (int i = 0; i < numberComputationStates; i++) {
-			ComputationState computationState = computationStates.poll();
-
+		Map<EventId, T> eventsCache = new HashMap<>();
+		for (ComputationState computationState : nfaState.getPartialMatches()) {
 			if (isStateTimedOut(computationState, timestamp)) {
 
 				if (handleTimeout) {
 					// extract the timed out event pattern
-					Map<String, List<T>> timedOutPattern = extractCurrentMatches(sharedBuffer, computationState);
+					Map<String, List<T>> timedOutPattern = sharedBuffer.materializeMatch(extractCurrentMatches(
+						sharedBuffer,
+						computationState), eventsCache);
 					timeoutResult.add(Tuple2.of(timedOutPattern, timestamp));
 				}
 
@@ -256,10 +255,12 @@ public class NFA<T> {
 
 				nfaState.setStateChanged();
 			} else {
-				computationStates.add(computationState);
+				newPartialMatches.add(computationState);
 			}
 		}
 
+		nfaState.setNewPartialMatches(newPartialMatches);
+
 		sharedBuffer.advanceTime(timestamp);
 
 		return timeoutResult;
@@ -275,15 +276,11 @@ public class NFA<T> {
 			final EventWrapper event,
 			final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
 
-		Queue<ComputationState> computationStates = nfaState.getComputationStates();
-
-		final int numberComputationStates = computationStates.size();
-		final Collection<Map<String, List<T>>> result = new ArrayList<>();
+		final PriorityQueue<ComputationState> newPartialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
+		final PriorityQueue<ComputationState> potentialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
 
 		// iterate over all current computations
-		for (int i = 0; i < numberComputationStates; i++) {
-			ComputationState computationState = computationStates.poll();
-
+		for (ComputationState computationState : nfaState.getPartialMatches()) {
 			final Collection<ComputationState> newComputationStates = computeNextStates(
 				sharedBuffer,
 				computationState,
@@ -303,12 +300,7 @@ public class NFA<T> {
 			for (final ComputationState newComputationState : newComputationStates) {
 
 				if (isFinalState(newComputationState)) {
-					// we've reached a final state and can thus retrieve the matching event sequence
-					Map<String, List<T>> matchedPattern = extractCurrentMatches(sharedBuffer, newComputationState);
-					result.add(matchedPattern);
-
-					// remove found patterns because they are no longer needed
-					sharedBuffer.releaseNode(newComputationState.getPreviousBufferEntry());
+					potentialMatches.add(newComputationState);
 				} else if (isStopState(newComputationState)) {
 					//reached stop state. release entry for the stop state
 					shouldDiscardPath = true;
@@ -326,81 +318,89 @@ public class NFA<T> {
 					sharedBuffer.releaseNode(state.getPreviousBufferEntry());
 				}
 			} else {
-				computationStates.addAll(statesToRetain);
+				newPartialMatches.addAll(statesToRetain);
 			}
 		}
 
-		discardComputationStatesAccordingToStrategy(
-			sharedBuffer, computationStates, result, afterMatchSkipStrategy);
+		if (!potentialMatches.isEmpty()) {
+			nfaState.setStateChanged();
+		}
+
+		List<Map<String, List<T>>> result = new ArrayList<>();
+		if (afterMatchSkipStrategy.isSkipStrategy()) {
+			processMatchesAccordingToSkipStrategy(sharedBuffer,
+				nfaState,
+				afterMatchSkipStrategy,
+				potentialMatches,
+				newPartialMatches,
+				result);
+		} else {
+			for (ComputationState match : potentialMatches) {
+				Map<EventId, T> eventsCache = new HashMap<>();
+				Map<String, List<T>> materializedMatch =
+					sharedBuffer.materializeMatch(
+						sharedBuffer.extractPatterns(
+							match.getPreviousBufferEntry(),
+							match.getVersion()).get(0),
+						eventsCache
+					);
+
+				result.add(materializedMatch);
+				sharedBuffer.releaseNode(match.getPreviousBufferEntry());
+			}
+		}
+
+		nfaState.setNewPartialMatches(newPartialMatches);
 
 		return result;
 	}
 
-	private void discardComputationStatesAccordingToStrategy(
-			final SharedBuffer<T> sharedBuffer,
-			final Queue<ComputationState> computationStates,
-			final Collection<Map<String, List<T>>> matchedResult,
-			final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
+	private void processMatchesAccordingToSkipStrategy(
+			SharedBuffer<T> sharedBuffer,
+			NFAState nfaState,
+			AfterMatchSkipStrategy afterMatchSkipStrategy,
+			PriorityQueue<ComputationState> potentialMatches,
+			PriorityQueue<ComputationState> partialMatches,
+			List<Map<String, List<T>>> result) throws Exception {
 
-		Set<T> discardEvents = new HashSet<>();
-		switch(afterMatchSkipStrategy.getStrategy()) {
-			case SKIP_TO_LAST:
-				for (Map<String, List<T>> resultMap: matchedResult) {
-					for (Map.Entry<String, List<T>> keyMatches : resultMap.entrySet()) {
-						if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
-							discardEvents.addAll(keyMatches.getValue().subList(0, keyMatches.getValue().size() - 1));
-							break;
-						} else {
-							discardEvents.addAll(keyMatches.getValue());
-						}
-					}
-				}
-				break;
-			case SKIP_TO_FIRST:
-				for (Map<String, List<T>> resultMap: matchedResult) {
-					for (Map.Entry<String, List<T>> keyMatches : resultMap.entrySet()) {
-						if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
-							break;
-						} else {
-							discardEvents.addAll(keyMatches.getValue());
-						}
-					}
-				}
-				break;
-			case SKIP_PAST_LAST_EVENT:
-				for (Map<String, List<T>> resultMap: matchedResult) {
-					for (List<T> eventList: resultMap.values()) {
-						discardEvents.addAll(eventList);
-					}
-				}
-				break;
-		}
-		if (!discardEvents.isEmpty()) {
-			List<ComputationState> discardStates = new ArrayList<>();
-			for (ComputationState computationState : computationStates) {
-				boolean discard = false;
-				Map<String, List<T>> partialMatch = extractCurrentMatches(sharedBuffer, computationState);
-				for (List<T> list: partialMatch.values()) {
-					for (T e: list) {
-						if (discardEvents.contains(e)) {
-							// discard the computation state.
-							discard = true;
-							break;
-						}
-					}
-					if (discard) {
-						break;
-					}
-				}
-				if (discard) {
-					sharedBuffer.releaseNode(computationState.getPreviousBufferEntry());
-					discardStates.add(computationState);
-				}
+		nfaState.getCompletedMatches().addAll(potentialMatches);
+
+		ComputationState earliestMatch = nfaState.getCompletedMatches().peek();
+
+		if (earliestMatch != null) {
+
+			Map<EventId, T> eventsCache = new HashMap<>();
+			ComputationState earliestPartialMatch;
+			while (earliestMatch != null && ((earliestPartialMatch = partialMatches.peek()) == null ||
+				isEarlier(earliestMatch, earliestPartialMatch))) {
+
+				nfaState.setStateChanged();
+				nfaState.getCompletedMatches().poll();
+				List<Map<String, List<EventId>>> matchedResult =
+					sharedBuffer.extractPatterns(earliestMatch.getPreviousBufferEntry(), earliestMatch.getVersion());
+
+				afterMatchSkipStrategy.prune(
+					partialMatches,
+					matchedResult,
+					sharedBuffer);
+
+				afterMatchSkipStrategy.prune(
+					nfaState.getCompletedMatches(),
+					matchedResult,
+					sharedBuffer);
+
+				result.add(sharedBuffer.materializeMatch(matchedResult.get(0), eventsCache));
+				earliestMatch = nfaState.getCompletedMatches().peek();
 			}
-			computationStates.removeAll(discardStates);
+
+			nfaState.getPartialMatches().removeIf(pm -> pm.getStartEventID() != null && !partialMatches.contains(pm));
 		}
 	}
 
+	private boolean isEarlier(ComputationState earliestMatch, ComputationState earliestPartialMatch) {
+		return NFAState.COMPUTATION_STATE_COMPARATOR.compare(earliestMatch, earliestPartialMatch) <= 0;
+	}
+
 	private static <T> boolean isEquivalentState(final State<T> s1, final State<T> s2) {
 		return s1.getName().equals(s2.getName());
 	}
@@ -569,12 +569,13 @@ public class NFA<T> {
 						}
 
 						addComputationState(
-								sharedBuffer,
-								resultingComputationStates,
-								edge.getTargetState(),
-								computationState.getPreviousBufferEntry(),
-								version,
-								computationState.getStartTimestamp()
+							sharedBuffer,
+							resultingComputationStates,
+							edge.getTargetState(),
+							computationState.getPreviousBufferEntry(),
+							version,
+							computationState.getStartTimestamp(),
+							computationState.getStartEventID()
 						);
 					}
 				}
@@ -596,10 +597,13 @@ public class NFA<T> {
 						currentVersion);
 
 					final long startTimestamp;
+					final EventId startEventId;
 					if (isStartState(computationState)) {
 						startTimestamp = timestamp;
+						startEventId = event.getEventId();
 					} else {
 						startTimestamp = computationState.getStartTimestamp();
+						startEventId = computationState.getStartEventID();
 					}
 
 					addComputationState(
@@ -608,7 +612,8 @@ public class NFA<T> {
 							nextState,
 							newEntry,
 							nextVersion,
-							startTimestamp);
+							startTimestamp,
+							startEventId);
 
 					//check if newly created state is optional (have a PROCEED path to Final state)
 					final State<T> finalState = findFinalStateAfterProceed(context, nextState, event.getEvent());
@@ -619,7 +624,8 @@ public class NFA<T> {
 								finalState,
 								newEntry,
 								nextVersion,
-								startTimestamp);
+								startTimestamp,
+								startEventId);
 					}
 					break;
 			}
@@ -649,9 +655,10 @@ public class NFA<T> {
 			State<T> currentState,
 			NodeId previousEntry,
 			DeweyNumber version,
-			long startTimestamp) throws Exception {
+			long startTimestamp,
+			EventId startEventId) throws Exception {
 		ComputationState computationState = ComputationState.createState(
-				currentState.getName(), previousEntry, version, startTimestamp);
+				currentState.getName(), previousEntry, version, startTimestamp, startEventId);
 		computationStates.add(computationState);
 
 		sharedBuffer.lockNode(previousEntry);
@@ -745,14 +752,14 @@ public class NFA<T> {
 	 * @return Collection of event sequences which end in the given computation state
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
-	private Map<String, List<T>> extractCurrentMatches(
+	private Map<String, List<EventId>> extractCurrentMatches(
 			final SharedBuffer<T> sharedBuffer,
 			final ComputationState computationState) throws Exception {
 		if (computationState.getPreviousBufferEntry() == null) {
 			return new HashMap<>();
 		}
 
-		List<Map<String, List<T>>> paths = sharedBuffer.extractPatterns(
+		List<Map<String, List<EventId>>> paths = sharedBuffer.extractPatterns(
 				computationState.getPreviousBufferEntry(),
 				computationState.getVersion());
 
@@ -762,15 +769,7 @@ public class NFA<T> {
 		// for a given computation state, we cannot have more than one matching patterns.
 		Preconditions.checkState(paths.size() == 1);
 
-		Map<String, List<T>> result = new LinkedHashMap<>();
-		Map<String, List<T>> path = paths.get(0);
-		for (String key: path.keySet()) {
-			List<T> events = path.get(key);
-
-			List<T> values = result.computeIfAbsent(key, k -> new ArrayList<>(events.size()));
-			values.addAll(events);
-		}
-		return result;
+		return paths.get(0);
 	}
 
 	/**
@@ -809,7 +808,8 @@ public class NFA<T> {
 			// this is to avoid any overheads when using a simple, non-iterative condition.
 
 			if (matchedEvents == null) {
-				this.matchedEvents = nfa.extractCurrentMatches(sharedBuffer, computationState);
+				this.matchedEvents = sharedBuffer.materializeMatch(nfa.extractCurrentMatches(sharedBuffer,
+					computationState));
 			}
 
 			return new Iterable<T>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java
index a89c72e..dae7013 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAState.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.cep.nfa;
 
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Objects;
+import java.util.PriorityQueue;
 import java.util.Queue;
 
 /**
@@ -31,16 +34,33 @@ public class NFAState {
 	 * These are the "active" intermediate states that are waiting for new matching
 	 * events to transition to new valid states.
 	 */
-	private final Queue<ComputationState> computationStates;
+	private Queue<ComputationState> partialMatches;
+
+	private Queue<ComputationState> completedMatches;
 
 	/**
 	 * Flag indicating whether the matching status of the state machine has changed.
 	 */
 	private boolean stateChanged;
 
-	public NFAState(Queue<ComputationState> computationStates) {
-		this.computationStates = computationStates;
-		this.stateChanged = false;
+	public static final Comparator<ComputationState> COMPUTATION_STATE_COMPARATOR =
+		Comparator.<ComputationState>comparingLong(c ->
+				c.getStartEventID() != null ? c.getStartEventID().getTimestamp() : Long.MAX_VALUE)
+			.thenComparingInt(c ->
+				c.getStartEventID() != null ? c.getStartEventID().getId() : Integer.MAX_VALUE);
+
+	public NFAState(Iterable<ComputationState> states) {
+		this.partialMatches = new PriorityQueue<>(COMPUTATION_STATE_COMPARATOR);
+		for (ComputationState startingState : states) {
+			partialMatches.add(startingState);
+		}
+
+		this.completedMatches = new PriorityQueue<>(COMPUTATION_STATE_COMPARATOR);
+	}
+
+	public NFAState(Queue<ComputationState> partialMatches, Queue<ComputationState> completedMatches) {
+		this.partialMatches = partialMatches;
+		this.completedMatches = completedMatches;
 	}
 
 	/**
@@ -66,8 +86,16 @@ public class NFAState {
 		this.stateChanged = true;
 	}
 
-	public Queue<ComputationState> getComputationStates() {
-		return computationStates;
+	public Queue<ComputationState> getPartialMatches() {
+		return partialMatches;
+	}
+
+	public Queue<ComputationState> getCompletedMatches() {
+		return completedMatches;
+	}
+
+	public void setNewPartialMatches(PriorityQueue<ComputationState> newPartialMatches) {
+		this.partialMatches = newPartialMatches;
 	}
 
 	@Override
@@ -79,18 +107,20 @@ public class NFAState {
 			return false;
 		}
 		NFAState nfaState = (NFAState) o;
-		return 	Objects.equals(computationStates, nfaState.computationStates);
+		return Arrays.equals(partialMatches.toArray(), nfaState.partialMatches.toArray()) &&
+			Arrays.equals(completedMatches.toArray(), nfaState.completedMatches.toArray());
 	}
 
 	@Override
 	public int hashCode() {
-		return Objects.hash(computationStates, stateChanged);
+		return Objects.hash(partialMatches, completedMatches);
 	}
 
 	@Override
 	public String toString() {
 		return "NFAState{" +
-			"computationStates=" + computationStates +
+			"partialMatches=" + partialMatches +
+			", completedMatches=" + completedMatches +
 			", stateChanged=" + stateChanged +
 			'}';
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
index bac144d..05b6c91 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFAStateSerializer.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -31,7 +32,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.LinkedList;
+import java.util.PriorityQueue;
 import java.util.Queue;
 
 /**
@@ -89,43 +90,61 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 		return -1;
 	}
 
+	private static final StringSerializer STATE_NAME_SERIALIZER = StringSerializer.INSTANCE;
+	private static final LongSerializer TIMESTAMP_SERIALIZER = LongSerializer.INSTANCE;
+	private static final DeweyNumber.DeweyNumberSerializer VERSION_SERIALIZER = DeweyNumber.DeweyNumberSerializer.INSTANCE;
+	private static final NodeId.NodeIdSerializer NODE_ID_SERIALIZER = NodeId.NodeIdSerializer.INSTANCE;
+	private static final EventId.EventIdSerializer EVENT_ID_SERIALIZER = EventId.EventIdSerializer.INSTANCE;
+
 	@Override
 	public void serialize(NFAState record, DataOutputView target) throws IOException {
+		serializeComputationStates(record.getPartialMatches(), target);
+		serializeComputationStates(record.getCompletedMatches(), target);
+	}
 
-		target.writeInt(record.getComputationStates().size());
-
-		StringSerializer stateNameSerializer = StringSerializer.INSTANCE;
-		LongSerializer timestampSerializer = LongSerializer.INSTANCE;
-		DeweyNumber.DeweyNumberSerializer versionSerializer = DeweyNumber.DeweyNumberSerializer.INSTANCE;
-		NodeId.NodeIdSerializer nodeIdSerializer = NodeId.NodeIdSerializer.INSTANCE;
-
-		for (ComputationState computationState : record.getComputationStates()) {
-			stateNameSerializer.serialize(computationState.getCurrentStateName(), target);
-			nodeIdSerializer.serialize(computationState.getPreviousBufferEntry(), target);
-
-			versionSerializer.serialize(computationState.getVersion(), target);
-			timestampSerializer.serialize(computationState.getStartTimestamp(), target);
+	private void serializeComputationStates(Queue<ComputationState> states, DataOutputView target) throws IOException {
+		target.writeInt(states.size());
+		for (ComputationState computationState : states) {
+			STATE_NAME_SERIALIZER.serialize(computationState.getCurrentStateName(), target);
+			NODE_ID_SERIALIZER.serialize(computationState.getPreviousBufferEntry(), target);
+
+			VERSION_SERIALIZER.serialize(computationState.getVersion(), target);
+			TIMESTAMP_SERIALIZER.serialize(computationState.getStartTimestamp(), target);
+			if (computationState.getStartEventID() != null) {
+				target.writeByte(1);
+				EVENT_ID_SERIALIZER.serialize(computationState.getStartEventID(), target);
+			} else {
+				target.writeByte(0);
+			}
 		}
 	}
 
 	@Override
 	public NFAState deserialize(DataInputView source) throws IOException {
-		Queue<ComputationState> computationStates = new LinkedList<>();
-		StringSerializer stateNameSerializer = StringSerializer.INSTANCE;
-		LongSerializer timestampSerializer = LongSerializer.INSTANCE;
-		DeweyNumber.DeweyNumberSerializer versionSerializer = DeweyNumber.DeweyNumberSerializer.INSTANCE;
-		NodeId.NodeIdSerializer nodeIdSerializer = NodeId.NodeIdSerializer.INSTANCE;
+		PriorityQueue<ComputationState> partialMatches = deserializeComputationStates(source);
+		PriorityQueue<ComputationState> completedMatches = deserializeComputationStates(source);
+		return new NFAState(partialMatches, completedMatches);
+	}
+
+	private PriorityQueue<ComputationState> deserializeComputationStates(DataInputView source) throws IOException {
+		PriorityQueue<ComputationState> computationStates = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
 
 		int computationStateNo = source.readInt();
 		for (int i = 0; i < computationStateNo; i++) {
-			String state = stateNameSerializer.deserialize(source);
-			NodeId prevState = nodeIdSerializer.deserialize(source);
-			DeweyNumber version = versionSerializer.deserialize(source);
-			long startTimestamp = timestampSerializer.deserialize(source);
-
-			computationStates.add(ComputationState.createState(state, prevState, version, startTimestamp));
+			String state = STATE_NAME_SERIALIZER.deserialize(source);
+			NodeId prevState = NODE_ID_SERIALIZER.deserialize(source);
+			DeweyNumber version = VERSION_SERIALIZER.deserialize(source);
+			long startTimestamp = TIMESTAMP_SERIALIZER.deserialize(source);
+
+			byte isNull = source.readByte();
+			EventId startEventId = null;
+			if (isNull == 1) {
+				startEventId = EVENT_ID_SERIALIZER.deserialize(source);
+			}
+
+			computationStates.add(ComputationState.createState(state, prevState, version, startTimestamp, startEventId));
 		}
-		return new NFAState(computationStates);
+		return computationStates;
 	}
 
 	@Override
@@ -135,7 +154,32 @@ public class NFAStateSerializer extends TypeSerializerSingleton<NFAState> {
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		serialize(deserialize(source), target);
+		copyStates(source, target); // copy partial matches
+		copyStates(source, target); // copy completed matches
+	}
+
+	private void copyStates(DataInputView source, DataOutputView target) throws IOException {
+		int computationStateNo = source.readInt();
+		target.writeInt(computationStateNo);
+
+		for (int i = 0; i < computationStateNo; i++) {
+			String state = STATE_NAME_SERIALIZER.deserialize(source);
+			STATE_NAME_SERIALIZER.serialize(state, target);
+			NodeId prevState = NODE_ID_SERIALIZER.deserialize(source);
+			NODE_ID_SERIALIZER.serialize(prevState, target);
+			DeweyNumber version = VERSION_SERIALIZER.deserialize(source);
+			VERSION_SERIALIZER.serialize(version, target);
+			long startTimestamp = TIMESTAMP_SERIALIZER.deserialize(source);
+			TIMESTAMP_SERIALIZER.serialize(startTimestamp, target);
+
+			byte isNull = source.readByte();
+			target.writeByte(isNull);
+
+			if (isNull == 1) {
+				EventId startEventId = EVENT_ID_SERIALIZER.deserialize(source);
+				EVENT_ID_SERIALIZER.serialize(startEventId, target);
+			}
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index a4dbc00..6e5f9db 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -50,6 +50,8 @@ import java.util.stream.Collectors;
 public class SharedBuffer<V> {
 
 	private final Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext;
+	/** Run number (first block in DeweyNumber) -> EventId. */
+	private Map<Integer, EventId> starters;
 	private final Map<EventId, Lockable<V>> eventsBuffer;
 	private final Map<NodeId, Lockable<SharedBufferNode>> pages;
 
@@ -64,11 +66,13 @@ public class SharedBuffer<V> {
 	public SharedBuffer(
 			Map<EventId, Lockable<V>> eventsBuffer,
 			Map<NodeId, Lockable<SharedBufferNode>> pages,
-			Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext) {
+			Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext,
+			Map<Integer, EventId> starters) {
 
 		this.eventsBuffer = eventsBuffer;
 		this.pages = pages;
 		this.mappingContext = mappingContext;
+		this.starters = starters;
 	}
 
 	public NodeId getNodeId(String prevState, long timestamp, int counter, V event) {
@@ -76,6 +80,10 @@ public class SharedBuffer<V> {
 			new ValueTimeWrapper<>(event, timestamp, counter)));
 	}
 
+	public EventId getStartEventId(int run) {
+		return starters.get(run);
+	}
+
 	/**
 	 * Wrapper for a value-timestamp pair.
 	 *
@@ -284,6 +292,7 @@ public class SharedBuffer<V> {
 			// read the edges of the shared buffer entries
 			int totalEdges = source.readInt();
 
+			Map<Integer, EventId> starters = new HashMap<>();
 			for (int j = 0; j < totalEdges; j++) {
 				int sourceIdx = source.readInt();
 
@@ -297,11 +306,14 @@ public class SharedBuffer<V> {
 				Tuple2<NodeId, Lockable<SharedBufferNode>> targetEntry =
 					targetIdx < 0 ? Tuple2.of(null, null) : entries.get(targetIdx);
 				sourceEntry.f1.getElement().addEdge(new SharedBufferEdge(targetEntry.f0, version));
+				if (version.length() == 1) {
+					starters.put(version.getRun(), sourceEntry.f0.getEventId());
+				}
 			}
 
 			Map<NodeId, Lockable<SharedBufferNode>> entriesMap = entries.stream().collect(Collectors.toMap(e -> e.f0, e -> e.f1));
 
-			return new SharedBuffer<>(valuesWithIds, entriesMap, mappingContext);
+			return new SharedBuffer<>(valuesWithIds, entriesMap, mappingContext, starters);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
new file mode 100644
index 0000000..e0f399f
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.aftermatch;
+
+import org.apache.flink.cep.nfa.ComputationState;
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+
+/**
+ * Indicate the skip strategy after a match process.
+ */
+public abstract class AfterMatchSkipStrategy implements Serializable {
+
+	private static final long serialVersionUID = -4048930333619068531L;
+
+	/**
+	 * Discards every partial match that contains event of the match preceding the first of *PatternName*.
+	 *
+	 * @param patternName the pattern name to skip to
+	 * @return the created AfterMatchSkipStrategy
+	 */
+	public static AfterMatchSkipStrategy skipToFirst(String patternName) {
+		return new SkipToFirstStrategy(patternName);
+	}
+
+	/**
+	 * Discards every partial match that contains event of the match preceding the last of *PatternName*.
+	 *
+	 * @param patternName the pattern name to skip to
+	 * @return the created AfterMatchSkipStrategy
+	 */
+	public static AfterMatchSkipStrategy skipToLast(String patternName) {
+		return new SkipToLastStrategy(patternName);
+	}
+
+	/**
+	 * Discards every partial match that contains event of the match.
+	 *
+	 * @return the created AfterMatchSkipStrategy
+	 */
+	public static AfterMatchSkipStrategy skipPastLastEvent() {
+		return SkipPastLastStrategy.INSTANCE;
+	}
+
+	/**
+	 * Every possible match will be emitted.
+	 *
+	 * @return the created AfterMatchSkipStrategy
+	 */
+	public static AfterMatchSkipStrategy noSkip() {
+		return NoSkipStrategy.INSTANCE;
+	}
+
+	/**
+	 * Tells if the strategy may skip some matches.
+	 *
+	 * @return false if the strategy is NO_SKIP strategy
+	 */
+	public abstract boolean isSkipStrategy();
+
+	/**
+	 * Prunes matches/partial matches based on the chosen strategy.
+	 *
+	 * @param matchesToPrune current partial matches
+	 * @param matchedResult  already completed matches
+	 * @param sharedBuffer   corresponding shared buffer
+	 * @throws Exception thrown if could not access the state
+	 */
+	public void prune(
+			Collection<ComputationState> matchesToPrune,
+			Collection<Map<String, List<EventId>>> matchedResult,
+			SharedBuffer<?> sharedBuffer) throws Exception {
+
+		EventId pruningId = getPruningId(matchedResult);
+		if (pruningId != null) {
+			List<ComputationState> discardStates = new ArrayList<>();
+			for (ComputationState computationState : matchesToPrune) {
+				if (computationState.getStartEventID() != null &&
+					shouldPrune(computationState.getStartEventID(), pruningId)) {
+					sharedBuffer.releaseNode(computationState.getPreviousBufferEntry());
+					discardStates.add(computationState);
+				}
+			}
+			matchesToPrune.removeAll(discardStates);
+		}
+	}
+
+	/**
+	 * Tells if the partial/completed match starting at given id should be prunned by given pruningId.
+	 *
+	 * @param startEventID starting event id of a partial/completed match
+	 * @param pruningId   pruningId calculated by this strategy
+	 * @return true if the match should be pruned
+	 */
+	protected abstract boolean shouldPrune(EventId startEventID, EventId pruningId);
+
+	/**
+	 * Retrieves event id of the pruning element from the given match based on the strategy.
+	 *
+	 * @param match match corresponding to which should the pruning happen
+	 * @return pruning event id
+	 */
+	protected abstract EventId getPruningId(Collection<Map<String, List<EventId>>> match);
+
+	/**
+	 * Name of pattern that processing will be skipped to.
+	 */
+	public Optional<String> getPatternName() {
+		return Optional.empty();
+	}
+
+	static EventId max(EventId o1, EventId o2) {
+		if (o2 == null) {
+			return o1;
+		}
+
+		if (o1 == null) {
+			return o2;
+		}
+
+		if (o1.compareTo(o2) >= 0) {
+			return o1;
+		} else {
+			return o2;
+		}
+	}
+
+	/** Forbid further extending. */
+	AfterMatchSkipStrategy() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/NoSkipStrategy.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/NoSkipStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/NoSkipStrategy.java
new file mode 100644
index 0000000..2f6769d
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/NoSkipStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.aftermatch;
+
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Every possible match will be emitted.
+ */
+public class NoSkipStrategy extends AfterMatchSkipStrategy {
+
+	private static final long serialVersionUID = -5843740153729531775L;
+
+	static final NoSkipStrategy INSTANCE = new NoSkipStrategy();
+
+	private NoSkipStrategy() {
+	}
+
+	@Override
+	public boolean isSkipStrategy() {
+		return false;
+	}
+
+	@Override
+	protected boolean shouldPrune(EventId startEventID, EventId pruningId) {
+		throw new IllegalStateException("This should never happen. Please file a bug.");
+	}
+
+	@Override
+	protected EventId getPruningId(Collection<Map<String, List<EventId>>> match) {
+		throw new IllegalStateException("This should never happen. Please file a bug.");
+	}
+
+	@Override
+	public String toString() {
+		return "NoSkipStrategy{}";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipPastLastStrategy.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipPastLastStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipPastLastStrategy.java
new file mode 100644
index 0000000..952d91a
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipPastLastStrategy.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.aftermatch;
+
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Discards every partial match that contains event of the match.
+ */
+public class SkipPastLastStrategy extends AfterMatchSkipStrategy {
+
+	public static final SkipPastLastStrategy INSTANCE = new SkipPastLastStrategy();
+
+	private static final long serialVersionUID = -8450320065949093169L;
+
+	private SkipPastLastStrategy() {
+	}
+
+	@Override
+	public boolean isSkipStrategy() {
+		return true;
+	}
+
+	@Override
+	protected boolean shouldPrune(EventId startEventID, EventId pruningId) {
+		return startEventID != null && startEventID.compareTo(pruningId) <= 0;
+	}
+
+	@Override
+	protected EventId getPruningId(final Collection<Map<String, List<EventId>>> match) {
+		EventId pruningId = null;
+		for (Map<String, List<EventId>> resultMap : match) {
+			for (List<EventId> eventList : resultMap.values()) {
+				pruningId = max(pruningId, eventList.get(eventList.size() - 1));
+			}
+		}
+
+		return pruningId;
+	}
+
+	@Override
+	public String toString() {
+		return "SkipPastLastStrategy{}";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java
new file mode 100644
index 0000000..7d7be4a
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.aftermatch;
+
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Discards every partial match that contains event of the match preceding the first of *PatternName*.
+ */
+public class SkipToFirstStrategy extends AfterMatchSkipStrategy {
+
+	private static final long serialVersionUID = 7127107527654629026L;
+	private final String patternName;
+
+	SkipToFirstStrategy(String patternName) {
+		this.patternName = checkNotNull(patternName);
+	}
+
+	@Override
+	public boolean isSkipStrategy() {
+		return true;
+	}
+
+	@Override
+	protected boolean shouldPrune(EventId startEventID, EventId pruningId) {
+		return startEventID != null && startEventID.compareTo(pruningId) < 0;
+	}
+
+	@Override
+	protected EventId getPruningId(Collection<Map<String, List<EventId>>> match) {
+		EventId pruniningId = null;
+		for (Map<String, List<EventId>> resultMap : match) {
+			List<EventId> pruningPattern = resultMap.get(patternName);
+			if (pruningPattern != null && !pruningPattern.isEmpty()) {
+				pruniningId = max(pruniningId, pruningPattern.get(0));
+			}
+		}
+
+		return pruniningId;
+	}
+
+	@Override
+	public Optional<String> getPatternName() {
+		return Optional.of(patternName);
+	}
+
+	@Override
+	public String toString() {
+		return "SkipToFirstStrategy{" +
+			"patternName='" + patternName + '\'' +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java
new file mode 100644
index 0000000..0f6c3ed
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.aftermatch;
+
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Discards every partial match that contains event of the match preceding the last of *PatternName*.
+ */
+public class SkipToLastStrategy extends AfterMatchSkipStrategy {
+	private static final long serialVersionUID = 7585116990619594531L;
+	private final String patternName;
+
+	SkipToLastStrategy(String patternName) {
+		this.patternName = checkNotNull(patternName);
+	}
+
+	@Override
+	public boolean isSkipStrategy() {
+		return true;
+	}
+
+	@Override
+	protected boolean shouldPrune(EventId startEventID, EventId pruningId) {
+		return startEventID != null && startEventID.compareTo(pruningId) < 0;
+	}
+
+	@Override
+	protected EventId getPruningId(Collection<Map<String, List<EventId>>> match) {
+		EventId pruningId = null;
+		for (Map<String, List<EventId>> resultMap : match) {
+			List<EventId> pruningPattern = resultMap.get(patternName);
+
+			if (pruningPattern != null && !pruningPattern.isEmpty()) {
+				pruningId = max(pruningId, pruningPattern.get(pruningPattern.size() - 1));
+			}
+		}
+
+		return pruningId;
+	}
+
+	@Override
+	public Optional<String> getPatternName() {
+		return Optional.of(patternName);
+	}
+
+	@Override
+	public String toString() {
+		return "SkipToLastStrategy{" +
+			"patternName='" + patternName + '\'' +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index dbb654c..8f49f68 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -19,11 +19,11 @@
 package org.apache.flink.cep.nfa.compiler;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.State;
 import org.apache.flink.cep.nfa.StateTransition;
 import org.apache.flink.cep.nfa.StateTransitionAction;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.pattern.GroupPattern;
 import org.apache.flink.cep.pattern.MalformedPatternException;
 import org.apache.flink.cep.pattern.Pattern;
@@ -136,15 +136,15 @@ public class NFACompiler {
 		 * Check pattern after match skip strategy.
 		 */
 		private void checkPatternSkipStrategy() {
-			if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
-				afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
+			if (afterMatchSkipStrategy.getPatternName().isPresent()) {
+				String patternName = afterMatchSkipStrategy.getPatternName().get();
 				Pattern<T, ?> pattern = currentPattern;
-				while (pattern.getPrevious() != null && !pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
+				while (pattern.getPrevious() != null && !pattern.getName().equals(patternName)) {
 					pattern = pattern.getPrevious();
 				}
 
 				// pattern name match check.
-				if (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
+				if (!pattern.getName().equals(patternName)) {
 					throw new MalformedPatternException("The pattern name specified in AfterMatchSkipStrategy " +
 						"can not be found in the given Pattern");
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
index 57d244a..c1a6ccb 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
@@ -26,12 +26,13 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 import java.io.IOException;
+import java.util.Comparator;
 import java.util.Objects;
 
 /**
  * Composite key for events in {@link SharedBuffer}.
  */
-public class EventId {
+public class EventId implements Comparable<EventId> {
 	private final int id;
 	private final long timestamp;
 
@@ -48,6 +49,9 @@ public class EventId {
 		return timestamp;
 	}
 
+	public static final Comparator<EventId> COMPARATOR = Comparator.comparingLong(EventId::getTimestamp)
+		.thenComparingInt(EventId::getId);
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
@@ -74,6 +78,11 @@ public class EventId {
 			'}';
 	}
 
+	@Override
+	public int compareTo(EventId o) {
+		return COMPARATOR.compare(this, o);
+	}
+
 	/** {@link TypeSerializer} for {@link EventId}. */
 	public static class EventIdSerializer extends TypeSerializerSingleton<EventId> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
index 32be5da..c35e4d7 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
@@ -35,6 +36,7 @@ import org.apache.commons.lang3.StringUtils;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -213,11 +215,11 @@ public class SharedBuffer<V> {
 	 * @return Collection of previous relations starting with the given value
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
-	public List<Map<String, List<V>>> extractPatterns(
+	public List<Map<String, List<EventId>>> extractPatterns(
 			final NodeId nodeId,
 			final DeweyNumber version) throws Exception {
 
-		List<Map<String, List<V>>> result = new ArrayList<>();
+		List<Map<String, List<EventId>>> result = new ArrayList<>();
 
 		// stack to remember the current extraction states
 		Stack<ExtractionState> extractionStates = new Stack<>();
@@ -238,15 +240,15 @@ public class SharedBuffer<V> {
 
 				// termination criterion
 				if (currentEntry == null) {
-					final Map<String, List<V>> completePath = new LinkedHashMap<>();
+					final Map<String, List<EventId>> completePath = new LinkedHashMap<>();
 
 					while (!currentPath.isEmpty()) {
 						final NodeId currentPathEntry = currentPath.pop().f0;
 
 						String page = currentPathEntry.getPageName();
-						List<V> values = completePath
+						List<EventId> values = completePath
 							.computeIfAbsent(page, k -> new ArrayList<>());
-						values.add(eventsBuffer.get(currentPathEntry.getEventId()).getElement());
+						values.add(currentPathEntry.getEventId());
 					}
 					result.add(completePath);
 				} else {
@@ -285,6 +287,32 @@ public class SharedBuffer<V> {
 		return result;
 	}
 
+	public Map<String, List<V>> materializeMatch(Map<String, List<EventId>> match) {
+		return materializeMatch(match, new HashMap<>());
+	}
+
+	public Map<String, List<V>> materializeMatch(Map<String, List<EventId>> match, Map<EventId, V> cache) {
+
+		Map<String, List<V>> materializedMatch = new LinkedHashMap<>(match.size());
+
+		for (Map.Entry<String, List<EventId>> pattern : match.entrySet()) {
+			List<V> events = new ArrayList<>(pattern.getValue().size());
+			for (EventId eventId : pattern.getValue()) {
+				V event = cache.computeIfAbsent(eventId, id -> {
+					try {
+						return eventsBuffer.get(id).getElement();
+					} catch (Exception ex) {
+						throw new WrappingRuntimeException(ex);
+					}
+				});
+				events.add(event);
+			}
+			materializedMatch.put(pattern.getKey(), events);
+		}
+
+		return materializedMatch;
+	}
+
 	/**
 	 * Increases the reference counter for the given entry so that it is not
 	 * accidentally removed.

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 7e3e6f3..9c263b0 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -29,11 +29,11 @@ import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.NFA.MigratedNFA;
 import org.apache.flink.cep.nfa.NFAState;
 import org.apache.flink.cep.nfa.NFAStateSerializer;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.runtime.state.KeyedStateFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index bc5e2b1..bdb2897 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -28,7 +28,7 @@ import org.apache.flink.cep.PatternFlatTimeoutFunction;
 import org.apache.flink.cep.PatternSelectFunction;
 import org.apache.flink.cep.PatternStream;
 import org.apache.flink.cep.PatternTimeoutFunction;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.TimeCharacteristic;

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
index fb7143f..df54b53 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
@@ -21,7 +21,7 @@ package org.apache.flink.cep.operator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.EventComparator;
 import org.apache.flink.cep.PatternFlatSelectFunction;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.util.OutputTag;

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
index cc9caae..642c92a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.EventComparator;
 import org.apache.flink.cep.PatternFlatSelectFunction;
 import org.apache.flink.cep.PatternFlatTimeoutFunction;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.util.OutputTag;

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
index 1b2b28d..ad335e5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
@@ -21,7 +21,7 @@ package org.apache.flink.cep.operator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.EventComparator;
 import org.apache.flink.cep.PatternSelectFunction;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.OutputTag;

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
index 344f374..73ac709 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.EventComparator;
 import org.apache.flink.cep.PatternSelectFunction;
 import org.apache.flink.cep.PatternTimeoutFunction;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.OutputTag;

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/GroupPattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/GroupPattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/GroupPattern.java
index fce408c..749ab27 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/GroupPattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/GroupPattern.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.cep.pattern;
 
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index a276d9a..c821120 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -19,8 +19,8 @@
 package org.apache.flink.cep.pattern;
 
 import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy;
 import org.apache.flink.cep.pattern.Quantifier.Times;
 import org.apache.flink.cep.pattern.conditions.AndCondition;

http://git-wip-us.apache.org/repos/asf/flink/blob/d934cb8f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 4f2383a..9b35788 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -21,8 +21,8 @@ package org.apache.flink.cep;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.core.fs.FileSystem;


[5/5] flink git commit: [FLINK-9588][cep] Reused context with same computation state calculate

Posted by dw...@apache.org.
[FLINK-9588][cep] Reused context with same computation state calculate

This cloes #6168


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/abd61cfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/abd61cfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/abd61cfa

Branch: refs/heads/master
Commit: abd61cfacf62d909a9a6a2d843f3be97a6f629ee
Parents: ce345e3
Author: minwenjun <mi...@didichuxing.com>
Authored: Thu Jun 14 22:24:02 2018 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Thu Jul 5 15:54:54 2018 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 33 +++++++-------------
 1 file changed, 12 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/abd61cfa/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 041a017..276cde7 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -536,7 +536,9 @@ public class NFA<T> {
 			final EventWrapper event,
 			final long timestamp) throws Exception {
 
-		final OutgoingEdges<T> outgoingEdges = createDecisionGraph(sharedBuffer, computationState, event.getEvent());
+		final ConditionContext<T> context = new ConditionContext<>(this, sharedBuffer, computationState);
+
+		final OutgoingEdges<T> outgoingEdges = createDecisionGraph(context, computationState, event.getEvent());
 
 		// Create the computing version based on the previously computed edges
 		// We need to defer the creation of computation states until we know how many edges start
@@ -609,7 +611,7 @@ public class NFA<T> {
 							startTimestamp);
 
 					//check if newly created state is optional (have a PROCEED path to Final state)
-					final State<T> finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState);
+					final State<T> finalState = findFinalStateAfterProceed(context, nextState, event.getEvent());
 					if (finalState != null) {
 						addComputationState(
 								sharedBuffer,
@@ -656,19 +658,17 @@ public class NFA<T> {
 	}
 
 	private State<T> findFinalStateAfterProceed(
-			SharedBuffer<T> sharedBuffer,
+			ConditionContext<T> context,
 			State<T> state,
-			T event,
-			ComputationState computationState) {
+			T event) {
 		final Stack<State<T>> statesToCheck = new Stack<>();
 		statesToCheck.push(state);
-
 		try {
 			while (!statesToCheck.isEmpty()) {
 				final State<T> currentState = statesToCheck.pop();
 				for (StateTransition<T> transition : currentState.getStateTransitions()) {
 					if (transition.getAction() == StateTransitionAction.PROCEED &&
-							checkFilterCondition(sharedBuffer, computationState, transition.getCondition(), event)) {
+							checkFilterCondition(context, transition.getCondition(), event)) {
 						if (transition.getTargetState().isFinal()) {
 							return transition.getTargetState();
 						} else {
@@ -689,7 +689,7 @@ public class NFA<T> {
 	}
 
 	private OutgoingEdges<T> createDecisionGraph(
-			SharedBuffer<T> sharedBuffer,
+			ConditionContext<T> context,
 			ComputationState computationState,
 			T event) {
 		State<T> state = getState(computationState);
@@ -706,7 +706,7 @@ public class NFA<T> {
 			// check all state transitions for each state
 			for (StateTransition<T> stateTransition : stateTransitions) {
 				try {
-					if (checkFilterCondition(sharedBuffer, computationState, stateTransition.getCondition(), event)) {
+					if (checkFilterCondition(context, stateTransition.getCondition(), event)) {
 						// filter condition is true
 						switch (stateTransition.getAction()) {
 							case PROCEED:
@@ -729,11 +729,10 @@ public class NFA<T> {
 	}
 
 	private boolean checkFilterCondition(
-			SharedBuffer<T> sharedBuffer,
-			ComputationState computationState,
+			ConditionContext<T> context,
 			IterativeCondition<T> condition,
 			T event) throws Exception {
-		return condition == null || condition.filter(event, new ConditionContext<>(this, sharedBuffer, computationState));
+		return condition == null || condition.filter(event, context);
 	}
 
 	/**
@@ -779,12 +778,6 @@ public class NFA<T> {
 	 */
 	private static class ConditionContext<T> implements IterativeCondition.Context<T> {
 
-		/**
-		 * A flag indicating if we should recompute the matching pattern, so that
-		 * the {@link IterativeCondition iterative condition} can be evaluated.
-		 */
-		private boolean shouldUpdate;
-
 		/** The current computation state. */
 		private ComputationState computationState;
 
@@ -806,7 +799,6 @@ public class NFA<T> {
 			this.computationState = computationState;
 			this.nfa = nfa;
 			this.sharedBuffer = sharedBuffer;
-			this.shouldUpdate = true;
 		}
 
 		@Override
@@ -816,9 +808,8 @@ public class NFA<T> {
 			// the (partially) matched pattern is computed lazily when this method is called.
 			// this is to avoid any overheads when using a simple, non-iterative condition.
 
-			if (shouldUpdate) {
+			if (matchedEvents == null) {
 				this.matchedEvents = nfa.extractCurrentMatches(sharedBuffer, computationState);
-				shouldUpdate = false;
 			}
 
 			return new Iterable<T>() {