You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/04/06 21:08:43 UTC

[01/11] flink git commit: [FLINK-6268] [core] Object reuse for Either type [Forced Update!]

Repository: flink
Updated Branches:
  refs/heads/table-retraction ff2625089 -> 07a59ae0e (forced update)


[FLINK-6268] [core] Object reuse for Either type

Implement object reuse for Flink's Either type by storing a reference to
Right in Left and Left in Right. These references are private and remain
null until set by EitherSerializer when copying or deserializing with
object reuse.

This closes #3680


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae17718f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae17718f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae17718f

Branch: refs/heads/table-retraction
Commit: ae17718fb9262f65b89b4551d2aecaab3ee23d9e
Parents: c5173fa
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Apr 5 10:12:04 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Apr 6 12:55:49 2017 -0400

----------------------------------------------------------------------
 .../typeutils/runtime/EitherSerializer.java     |  51 +++-----
 .../java/org/apache/flink/types/Either.java     |  87 ++++++++++++-
 .../typeutils/runtime/EitherSerializerTest.java | 129 +++++++++++++++++--
 3 files changed, 227 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ae17718f/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
index 4066e9a..d9018da 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java
@@ -51,7 +51,7 @@ public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
 
 	@Override
 	public boolean isImmutableType() {
-		return leftSerializer.isImmutableType() && rightSerializer.isImmutableType();
+		return false;
 	}
 
 	@Override
@@ -91,23 +91,16 @@ public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
 
 	@Override
 	public Either<L, R> copy(Either<L, R> from, Either<L, R> reuse) {
-		if (from.isRight()) {
-			final R right = from.right();
-			if (reuse.isRight()) {
-				R copyRight = rightSerializer.copy(right, reuse.right());
-				return Right(copyRight);
-			}
-			else {
-				// if the reuse record isn't a right value, we cannot reuse
-				R copyRight = rightSerializer.copy(right);
-				return Right(copyRight);
-			}
-		}
-		else {
-			L left = from.left();
-			// reuse record is never a left value because we always create a right instance
-			L copyLeft = leftSerializer.copy(left);
-			return Left(copyLeft);
+		if (from.isLeft()) {
+			Left<L, R> to = Either.obtainLeft(reuse, leftSerializer);
+			L left = leftSerializer.copy(from.left(), to.left());
+			to.setValue(left);
+			return to;
+		} else {
+			Right<L, R> to = Either.obtainRight(reuse, rightSerializer);
+			R right = rightSerializer.copy(from.right(), to.right());
+			to.setValue(right);
+			return to;
 		}
 	}
 
@@ -142,18 +135,16 @@ public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
 	@Override
 	public Either<L, R> deserialize(Either<L, R> reuse, DataInputView source) throws IOException {
 		boolean isLeft = source.readBoolean();
-		if (!isLeft) {
-			if (reuse.isRight()) {
-				return Right(rightSerializer.deserialize(reuse.right(), source));
-			}
-			else {
-				// if the reuse record isn't a right value, we cannot reuse
-				return Right(rightSerializer.deserialize(source));
-			}
-		}
-		else {
-			// reuse record is never a left value because we always create a right instance
-			return Left(leftSerializer.deserialize(source));
+		if (isLeft) {
+			Left<L, R> to = Either.obtainLeft(reuse, leftSerializer);
+			L left = leftSerializer.deserialize(to.left(), source);
+			to.setValue(left);
+			return to;
+		} else {
+			Right<L, R> to = Either.obtainRight(reuse, rightSerializer);
+			R right = rightSerializer.deserialize(to.right(), source);
+			to.setValue(right);
+			return to;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae17718f/flink-core/src/main/java/org/apache/flink/types/Either.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Either.java b/flink-core/src/main/java/org/apache/flink/types/Either.java
index a08e968..80f62f6 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Either.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Either.java
@@ -18,9 +18,12 @@
 
 package org.apache.flink.types;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.EitherTypeInfoFactory;
+import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
 
 /**
  * This type represents a value of one two possible types, Left or Right (a
@@ -92,7 +95,9 @@ public abstract class Either<L, R> {
 	 *            the type of Right
 	 */
 	public static class Left<L, R> extends Either<L, R> {
-		private final L value;
+		private L value;
+
+		private Right<L, R> right;
 
 		public Left(L value) {
 			this.value = java.util.Objects.requireNonNull(value);
@@ -108,6 +113,15 @@ public abstract class Either<L, R> {
 			throw new IllegalStateException("Cannot retrieve Right value on a Left");
 		}
 
+		/**
+		 * Sets the encapsulated value to another value
+		 *
+		 * @param value the new value of the encapsulated value
+		 */
+		public void setValue(L value) {
+			this.value = value;
+		}
+
 		@Override
 		public boolean equals(Object object) {
 			if (object instanceof Left<?, ?>) {
@@ -145,7 +159,9 @@ public abstract class Either<L, R> {
 	 *            the type of Right
 	 */
 	public static class Right<L, R> extends Either<L, R> {
-		private final R value;
+		private R value;
+
+		private Left<L, R> left;
 
 		public Right(R value) {
 			this.value = java.util.Objects.requireNonNull(value);
@@ -161,6 +177,15 @@ public abstract class Either<L, R> {
 			return value;
 		}
 
+		/**
+		 * Sets the encapsulated value to another value
+		 *
+		 * @param value the new value of the encapsulated value
+		 */
+		public void setValue(R value) {
+			this.value = value;
+		}
+
 		@Override
 		public boolean equals(Object object) {
 			if (object instanceof Right<?, ?>) {
@@ -188,4 +213,62 @@ public abstract class Either<L, R> {
 			return new Right<L, R>(right);
 		}
 	}
+
+	/**
+	 * Utility function for {@link EitherSerializer} to support object reuse.
+	 *
+	 * To support object reuse both subclasses of Either contain a reference to
+	 * an instance of the other type. This method provides access to and
+	 * initializes the cross-reference.
+	 *
+	 * @param input container for Left or Right value
+	 * @param leftSerializer for creating an instance of the left type
+	 * @param <L>
+	 *            the type of Left
+	 * @param <R>
+	 *            the type of Right
+	 * @return input if Left type else input's Left reference
+	 */
+	@Internal
+	public static <L, R> Left<L, R> obtainLeft(Either<L, R> input, TypeSerializer<L> leftSerializer) {
+		if (input.isLeft()) {
+			return (Left<L, R>) input;
+		} else {
+			Right<L, R> right = (Right<L, R>) input;
+			if (right.left == null) {
+				right.left = Left.of(leftSerializer.createInstance());
+				right.left.right = right;
+			}
+			return right.left;
+		}
+	}
+
+	/**
+	 * Utility function for {@link EitherSerializer} to support object reuse.
+	 *
+	 * To support object reuse both subclasses of Either contain a reference to
+	 * an instance of the other type. This method provides access to and
+	 * initializes the cross-reference.
+	 *
+	 * @param input container for Left or Right value
+	 * @param rightSerializer for creating an instance of the right type
+	 * @param <L>
+	 *            the type of Left
+	 * @param <R>
+	 *            the type of Right
+	 * @return input if Right type else input's Right reference
+	 */
+	@Internal
+	public static <L, R> Right<L, R> obtainRight(Either<L, R> input, TypeSerializer<R> rightSerializer) {
+		if (input.isRight()) {
+			return (Right<L, R>) input;
+		} else {
+			Left<L, R> left = (Left<L, R>) input;
+			if (left.right == null) {
+				left.right = Right.of(rightSerializer.createInstance());
+				left.right.left = left;
+			}
+			return left.right;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ae17718f/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
index acf0d2e..2bf6abc 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
@@ -18,21 +18,30 @@
 
 package org.apache.flink.api.java.typeutils.runtime;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.apache.flink.types.Either.Left;
-import static org.apache.flink.types.Either.Right;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase.TestInputView;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase.TestOutputView;
 import org.apache.flink.api.common.typeutils.SerializerTestInstance;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.Either;
 import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
 import org.junit.Test;
 
+import java.io.IOException;
+
+import static junit.framework.TestCase.assertSame;
+import static org.apache.flink.types.Either.Left;
+import static org.apache.flink.types.Either.Right;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
 public class EitherSerializerTest {
 
 	@SuppressWarnings("unchecked")
@@ -55,6 +64,26 @@ public class EitherSerializerTest {
 	testInstance.testAll();
 	}
 
+	@Test
+	public void testStringValueDoubleValueEither() {
+		@SuppressWarnings("unchecked")
+		Either<StringValue, DoubleValue>[] testData = new Either[] {
+			Left(new StringValue("banana")),
+			Left.of(new StringValue("apple")),
+			new Left(new StringValue("")),
+			Right(new DoubleValue(32.0)),
+			Right.of(new DoubleValue(Double.MIN_VALUE)),
+			new Right(new DoubleValue(Double.MAX_VALUE))};
+
+		EitherTypeInfo<StringValue, DoubleValue> eitherTypeInfo = new EitherTypeInfo<>(
+			ValueTypeInfo.STRING_VALUE_TYPE_INFO, ValueTypeInfo.DOUBLE_VALUE_TYPE_INFO);
+		EitherSerializer<StringValue, DoubleValue> eitherSerializer =
+			(EitherSerializer<StringValue, DoubleValue>) eitherTypeInfo.createSerializer(new ExecutionConfig());
+		SerializerTestInstance<Either<StringValue, DoubleValue>> testInstance =
+			new EitherSerializerTestInstance<>(eitherSerializer, eitherTypeInfo.getTypeClass(), -1, testData);
+		testInstance.testAll();
+	}
+
 	@SuppressWarnings("unchecked")
 	@Test
 	public void testEitherWithTuple() {
@@ -78,6 +107,90 @@ public class EitherSerializerTest {
 	testInstance.testAll();
 	}
 
+	@Test
+	public void testEitherWithTupleValues() {
+		@SuppressWarnings("unchecked")
+		Either<Tuple2<LongValue, LongValue>, DoubleValue>[] testData = new Either[] {
+			Left(new Tuple2<>(new LongValue(2L), new LongValue(9L))),
+			new Left<>(new Tuple2<>(new LongValue(Long.MIN_VALUE), new LongValue(Long.MAX_VALUE))),
+			new Right<>(new DoubleValue(32.0)),
+			Right(new DoubleValue(Double.MIN_VALUE)),
+			Right(new DoubleValue(Double.MAX_VALUE))};
+
+		EitherTypeInfo<Tuple2<LongValue, LongValue>, DoubleValue> eitherTypeInfo = new EitherTypeInfo<>(
+			new TupleTypeInfo<Tuple2<LongValue, LongValue>>(ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.LONG_VALUE_TYPE_INFO),
+			ValueTypeInfo.DOUBLE_VALUE_TYPE_INFO);
+		EitherSerializer<Tuple2<LongValue, LongValue>, DoubleValue> eitherSerializer =
+			(EitherSerializer<Tuple2<LongValue, LongValue>, DoubleValue>) eitherTypeInfo.createSerializer(new ExecutionConfig());
+		SerializerTestInstance<Either<Tuple2<LongValue, LongValue>, DoubleValue>> testInstance =
+			new EitherSerializerTestInstance<>(eitherSerializer, eitherTypeInfo.getTypeClass(), -1, testData);
+		testInstance.testAll();
+	}
+
+	@Test
+	public void testEitherWithObjectReuse() {
+		EitherTypeInfo<LongValue, DoubleValue> eitherTypeInfo = new EitherTypeInfo<>(
+			ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.DOUBLE_VALUE_TYPE_INFO);
+		EitherSerializer<LongValue, DoubleValue> eitherSerializer =
+			(EitherSerializer<LongValue, DoubleValue>) eitherTypeInfo.createSerializer(new ExecutionConfig());
+
+		LongValue lv = new LongValue();
+		DoubleValue dv = new DoubleValue();
+
+		Either<LongValue, DoubleValue> left = Left(lv);
+		Either<LongValue, DoubleValue> right = Right(dv);
+
+		// the first copy creates a new instance of Left
+		Either<LongValue, DoubleValue> copy0 = eitherSerializer.copy(left, right);
+
+		// then the cross-references are used for future copies
+		Either<LongValue, DoubleValue> copy1 = eitherSerializer.copy(right, copy0);
+		Either<LongValue, DoubleValue> copy2 = eitherSerializer.copy(left, copy1);
+
+		// validate reference equality
+		assertSame(right, copy1);
+		assertSame(copy0, copy2);
+
+		// validate reference equality of contained objects
+		assertSame(right.right(), copy1.right());
+		assertSame(copy0.left(), copy2.left());
+	}
+
+	@Test
+	public void testSerializeIndividually() throws IOException {
+		EitherTypeInfo<LongValue, DoubleValue> eitherTypeInfo = new EitherTypeInfo<>(
+			ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.DOUBLE_VALUE_TYPE_INFO);
+		EitherSerializer<LongValue, DoubleValue> eitherSerializer =
+			(EitherSerializer<LongValue, DoubleValue>) eitherTypeInfo.createSerializer(new ExecutionConfig());
+
+		LongValue lv = new LongValue();
+		DoubleValue dv = new DoubleValue();
+
+		Either<LongValue, DoubleValue> left = Left(lv);
+		Either<LongValue, DoubleValue> right = Right(dv);
+
+		TestOutputView out = new TestOutputView();
+		eitherSerializer.serialize(left, out);
+		eitherSerializer.serialize(right, out);
+		eitherSerializer.serialize(left, out);
+
+		TestInputView in = out.getInputView();
+		// the first deserialization creates a new instance of Left
+		Either<LongValue, DoubleValue> copy0 = eitherSerializer.deserialize(right, in);
+
+		// then the cross-references are used for future copies
+		Either<LongValue, DoubleValue> copy1 = eitherSerializer.deserialize(copy0, in);
+		Either<LongValue, DoubleValue> copy2 = eitherSerializer.deserialize(copy1, in);
+
+		// validate reference equality
+		assertSame(right, copy1);
+		assertSame(copy0, copy2);
+
+		// validate reference equality of contained objects
+		assertSame(right.right(), copy1.right());
+		assertSame(copy0.left(), copy2.left());
+	}
+
 	/**
 	 * {@link org.apache.flink.api.common.typeutils.SerializerTestBase#testInstantiate()}
 	 * checks that the type of the created instance is the same as the type class parameter.
@@ -98,7 +211,7 @@ public class EitherSerializerTest {
 
 				T instance = serializer.createInstance();
 				assertNotNull("The created instance must not be null.", instance);
-				
+
 				Class<T> type = getTypeClass();
 				assertNotNull("The test is corrupt: type class is null.", type);
 			}
@@ -108,6 +221,6 @@ public class EitherSerializerTest {
 				fail("Exception in test: " + e.getMessage());
 			}
 		}
-		
+
 	}
 }


[10/11] flink git commit: [FLINK-6216] [table] Add non-windowed GroupBy aggregation for streams.

Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
index fde7682..c44443e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
@@ -181,11 +181,11 @@ class GroupWindowTest extends TableTestBase {
       .select('string.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         unaryNode(
-          "DataStreamAggregate",
+          "DataStreamGroupWindowAggregate",
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
@@ -222,7 +222,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -251,7 +251,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -279,7 +279,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -299,7 +299,7 @@ class GroupWindowTest extends TableTestBase {
   }
 
   @Test
-  @Ignore // see comments in DataStreamAggregate
+  @Ignore // see comments in DataStreamGroupWindowAggregate
   def testEventTimeTumblingGroupWindowOverCount(): Unit = {
     val util = streamTestUtil()
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
@@ -310,7 +310,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       streamTableNode(0),
       term("groupBy", "string"),
       term(
@@ -335,7 +335,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -364,7 +364,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -393,7 +393,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -412,7 +412,7 @@ class GroupWindowTest extends TableTestBase {
   }
 
   @Test
-  @Ignore // see comments in DataStreamAggregate
+  @Ignore // see comments in DataStreamGroupWindowAggregate
   def testEventTimeSlidingGroupWindowOverCount(): Unit = {
     val util = streamTestUtil()
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
@@ -423,7 +423,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       streamTableNode(0),
       term("groupBy", "string"),
       term(
@@ -448,7 +448,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -477,7 +477,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -506,7 +506,7 @@ class GroupWindowTest extends TableTestBase {
       .select('int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -534,7 +534,7 @@ class GroupWindowTest extends TableTestBase {
       .select('int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -552,7 +552,7 @@ class GroupWindowTest extends TableTestBase {
   }
 
   @Test
-  @Ignore // see comments in DataStreamAggregate
+  @Ignore // see comments in DataStreamGroupWindowAggregate
   def testAllEventTimeTumblingGroupWindowOverCount(): Unit = {
     val util = streamTestUtil()
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
@@ -563,7 +563,7 @@ class GroupWindowTest extends TableTestBase {
       .select('int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -592,7 +592,7 @@ class GroupWindowTest extends TableTestBase {
       .select('int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -620,7 +620,7 @@ class GroupWindowTest extends TableTestBase {
       .select('int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -648,7 +648,7 @@ class GroupWindowTest extends TableTestBase {
       .select('int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -666,7 +666,7 @@ class GroupWindowTest extends TableTestBase {
   }
 
   @Test
-  @Ignore // see comments in DataStreamAggregate
+  @Ignore // see comments in DataStreamGroupWindowAggregate
   def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
     val util = streamTestUtil()
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
@@ -677,7 +677,7 @@ class GroupWindowTest extends TableTestBase {
       .select('int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -705,7 +705,7 @@ class GroupWindowTest extends TableTestBase {
       .select('int.count)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -733,7 +733,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -766,7 +766,7 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val expected = unaryNode(
-      "DataStreamAggregate",
+      "DataStreamGroupWindowAggregate",
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
@@ -802,7 +802,7 @@ class GroupWindowTest extends TableTestBase {
     val expected = unaryNode(
       "DataStreamCalc",
       unaryNode(
-        "DataStreamAggregate",
+        "DataStreamGroupWindowAggregate",
         unaryNode(
           "DataStreamCalc",
           streamTableNode(0),
@@ -840,7 +840,7 @@ class GroupWindowTest extends TableTestBase {
     val expected = unaryNode(
       "DataStreamCalc",
       unaryNode(
-        "DataStreamAggregate",
+        "DataStreamGroupWindowAggregate",
         unaryNode(
           "DataStreamCalc",
           streamTableNode(0),

http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
index b6a6660..c72249a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
@@ -28,13 +28,6 @@ import org.junit.Test
 class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
 
   @Test(expected = classOf[ValidationException])
-  def testSelectWithAggregation(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1.min)
-  }
-
-  @Test(expected = classOf[ValidationException])
   def testDistinct(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)


[06/11] flink git commit: [FLINK-6270] port some memory and network task manager options to ConfigOption

Posted by fh...@apache.org.
[FLINK-6270] port some memory and network task manager options to ConfigOption

This closes #3683.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2a4f47e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2a4f47e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2a4f47e

Branch: refs/heads/table-retraction
Commit: e2a4f47ed8c95c7045f79bf9fe59cab39518710b
Parents: d1d761e
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Apr 5 11:45:57 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 6 19:35:50 2017 +0200

----------------------------------------------------------------------
 .../kafka/KafkaShortRetentionTestBase.java      |  3 +-
 .../connectors/kafka/KafkaTestBase.java         |  3 +-
 .../flink/storm/api/FlinkLocalCluster.java      |  3 +-
 .../flink/configuration/ConfigConstants.java    | 44 ++++++++++++++++----
 .../flink/configuration/TaskManagerOptions.java | 43 +++++++++++++++++++
 .../ContaineredTaskManagerParameters.java       | 12 ++----
 .../io/network/buffer/NetworkBufferPool.java    |  4 +-
 .../partition/SpillableSubpartition.java        | 11 +++--
 .../minicluster/MiniClusterConfiguration.java   | 32 +++++---------
 .../TaskManagerServicesConfiguration.java       | 34 +++++++--------
 .../minicluster/LocalFlinkMiniCluster.scala     | 20 ++++-----
 .../PartialConsumePipelinedResultTest.java      |  3 +-
 .../runtime/jobmanager/JobManagerTest.java      |  4 +-
 .../TaskCancelAsyncProducerConsumerITCase.java  |  5 ++-
 .../TaskManagerProcessReapingTestBase.java      |  5 ++-
 .../taskmanager/TaskManagerStartupTest.java     | 11 ++---
 .../runtime/taskmanager/TaskManagerTest.java    |  2 +-
 .../runtime/testutils/TaskManagerProcess.java   | 10 ++---
 .../runtime/testingUtils/TestingUtils.scala     |  4 +-
 .../Flip6LocalStreamEnvironment.java            |  4 +-
 .../api/environment/LocalStreamEnvironment.java |  3 +-
 .../apache/flink/test/util/TestBaseUtils.java   |  3 +-
 .../accumulators/AccumulatorErrorITCase.java    |  3 +-
 .../test/cancelling/CancelingTestBase.java      |  5 ++-
 ...tractEventTimeWindowCheckpointingITCase.java |  3 +-
 .../EventTimeAllWindowCheckpointingITCase.java  |  3 +-
 .../test/checkpointing/SavepointITCase.java     |  3 +-
 .../StreamFaultToleranceTestBase.java           |  3 +-
 .../WindowCheckpointingITCase.java              |  3 +-
 .../JobSubmissionFailsITCase.java               |  3 +-
 .../test/manual/NotSoMiniClusterIterations.java |  7 ++--
 .../manual/StreamingScalabilityAndLatency.java  |  5 ++-
 .../test/misc/CustomSerializationITCase.java    |  3 +-
 .../test/misc/MiscellaneousIssuesITCase.java    |  3 +-
 ...SuccessAfterNetworkBuffersFailureITCase.java |  5 ++-
 .../query/AbstractQueryableStateITCase.java     |  3 +-
 ...ctTaskManagerProcessFailureRecoveryTest.java |  5 ++-
 ...agerHAProcessFailureBatchRecoveryITCase.java |  5 ++-
 .../TaskManagerFailureRecoveryITCase.java       |  3 +-
 .../flink/test/runtime/IPv6HostnamesITCase.java |  3 +-
 .../test/streaming/runtime/TimestampITCase.java |  3 +-
 .../flink/test/web/WebFrontendITCase.java       |  3 +-
 42 files changed, 209 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index 1e85370..954dc7d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -98,7 +99,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
 		// start also a re-usable Flink mini cluster
 		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 
 		flink = new LocalFlinkMiniCluster(flinkConfig, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 5cec4f0..0c6bfa9 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -113,7 +114,7 @@ public abstract class KafkaTestBase extends TestLogger {
 		Configuration flinkConfig = new Configuration();
 		flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
index 367b313..d69d345 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.storm.api;
 
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.generated.ClusterSummary;
 import org.apache.storm.generated.KillOptions;
@@ -92,7 +93,7 @@ public class FlinkLocalCluster {
 			Configuration configuration = new Configuration();
 			configuration.addAll(jobGraph.getJobConfiguration());
 
-			configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+			configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
 			configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
 
 			this.flink = new LocalFlinkMiniCluster(configuration, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index ce44ab8..de06b59 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -210,34 +210,52 @@ public final class ConfigConstants {
 	 * The config parameter defining the amount of memory to be allocated by the task manager's
 	 * memory manager (in megabytes). If not set, a relative fraction will be allocated, as defined
 	 * by {@link #TASK_MANAGER_MEMORY_FRACTION_KEY}.
+	 *
+	 * @deprecated Use {@link TaskManagerOptions#MANAGED_MEMORY_SIZE} instead
 	 */
+	@Deprecated
 	public static final String TASK_MANAGER_MEMORY_SIZE_KEY = "taskmanager.memory.size";
 	
 	/**
 	 * The config parameter defining the fraction of free memory allocated by the memory manager.
+	 *
+	 * @deprecated Use {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION} instead
 	 */
+	@Deprecated
 	public static final String TASK_MANAGER_MEMORY_FRACTION_KEY = "taskmanager.memory.fraction";
 
 	/**
 	 * The config parameter defining the memory allocation method (JVM heap or off-heap).
-	*/
+	 *
+	 * @deprecated Use {@link TaskManagerOptions#MEMORY_OFF_HEAP} instead
+	 */
+	@Deprecated
 	public static final String TASK_MANAGER_MEMORY_OFF_HEAP_KEY = "taskmanager.memory.off-heap";
 
 	/**
 	 * The config parameter for specifying whether TaskManager managed memory should be preallocated
 	 * when the TaskManager is starting. (default is false)
+	 *
+	 * @deprecated Use {@link TaskManagerOptions#MANAGED_MEMORY_PRE_ALLOCATE} instead
 	 */
+	@Deprecated
 	public static final String TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY = "taskmanager.memory.preallocate";
 
 	/**
 	 * The config parameter defining the number of buffers used in the network stack. This defines the
 	 * number of possible tasks and shuffles.
+	 *
+	 * @deprecated Use {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} instead
 	 */
+	@Deprecated
 	public static final String TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY = "taskmanager.network.numberOfBuffers";
 
 	/**
 	 * Config parameter defining the size of memory buffers used by the network stack and the memory manager.
+	 *
+	 * @deprecated Use {@link TaskManagerOptions#MEMORY_SEGMENT_SIZE} instead
 	 */
+	@Deprecated
 	public static final String TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY = "taskmanager.memory.segment-size";
 	
 	/**
@@ -1126,20 +1144,29 @@ public final class ConfigConstants {
 	 * The default directory for temporary files of the task manager.
 	 */
 	public static final String DEFAULT_TASK_MANAGER_TMP_PATH = System.getProperty("java.io.tmpdir");
-	
+
 	/**
-	 * The default fraction of the free memory allocated by the task manager's memory manager.
+	 * Config key has been deprecated. Therefore, no default value required.
+	 *
+	 * @deprecated {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION} provides the default value now
 	 */
+	@Deprecated
 	public static final float DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION = 0.7f;
-	
+
 	/**
-	 * Default number of buffers used in the network stack.
+	 * Config key has been deprecated. Therefore, no default value required.
+	 *
+	 * @deprecated {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} provides the default value now
 	 */
+	@Deprecated
 	public static final int DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS = 2048;
 
 	/**
-	 * Default size of memory segments in the network stack and the memory manager.
+	 * Config key has been deprecated. Therefore, no default value required.
+	 *
+	 * @deprecated {@link TaskManagerOptions#MEMORY_SEGMENT_SIZE} provides the default value now
 	 */
+	@Deprecated
 	public static final int DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE = 32768;
 
 	/**
@@ -1179,8 +1206,11 @@ public final class ConfigConstants {
 	public static final String DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = "10 s";
 
 	/**
-	 * The default setting for TaskManager memory eager allocation of managed memory
+	 * Config key has been deprecated. Therefore, no default value required.
+	 *
+	 * @deprecated {@link TaskManagerOptions#MANAGED_MEMORY_PRE_ALLOCATE} provides the default value now
 	 */
+	@Deprecated
 	public static final boolean DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE = false;
 
 	/** @deprecated Please use {@link TaskManagerOptions#TASK_CANCELLATION_INTERVAL}. */

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index b891e35..adfc8e9 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -39,10 +39,53 @@ public class TaskManagerOptions {
 			key("taskmanager.jvm-exit-on-oom")
 			.defaultValue(false);
 
+	/** Size of memory buffers used by the network stack and the memory manager (in bytes). */
+	public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
+			key("taskmanager.memory.segment-size")
+			.defaultValue(32768);
+
+	/**
+	 * Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not
+	 * set, a relative fraction will be allocated, as defined by {@link #MANAGED_MEMORY_FRACTION}.
+	 */
+	public static final ConfigOption<Long> MANAGED_MEMORY_SIZE =
+			key("taskmanager.memory.size")
+			.defaultValue(-1L);
+
+	/**
+	 * Fraction of free memory allocated by the memory manager if {@link #MANAGED_MEMORY_SIZE} is
+	 * not set.
+	 */
+	public static final ConfigOption<Float> MANAGED_MEMORY_FRACTION =
+			key("taskmanager.memory.fraction")
+			.defaultValue(0.7f);
+
+	/**
+	 * Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager
+	 * as well as the network buffers.
+	 **/
+	public static final ConfigOption<Boolean> MEMORY_OFF_HEAP =
+			key("taskmanager.memory.off-heap")
+			.defaultValue(false);
+
+	/** Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting. */
+	public static final ConfigOption<Boolean> MANAGED_MEMORY_PRE_ALLOCATE =
+			key("taskmanager.memory.preallocate")
+			.defaultValue(false);
+
 	// ------------------------------------------------------------------------
 	//  Network Options
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Number of buffers used in the network stack. This defines the number of possible tasks and
+	 * shuffles.
+	 */
+	public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
+			key("taskmanager.network.numberOfBuffers")
+			.defaultValue(2048);
+
+
 	/** Minimum backoff for partition requests of input channels. */
 	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
 			key("taskmanager.net.request-backoff.initial")

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index 3dc4394..0fc0870 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -142,19 +143,14 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 
 		// (2) split the Java memory between heap and off-heap
 
-		final boolean useOffHeap = config.getBoolean(
-			ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false);
+		final boolean useOffHeap = config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP);
 
 		final long heapSizeMB;
 		if (useOffHeap) {
-			long offHeapSize = config.getLong(
-				ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+			long offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
 
 			if (offHeapSize <= 0) {
-				double fraction = config.getFloat(
-					ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-					ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
-
+				double fraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
 
 				offHeapSize = (long) (fraction * javaMemorySizeMB);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 5f2da03..a36bdf4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemoryType;
@@ -199,7 +199,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 						numRequiredBuffers,
 						totalNumberOfMemorySegments - numTotalRequiredBuffers,
 						totalNumberOfMemorySegments,
-						ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY));
+						TaskManagerOptions.NETWORK_NUM_BUFFERS.key()));
 			}
 
 			this.numTotalRequiredBuffers += numRequiredBuffers;

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index ad04e97..ae97c42 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -50,9 +50,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * this state, different reader variants are returned (see
  * {@link SpillableSubpartitionView} and {@link SpilledSubpartitionView}).
  *
- * <p>Since the network buffer pool size is usually quite small (default is
- * {@link ConfigConstants#DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS}), most
- * spillable partitions will be spilled for real-world data sets.
+ * <p>Since the network buffer pool size for outgoing partitions is usually
+ * quite small, e.g. via the {@link TaskManagerOptions#NETWORK_BUFFERS_PER_CHANNEL}
+ * and {@link TaskManagerOptions#NETWORK_EXTRA_BUFFERS_PER_GATE} parameters
+ * for bounded channels or from the default value of
+ * {@link TaskManagerOptions#NETWORK_NUM_BUFFERS}, most spillable partitions
+ * will be spilled for real-world data sets.
  */
 class SpillableSubpartition extends ResultSubpartition {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 3a03ca3..823b3f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -20,9 +20,8 @@ package org.apache.flink.runtime.minicluster;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import scala.concurrent.duration.FiniteDuration;
@@ -165,7 +164,7 @@ public class MiniClusterConfiguration {
 		Configuration newConfiguration = new Configuration(config);
 		// set the memory
 		long memory = getOrCalculateManagedMemoryPerTaskManager();
-		newConfiguration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memory);
+		newConfiguration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memory);
 
 		return newConfiguration;
 	}
@@ -196,29 +195,20 @@ public class MiniClusterConfiguration {
 	private long getOrCalculateManagedMemoryPerTaskManager() {
 		if (managedMemoryPerTaskManager == -1) {
 			// no memory set in the mini cluster configuration
-			final ConfigOption<Integer> memorySizeOption = ConfigOptions
-				.key(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY)
-				.defaultValue(-1);
 
-			int memorySize = config.getInteger(memorySizeOption);
+			long memorySize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
 
-			if (memorySize == -1) {
+			// we could probably use config.contains() but the previous implementation compared to
+			// the default (-1) thus allowing the user to explicitly specify this as well
+			// -> don't change this behaviour now
+			if (memorySize == TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) {
 				// no memory set in the flink configuration
 				// share the available memory among all running components
-				final ConfigOption<Integer> bufferSizeOption = ConfigOptions
-					.key(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY)
-					.defaultValue(ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
 
-				final ConfigOption<Long> bufferMemoryOption = ConfigOptions
-					.key(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)
-					.defaultValue((long) ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
-
-				final ConfigOption<Float> memoryFractionOption = ConfigOptions
-					.key(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY)
-					.defaultValue(ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
-
-				float memoryFraction = config.getFloat(memoryFractionOption);
-				long networkBuffersMemory = config.getLong(bufferMemoryOption) * config.getInteger(bufferSizeOption);
+				float memoryFraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
+				long networkBuffersMemory =
+					(long) config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS) *
+						(long) config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
 
 				long freeMemory = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 8ad318a..366be34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -182,22 +182,20 @@ public class TaskManagerServicesConfiguration {
 				parseQueryableStateConfiguration(configuration);
 
 		// extract memory settings
-		long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
-		checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory,
-			ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY,
+		long configuredMemory = configuration.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
+		checkConfigParameter(
+			configuredMemory == TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue() ||
+				configuredMemory > 0, configuredMemory,
+			TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
 			"MemoryManager needs at least one MB of memory. " +
 				"If you leave this config parameter empty, the system automatically " +
 				"pick a fraction of the available memory.");
 
-		boolean preAllocateMemory = configuration.getBoolean(
-			ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE);
+		boolean preAllocateMemory = configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE);
 
-		float memoryFraction = configuration.getFloat(
-			ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-			ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
+		float memoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
 		checkConfigParameter(memoryFraction > 0.0f && memoryFraction < 1.0f, memoryFraction,
-			ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+			TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
 			"MemoryManager fraction of the free memory must be between 0.0 and 1.0");
 
 		final MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
@@ -247,30 +245,26 @@ public class TaskManagerServicesConfiguration {
 		checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
 			"Number of task slots must be at least one.");
 
-		final int numNetworkBuffers = configuration.getInteger(
-			ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
+		final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
 
 		checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers,
-			ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, "");
+			TaskManagerOptions.NETWORK_NUM_BUFFERS.key(), "");
 
-		final int pageSize = configuration.getInteger(
-			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE);
+		final int pageSize = configuration.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE);
 
 		// check page size of for minimum size
 		checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
-			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+			TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
 			"Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);
 
 		// check page size for power of two
 		checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
-			ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+			TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
 			"Memory segment size must be a power of 2.");
 
 		// check whether we use heap or off-heap memory
 		final MemoryType memType;
-		if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) {
+		if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
 			memType = MemoryType.OFF_HEAP;
 		} else {
 			memType = MemoryType.HEAP;

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 21e0d28..3d43da5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.{Executor, ScheduledExecutorService}
 import akka.actor.{ActorRef, ActorSystem, Props}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.io.FileOutputFormat
-import org.apache.flink.configuration.{ConfigConstants, Configuration, QueryableStateOptions}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, QueryableStateOptions, TaskManagerOptions}
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
@@ -350,23 +350,19 @@ class LocalFlinkMiniCluster(
 
   def setMemory(config: Configuration): Unit = {
     // set this only if no memory was pre-configured
-    if (config.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1) == -1) {
+    if (config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE) ==
+        TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) {
 
-      val bufferSize: Int = config.getInteger(
-        ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-        ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)
+      val bufferSize: Int = config.getInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE)
 
-      val bufferMem: Long = config.getLong(
-        ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
-        ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS) * bufferSize.toLong
+      val bufferMem: Long = config.getInteger(
+        TaskManagerOptions.NETWORK_NUM_BUFFERS).toLong * bufferSize.toLong
 
       val numTaskManager = config.getInteger(
         ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
         ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER)
 
-      val memoryFraction = config.getFloat(
-        ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
-        ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
+      val memoryFraction = config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION)
 
       // full memory size
       var memorySize: Long = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag
@@ -379,7 +375,7 @@ class LocalFlinkMiniCluster(
       memorySize -= bufferMem
       memorySize = (memorySize * memoryFraction).toLong
       memorySize >>>= 20 // bytes to megabytes
-      config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize)
+      config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memorySize)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 4a826b7..f19ca4e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -51,7 +52,7 @@ public class PartialConsumePipelinedResultTest {
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUMBER_OF_TMS);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUMBER_OF_SLOTS_PER_TM);
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
-		config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, NUMBER_OF_NETWORK_BUFFERS);
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS);
 
 		flink = new TestingCluster(config, true);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 3944752c..4dec84b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -26,6 +26,7 @@ import com.typesafe.config.Config;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
@@ -105,7 +106,6 @@ import scala.reflect.ClassTag$;
 import java.io.File;
 import java.net.InetAddress;
 import java.util.Collections;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
@@ -588,7 +588,7 @@ public class JobManagerTest extends TestLogger {
 				AkkaUtils.getAkkaURL(system, jobManager.actor()));
 
 		Configuration tmConfig = new Configuration();
-		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+		tmConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
 
 		ActorRef taskManager = TaskManager.startTaskManagerComponentsAndActor(

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index 5a14b40..4ea6511 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -78,8 +79,8 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 8);
+			config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
+			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8);
 
 			flink = new TestingCluster(config, true);
 			flink.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index 2528e24..2fafe5b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
@@ -228,8 +229,8 @@ public abstract class TaskManagerProcessReapingTestBase {
 				Configuration cfg = new Configuration();
 				cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 				cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 256);
+				cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+				cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 256);
 
 				TaskManager.runTaskManager("localhost", ResourceID.generate(), taskManagerPort, cfg);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index b2a905d..4df8db3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -24,6 +24,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.StartupUtils;
@@ -113,7 +114,7 @@ public class TaskManagerStartupTest {
 		try {
 			Configuration cfg = new Configuration();
 			cfg.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, nonWritable.getAbsolutePath());
-			cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 			cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 			cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 21656);
 
@@ -154,7 +155,7 @@ public class TaskManagerStartupTest {
 			cfg.setString(ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, "true");
 
 			// something invalid
-			cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -42);
+			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -42L);
 			try {
 				TaskManager.runTaskManager("localhost", ResourceID.generate(), 0, cfg);
 				fail("Should fail synchronously with an exception");
@@ -165,8 +166,8 @@ public class TaskManagerStartupTest {
 
 			// something ridiculously high
 			final long memSize = (((long) Integer.MAX_VALUE - 1) *
-					ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE) >> 20;
-			cfg.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memSize);
+					TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue()) >> 20;
+			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, memSize);
 			try {
 				TaskManager.runTaskManager("localhost", ResourceID.generate(), 0, cfg);
 				fail("Should fail synchronously with an exception");
@@ -197,7 +198,7 @@ public class TaskManagerStartupTest {
 			final Configuration cfg = new Configuration();
 			cfg.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
 			cfg.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, blocker.getLocalPort());
-			cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
+			cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 1L);
 
 			TaskManager.startTaskManagerComponentsAndActor(
 				cfg,

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index a754cff..4530ade 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -1498,7 +1498,7 @@ public class TaskManagerTest extends TestLogger {
 			// set the memory segment to the smallest size possible, because we have to fill one
 			// memory buffer to trigger the schedule or update consumers message to the downstream
 			// operators
-			configuration.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096);
+			configuration.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
 
 			final JobID jid = new JobID();
 			final JobVertexID vid = new JobVertexID();

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
index 58bc50e..36eb47f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.slf4j.Logger;
@@ -102,12 +102,12 @@ public class TaskManagerProcess extends TestJvmProcess {
 			try {
 				Configuration config = ParameterTool.fromArgs(args).getConfiguration();
 
-				if (!config.containsKey(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY)) {
-					config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+				if (!config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
+					config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 				}
 
-				if (!config.containsKey(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY)) {
-					config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+				if (!config.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+					config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
 				}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index d6221f5..49baff1 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -28,7 +28,7 @@ import com.google.common.util.concurrent.MoreExecutors
 import com.typesafe.config.ConfigFactory
 import grizzled.slf4j.Logger
 import org.apache.flink.api.common.JobExecutionResult
-import org.apache.flink.configuration.{ConfigConstants, Configuration, HighAvailabilityOptions}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, HighAvailabilityOptions, TaskManagerOptions}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.client.JobClient
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
@@ -304,7 +304,7 @@ object TestingUtils {
 
     val resultingConfiguration = new Configuration()
 
-    resultingConfiguration.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10)
+    resultingConfiguration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10L)
 
     resultingConfiguration.addAll(configuration)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index 4a5f20d..244660a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.minicluster.MiniCluster;
@@ -98,7 +98,7 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 
 		Configuration configuration = new Configuration();
 		configuration.addAll(jobGraph.getJobConfiguration());
-		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+		configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
 
 		// add (and override) the settings with what the user defined
 		configuration.addAll(this.conf);

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index cb60552..117f6d8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
@@ -113,7 +114,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 		Configuration configuration = new Configuration();
 		configuration.addAll(jobGraph.getJobConfiguration());
 
-		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+		configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
 		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
 		
 		// add (and override) the settings with what the user defined

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index cc7c0e2..f96ab3d 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -141,7 +142,7 @@ public class TestBaseUtils extends TestLogger {
 		Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
 		Files.createFile(new File(logDir, "jobmanager.out").toPath());
 
-		config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, TASK_MANAGER_MEMORY_SIZE);
 		config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, true);
 
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s");

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
index cc70fee..0303202 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.AfterClass;
@@ -52,7 +53,7 @@ public class AccumulatorErrorITCase {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
 			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
index 8d8ee64..06233d6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java
@@ -22,6 +22,7 @@ package org.apache.flink.test.cancelling;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
@@ -88,8 +89,8 @@ public abstract class CancelingTestBase extends TestLogger {
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
 		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
-		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 4096);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 2048);
+		config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);
 
 		this.executor = new LocalFlinkMiniCluster(config, false);
 		this.executor.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 5fc2083..462d3a4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -99,7 +100,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
-		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L);
 
 		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 09c1437..3345b9c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -70,7 +71,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
-		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L);
 		config.setString(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT, "60 s");
 		config.setString(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT, "60 s");
 		cluster = new LocalFlinkMiniCluster(config, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index a5c994a..3718a94 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
@@ -783,7 +784,7 @@ public class SavepointITCase extends TestLogger {
 
 		Configuration config = new Configuration();
 		config.addAll(jobGraph.getJobConfiguration());
-		config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2 * jobGraph.getMaximumParallelism());
 		final File checkpointDir = new File(tmpDir, "checkpoints");
 		final File savepointDir = new File(tmpDir, "savepoints");

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 10f78d4..2839bc1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.TestUtils;
@@ -52,7 +53,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
 			
 			cluster = new LocalFlinkMiniCluster(config, false);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index a45349d..56d8c66 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -81,7 +82,7 @@ public class WindowCheckpointingITCase extends TestLogger {
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
-		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 48);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L);
 
 		cluster = new LocalFlinkMiniCluster(config, false);
 		cluster.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index 256b1ae..93ab29d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.failingPrograms;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -54,7 +55,7 @@ public class JobSubmissionFailsITCase {
 	public static void setup() {
 		try {
 			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
index bd9955c..ee3b4b2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -50,10 +51,10 @@ public class NotSoMiniClusterIterations {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, PARALLELISM);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 8);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 8L);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 1000);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, 8 * 1024);
+			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1000);
+			config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 8 * 1024);
 			
 			config.setInteger("taskmanager.net.server.numThreads", 1);
 			config.setInteger("taskmanager.net.client.numThreads", 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
index ec617b1..90dbe80 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -46,9 +47,9 @@ public class StreamingScalabilityAndLatency {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, TASK_MANAGERS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 80);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TASK_MANAGER);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 20000);
+			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 20000);
 
 			config.setInteger("taskmanager.net.server.numThreads", 1);
 			config.setInteger("taskmanager.net.client.numThreads", 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
index 51f3534..fda731e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -50,7 +51,7 @@ public class CustomSerializationITCase {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARLLELISM);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 30);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 30L);
 			cluster = new LocalFlinkMiniCluster(config, false);
 			cluster.start();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
index 06b93ea..d9cf574 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
@@ -60,7 +61,7 @@ public class MiscellaneousIssuesITCase {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 3);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
 			cluster = new LocalFlinkMiniCluster(config, false);
 
 			cluster.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index a43bab6..5761bf2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.examples.java.clustering.KMeans;
 import org.apache.flink.examples.java.clustering.util.KMeansData;
 import org.apache.flink.examples.java.graph.ConnectedComponents;
@@ -48,9 +49,9 @@ public class SuccessAfterNetworkBuffersFailureITCase {
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 80);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 840);
+			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 840);
 			
 			cluster = new LocalFlinkMiniCluster(config, false);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
index 1912c0f..3c8eb48 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -109,7 +110,7 @@ public abstract class AbstractQueryableStateITCase extends TestLogger {
 	public static void setup() {
 		try {
 			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
 			config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 515570d..27d1aa1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -26,6 +26,7 @@ import akka.util.Timeout;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -389,8 +390,8 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 				Configuration cfg = new Configuration();
 				cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 				cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
-				cfg.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+				cfg.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+				cfg.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
 				cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
 				cfg.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100 s");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index a51f88b..b6a1bd4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -258,8 +259,8 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 			jmProcess[0].startProcess();
 
 			// Task manager configuration
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 4);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, 100);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
 
 			// Start the task manager process

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 5d29905..d256ccf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.junit.Test;
@@ -72,7 +73,7 @@ public class TaskManagerFailureRecoveryITCase {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 			
 			config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
 			config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "20 s");

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index 0b008eb..4c77ef0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.testdata.WordCountData;
@@ -73,7 +74,7 @@ public class IPv6HostnamesITCase extends TestLogger {
 			conf.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, addressString);
 			conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 			conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-			conf.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+			conf.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 			
 			flink = new LocalFlinkMiniCluster(conf, false);
 			flink.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 13add4b..229d3fd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -89,7 +90,7 @@ public class TimestampITCase extends TestLogger {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
 
 			cluster = new LocalFlinkMiniCluster(config, false);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f47e/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index 3b0c364..003eb0c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -27,6 +27,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -71,7 +72,7 @@ public class WebFrontendITCase extends TestLogger {
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS);
-		config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
 		config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
 		File logDir = File.createTempFile("TestBaseUtils-logdir", null);


[02/11] flink git commit: [hotfix] [jdbc] Add generic parameter to ResultTypeQueryable

Posted by fh...@apache.org.
[hotfix] [jdbc] Add generic parameter to ResultTypeQueryable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b3ac6932
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b3ac6932
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b3ac6932

Branch: refs/heads/table-retraction
Commit: b3ac693200aacd6f8a581e25056067fabe0ef356
Parents: 8890a8d
Author: zentol <ch...@apache.org>
Authored: Thu Apr 6 13:11:35 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 6 19:35:50 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b3ac6932/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
index 3153f96..e714867 100644
--- a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
+++ b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
@@ -95,7 +95,7 @@ import org.slf4j.LoggerFactory;
  * @see PreparedStatement
  * @see DriverManager
  */
-public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements ResultTypeQueryable {
+public class JDBCInputFormat extends RichInputFormat<Row, InputSplit> implements ResultTypeQueryable<Row> {
 
 	private static final long serialVersionUID = 1L;
 	private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class);


[11/11] flink git commit: [FLINK-6216] [table] Add non-windowed GroupBy aggregation for streams.

Posted by fh...@apache.org.
[FLINK-6216] [table] Add non-windowed GroupBy aggregation for streams.

This closes #3646.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07a59ae0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07a59ae0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07a59ae0

Branch: refs/heads/table-retraction
Commit: 07a59ae0ef0b2343b8e93e8b6c06f6d668206411
Parents: 6353947
Author: shaoxuan-wang <ws...@gmail.com>
Authored: Thu Mar 30 03:57:58 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Apr 6 23:08:41 2017 +0200

----------------------------------------------------------------------
 .../flink/table/plan/logical/operators.scala    |   3 -
 .../nodes/datastream/DataStreamAggregate.scala  | 267 -------------------
 .../datastream/DataStreamGroupAggregate.scala   | 133 +++++++++
 .../DataStreamGroupWindowAggregate.scala        | 267 +++++++++++++++++++
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   3 +-
 .../datastream/DataStreamAggregateRule.scala    |  76 ------
 .../DataStreamGroupAggregateRule.scala          |  77 ++++++
 .../DataStreamGroupWindowAggregateRule.scala    |  75 ++++++
 .../table/runtime/aggregate/AggregateUtil.scala |  46 +++-
 .../aggregate/GroupAggProcessFunction.scala     | 100 +++++++
 .../scala/batch/table/FieldProjectionTest.scala |   4 +-
 .../table/api/scala/stream/sql/SqlITCase.scala  |  21 ++
 .../scala/stream/sql/WindowAggregateTest.scala  |  50 ++--
 .../scala/stream/table/AggregationsITCase.scala | 167 ------------
 .../stream/table/GroupAggregationsITCase.scala  | 132 +++++++++
 .../stream/table/GroupAggregationsTest.scala    | 218 +++++++++++++++
 .../table/GroupWindowAggregationsITCase.scala   | 167 ++++++++++++
 .../scala/stream/table/GroupWindowTest.scala    |  56 ++--
 .../scala/stream/table/UnsupportedOpsTest.scala |   7 -
 19 files changed, 1286 insertions(+), 583 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 559bd75..7438082 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -221,9 +221,6 @@ case class Aggregate(
   }
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = {
-    if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
-      failValidation(s"Aggregate on stream tables is currently not supported.")
-    }
 
     val resolvedAggregate = super.validate(tableEnv).asInstanceOf[Aggregate]
     val groupingExprs = resolvedAggregate.groupingExpressions

http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
deleted file mode 100644
index 50f8281..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.nodes.datastream
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
-import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-import org.apache.flink.table.api.StreamTableEnvironment
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.expressions._
-import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.plan.nodes.CommonAggregate
-import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
-import org.apache.flink.table.runtime.aggregate.AggregateUtil._
-import org.apache.flink.table.runtime.aggregate._
-import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
-import org.apache.flink.types.Row
-
-class DataStreamAggregate(
-    window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty],
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    inputNode: RelNode,
-    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-    rowRelDataType: RelDataType,
-    inputType: RelDataType,
-    grouping: Array[Int])
-  extends SingleRel(cluster, traitSet, inputNode) with CommonAggregate with DataStreamRel {
-
-  override def deriveRowType(): RelDataType = rowRelDataType
-
-  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
-    new DataStreamAggregate(
-      window,
-      namedProperties,
-      cluster,
-      traitSet,
-      inputs.get(0),
-      namedAggregates,
-      getRowType,
-      inputType,
-      grouping)
-  }
-
-  override def toString: String = {
-    s"Aggregate(${
-      if (!grouping.isEmpty) {
-        s"groupBy: (${groupingToString(inputType, grouping)}), "
-      } else {
-        ""
-      }
-    }window: ($window), " +
-      s"select: (${
-        aggregationToString(
-          inputType,
-          grouping,
-          getRowType,
-          namedAggregates,
-          namedProperties)
-      }))"
-  }
-
-  override def explainTerms(pw: RelWriter): RelWriter = {
-    super.explainTerms(pw)
-      .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty)
-      .item("window", window)
-      .item(
-        "select", aggregationToString(
-          inputType,
-          grouping,
-          getRowType,
-          namedAggregates,
-          namedProperties))
-  }
-
-  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
-
-    val groupingKeys = grouping.indices.toArray
-    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
-
-    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
-
-    val aggString = aggregationToString(
-      inputType,
-      grouping,
-      getRowType,
-      namedAggregates,
-      namedProperties)
-
-    val keyedAggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
-      s"window: ($window), " +
-      s"select: ($aggString)"
-    val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
-
-    // grouped / keyed aggregation
-    if (groupingKeys.length > 0) {
-      val windowFunction = AggregateUtil.createAggregationGroupWindowFunction(
-        window,
-        groupingKeys.length,
-        namedAggregates.size,
-        rowRelDataType.getFieldCount,
-        namedProperties)
-
-      val keyedStream = inputDS.keyBy(groupingKeys: _*)
-      val windowedStream =
-        createKeyedWindowedStream(window, keyedStream)
-          .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-      val (aggFunction, accumulatorRowType, aggResultRowType) =
-        AggregateUtil.createDataStreamAggregateFunction(
-          namedAggregates,
-          inputType,
-          rowRelDataType,
-          grouping)
-
-      windowedStream
-        .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
-        .name(keyedAggOpName)
-    }
-    // global / non-keyed aggregation
-    else {
-      val windowFunction = AggregateUtil.createAggregationAllWindowFunction(
-        window,
-        rowRelDataType.getFieldCount,
-        namedProperties)
-
-      val windowedStream =
-        createNonKeyedWindowedStream(window, inputDS)
-          .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
-      val (aggFunction, accumulatorRowType, aggResultRowType) =
-        AggregateUtil.createDataStreamAggregateFunction(
-          namedAggregates,
-          inputType,
-          rowRelDataType,
-          grouping)
-
-      windowedStream
-        .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
-        .name(nonKeyedAggOpName)
-    }
-  }
-}
-
-object DataStreamAggregate {
-
-
-  private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple])
-    : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {
-
-    case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
-      stream.window(TumblingProcessingTimeWindows.of(asTime(size)))
-
-    case ProcessingTimeTumblingGroupWindow(_, size) =>
-      stream.countWindow(asCount(size))
-
-    case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
-      stream.window(TumblingEventTimeWindows.of(asTime(size)))
-
-    case EventTimeTumblingGroupWindow(_, _, size) =>
-      // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
-      // before applying the  windowing logic. Otherwise, this would be the same as a
-      // ProcessingTimeTumblingGroupWindow
-      throw new UnsupportedOperationException(
-        "Event-time grouping windows on row intervals are currently not supported.")
-
-    case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
-      stream.window(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
-
-    case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
-      stream.countWindow(asCount(size), asCount(slide))
-
-    case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
-      stream.window(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
-
-    case EventTimeSlidingGroupWindow(_, _, size, slide) =>
-      // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
-      // before applying the  windowing logic. Otherwise, this would be the same as a
-      // ProcessingTimeTumblingGroupWindow
-      throw new UnsupportedOperationException(
-        "Event-time grouping windows on row intervals are currently not supported.")
-
-    case ProcessingTimeSessionGroupWindow(_, gap: Expression) =>
-      stream.window(ProcessingTimeSessionWindows.withGap(asTime(gap)))
-
-    case EventTimeSessionGroupWindow(_, _, gap) =>
-      stream.window(EventTimeSessionWindows.withGap(asTime(gap)))
-  }
-
-  private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row])
-    : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match {
-
-    case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
-      stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size)))
-
-    case ProcessingTimeTumblingGroupWindow(_, size) =>
-      stream.countWindowAll(asCount(size))
-
-    case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
-      stream.windowAll(TumblingEventTimeWindows.of(asTime(size)))
-
-    case EventTimeTumblingGroupWindow(_, _, size) =>
-      // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
-      // before applying the  windowing logic. Otherwise, this would be the same as a
-      // ProcessingTimeTumblingGroupWindow
-      throw new UnsupportedOperationException(
-        "Event-time grouping windows on row intervals are currently not supported.")
-
-    case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
-      stream.windowAll(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
-
-    case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
-      stream.countWindowAll(asCount(size), asCount(slide))
-
-    case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
-      stream.windowAll(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
-
-    case EventTimeSlidingGroupWindow(_, _, size, slide) =>
-      // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
-      // before applying the  windowing logic. Otherwise, this would be the same as a
-      // ProcessingTimeTumblingGroupWindow
-      throw new UnsupportedOperationException(
-        "Event-time grouping windows on row intervals are currently not supported.")
-
-    case ProcessingTimeSessionGroupWindow(_, gap) =>
-      stream.windowAll(ProcessingTimeSessionWindows.withGap(asTime(gap)))
-
-    case EventTimeSessionGroupWindow(_, _, gap) =>
-      stream.windowAll(EventTimeSessionWindows.withGap(asTime(gap)))
-  }
-
-  def asTime(expr: Expression): Time = expr match {
-    case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => Time.milliseconds(value)
-    case _ => throw new IllegalArgumentException()
-  }
-
-  def asCount(expr: Expression): Long = expr match {
-    case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
-    case _ => throw new IllegalArgumentException()
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
new file mode 100644
index 0000000..c2d4fb7
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.functions.NullByteKeySelector
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+
+/**
+  *
+  * Flink RelNode for data stream unbounded group aggregate
+  *
+  * @param cluster         Cluster of the RelNode, represent for an environment of related
+  *                        relational expressions during the optimization of a query.
+  * @param traitSet        Trait set of the RelNode
+  * @param inputNode       The input RelNode of aggregation
+  * @param namedAggregates List of calls to aggregate functions and their output field names
+  * @param rowRelDataType  The type of the rows of the RelNode
+  * @param inputType       The type of the rows of aggregation input RelNode
+  * @param groupings       The position (in the input Row) of the grouping keys
+  */
+class DataStreamGroupAggregate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputNode: RelNode,
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    rowRelDataType: RelDataType,
+    inputType: RelDataType,
+    groupings: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+    with CommonAggregate
+    with DataStreamRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataStreamGroupAggregate(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      namedAggregates,
+      getRowType,
+      inputType,
+      groupings)
+  }
+
+  override def toString: String = {
+    s"Aggregate(${
+      if (!groupings.isEmpty) {
+        s"groupBy: (${groupingToString(inputType, groupings)}), "
+      } else {
+        ""
+      }
+    }select:(${aggregationToString(inputType, groupings, getRowType, namedAggregates, Nil)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+      .itemIf("groupBy", groupingToString(inputType, groupings), !groupings.isEmpty)
+      .item("select", aggregationToString(inputType, groupings, getRowType, namedAggregates, Nil))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+
+    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+
+    val aggString = aggregationToString(
+      inputType,
+      groupings,
+      getRowType,
+      namedAggregates,
+      Nil)
+
+    val keyedAggOpName = s"groupBy: (${groupingToString(inputType, groupings)}), " +
+      s"select: ($aggString)"
+    val nonKeyedAggOpName = s"select: ($aggString)"
+
+    val processFunction = AggregateUtil.createGroupAggregateFunction(
+      namedAggregates,
+      inputType,
+      groupings)
+
+    val result: DataStream[Row] =
+    // grouped / keyed aggregation
+      if (groupings.nonEmpty) {
+        inputDS
+        .keyBy(groupings: _*)
+        .process(processFunction)
+        .returns(rowTypeInfo)
+        .name(keyedAggOpName)
+        .asInstanceOf[DataStream[Row]]
+      }
+      // global / non-keyed aggregation
+      else {
+        inputDS
+        .keyBy(new NullByteKeySelector[Row])
+        .process(processFunction)
+        .setParallelism(1)
+        .setMaxParallelism(1)
+        .returns(rowTypeInfo)
+        .name(nonKeyedAggOpName)
+        .asInstanceOf[DataStream[Row]]
+      }
+    result
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
new file mode 100644
index 0000000..a0c1dec
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.plan.nodes.CommonAggregate
+import org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate._
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.types.Row
+
+class DataStreamGroupWindowAggregate(
+    window: LogicalWindow,
+    namedProperties: Seq[NamedWindowProperty],
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputNode: RelNode,
+    namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+    rowRelDataType: RelDataType,
+    inputType: RelDataType,
+    grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode) with CommonAggregate with DataStreamRel {
+
+  override def deriveRowType(): RelDataType = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+    new DataStreamGroupWindowAggregate(
+      window,
+      namedProperties,
+      cluster,
+      traitSet,
+      inputs.get(0),
+      namedAggregates,
+      getRowType,
+      inputType,
+      grouping)
+  }
+
+  override def toString: String = {
+    s"Aggregate(${
+      if (!grouping.isEmpty) {
+        s"groupBy: (${groupingToString(inputType, grouping)}), "
+      } else {
+        ""
+      }
+    }window: ($window), " +
+      s"select: (${
+        aggregationToString(
+          inputType,
+          grouping,
+          getRowType,
+          namedAggregates,
+          namedProperties)
+      }))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+    super.explainTerms(pw)
+      .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty)
+      .item("window", window)
+      .item(
+        "select", aggregationToString(
+          inputType,
+          grouping,
+          getRowType,
+          namedAggregates,
+          namedProperties))
+  }
+
+  override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+
+    val groupingKeys = grouping.indices.toArray
+    val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+
+    val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+
+    val aggString = aggregationToString(
+      inputType,
+      grouping,
+      getRowType,
+      namedAggregates,
+      namedProperties)
+
+    val keyedAggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
+      s"window: ($window), " +
+      s"select: ($aggString)"
+    val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
+
+    // grouped / keyed aggregation
+    if (groupingKeys.length > 0) {
+      val windowFunction = AggregateUtil.createAggregationGroupWindowFunction(
+        window,
+        groupingKeys.length,
+        namedAggregates.size,
+        rowRelDataType.getFieldCount,
+        namedProperties)
+
+      val keyedStream = inputDS.keyBy(groupingKeys: _*)
+      val windowedStream =
+        createKeyedWindowedStream(window, keyedStream)
+          .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+      val (aggFunction, accumulatorRowType, aggResultRowType) =
+        AggregateUtil.createDataStreamGroupWindowAggregateFunction(
+          namedAggregates,
+          inputType,
+          rowRelDataType,
+          grouping)
+
+      windowedStream
+        .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
+        .name(keyedAggOpName)
+    }
+    // global / non-keyed aggregation
+    else {
+      val windowFunction = AggregateUtil.createAggregationAllWindowFunction(
+        window,
+        rowRelDataType.getFieldCount,
+        namedProperties)
+
+      val windowedStream =
+        createNonKeyedWindowedStream(window, inputDS)
+          .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+      val (aggFunction, accumulatorRowType, aggResultRowType) =
+        AggregateUtil.createDataStreamGroupWindowAggregateFunction(
+          namedAggregates,
+          inputType,
+          rowRelDataType,
+          grouping)
+
+      windowedStream
+        .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
+        .name(nonKeyedAggOpName)
+    }
+  }
+}
+
+object DataStreamGroupWindowAggregate {
+
+
+  private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple])
+    : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {
+
+    case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
+      stream.window(TumblingProcessingTimeWindows.of(asTime(size)))
+
+    case ProcessingTimeTumblingGroupWindow(_, size) =>
+      stream.countWindow(asCount(size))
+
+    case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
+      stream.window(TumblingEventTimeWindows.of(asTime(size)))
+
+    case EventTimeTumblingGroupWindow(_, _, size) =>
+      // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+      // before applying the  windowing logic. Otherwise, this would be the same as a
+      // ProcessingTimeTumblingGroupWindow
+      throw new UnsupportedOperationException(
+        "Event-time grouping windows on row intervals are currently not supported.")
+
+    case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
+      stream.window(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
+
+    case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
+      stream.countWindow(asCount(size), asCount(slide))
+
+    case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+      stream.window(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
+
+    case EventTimeSlidingGroupWindow(_, _, size, slide) =>
+      // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+      // before applying the  windowing logic. Otherwise, this would be the same as a
+      // ProcessingTimeTumblingGroupWindow
+      throw new UnsupportedOperationException(
+        "Event-time grouping windows on row intervals are currently not supported.")
+
+    case ProcessingTimeSessionGroupWindow(_, gap: Expression) =>
+      stream.window(ProcessingTimeSessionWindows.withGap(asTime(gap)))
+
+    case EventTimeSessionGroupWindow(_, _, gap) =>
+      stream.window(EventTimeSessionWindows.withGap(asTime(gap)))
+  }
+
+  private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row])
+    : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match {
+
+    case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
+      stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size)))
+
+    case ProcessingTimeTumblingGroupWindow(_, size) =>
+      stream.countWindowAll(asCount(size))
+
+    case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
+      stream.windowAll(TumblingEventTimeWindows.of(asTime(size)))
+
+    case EventTimeTumblingGroupWindow(_, _, size) =>
+      // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+      // before applying the  windowing logic. Otherwise, this would be the same as a
+      // ProcessingTimeTumblingGroupWindow
+      throw new UnsupportedOperationException(
+        "Event-time grouping windows on row intervals are currently not supported.")
+
+    case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
+      stream.windowAll(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
+
+    case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
+      stream.countWindowAll(asCount(size), asCount(slide))
+
+    case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+      stream.windowAll(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
+
+    case EventTimeSlidingGroupWindow(_, _, size, slide) =>
+      // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+      // before applying the  windowing logic. Otherwise, this would be the same as a
+      // ProcessingTimeTumblingGroupWindow
+      throw new UnsupportedOperationException(
+        "Event-time grouping windows on row intervals are currently not supported.")
+
+    case ProcessingTimeSessionGroupWindow(_, gap) =>
+      stream.windowAll(ProcessingTimeSessionWindows.withGap(asTime(gap)))
+
+    case EventTimeSessionGroupWindow(_, _, gap) =>
+      stream.windowAll(EventTimeSessionWindows.withGap(asTime(gap)))
+  }
+
+  def asTime(expr: Expression): Time = expr match {
+    case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => Time.milliseconds(value)
+    case _ => throw new IllegalArgumentException()
+  }
+
+  def asCount(expr: Expression): Long = expr match {
+    case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
+    case _ => throw new IllegalArgumentException()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 222021a..6b22b4a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -174,8 +174,9 @@ object FlinkRuleSets {
       UnionEliminatorRule.INSTANCE,
 
       // translate to DataStream nodes
+      DataStreamGroupAggregateRule.INSTANCE,
       DataStreamOverAggregateRule.INSTANCE,
-      DataStreamAggregateRule.INSTANCE,
+      DataStreamGroupWindowAggregateRule.INSTANCE,
       DataStreamCalcRule.INSTANCE,
       DataStreamScanRule.INSTANCE,
       DataStreamUnionRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
deleted file mode 100644
index 09f05d7..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.rules.datastream
-
-import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.expressions.Alias
-import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
-import org.apache.flink.table.plan.nodes.datastream.{DataStreamAggregate, DataStreamConvention}
-
-import scala.collection.JavaConversions._
-
-class DataStreamAggregateRule
-  extends ConverterRule(
-      classOf[LogicalWindowAggregate],
-      Convention.NONE,
-      DataStreamConvention.INSTANCE,
-      "DataStreamAggregateRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val agg: LogicalWindowAggregate = call.rel(0).asInstanceOf[LogicalWindowAggregate]
-
-    // check if we have distinct aggregates
-    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
-    if (distinctAggs) {
-      throw TableException("DISTINCT aggregates are currently not supported.")
-    }
-
-    // check if we have grouping sets
-    val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
-    if (groupSets || agg.indicator) {
-      throw TableException("GROUPING SETS are currently not supported.")
-    }
-
-    !distinctAggs && !groupSets && !agg.indicator
-  }
-
-  override def convert(rel: RelNode): RelNode = {
-    val agg: LogicalWindowAggregate = rel.asInstanceOf[LogicalWindowAggregate]
-    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
-    val convInput: RelNode = RelOptRule.convert(agg.getInput, DataStreamConvention.INSTANCE)
-
-    new DataStreamAggregate(
-      agg.getWindow,
-      agg.getNamedProperties,
-      rel.getCluster,
-      traitSet,
-      convInput,
-      agg.getNamedAggCalls,
-      rel.getRowType,
-      agg.getInput.getRowType,
-      agg.getGroupSet.toArray)
-    }
-  }
-
-object DataStreamAggregateRule {
-  val INSTANCE: RelOptRule = new DataStreamAggregateRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala
new file mode 100644
index 0000000..82d7104
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.logical.LogicalAggregate
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamGroupAggregate, DataStreamConvention}
+
+import scala.collection.JavaConversions._
+
+/**
+  * Rule to convert a [[LogicalAggregate]] into a [[DataStreamGroupAggregate]].
+  */
+class DataStreamGroupAggregateRule
+  extends ConverterRule(
+    classOf[LogicalAggregate],
+    Convention.NONE,
+    DataStreamConvention.INSTANCE,
+    "DataStreamGroupAggregateRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
+
+    // check if we have distinct aggregates
+    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+    if (distinctAggs) {
+      throw TableException("DISTINCT aggregates are currently not supported.")
+    }
+
+    // check if we have grouping sets
+    val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
+    if (groupSets || agg.indicator) {
+      throw TableException("GROUPING SETS are currently not supported.")
+    }
+
+    !distinctAggs && !groupSets && !agg.indicator
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+    val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+    val convInput: RelNode = RelOptRule.convert(agg.getInput, DataStreamConvention.INSTANCE)
+
+    new DataStreamGroupAggregate(
+      rel.getCluster,
+      traitSet,
+      convInput,
+      agg.getNamedAggCalls,
+      rel.getRowType,
+      agg.getInput.getRowType,
+      agg.getGroupSet.toArray)
+  }
+}
+
+object DataStreamGroupAggregateRule {
+  val INSTANCE: RelOptRule = new DataStreamGroupAggregateRule
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala
new file mode 100644
index 0000000..7ec1d40
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamConvention, DataStreamGroupWindowAggregate}
+
+import scala.collection.JavaConversions._
+
+class DataStreamGroupWindowAggregateRule
+  extends ConverterRule(
+      classOf[LogicalWindowAggregate],
+      Convention.NONE,
+      DataStreamConvention.INSTANCE,
+      "DataStreamGroupWindowAggregateRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val agg: LogicalWindowAggregate = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+    // check if we have distinct aggregates
+    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+    if (distinctAggs) {
+      throw TableException("DISTINCT aggregates are currently not supported.")
+    }
+
+    // check if we have grouping sets
+    val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
+    if (groupSets || agg.indicator) {
+      throw TableException("GROUPING SETS are currently not supported.")
+    }
+
+    !distinctAggs && !groupSets && !agg.indicator
+  }
+
+  override def convert(rel: RelNode): RelNode = {
+    val agg: LogicalWindowAggregate = rel.asInstanceOf[LogicalWindowAggregate]
+    val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+    val convInput: RelNode = RelOptRule.convert(agg.getInput, DataStreamConvention.INSTANCE)
+
+    new DataStreamGroupWindowAggregate(
+      agg.getWindow,
+      agg.getNamedProperties,
+      rel.getCluster,
+      traitSet,
+      convInput,
+      agg.getNamedAggCalls,
+      rel.getRowType,
+      agg.getInput.getRowType,
+      agg.getGroupSet.toArray)
+    }
+  }
+
+object DataStreamGroupWindowAggregateRule {
+  val INSTANCE: RelOptRule = new DataStreamGroupWindowAggregateRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 09d1a13..634f7c8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -79,8 +79,7 @@ object AggregateUtil {
         inputType,
         needRetraction = false)
 
-    val aggregationStateType: RowTypeInfo =
-      createDataSetAggregateBufferDataType(Array(), aggregates, inputType)
+    val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
 
     val forwardMapping = (0 until inputType.getFieldCount).map(x => (x, x)).toArray
     val aggMapping = aggregates.indices.map(x => x + inputType.getFieldCount).toArray
@@ -125,7 +124,36 @@ object AggregateUtil {
   }
 
   /**
-    * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for
+    * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for group (without
+    * window) aggregate to evaluate final aggregate value.
+    *
+    * @param namedAggregates List of calls to aggregate functions and their output field names
+    * @param inputType       Input row type
+    * @param groupings       the position (in the input Row) of the grouping keys
+    * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
+    */
+  private[flink] def createGroupAggregateFunction(
+      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+      inputType: RelDataType,
+      groupings: Array[Int]): ProcessFunction[Row, Row] = {
+
+    val (aggFields, aggregates) =
+      transformToAggregateFunctions(
+        namedAggregates.map(_.getKey),
+        inputType,
+        needRetraction = false)
+
+    val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
+
+    new GroupAggProcessFunction(
+      aggregates,
+      aggFields,
+      groupings,
+      aggregationStateType)
+  }
+
+  /**
+    * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for ROWS clause
     * bounded OVER window to evaluate final aggregate value.
     *
     * @param generator       code generator instance
@@ -238,7 +266,7 @@ object AggregateUtil {
       needRetraction = false)
 
     val mapReturnType: RowTypeInfo =
-      createDataSetAggregateBufferDataType(
+      createRowTypeForKeysAndAggregates(
         groupings,
         aggregates,
         inputType,
@@ -321,7 +349,7 @@ object AggregateUtil {
       inputType,
       needRetraction = false)._2
 
-    val returnType: RowTypeInfo = createDataSetAggregateBufferDataType(
+    val returnType: RowTypeInfo = createRowTypeForKeysAndAggregates(
       groupings,
       aggregates,
       inputType,
@@ -550,7 +578,7 @@ object AggregateUtil {
     window match {
       case EventTimeSessionGroupWindow(_, _, gap) =>
         val combineReturnType: RowTypeInfo =
-          createDataSetAggregateBufferDataType(
+          createRowTypeForKeysAndAggregates(
             groupings,
             aggregates,
             inputType,
@@ -600,7 +628,7 @@ object AggregateUtil {
 
       case EventTimeSessionGroupWindow(_, _, gap) =>
         val combineReturnType: RowTypeInfo =
-          createDataSetAggregateBufferDataType(
+          createRowTypeForKeysAndAggregates(
             groupings,
             aggregates,
             inputType,
@@ -745,7 +773,7 @@ object AggregateUtil {
     }
   }
 
-  private[flink] def createDataStreamAggregateFunction(
+  private[flink] def createDataStreamGroupWindowAggregateFunction(
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       inputType: RelDataType,
       outputType: RelDataType,
@@ -1125,7 +1153,7 @@ object AggregateUtil {
     aggTypes
   }
 
-  private def createDataSetAggregateBufferDataType(
+  private def createRowTypeForKeysAndAggregates(
       groupings: Array[Int],
       aggregates: Array[TableAggregateFunction[_]],
       inputType: RelDataType,

http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
new file mode 100644
index 0000000..3813aa0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+/**
+  * Aggregate Function used for the groupby (without window) aggregate
+  *
+  * @param aggregates           the list of all
+  *                             [[org.apache.flink.table.functions.AggregateFunction]] used for
+  *                             this aggregation
+  * @param aggFields            the position (in the input Row) of the input value for each
+  *                             aggregate
+  * @param groupings            the position (in the input Row) of the grouping keys
+  * @param aggregationStateType the row type info of aggregation
+  */
+class GroupAggProcessFunction(
+    private val aggregates: Array[AggregateFunction[_]],
+    private val aggFields: Array[Array[Int]],
+    private val groupings: Array[Int],
+    private val aggregationStateType: RowTypeInfo)
+  extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var state: ValueState[Row] = _
+
+  override def open(config: Configuration) {
+    output = new Row(groupings.length + aggregates.length)
+    val stateDescriptor: ValueStateDescriptor[Row] =
+      new ValueStateDescriptor[Row]("GroupAggregateState", aggregationStateType)
+    state = getRuntimeContext.getState(stateDescriptor)
+  }
+
+  override def processElement(
+      input: Row,
+      ctx: ProcessFunction[Row, Row]#Context,
+      out: Collector[Row]): Unit = {
+
+    var i = 0
+
+    var accumulators = state.value()
+
+    if (null == accumulators) {
+      accumulators = new Row(aggregates.length)
+      i = 0
+      while (i < aggregates.length) {
+        accumulators.setField(i, aggregates(i).createAccumulator())
+        i += 1
+      }
+    }
+
+    // Set group keys value to the final output
+    i = 0
+    while (i < groupings.length) {
+      output.setField(i, input.getField(groupings(i)))
+      i += 1
+    }
+
+    // Set aggregate result to the final output
+    i = 0
+    while (i < aggregates.length) {
+      val index = groupings.length + i
+      val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
+      aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0)))
+      output.setField(index, aggregates(i).getValue(accumulator))
+      i += 1
+    }
+    state.update(accumulators)
+
+    out.collect(output)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
index 4d0d9aa..ebdf9de 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
@@ -231,7 +231,7 @@ class FieldProjectionTest extends TableTestBase {
 
     val expected =
       unaryNode(
-        "DataStreamAggregate",
+        "DataStreamGroupWindowAggregate",
         unaryNode(
           "DataStreamCalc",
           streamTableNode(0),
@@ -259,7 +259,7 @@ class FieldProjectionTest extends TableTestBase {
     val expected = unaryNode(
         "DataStreamCalc",
         unaryNode(
-          "DataStreamAggregate",
+          "DataStreamGroupWindowAggregate",
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),

http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 67d13b0..f7bdccf 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -47,6 +47,27 @@ class SqlITCase extends StreamingWithStateTestBase {
     (8L, 8, "Hello World"),
     (20L, 20, "Hello World"))
 
+  /** test unbounded groupby (without window) **/
+  @Test
+  def testUnboundedGroupby(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val sqlQuery = "SELECT b, COUNT(a) FROM MyTable GROUP BY b"
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", t)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("1,1", "2,1", "2,2")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
   /** test selection **/
   @Test
   def testSelectExpressionFromTable(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index f4befa6..39a33ea 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -17,12 +17,10 @@
  */
 package org.apache.flink.table.api.scala.stream.sql
 
-import java.sql.Timestamp
-
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.plan.logical.{EventTimeTumblingGroupWindow, ProcessingTimeSessionGroupWindow, ProcessingTimeSlidingGroupWindow, ProcessingTimeTumblingGroupWindow}
+import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
 import org.junit.Test
@@ -63,7 +61,7 @@ class WindowAggregateTest extends TableTestBase {
     val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" +
       "RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " +
       "FROM MyTable"
-      val expected =
+    val expected =
       unaryNode(
         "DataStreamCalc",
         unaryNode(
@@ -73,7 +71,7 @@ class WindowAggregateTest extends TableTestBase {
             streamTableNode(0),
             term("select", "a", "c", "PROCTIME() AS $2")
           ),
-          term("partitionBy","a"),
+          term("partitionBy", "a"),
           term("orderBy", "PROCTIME"),
           term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
           term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0", "$SUM0(c) AS w0$o1")
@@ -85,13 +83,34 @@ class WindowAggregateTest extends TableTestBase {
   }
 
   @Test
+  def testGroupbyWithoutWindow() = {
+    val sql = "SELECT COUNT(a) FROM MyTable GROUP BY b"
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "b", "a")
+          ),
+          term("groupBy", "b"),
+          term("select", "b", "COUNT(a) AS EXPR$0")
+        ),
+        term("select", "EXPR$0")
+      )
+    streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
   def testTumbleFunction() = {
     val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(rowtime(), INTERVAL '15' MINUTE)"
     val expected =
       unaryNode(
         "DataStreamCalc",
         unaryNode(
-          "DataStreamAggregate",
+          "DataStreamGroupWindowAggregate",
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
@@ -113,7 +132,7 @@ class WindowAggregateTest extends TableTestBase {
       unaryNode(
         "DataStreamCalc",
         unaryNode(
-          "DataStreamAggregate",
+          "DataStreamGroupWindowAggregate",
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
@@ -136,7 +155,7 @@ class WindowAggregateTest extends TableTestBase {
       unaryNode(
         "DataStreamCalc",
         unaryNode(
-          "DataStreamAggregate",
+          "DataStreamGroupWindowAggregate",
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
@@ -186,21 +205,6 @@ class WindowAggregateTest extends TableTestBase {
     streamUtil.verifySql(sql, "n/a")
   }
 
-  @Test(expected = classOf[TableException])
-  def testMultiWindow() = {
-    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +
-      "FLOOR(rowtime() TO HOUR), FLOOR(rowtime() TO MINUTE)"
-    val expected = ""
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testInvalidWindowExpression() = {
-    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(localTimestamp TO HOUR)"
-    val expected = ""
-    streamUtil.verifySql(sql, expected)
-  }
-
   @Test
   def testUnboundPartitionedProcessingWindowWithRange() = {
     val sql = "SELECT " +

http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
deleted file mode 100644
index 3e7b66b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.table.AggregationsITCase.TimestampWithEqualWatermark
-import org.apache.flink.table.api.scala.stream.utils.StreamITCase
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-/**
-  * We only test some aggregations until better testing of constructed DataStream
-  * programs is possible.
-  */
-class AggregationsITCase extends StreamingMultipleProgramsTestBase {
-
-  val data = List(
-    (1L, 1, "Hi"),
-    (2L, 2, "Hello"),
-    (4L, 2, "Hello"),
-    (8L, 3, "Hello world"),
-    (16L, 3, "Hello world"))
-
-  @Test
-  def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Slide over 2.rows every 1.rows as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count, 'int.avg)
-
-    val results = windowedTable.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = Seq("Hello world,1,3", "Hello world,2,3", "Hello,1,2", "Hello,2,2", "Hi,1,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testEventTimeSessionGroupWindowOverTime(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env
-      .fromCollection(data)
-      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
-    val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Session withGap 7.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val results = windowedTable.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = Seq("Hello world,1", "Hello world,1", "Hello,2", "Hi,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 2.rows as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val results = windowedTable.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = Seq("2", "2")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testEventTimeTumblingWindow(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env
-      .fromCollection(data)
-      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
-    val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count, 'int.avg, 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
-
-    val results = windowedTable.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = Seq(
-      "Hello world,1,3,3,3,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
-      "Hello world,1,3,3,3,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
-      "Hello,2,2,2,2,4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
-      "Hi,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}
-
-object AggregationsITCase {
-  class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
-
-    override def checkAndGetNextWatermark(
-        lastElement: (Long, Int, String),
-        extractedTimestamp: Long)
-      : Watermark = {
-      new Watermark(extractedTimestamp)
-    }
-
-    override def extractTimestamp(
-        element: (Long, Int, String),
-        previousElementTimestamp: Long): Long = {
-      element._1
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
new file mode 100644
index 0000000..271e90b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+  * Tests of groupby (without window) aggregations
+  */
+class GroupAggregationsITCase extends StreamingWithStateTestBase {
+
+  @Test
+  def testNonKeyedGroupAggregate(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+            .select('a.sum, 'b.sum)
+
+    val results = t.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,1", "3,3", "6,5", "10,8", "15,11", "21,14", "28,18", "36,22", "45,26", "55,30", "66,35",
+      "78,40", "91,45", "105,50", "120,55", "136,61", "153,67", "171,73", "190,79", "210,85",
+      "231,91")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testGroupAggregate(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+      .groupBy('b)
+      .select('b, 'a.sum)
+
+    val results = t.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,1", "2,2", "2,5", "3,4", "3,9", "3,15", "4,7", "4,15",
+      "4,24", "4,34", "5,11", "5,23", "5,36", "5,50", "5,65", "6,16", "6,33", "6,51", "6,70",
+      "6,90", "6,111")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testDoubleGroupAggregation(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+      .groupBy('b)
+      .select('a.sum as 'd, 'b)
+      .groupBy('b, 'd)
+      .select('b)
+
+    val results = t.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1",
+      "2", "2",
+      "3", "3", "3",
+      "4", "4", "4", "4",
+      "5", "5", "5", "5", "5",
+      "6", "6", "6", "6", "6", "6")
+
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testGroupAggregateWithExpression(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+      .groupBy('e, 'b % 3)
+      .select('c.min, 'e, 'a.avg, 'd.count)
+
+    val results = t.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "0,1,1,1", "1,2,2,1", "2,1,2,1", "3,2,3,1", "1,2,2,2",
+      "5,3,3,1", "3,2,3,2", "7,1,4,1", "2,1,3,2", "3,2,3,3", "7,1,4,2", "5,3,4,2", "12,3,5,1",
+      "1,2,3,3", "14,2,5,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala
new file mode 100644
index 0000000..1f4a694
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsTest.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+
+class GroupAggregationsTest extends TableTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testGroupingOnNonExistentField(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+    val ds = table
+             // must fail. '_foo is not a valid field
+             .groupBy('_foo)
+             .select('a.avg)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testGroupingInvalidSelection(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+    val ds = table
+             .groupBy('a, 'b)
+             // must fail. 'c is not a grouping key or aggregation
+             .select('c)
+  }
+
+  @Test
+  def testGroupbyWithoutWindow() = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+    val resultTable = table
+                      .groupBy('b)
+                      .select('a.count)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "a", "b")
+          ),
+          term("groupBy", "b"),
+          term("select", "b", "COUNT(a) AS TMP_0")
+        ),
+        term("select", "TMP_0")
+      )
+    util.verifyTable(resultTable, expected)
+  }
+
+
+  @Test
+  def testGroupAggregateWithConstant1(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+    val resultTable = table
+            .select('a, 4 as 'four, 'b)
+            .groupBy('four, 'a)
+            .select('four, 'b.sum)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "4 AS four", "b", "a")
+          ),
+          term("groupBy", "four", "a"),
+          term("select", "four", "a", "SUM(b) AS TMP_0")
+        ),
+        term("select", "4 AS four", "TMP_0")
+      )
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testGroupAggregateWithConstant2(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+    val resultTable = table
+            .select('b, 4 as 'four, 'a)
+            .groupBy('b, 'four)
+            .select('four, 'a.sum)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "4 AS four", "a", "b")
+          ),
+          term("groupBy", "four", "b"),
+          term("select", "four", "b", "SUM(a) AS TMP_0")
+        ),
+        term("select", "4 AS four", "TMP_0")
+      )
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testGroupAggregateWithExpressionInSelect(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+    val resultTable = table
+            .select('a as 'a, 'b % 3 as 'd, 'c as 'c)
+            .groupBy('d)
+            .select('c.min, 'a.avg)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "c", "a", "MOD(b, 3) AS d")
+          ),
+          term("groupBy", "d"),
+          term("select", "d", "MIN(c) AS TMP_0", "AVG(a) AS TMP_1")
+        ),
+        term("select", "TMP_0", "TMP_1")
+      )
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testGroupAggregateWithFilter(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+    val resultTable = table
+            .groupBy('b)
+            .select('b, 'a.sum)
+            .where('b === 2)
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "b", "a")
+          ),
+          term("groupBy", "b"),
+          term("select", "b", "SUM(a) AS TMP_0")
+        ),
+        term("select", "b", "TMP_0"),
+        term("where", "=(b, 2)")
+      )
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testGroupAggregateWithAverage(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+    val resultTable = table
+            .groupBy('b)
+            .select('b, 'a.cast(BasicTypeInfo.DOUBLE_TYPE_INFO).avg)
+
+    val expected =
+      unaryNode(
+        "DataStreamGroupAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          streamTableNode(0),
+          term("select", "b", "a", "CAST(a) AS a0")
+        ),
+        term("groupBy", "b"),
+        term("select", "b", "AVG(a0) AS TMP_0")
+      )
+
+    util.verifyTable(resultTable, expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/07a59ae0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
new file mode 100644
index 0000000..b8fc49b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.table.GroupWindowAggregationsITCase.TimestampWithEqualWatermark
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+  * We only test some aggregations until better testing of constructed DataStream
+  * programs is possible.
+  */
+class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
+
+  val data = List(
+    (1L, 1, "Hi"),
+    (2L, 2, "Hello"),
+    (4L, 2, "Hello"),
+    (8L, 3, "Hello world"),
+    (16L, 3, "Hello world"))
+
+  @Test
+  def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'int.avg)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq("Hello world,1,3", "Hello world,2,3", "Hello,1,2", "Hello,2,2", "Hi,1,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeSessionGroupWindowOverTime(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Session withGap 7.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq("Hello world,1", "Hello world,1", "Hello,2", "Hi,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 2.rows as 'w)
+      .groupBy('w)
+      .select('int.count)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq("2", "2")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeTumblingWindow(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('w, 'string)
+      .select('string, 'int.count, 'int.avg, 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "Hello world,1,3,3,3,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
+      "Hello world,1,3,3,3,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
+      "Hello,2,2,2,2,4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+      "Hi,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+}
+
+object GroupWindowAggregationsITCase {
+  class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
+
+    override def checkAndGetNextWatermark(
+        lastElement: (Long, Int, String),
+        extractedTimestamp: Long)
+      : Watermark = {
+      new Watermark(extractedTimestamp)
+    }
+
+    override def extractTimestamp(
+        element: (Long, Int, String),
+        previousElementTimestamp: Long): Long = {
+      element._1
+    }
+  }
+}


[07/11] flink git commit: [FLINK-6270] extend Configuration with contains(configOption)

Posted by fh...@apache.org.
[FLINK-6270] extend Configuration with contains(configOption)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1d761ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1d761ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1d761ed

Branch: refs/heads/table-retraction
Commit: d1d761ed6b4de050402905b8c196e781479245a0
Parents: 2cb6004
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Apr 5 10:59:00 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 6 19:35:50 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/Configuration.java      | 30 ++++++++++++++++++++
 .../configuration/DelegatingConfiguration.java  |  5 ++++
 2 files changed, 35 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d1d761ed/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index 8f23435..ea0c419 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -595,6 +595,36 @@ public class Configuration extends ExecutionConfig.GlobalJobParameters
 		}
 	}
 
+	/**
+	 * Checks whether there is an entry for the given config option
+	 *
+	 * @param configOption The configuration option
+	 *
+	 * @return <tt>true</tt> if a valid (current or deprecated) key of the config option is stored,
+	 * <tt>false</tt> otherwise
+	 */
+	@PublicEvolving
+	public boolean contains(ConfigOption<?> configOption) {
+		synchronized (this.confData){
+			// first try the current key
+			if (this.confData.containsKey(configOption.key())) {
+				return true;
+			}
+			else if (configOption.hasDeprecatedKeys()) {
+				// try the deprecated keys
+				for (String deprecatedKey : configOption.deprecatedKeys()) {
+					if (this.confData.containsKey(deprecatedKey)) {
+						LOG.warn("Config uses deprecated configuration key '{}' instead of proper key '{}'",
+							deprecatedKey, configOption.key());
+						return true;
+					}
+				}
+			}
+
+			return false;
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d1d761ed/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
index bd9a962..1b14e9e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java
@@ -290,6 +290,11 @@ public final class DelegatingConfiguration extends Configuration {
 		return backingConfig.containsKey(prefix + key);
 	}
 
+	@Override
+	public boolean contains(ConfigOption<?> configOption) {
+		return backingConfig.contains(prefixOption(configOption, prefix));
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override


[03/11] flink git commit: [FLINK-6079] [kafka] Provide meaningful error message if TopicPartitions are null

Posted by fh...@apache.org.
[FLINK-6079] [kafka] Provide meaningful error message if TopicPartitions are null

This closes #3685.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8890a8db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8890a8db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8890a8db

Branch: refs/heads/table-retraction
Commit: 8890a8db41c45504aa658a1942f40bb9af7dcf30
Parents: a6355ed
Author: zentol <ch...@apache.org>
Authored: Thu Apr 6 11:55:29 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 6 19:35:50 2017 +0200

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBase.java           | 114 +++++++++----------
 1 file changed, 57 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8890a8db/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index d409027..a35e710 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -44,6 +44,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -348,71 +349,70 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 		// initialize subscribed partitions
 		List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
+		Preconditions.checkNotNull(kafkaTopicPartitions, "TopicPartitions must not be null.");
 
 		subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size());
 
-		if (kafkaTopicPartitions != null) {
-			if (restoredState != null) {
-				for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
-					if (restoredState.containsKey(kafkaTopicPartition)) {
-						subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition));
-					}
+		if (restoredState != null) {
+			for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
+				if (restoredState.containsKey(kafkaTopicPartition)) {
+					subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition));
 				}
+			}
 
-				LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
-					getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
-			} else {
-				initializeSubscribedPartitionsToStartOffsets(
-					subscribedPartitionsToStartOffsets,
-					kafkaTopicPartitions,
-					getRuntimeContext().getIndexOfThisSubtask(),
-					getRuntimeContext().getNumberOfParallelSubtasks(),
-					startupMode,
-					specificStartupOffsets);
-
-				if (subscribedPartitionsToStartOffsets.size() != 0) {
-					switch (startupMode) {
-						case EARLIEST:
-							LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
-								getRuntimeContext().getIndexOfThisSubtask(),
-								subscribedPartitionsToStartOffsets.size(),
-								subscribedPartitionsToStartOffsets.keySet());
-							break;
-						case LATEST:
-							LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
-								getRuntimeContext().getIndexOfThisSubtask(),
-								subscribedPartitionsToStartOffsets.size(),
-								subscribedPartitionsToStartOffsets.keySet());
-							break;
-						case SPECIFIC_OFFSETS:
-							LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
-								getRuntimeContext().getIndexOfThisSubtask(),
-								subscribedPartitionsToStartOffsets.size(),
-								specificStartupOffsets,
-								subscribedPartitionsToStartOffsets.keySet());
-
-							List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
-							for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
-								if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
-									partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
-								}
+			LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
+				getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
+		} else {
+			initializeSubscribedPartitionsToStartOffsets(
+				subscribedPartitionsToStartOffsets,
+				kafkaTopicPartitions,
+				getRuntimeContext().getIndexOfThisSubtask(),
+				getRuntimeContext().getNumberOfParallelSubtasks(),
+				startupMode,
+				specificStartupOffsets);
+
+			if (subscribedPartitionsToStartOffsets.size() != 0) {
+				switch (startupMode) {
+					case EARLIEST:
+						LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
+							getRuntimeContext().getIndexOfThisSubtask(),
+							subscribedPartitionsToStartOffsets.size(),
+							subscribedPartitionsToStartOffsets.keySet());
+						break;
+					case LATEST:
+						LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
+							getRuntimeContext().getIndexOfThisSubtask(),
+							subscribedPartitionsToStartOffsets.size(),
+							subscribedPartitionsToStartOffsets.keySet());
+						break;
+					case SPECIFIC_OFFSETS:
+						LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
+							getRuntimeContext().getIndexOfThisSubtask(),
+							subscribedPartitionsToStartOffsets.size(),
+							specificStartupOffsets,
+							subscribedPartitionsToStartOffsets.keySet());
+
+						List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
+						for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
+							if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
+								partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
 							}
+						}
 
-							if (partitionsDefaultedToGroupOffsets.size() > 0) {
-								LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
-										"; their startup offsets will be defaulted to their committed group offsets in Kafka.",
-									getRuntimeContext().getIndexOfThisSubtask(),
-									partitionsDefaultedToGroupOffsets.size(),
-									partitionsDefaultedToGroupOffsets);
-							}
-							break;
-						default:
-						case GROUP_OFFSETS:
-							LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
+						if (partitionsDefaultedToGroupOffsets.size() > 0) {
+							LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
+									"; their startup offsets will be defaulted to their committed group offsets in Kafka.",
 								getRuntimeContext().getIndexOfThisSubtask(),
-								subscribedPartitionsToStartOffsets.size(),
-								subscribedPartitionsToStartOffsets.keySet());
-					}
+								partitionsDefaultedToGroupOffsets.size(),
+								partitionsDefaultedToGroupOffsets);
+						}
+						break;
+					default:
+					case GROUP_OFFSETS:
+						LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
+							getRuntimeContext().getIndexOfThisSubtask(),
+							subscribedPartitionsToStartOffsets.size(),
+							subscribedPartitionsToStartOffsets.keySet());
 				}
 			}
 		}


[09/11] flink git commit: [FLINK-6261] [table] Support TUMBLE, HOP, SESSION group window functions for SQL queries on batch tables.

Posted by fh...@apache.org.
[FLINK-6261] [table] Support TUMBLE, HOP, SESSION group window functions for SQL queries on batch tables.

- Drop support for group window translation of "GROUP BY FLOOR/CEIL".

This closes #3675.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63539475
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63539475
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63539475

Branch: refs/heads/table-retraction
Commit: 635394751dce6e532fcd5a758c3d1bdb25303712
Parents: e2a4f47
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Apr 4 15:19:25 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Apr 6 22:37:45 2017 +0200

----------------------------------------------------------------------
 docs/dev/table_api.md                           |  90 +++++--
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   7 +-
 .../common/LogicalWindowAggregateRule.scala     | 144 +++++++++++
 .../DataSetLogicalWindowAggregateRule.scala     |  92 +++++++
 .../DataStreamLogicalWindowAggregateRule.scala  | 112 +++++++++
 .../datastream/LogicalWindowAggregateRule.scala | 222 -----------------
 .../scala/batch/sql/AggregationsITCase.scala    |  92 ++++++-
 .../scala/batch/sql/WindowAggregateTest.scala   | 244 +++++++++++++++++++
 .../scala/stream/sql/WindowAggregateTest.scala  |  86 -------
 9 files changed, 762 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 7156393..6f96920 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1418,45 +1418,103 @@ val result2 = tableEnv.sql(
 </div>
 </div>
 
-#### Group windows
+#### Limitations
+
+Joins, set operations, and non-windowed aggregations are not supported yet.
+
+{% top %}
+
+### Group Windows
 
-Streaming SQL supports aggregation on group windows by specifying the windows in the `GROUP BY` clause. The following table describes the syntax of the group windows:
+Group windows are defined in the `GROUP BY` clause of a SQL query. Just like queries with regular `GROUP BY` clauses, queries with a `GROUP BY` clause that includes a group window function compute a single result row per group. The following group windows functions are supported for SQL on batch and streaming tables.
 
 <table class="table table-bordered">
   <thead>
     <tr>
-      <th><code>GROUP BY</code> clause</th>
+      <th class="text-left" style="width: 30%">Group Window Function</th>
       <th class="text-left">Description</th>
     </tr>
   </thead>
 
   <tbody>
     <tr>
-      <td><code>TUMBLE(mode, interval)</code></td>
-      <td>A tumbling window over the time period specified by <code>interval</code>.</td>
+      <td><code>TUMBLE(time_attr, interval)</code></td>
+      <td>Defines are tumbling time window. A tumbling time window assigns rows to non-overlapping, continuous windows with a fixed duration (<code>interval</code>). For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. Tumbling windows can be defined on event-time (stream + batch) or processing-time (stream).</td>
     </tr>
     <tr>
-      <td><code>HOP(mode, slide, size)</code></td>
-      <td>A sliding window with the length of <code>size</code> and moves every <code>slide</code>.</td>
+      <td><code>HOP(time_attr, interval, interval)</code></td>
+      <td>Defines a hopping time window (called sliding window in the Table API). A hopping time window has a fixed duration (second <code>interval</code> parameter) and hops by a specified hop interval (first <code>interval</code> parameter). If the hop interval is smaller than the window size, hopping windows are overlapping. Thus, rows can be assigned to multiple windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size, which are evaluated in an interval of 5 minutes. Hopping windows can be defined on event-time (stream + batch) or processing-time (stream).</td>
     </tr>
     <tr>
-      <td><code>SESSION(mode, gap)</code></td>
-      <td>A session window that has <code>gap</code> as the gap between two windows.</td>
+      <td><code>SESSION(time_attr, interval)</code></td>
+      <td>Defines a session time window. Session time windows do not have a fixed duration but their bounds are defined by a time <code>interval</code> of inactivity, i.e., a session window is closed if no event appears for a defined gap period. For example a session window with a 30 minute gap starts when a row is observed after 30 minutes inactivity (otherwise the row would be added to an existing window) and is closed if no row is added within 30 minutes. Session windows can work on event-time (stream + batch) or processing-time (stream).</td>
     </tr>
   </tbody>
 </table>
 
-The parameters `interval`, `slide`, `size`, `gap` must be constant time intervals. The `mode` can be either `proctime()` or `rowtime()`, which specifies the window is over the processing time or the event time.
+For SQL queries on streaming tables, the `time_attr` argument of the group window function must be one of the `rowtime()` or `proctime()` time-indicators, which distinguish between event or processing time, respectively. For SQL on batch tables, the `time_attr` argument of the group window function must be an attribute of type `TIMESTAMP`. 
 
-As an example, the following SQL computes the total number of records over a 15 minute tumbling window over processing time:
+The following examples show how to specify SQL queries with group windows on streaming tables. 
 
-```
-SELECT COUNT(*) FROM $table GROUP BY TUMBLE(proctime(), INTERVAL '15' MINUTE)
-```
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
 
-#### Limitations
+// ingest a DataStream from an external source
+DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
+// register the DataStream as table "Orders"
+tableEnv.registerDataStream("Orders", ds, "user, product, amount");
+
+// compute SUM(amount) per day (in event-time)
+Table result1 = tableEnv.sql(
+  "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user");
+
+// compute SUM(amount) per day (in processing-time)
+Table result2 = tableEnv.sql(
+  "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime(), INTERVAL '1' DAY), user");
+
+// compute every hour the SUM(amount) of the last 24 hours in event-time
+Table result3 = tableEnv.sql(
+  "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime(), INTERVAL '1' HOUR, INTERVAL '1' DAY), product");
 
-The current version of streaming SQL only supports `SELECT`, `FROM`, `WHERE`, and `UNION` clauses. Aggregations or joins are not fully supported yet.
+// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
+Table result4 = tableEnv.sql(
+  "SELECT user, SUM(amount) FROM Orders GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user");
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+// read a DataStream from an external source
+val ds: DataStream[(Long, String, Int)] = env.addSource(...)
+// register the DataStream under the name "Orders"
+tableEnv.registerDataStream("Orders", ds, 'user, 'product, 'amount)
+
+// compute SUM(amount) per day (in event-time)
+val result1 = tableEnv.sql(
+  "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime(), INTERVAL '1' DAY), user")
+
+// compute SUM(amount) per day (in processing-time)
+val result2 = tableEnv.sql(
+  "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime(), INTERVAL '1' DAY), user")
+
+// compute every hour the SUM(amount) of the last 24 hours in event-time
+val result3 = tableEnv.sql(
+  "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime(), INTERVAL '1' HOUR, INTERVAL '1' DAY), product")
+
+// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
+val result4 = tableEnv.sql(
+  "SELECT user, SUM(amount) FROM Orders GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user")
+
+{% endhighlight %}
+</div>
+</div>
 
 {% top %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 5caaf1f..222021a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -35,7 +35,10 @@ object FlinkRuleSets {
     ReduceExpressionsRule.PROJECT_INSTANCE,
     ReduceExpressionsRule.CALC_INSTANCE,
     ReduceExpressionsRule.JOIN_INSTANCE,
-    ProjectToWindowRule.PROJECT
+    ProjectToWindowRule.PROJECT,
+
+    // Transform window to LogicalWindowAggregate
+    DataSetLogicalWindowAggregateRule.INSTANCE
   )
 
   /**
@@ -132,7 +135,7 @@ object FlinkRuleSets {
     */
   val DATASTREAM_NORM_RULES: RuleSet = RuleSets.ofList(
     // Transform window to LogicalWindowAggregate
-    LogicalWindowAggregateRule.INSTANCE,
+    DataStreamLogicalWindowAggregateRule.INSTANCE,
 
     // simplify expressions rules
     ReduceExpressionsRule.FILTER_INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
new file mode 100644
index 0000000..34433f9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.common
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.api._
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+
+import _root_.scala.collection.JavaConversions._
+
+abstract class LogicalWindowAggregateRule(ruleName: String)
+  extends RelOptRule(
+    RelOptRule.operand(classOf[LogicalAggregate],
+      RelOptRule.operand(classOf[LogicalProject], RelOptRule.none())),
+    ruleName) {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val agg = call.rel(0).asInstanceOf[LogicalAggregate]
+
+    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+    val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
+
+    val windowExpressions = getWindowExpressions(agg)
+    if (windowExpressions.length > 1) {
+      throw new TableException("Only a single window group function may be used in GROUP BY")
+    }
+
+    !distinctAggs && !groupSets && !agg.indicator && windowExpressions.nonEmpty
+  }
+
+  /**
+    * Transform LogicalAggregate with windowing expression to LogicalProject
+    * + LogicalWindowAggregate + LogicalProject.
+    *
+    * The transformation adds an additional LogicalProject at the top to ensure
+    * that the types are equivalent.
+    */
+  override def onMatch(call: RelOptRuleCall): Unit = {
+    val agg = call.rel[LogicalAggregate](0)
+    val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
+
+    val (windowExpr, windowExprIdx) = getWindowExpressions(agg).head
+    val window = translateWindowExpression(windowExpr, project.getInput.getRowType)
+
+    val builder = call.builder()
+    val rexBuilder = builder.getRexBuilder
+
+    val inAggGroupExpression = getInAggregateGroupExpression(rexBuilder, windowExpr)
+    val newGroupSet = agg.getGroupSet.except(ImmutableBitSet.of(windowExprIdx))
+    val newAgg = builder
+      .push(project.getInput)
+      .project(project.getChildExps.updated(windowExprIdx, inAggGroupExpression))
+      .aggregate(builder.groupKey(
+        newGroupSet,
+        agg.indicator, ImmutableList.of(newGroupSet)), agg.getAggCallList)
+      .build().asInstanceOf[LogicalAggregate]
+
+    // Create an additional project to conform with types
+    val outAggGroupExpression = getOutAggregateGroupExpression(rexBuilder, windowExpr)
+    val transformed = call.builder()
+    transformed.push(LogicalWindowAggregate.create(
+      window.toLogicalWindow,
+      Seq[NamedWindowProperty](),
+      newAgg))
+      .project(transformed.fields().patch(windowExprIdx, Seq(outAggGroupExpression), 0))
+
+    call.transformTo(transformed.build())
+  }
+
+  private[table] def getWindowExpressions(agg: LogicalAggregate): Seq[(RexCall, Int)] = {
+
+    val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
+    val groupKeys = agg.getGroupSet
+
+    // get grouping expressions
+    val groupExpr = project.getProjects.zipWithIndex.filter(p => groupKeys.get(p._2))
+
+    // filter grouping expressions for window expressions
+    groupExpr.filter { g =>
+      g._1 match {
+        case call: RexCall =>
+          call.getOperator match {
+            case SqlStdOperatorTable.TUMBLE =>
+              if (call.getOperands.size() == 2) {
+                true
+              } else {
+                throw TableException("TUMBLE window with alignment is not supported yet.")
+              }
+            case SqlStdOperatorTable.HOP =>
+              if (call.getOperands.size() == 3) {
+                true
+              } else {
+                throw TableException("HOP window with alignment is not supported yet.")
+              }
+            case SqlStdOperatorTable.SESSION =>
+              if (call.getOperands.size() == 2) {
+                true
+              } else {
+                throw TableException("SESSION window with alignment is not supported yet.")
+              }
+            case _ => false
+          }
+        case _ => false
+      }
+    }.map(w => (w._1.asInstanceOf[RexCall], w._2))
+  }
+
+  /** Returns the expression that replaces the window expression before the aggregation. */
+  private[table] def getInAggregateGroupExpression(
+      rexBuilder: RexBuilder,
+      windowExpression: RexCall): RexNode
+
+  /** Returns the expression that replaces the window expression after the aggregation. */
+  private[table] def getOutAggregateGroupExpression(
+      rexBuilder: RexBuilder,
+      windowExpression: RexCall): RexNode
+
+  /** translate the group window expression in to a Flink Table window. */
+  private[table] def translateWindowExpression(windowExpr: RexCall, rowType: RelDataType): Window
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala
new file mode 100644
index 0000000..883f5ae
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetLogicalWindowAggregateRule.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.dataSet
+
+import java.math.BigDecimal
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
+import org.apache.flink.table.api.{TableException, Window}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference}
+import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+
+class DataSetLogicalWindowAggregateRule
+  extends LogicalWindowAggregateRule("DataSetLogicalWindowAggregateRule") {
+
+  /** Returns the operand of the group window function. */
+  override private[table] def getInAggregateGroupExpression(
+      rexBuilder: RexBuilder,
+      windowExpression: RexCall): RexNode = windowExpression.getOperands.get(0)
+
+  /** Returns a zero literal of the correct type. */
+  override private[table] def getOutAggregateGroupExpression(
+      rexBuilder: RexBuilder,
+      windowExpression: RexCall): RexNode = {
+
+    val literalType = windowExpression.getOperands.get(0).getType
+    rexBuilder.makeZeroLiteral(literalType)
+  }
+
+  override private[table] def translateWindowExpression(
+      windowExpr: RexCall,
+      rowType: RelDataType): Window = {
+
+    def getOperandAsLong(call: RexCall, idx: Int): Long =
+      call.getOperands.get(idx) match {
+        case v: RexLiteral => v.getValue.asInstanceOf[BigDecimal].longValue()
+        case _ => throw new TableException("Only constant window descriptors are supported")
+      }
+
+    def getFieldReference(operand: RexNode): Expression = {
+      operand match {
+        case ref: RexInputRef =>
+          // resolve field name of window attribute
+          val fieldName = rowType.getFieldList.get(ref.getIndex).getName
+          val fieldType = rowType.getFieldList.get(ref.getIndex).getType
+          ResolvedFieldReference(fieldName, FlinkTypeFactory.toTypeInfo(fieldType))
+      }
+    }
+
+    windowExpr.getOperator match {
+      case SqlStdOperatorTable.TUMBLE =>
+        val interval = getOperandAsLong(windowExpr, 1)
+        val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS))
+        w.on(getFieldReference(windowExpr.getOperands.get(0))).as("w$")
+
+      case SqlStdOperatorTable.HOP =>
+        val (slide, size) = (getOperandAsLong(windowExpr, 1), getOperandAsLong(windowExpr, 2))
+        val w = Slide
+          .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
+          .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
+        w.on(getFieldReference(windowExpr.getOperands.get(0))).as("w$")
+
+      case SqlStdOperatorTable.SESSION =>
+        val gap = getOperandAsLong(windowExpr, 1)
+        val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS))
+        w.on(getFieldReference(windowExpr.getOperands.get(0))).as("w$")
+    }
+  }
+}
+
+object DataSetLogicalWindowAggregateRule {
+  val INSTANCE = new DataSetLogicalWindowAggregateRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
new file mode 100644
index 0000000..175a202
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.datastream
+
+import java.math.BigDecimal
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexBuilder, RexCall, RexLiteral, RexNode}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.flink.table.api.{TableException, Window}
+import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
+import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.functions.TimeModeTypes
+import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+
+class DataStreamLogicalWindowAggregateRule
+  extends LogicalWindowAggregateRule("DataStreamLogicalWindowAggregateRule") {
+
+  /** Returns a zero literal of the correct time type */
+  override private[table] def getInAggregateGroupExpression(
+      rexBuilder: RexBuilder,
+      windowExpression: RexCall): RexNode = createZeroLiteral(rexBuilder, windowExpression)
+
+  /** Returns a zero literal of the correct time type */
+  override private[table] def getOutAggregateGroupExpression(
+      rexBuilder: RexBuilder,
+      windowExpression: RexCall): RexNode = createZeroLiteral(rexBuilder, windowExpression)
+
+  private def createZeroLiteral(
+      rexBuilder: RexBuilder,
+      windowExpression: RexCall): RexNode = {
+
+    val timeType = windowExpression.operands.get(0).getType
+    timeType match {
+      case TimeModeTypes.ROWTIME =>
+        rexBuilder.makeAbstractCast(
+          TimeModeTypes.ROWTIME,
+          rexBuilder.makeLiteral(0L, TimeModeTypes.ROWTIME, true))
+      case TimeModeTypes.PROCTIME =>
+        rexBuilder.makeAbstractCast(
+          TimeModeTypes.PROCTIME,
+          rexBuilder.makeLiteral(0L, TimeModeTypes.PROCTIME, true))
+      case _ =>
+        throw TableException(s"""Unexpected time type $timeType encountered""")
+    }
+  }
+
+  override private[table] def translateWindowExpression(
+      windowExpr: RexCall,
+      rowType: RelDataType): Window = {
+
+    def getOperandAsLong(call: RexCall, idx: Int): Long =
+      call.getOperands.get(idx) match {
+        case v : RexLiteral => v.getValue.asInstanceOf[BigDecimal].longValue()
+        case _ => throw new TableException("Only constant window descriptors are supported")
+      }
+
+    windowExpr.getOperator match {
+      case SqlStdOperatorTable.TUMBLE =>
+        val interval = getOperandAsLong(windowExpr, 1)
+        val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS))
+
+        val window = windowExpr.getType match {
+          case TimeModeTypes.PROCTIME => w
+          case TimeModeTypes.ROWTIME => w.on("rowtime")
+        }
+        window.as("w$")
+
+      case SqlStdOperatorTable.HOP =>
+        val (slide, size) = (getOperandAsLong(windowExpr, 1), getOperandAsLong(windowExpr, 2))
+        val w = Slide
+          .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
+          .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
+
+        val window = windowExpr.getType match {
+          case TimeModeTypes.PROCTIME => w
+          case TimeModeTypes.ROWTIME => w.on("rowtime")
+        }
+        window.as("w$")
+      case SqlStdOperatorTable.SESSION =>
+        val gap = getOperandAsLong(windowExpr, 1)
+        val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS))
+
+        val window = windowExpr.getType match {
+          case TimeModeTypes.PROCTIME => w
+          case TimeModeTypes.ROWTIME => w.on("rowtime")
+        }
+        window.as("w$")
+    }
+  }
+}
+
+object DataStreamLogicalWindowAggregateRule {
+  val INSTANCE = new DataStreamLogicalWindowAggregateRule
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
deleted file mode 100644
index 7572e46..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.plan.rules.datastream
-
-import java.math.BigDecimal
-
-import com.google.common.collect.ImmutableList
-import org.apache.calcite.avatica.util.TimeUnitRange
-import org.apache.calcite.plan._
-import org.apache.calcite.plan.hep.HepRelVertex
-import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
-import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
-import org.apache.calcite.sql.fun.{SqlFloorFunction, SqlStdOperatorTable}
-import org.apache.calcite.util.ImmutableBitSet
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
-import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.expressions._
-import org.apache.flink.table.functions.TimeModeTypes
-import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
-import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
-
-import _root_.scala.collection.JavaConversions._
-
-class LogicalWindowAggregateRule
-  extends RelOptRule(
-    LogicalWindowAggregateRule.LOGICAL_WINDOW_PREDICATE,
-    "LogicalWindowAggregateRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val agg = call.rel(0).asInstanceOf[LogicalAggregate]
-
-    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
-    val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
-
-    val windowClause = recognizeWindow(agg)
-    !distinctAggs && !groupSets && !agg.indicator && windowClause.isDefined
-  }
-
-  /**
-    * Transform LogicalAggregate with windowing expression to LogicalProject
-    * + LogicalWindowAggregate + LogicalProject.
-    *
-    * The transformation adds an additional LogicalProject at the top to ensure
-    * that the types are equivalent.
-    */
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val agg = call.rel[LogicalAggregate](0)
-    val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
-    val (windowExprIdx, window) = recognizeWindow(agg).get
-    val newGroupSet = agg.getGroupSet.except(ImmutableBitSet.of(windowExprIdx))
-
-    val builder = call.builder()
-    val rexBuilder = builder.getRexBuilder
-
-    // build dummy literal with type depending on time semantics
-    val zero = window match {
-      case _: EventTimeWindow =>
-        rexBuilder.makeAbstractCast(
-          TimeModeTypes.ROWTIME,
-          rexBuilder.makeLiteral(0L, TimeModeTypes.ROWTIME, true))
-      case _ =>
-        rexBuilder.makeAbstractCast(
-          TimeModeTypes.PROCTIME,
-          rexBuilder.makeLiteral(0L, TimeModeTypes.PROCTIME, true))
-    }
-
-    val newAgg = builder
-      .push(project.getInput)
-      .project(project.getChildExps.updated(windowExprIdx, zero))
-      .aggregate(builder.groupKey(
-        newGroupSet,
-        agg.indicator, ImmutableList.of(newGroupSet)), agg.getAggCallList)
-      .build().asInstanceOf[LogicalAggregate]
-
-    // Create an additional project to conform with types
-    val transformed = call.builder()
-    transformed.push(LogicalWindowAggregate.create(
-      window.toLogicalWindow,
-      Seq[NamedWindowProperty](),
-      newAgg))
-      .project(transformed.fields().patch(windowExprIdx, Seq(zero), 0))
-    call.transformTo(transformed.build())
-  }
-
-  private def recognizeWindow(agg: LogicalAggregate) : Option[(Int, Window)] = {
-    val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
-    val groupKeys = agg.getGroupSet
-
-    // filter expressions on which is grouped
-    val groupExpr = project.getProjects.zipWithIndex.filter(p => groupKeys.get(p._2))
-
-    // check for window expressions in group expressions
-    val windowExpr = groupExpr
-      .map(g => (g._2, identifyWindow(g._1)) )
-      .filter(_._2.isDefined)
-      .map(g => (g._1, g._2.get.as("w$")) )
-
-    windowExpr.size match {
-      case 0 => None
-      case 1 => Some(windowExpr.head)
-      case _ => throw new TableException("Multiple windows are not supported")
-    }
-  }
-
-  private def identifyWindow(field: RexNode): Option[Window] = {
-    field match {
-      case call: RexCall =>
-        call.getOperator match {
-          case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow
-          case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow
-          case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow
-          case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow
-          case _ => None
-        }
-      case _ => None
-    }
-  }
-}
-
-private abstract class WindowTranslator {
-  val call: RexCall
-
-  protected def unwrapLiteral[T](node: RexNode): T =
-    node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
-
-  protected def getOperandAsLong(idx: Int): Long =
-    call.getOperands.get(idx) match {
-      case v : RexLiteral => v.getValue.asInstanceOf[BigDecimal].longValue()
-      case _ => throw new TableException("Only constant window descriptors are supported")
-    }
-
-  def toWindow: Option[Window]
-}
-
-private case class FloorWindowTranslator(call: RexCall) extends WindowTranslator {
-  override def toWindow: Option[Window] = {
-    val range = unwrapLiteral[TimeUnitRange](call.getOperands.get(1))
-    val w = Tumble.over(Literal(range.startUnit.multiplier.longValue(),
-      TimeIntervalTypeInfo.INTERVAL_MILLIS))
-    call.getType match {
-      case TimeModeTypes.PROCTIME => Some(w)
-      case TimeModeTypes.ROWTIME => Some(w.on("rowtime"))
-      case _ => None
-    }
-  }
-}
-
-private case class TumbleWindowTranslator(call: RexCall) extends WindowTranslator {
-  override def toWindow: Option[Window] = {
-
-    if (call.getOperands.size() != 2) {
-      throw new TableException("TUMBLE with alignment is not supported yet.")
-    }
-
-    val interval = getOperandAsLong(1)
-    val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS))
-    call.getType match {
-      case TimeModeTypes.PROCTIME => Some(w)
-      case TimeModeTypes.ROWTIME => Some(w.on("rowtime"))
-      case _ => None
-    }
-  }
-}
-
-private case class SlidingWindowTranslator(call: RexCall) extends WindowTranslator {
-  override def toWindow: Option[Window] = {
-
-    if (call.getOperands.size() != 3) {
-      throw new TableException("HOP with alignment is not supported yet.")
-    }
-
-    val (slide, size) = (getOperandAsLong(1), getOperandAsLong(2))
-    val w = Slide
-      .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
-      .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
-    call.getType match {
-      case TimeModeTypes.PROCTIME => Some(w)
-      case TimeModeTypes.ROWTIME => Some(w.on("rowtime"))
-      case _ => None
-    }
-  }
-}
-
-private case class SessionWindowTranslator(call: RexCall) extends WindowTranslator {
-  override def toWindow: Option[Window] = {
-
-    if (call.getOperands.size() != 2) {
-      throw new TableException("SESSION with alignment is not supported yet")
-    }
-
-    val gap = getOperandAsLong(1)
-    val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS))
-    call.getType match {
-      case TimeModeTypes.PROCTIME => Some(w)
-      case TimeModeTypes.ROWTIME => Some(w.on("rowtime"))
-      case _ => None
-    }
-  }
-}
-
-object LogicalWindowAggregateRule {
-  private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate],
-    RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
-
-  private[flink] val INSTANCE = new LogicalWindowAggregateRule
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
index cceb272..600c15b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsITCase.scala
@@ -18,13 +18,15 @@
 
 package org.apache.flink.table.api.scala.batch.sql
 
+import java.sql.Timestamp
+
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.types.Row
-import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
@@ -292,4 +294,92 @@ class AggregationsITCase(
     TestBaseUtils.compareResultAsText(results2.asJava, expected2)
     TestBaseUtils.compareResultAsText(results3.asJava, expected3)
   }
+
+  @Test
+  def testTumbleWindowAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery =
+      "SELECT b, SUM(a), COUNT(*)" +
+        "FROM T " +
+        "GROUP BY b, TUMBLE(ts, INTERVAL '3' SECOND)"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+      // create timestamps
+      .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
+    tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
+
+    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val expected = Seq(
+      "1,1,1",
+      "2,2,1", "2,3,1",
+      "3,9,2", "3,6,1",
+      "4,15,2", "4,19,2",
+      "5,11,1", "5,39,3", "5,15,1",
+      "6,33,2", "6,57,3", "6,21,1"
+    ).mkString("\n")
+
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
+
+  @Test
+  def testHopWindowAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    env.setParallelism(1)
+
+    val sqlQuery =
+      "SELECT b, SUM(a), COUNT(*)" +
+        "FROM T " +
+        "GROUP BY b, HOP(ts, INTERVAL '2' SECOND, INTERVAL '4' SECOND)"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+      // create timestamps
+      .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
+    tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
+
+    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val expected = Seq(
+      "1,1,1","1,1,1",
+      "2,5,2","2,5,2",
+      "3,9,2", "3,15,3", "3,6,1",
+      "4,7,1", "4,24,3", "4,27,3", "4,10,1",
+      "5,11,1", "5,36,3", "5,54,4", "5,29,2",
+      "6,33,2", "6,70,4", "6,78,4", "6,41,2"
+    ).mkString("\n")
+
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
+
+  @Test
+  def testSessionWindowAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    env.setParallelism(1)
+
+    val sqlQuery =
+      "SELECT MIN(a), MAX(a), SUM(a), COUNT(*)" +
+        "FROM T " +
+        "GROUP BY SESSION(ts, INTERVAL '4' SECOND)"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+      // create timestamps
+      .filter(x => (x._2 % 2) == 0)
+      .map(x => (x._1, x._2, x._3, new Timestamp(x._1 * 1000)))
+    tEnv.registerDataSet("T", ds, 'a, 'b, 'c, 'ts)
+
+    val result = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+    val expected = Seq(
+      "2,10,39,6",
+      "16,21,111,6"
+    ).mkString("\n")
+
+    TestBaseUtils.compareResultAsText(result.asJava, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
new file mode 100644
index 0000000..e84ede6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.sql
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class WindowAggregateTest extends TableTestBase {
+
+  @Test
+  def testNonPartitionedTumbleWindow(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB FROM T GROUP BY TUMBLE(ts, INTERVAL '2' HOUR)"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetWindowAggregate",
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(0),
+            term("select", "ts, a, b")
+          ),
+          term("window", EventTimeTumblingGroupWindow(Some('w$), 'ts, 7200000.millis)),
+          term("select", "SUM(a) AS sumA, COUNT(b) AS cntB")
+        ),
+        term("select", "sumA, cntB")
+      )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testPartitionedTumbleWindow(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT c, SUM(a) AS sumA, MIN(b) AS minB FROM T GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetWindowAggregate",
+          batchTableNode(0),
+          term("groupBy", "c"),
+          term("window", EventTimeTumblingGroupWindow(Some('w$), 'ts, 240000.millis)),
+          term("select", "c, SUM(a) AS sumA, MIN(b) AS minB")
+        ),
+        term("select", "c, sumA, minB")
+      )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testNonPartitionedHopWindow(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+        "FROM T " +
+        "GROUP BY HOP(ts, INTERVAL '15' MINUTE, INTERVAL '90' MINUTE)"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetWindowAggregate",
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(0),
+            term("select", "ts, a, b")
+          ),
+          term("window",
+            EventTimeSlidingGroupWindow(Some('w$), 'ts, 5400000.millis, 900000.millis)),
+          term("select", "SUM(a) AS sumA, COUNT(b) AS cntB")
+        ),
+        term("select", "sumA, cntB")
+      )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testPartitionedHopWindow(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Long, Timestamp)]("T", 'a, 'b, 'c, 'd, 'ts)
+
+    val sqlQuery =
+      "SELECT c, SUM(a) AS sumA, AVG(b) AS avgB " +
+        "FROM T " +
+        "GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '3' HOUR), d, c"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetWindowAggregate",
+          batchTableNode(0),
+          term("groupBy", "c, d"),
+          term("window",
+            EventTimeSlidingGroupWindow(Some('w$), 'ts, 10800000.millis, 3600000.millis)),
+          term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB")
+        ),
+        term("select", "c, sumA, avgB")
+      )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testNonPartitionedSessionWindow(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT COUNT(*) AS cnt FROM T GROUP BY SESSION(ts, INTERVAL '30' MINUTE)"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetWindowAggregate",
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(0),
+            term("select", "ts")
+          ),
+          term("window", EventTimeSessionGroupWindow(Some('w$), 'ts, 1800000.millis)),
+          term("select", "COUNT(*) AS cnt")
+        ),
+        term("select", "cnt")
+      )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testPartitionedSessionWindow(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Int, Timestamp)]("T", 'a, 'b, 'c, 'd, 'ts)
+
+    val sqlQuery =
+      "SELECT c, d, SUM(a) AS sumA, MIN(b) AS minB " +
+        "FROM T " +
+        "GROUP BY SESSION(ts, INTERVAL '12' HOUR), c, d"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetWindowAggregate",
+          batchTableNode(0),
+          term("groupBy", "c, d"),
+          term("window", EventTimeSessionGroupWindow(Some('w$), 'ts, 43200000.millis)),
+          term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB")
+        ),
+        term("select", "c, d, sumA, minB")
+      )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testTumbleWindowNoOffset(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+        "FROM T " +
+        "GROUP BY TUMBLE(ts, INTERVAL '2' HOUR, TIME '10:00:00')"
+
+    util.verifySql(sqlQuery, "n/a")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testHopWindowNoOffset(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+        "FROM T " +
+        "GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME '10:00:00')"
+
+    util.verifySql(sqlQuery, "n/a")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testSessionWindowNoOffset(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sqlQuery =
+      "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+        "FROM T " +
+        "GROUP BY SESSION(ts, INTERVAL '2' HOUR, TIME '10:00:00')"
+
+    util.verifySql(sqlQuery, "n/a")
+  }
+
+  @Test(expected = classOf[TableException])
+  def testVariableWindowSize() = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sql = "SELECT COUNT(*) " +
+      "FROM T " +
+      "GROUP BY TUMBLE(proctime(), b * INTERVAL '1' MINUTE)"
+    util.verifySql(sql, "n/a")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/63539475/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 1c1752f..f4befa6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -85,92 +85,6 @@ class WindowAggregateTest extends TableTestBase {
   }
 
   @Test
-  def testNonPartitionedTumbleWindow() = {
-    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(rowtime() TO HOUR)"
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "1970-01-01 00:00:00 AS $f0")
-          ),
-          term("window", EventTimeTumblingGroupWindow(Some('w$), 'rowtime, 3600000.millis)),
-          term("select", "COUNT(*) AS EXPR$0")
-        ),
-        term("select", "EXPR$0")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testPartitionedTumbleWindow1() = {
-    val sql = "SELECT a, COUNT(*) FROM MyTable GROUP BY a, FLOOR(rowtime() TO MINUTE)"
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "1970-01-01 00:00:00 AS $f1")
-          ),
-          term("groupBy", "a"),
-          term("window", EventTimeTumblingGroupWindow(Some('w$), 'rowtime, 60000.millis)),
-          term("select", "a", "COUNT(*) AS EXPR$1")
-        ),
-        term("select", "a", "EXPR$1")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testPartitionedTumbleWindow2() = {
-    val sql = "SELECT a, SUM(c), b FROM MyTable GROUP BY a, FLOOR(rowtime() TO SECOND), b"
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "a", "1970-01-01 00:00:00 AS $f1, b, c")
-          ),
-          term("groupBy", "a, b"),
-          term("window", EventTimeTumblingGroupWindow(Some('w$), 'rowtime, 1000.millis)),
-          term("select", "a", "b", "SUM(c) AS EXPR$1")
-        ),
-        term("select", "a", "EXPR$1", "b")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
-  def testProcessingTime() = {
-    val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(proctime() TO HOUR)"
-    val expected =
-      unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "1970-01-01 00:00:00 AS $f0")
-          ),
-          term("window", ProcessingTimeTumblingGroupWindow(Some('w$), 3600000.millis)),
-          term("select", "COUNT(*) AS EXPR$0")
-        ),
-        term("select", "EXPR$0")
-      )
-    streamUtil.verifySql(sql, expected)
-  }
-
-  @Test
   def testTumbleFunction() = {
     val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(rowtime(), INTERVAL '15' MINUTE)"
     val expected =


[04/11] flink git commit: [FLINK-6269] Make UserCodeClassLoader final

Posted by fh...@apache.org.
[FLINK-6269] Make UserCodeClassLoader final

This closes #3682.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cb60043
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cb60043
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cb60043

Branch: refs/heads/table-retraction
Commit: 2cb60043d8fce4a291fbefeddce6598b442b8f01
Parents: b3ac693
Author: zcb <zh...@huawei.com>
Authored: Thu Apr 6 15:31:46 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 6 19:35:50 2017 +0200

----------------------------------------------------------------------
 .../scala/org/apache/flink/runtime/jobmanager/JobManager.scala     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2cb60043/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1e6d8d3..f2ecde5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1250,7 +1250,7 @@ class JobManager(
               "Cannot set up the user code libraries: " + t.getMessage, t)
         }
 
-        var userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
+        val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
         if (userCodeLoader == null) {
           throw new JobSubmissionException(jobId,
             "The user code class loader could not be initialized.")


[08/11] flink git commit: [FLINK-6256] Fix outputTag variable name in Side Output docs

Posted by fh...@apache.org.
[FLINK-6256] Fix outputTag variable name in Side Output docs

This closes #3684.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6355edd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6355edd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6355edd

Branch: refs/heads/table-retraction
Commit: a6355edd904c2efe2ee37118132b0add5a2e5588
Parents: 153d73a
Author: zentol <ch...@apache.org>
Authored: Thu Apr 6 11:45:56 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 6 19:35:50 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/side_output.md | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a6355edd/docs/dev/stream/side_output.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/side_output.md b/docs/dev/stream/side_output.md
index 3633b75..e4c4c19 100644
--- a/docs/dev/stream/side_output.md
+++ b/docs/dev/stream/side_output.md
@@ -41,13 +41,13 @@ side output stream:
 
 {% highlight java %}
 // this needs to be an anonymous inner class, so that we can analyze the type
-OutputTag<String> outputTag = new OutputTag<String>("string-side-output") {};
+OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val outputTag = OutputTag[String]("string-side-output")
+val outputTag = OutputTag[String]("side-output")
 {% endhighlight %}
 </div>
 </div>
@@ -79,7 +79,7 @@ SingleOutputStreamOperator<Integer> mainDataStream = input
         out.collect(value);
 
         // emit data to side output
-        ctx.output(sideOutputTag, "sideout-" + String.valueOf(value));
+        ctx.output(outputTag, "sideout-" + String.valueOf(value));
       }
     });
 {% endhighlight %}
@@ -90,7 +90,7 @@ SingleOutputStreamOperator<Integer> mainDataStream = input
 {% highlight scala %}
 
 val input: DataStream[Int] = ...
-val outputTag = OutputTag[String]("string-side-output")
+val outputTag = OutputTag[String]("side-output")
 
 val mainDataStream = input
   .process(new ProcessFunction[Int, Int] {
@@ -128,7 +128,7 @@ DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-val outputTag = OutputTag[String]("string-side-output")
+val outputTag = OutputTag[String]("side-output")
 
 val mainDataStream = ...
 


[05/11] flink git commit: [FLINK-5376] Fix log statement in UnorderedStreamElementQueue

Posted by fh...@apache.org.
[FLINK-5376] Fix log statement in UnorderedStreamElementQueue


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/153d73a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/153d73a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/153d73a6

Branch: refs/heads/table-retraction
Commit: 153d73a6f4ac51dcc02ccdb734d548855f1a1e45
Parents: ae17718
Author: zentol <ch...@apache.org>
Authored: Thu Apr 6 13:05:11 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 6 19:35:50 2017 +0200

----------------------------------------------------------------------
 .../api/operators/async/queue/UnorderedStreamElementQueue.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/153d73a6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
index 396dbe8..f2c78f8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
@@ -123,7 +123,7 @@ public class UnorderedStreamElementQueue implements StreamElementQueue {
 			if (numberEntries < capacity) {
 				addEntry(streamElementQueueEntry);
 
-				LOG.debug("Put element into ordered stream element queue. New filling degree " +
+				LOG.debug("Put element into unordered stream element queue. New filling degree " +
 					"({}/{}).", numberEntries, capacity);
 
 				return true;