You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2016/11/24 21:23:15 UTC

[2/4] flink git commit: [FLINK-3702] Make FieldAccessors support nested field expressions.

[FLINK-3702] Make FieldAccessors support nested field expressions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f04542e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f04542e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f04542e

Branch: refs/heads/master
Commit: 1f04542e861f9c156d7b5c1f6db72a74e08d7a75
Parents: 5d2da12
Author: Gabor Gevay <gg...@gmail.com>
Authored: Sun May 22 19:48:50 2016 +0200
Committer: Marton Balassi <mb...@apache.org>
Committed: Thu Nov 24 22:22:42 2016 +0100

----------------------------------------------------------------------
 docs/dev/api_concepts.md                        |   4 +-
 .../api/common/typeinfo/BasicArrayTypeInfo.java |  18 +
 .../api/common/typeinfo/BasicTypeInfo.java      |  26 ++
 .../InvalidFieldReferenceException.java         |  31 ++
 .../common/typeinfo/PrimitiveArrayTypeInfo.java |  18 +
 .../api/common/typeinfo/TypeInformation.java    |  34 ++
 .../api/common/typeutils/CompositeType.java     |  10 -
 .../api/common/typeutils/TypeSerializer.java    |   2 +-
 .../flink/api/java/typeutils/FieldAccessor.java | 324 ++++++++++++++++++
 .../flink/api/java/typeutils/PojoField.java     |  22 +-
 .../flink/api/java/typeutils/PojoTypeInfo.java  |  42 ++-
 .../api/java/typeutils/TupleTypeInfoBase.java   |  32 +-
 .../java/typeutils/runtime/FieldSerializer.java |  54 +++
 .../java/typeutils/runtime/PojoComparator.java  |  21 +-
 .../java/typeutils/runtime/PojoSerializer.java  |  21 +-
 .../api/java/typeutils/FieldAccessorTest.java   | 343 +++++++++++++++++++
 .../api/java/functions/SemanticPropUtil.java    |   2 +-
 .../flink/api/java/operator/DataSinkTest.java   |   5 +-
 .../operator/FullOuterJoinOperatorTest.java     |   3 +-
 .../operator/LeftOuterJoinOperatorTest.java     |   3 +-
 .../operator/RightOuterJoinOperatorTest.java    |   3 +-
 .../scala/typeutils/ProductFieldAccessor.java   |  75 ++++
 .../api/scala/typeutils/CaseClassTypeInfo.scala |  38 +-
 .../scala/typeutils/CaseClassTypeInfoTest.scala | 110 ++++++
 .../streaming/api/datastream/KeyedStream.java   | 140 +++++---
 .../aggregation/ComparableAggregator.java       |   6 +-
 .../functions/aggregation/SumAggregator.java    |   6 +-
 .../flink/streaming/util/FieldAccessor.java     | 254 --------------
 .../flink/streaming/util/FieldAccessorTest.java |  75 ----
 .../flink/streaming/api/scala/KeyedStream.scala |  90 ++++-
 .../streaming/runtime/DataStreamPojoITCase.java |  42 ++-
 31 files changed, 1366 insertions(+), 488 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/docs/dev/api_concepts.md
----------------------------------------------------------------------
diff --git a/docs/dev/api_concepts.md b/docs/dev/api_concepts.md
index 49d2ded..07a81e7 100644
--- a/docs/dev/api_concepts.md
+++ b/docs/dev/api_concepts.md
@@ -385,7 +385,7 @@ while a key can be specified on a DataStream using
 {% highlight java %}
 DataStream<...> input = // [...]
 DataStream<...> windowed = input
-  .key(/*define key here*/)
+  .keyBy(/*define key here*/)
   .window(/*window specification*/);
 {% endhighlight %}
 
@@ -418,7 +418,7 @@ val keyed = input.keyBy(0)
 </div>
 </div>
 
-The tuples is grouped on the first field (the one of
+The tuples are grouped on the first field (the one of
 Integer type).
 
 <div class="codetabs" markdown="1">

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
index 25b2850..d04e7d9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicArrayTypeInfo.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
+import org.apache.flink.api.java.typeutils.FieldAccessor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -121,6 +122,23 @@ public final class BasicArrayTypeInfo<T, C> extends TypeInformation<T> {
 	}
 
 	@Override
+	@PublicEvolving
+	public <F> FieldAccessor<T, F> getFieldAccessor(int pos, ExecutionConfig config) {
+		return new FieldAccessor.ArrayFieldAccessor<>(pos, this);
+	}
+
+	@Override
+	@PublicEvolving
+	public <F> FieldAccessor<T, F> getFieldAccessor(String pos, ExecutionConfig config) {
+		try {
+			return new FieldAccessor.ArrayFieldAccessor<>(Integer.parseInt(pos), this);
+		} catch (NumberFormatException ex) {
+			throw new InvalidFieldReferenceException
+				("A field expression on an array must be an integer index (that might be given as a string).");
+		}
+	}
+
+	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof BasicArrayTypeInfo) {
 			BasicArrayTypeInfo<?, ?> other = (BasicArrayTypeInfo<?, ?>) obj;

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index e2fd74e..09efba6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -58,6 +58,7 @@ import org.apache.flink.api.common.typeutils.base.ShortSerializer;
 import org.apache.flink.api.common.typeutils.base.StringComparator;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.api.java.typeutils.FieldAccessor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -171,6 +172,31 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
 		}
 	}
 
+	@Override
+	@PublicEvolving
+	@SuppressWarnings("unchecked")
+	public <F> FieldAccessor<T, F> getFieldAccessor(int pos, ExecutionConfig config) {
+		if(pos != 0) {
+			throw new InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on a " +
+				"basic type (" + this.toString() + "). A field expression on a basic type can only select " +
+				"the 0th field (which means selecting the entire basic type).");
+		}
+		return (FieldAccessor<T, F>) new FieldAccessor.SimpleFieldAccessor<T>(this);
+	}
+
+	@Override
+	@PublicEvolving
+	public <F> FieldAccessor<T, F> getFieldAccessor(String field, ExecutionConfig config) {
+		try {
+			int pos = field.equals("*") ? 0 : Integer.parseInt(field);
+			return getFieldAccessor(pos, config);
+		} catch (NumberFormatException ex) {
+			throw new InvalidFieldReferenceException("You tried to select the field \"" + field +
+				"\" on a " + this.toString() + ". A field expression on a basic type can only be \"*\" or \"0\"" +
+				" (both of which mean selecting the entire basic type).");
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/InvalidFieldReferenceException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/InvalidFieldReferenceException.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/InvalidFieldReferenceException.java
new file mode 100644
index 0000000..3c67c46
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/InvalidFieldReferenceException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+@PublicEvolving
+public class InvalidFieldReferenceException extends IllegalArgumentException {
+
+	private static final long serialVersionUID = 1L;
+
+	public InvalidFieldReferenceException(String s) {
+		super(s);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
index 1c6ce00..2bd96d3 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/PrimitiveArrayTypeInfo.java
@@ -40,6 +40,7 @@ import org.apache.flink.api.common.typeutils.base.array.LongPrimitiveArraySerial
 import org.apache.flink.api.common.typeutils.base.array.PrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArrayComparator;
 import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySerializer;
+import org.apache.flink.api.java.typeutils.FieldAccessor;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -138,6 +139,23 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> implements Ato
 		return this.serializer;
 	}
 
+	@Override
+	@PublicEvolving
+	public <F> FieldAccessor<T, F> getFieldAccessor(int pos, ExecutionConfig config) {
+		return new FieldAccessor.ArrayFieldAccessor<>(pos, this);
+	}
+
+	@Override
+	@PublicEvolving
+	public <F> FieldAccessor<T, F> getFieldAccessor(String pos, ExecutionConfig config) {
+		try {
+			return new FieldAccessor.ArrayFieldAccessor<>(Integer.parseInt(pos), this);
+		} catch (NumberFormatException ex) {
+			throw new InvalidFieldReferenceException
+				("A field expression on an array must be an integer index (that might be given as a string).");
+		}
+	}
+
 	/**
 	 * Gets the class that represents the component type.
 	 * @return The class of the component type.

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
index 154ceb1..7be2b68 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -24,6 +24,7 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.FieldAccessor;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import java.io.Serializable;
@@ -172,6 +173,39 @@ public abstract class TypeInformation<T> implements Serializable {
 	@PublicEvolving
 	public abstract TypeSerializer<T> createSerializer(ExecutionConfig config);
 
+
+	/**
+	 * Creates a {@link FieldAccessor} for the given field position, which can be used to get and set
+	 * the specified field on instances of this type.
+	 *
+	 * @param pos The field position (zero-based)
+	 * @param config Configuration object
+	 * @param <F> The type of the field to access
+	 * @return The created FieldAccessor
+	 */
+	@PublicEvolving
+	public <F> FieldAccessor<T, F> getFieldAccessor(int pos, ExecutionConfig config){
+		throw new InvalidFieldReferenceException("Cannot reference field by position on " + this.toString()
+			+ "Referencing a field by position is supported on tuples, case classes, and arrays. "
+			+ "Additionally, you can select the 0th field of a primitive/basic type (e.g. int).");
+	}
+
+	/**
+	 * Creates a {@link FieldAccessor} for the field that is given by a field expression,
+	 * which can be used to get and set the specified field on instances of this type.
+	 *
+	 * @param field The field expression
+	 * @param config Configuration object
+	 * @param <F> The type of the field to access
+	 * @return The created FieldAccessor
+	 */
+	@PublicEvolving
+	public <F> FieldAccessor<T, F> getFieldAccessor(String field, ExecutionConfig config) {
+		throw new InvalidFieldReferenceException("Cannot reference field by field expression on " + this.toString()
+				+ "Field expressions are only supported on POJO types, tuples, and case classes. "
+				+ "(See the Flink documentation on what is considered a POJO.)");
+	}
+
 	@Override
 	public abstract String toString();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
index 4bf17ea..a4230f4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeType.java
@@ -265,16 +265,6 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
 	@PublicEvolving
 	public abstract int getFieldIndex(String fieldName);
 
-	@PublicEvolving
-	public static class InvalidFieldReferenceException extends IllegalArgumentException {
-
-		private static final long serialVersionUID = 1L;
-
-		public InvalidFieldReferenceException(String s) {
-			super(s);
-		}
-	}
-
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof CompositeType) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 0d56743..5e81db7 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 /**
- * This interface describes the methods that are required for a data type to be handled by the pact
+ * This interface describes the methods that are required for a data type to be handled by the Flink
  * runtime. Specifically, this interface contains the serialization and copying methods.
  * <p>
  * The methods in this class are assumed to be stateless, such that it is effectively thread safe. Stateful

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java
new file mode 100644
index 0000000..97ef31a
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/FieldAccessor.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * These classes encapsulate the logic of accessing a field specified by the user as either an index
+ * or a field expression string. TypeInformation can also be requested for the field.
+ * The position index might specify a field of a Tuple, an array, or a simple type (only "0th field").
+ *
+ * Field expressions that specify nested fields (e.g. "f1.a.foo") result in nested field accessors.
+ * These penetrate one layer, and then delegate the rest of the work to an "innerAccesor".
+ * (see PojoFieldAccessor, RecursiveTupleFieldAccessor, ProductFieldAccessor)
+ */
+@PublicEvolving
+public abstract class FieldAccessor<T, F> implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	protected TypeInformation fieldType;
+
+	/**
+	 * Gets the TypeInformation for the type of the field.
+	 * Note: For an array of a primitive type, it returns the corresponding basic type (Integer for int[]).
+	 */
+	@SuppressWarnings("unchecked")
+	public TypeInformation<F> getFieldType() {
+		return fieldType;
+	}
+
+
+	/**
+	 * Gets the value of the field (specified in the constructor) of the given record.
+	 * @param record The record on which the field will be accessed
+	 * @return The value of the field.
+	 */
+	public abstract F get(T record);
+
+	/**
+	 * Sets the field (specified in the constructor) of the given record to the given value.
+	 *
+	 * Warning: This might modify the original object, or might return a new object instance.
+	 * (This is necessary, because the record might be immutable.)
+	 *
+	 * @param record The record to modify
+	 * @param fieldValue The new value of the field
+	 * @return A record that has the given field value. (this might be a new instance or the original)
+	 */
+	public abstract T set(T record, F fieldValue);
+
+
+	// --------------------------------------------------------------------------------------------------
+
+
+	/**
+	 * This is when the entire record is considered as a single field. (eg. field 0 of a basic type, or a
+	 * field of a POJO that is itself some composite type but is not further decomposed)
+	 */
+	public final static class SimpleFieldAccessor<T> extends FieldAccessor<T, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		public SimpleFieldAccessor(TypeInformation<T> typeInfo) {
+			checkNotNull(typeInfo, "typeInfo must not be null.");
+
+			this.fieldType = typeInfo;
+		}
+
+		@Override
+		public T get(T record) {
+			return record;
+		}
+
+		@Override
+		public T set(T record, T fieldValue) {
+			return fieldValue;
+		}
+	}
+
+	public final static class ArrayFieldAccessor<T, F> extends FieldAccessor<T, F> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final int pos;
+
+		public ArrayFieldAccessor(int pos, TypeInformation typeInfo) {
+			if(pos < 0) {
+				throw new InvalidFieldReferenceException("The " + ((Integer) pos).toString() + ". field selected on" +
+					" an array, which is an invalid index.");
+			}
+			checkNotNull(typeInfo, "typeInfo must not be null.");
+
+			this.pos = pos;
+			this.fieldType = BasicTypeInfo.getInfoFor(typeInfo.getTypeClass().getComponentType());
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public F get(T record) {
+			return (F) Array.get(record, pos);
+		}
+
+		@Override
+		public T set(T record, F fieldValue) {
+			Array.set(record, pos, fieldValue);
+			return record;
+		}
+	}
+
+	/**
+	 * There are two versions of TupleFieldAccessor, differing in whether there is an other
+	 * FieldAccessor nested inside. The no inner accessor version is probably a little faster.
+	 */
+	static final class SimpleTupleFieldAccessor<T, F> extends FieldAccessor<T, F> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final int pos;
+
+		SimpleTupleFieldAccessor(int pos, TypeInformation<T> typeInfo) {
+			int arity = ((TupleTypeInfo)typeInfo).getArity();
+			if(pos < 0 || pos >= arity) {
+				throw new InvalidFieldReferenceException(
+					"Tried to select " + ((Integer) pos).toString() + ". field on \"" +
+					typeInfo.toString() + "\", which is an invalid index.");
+			}
+			checkNotNull(typeInfo, "typeInfo must not be null.");
+
+			this.pos = pos;
+			this.fieldType = ((TupleTypeInfo)typeInfo).getTypeAt(pos);
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public F get(T record) {
+			final Tuple tuple = (Tuple) record;
+			return (F) tuple.getField(pos);
+		}
+
+		@Override
+		public T set(T record, F fieldValue) {
+			final Tuple tuple = (Tuple) record;
+			tuple.setField(fieldValue, pos);
+			return record;
+		}
+	}
+
+	/**
+	 * @param <T> The Tuple type
+	 * @param <R> The field type at the first level
+	 * @param <F> The field type at the innermost level
+	 */
+	static final class RecursiveTupleFieldAccessor<T, R, F> extends FieldAccessor<T, F> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final int pos;
+		private final FieldAccessor<R, F> innerAccessor;
+
+		RecursiveTupleFieldAccessor(int pos, FieldAccessor<R, F> innerAccessor) {
+			if(pos < 0) {
+				throw new InvalidFieldReferenceException("Tried to select " + ((Integer) pos).toString() + ". field.");
+			}
+			checkNotNull(innerAccessor, "innerAccessor must not be null.");
+
+			this.pos = pos;
+			this.innerAccessor = innerAccessor;
+			this.fieldType = innerAccessor.fieldType;
+		}
+
+		@Override
+		public F get(T record) {
+			final Tuple tuple = (Tuple) record;
+			final R inner = tuple.getField(pos);
+			return innerAccessor.get(inner);
+		}
+
+		@Override
+		public T set(T record, F fieldValue) {
+			final Tuple tuple = (Tuple) record;
+			final R inner = tuple.getField(pos);
+			tuple.setField(innerAccessor.set(inner, fieldValue), pos);
+			return record;
+		}
+	}
+
+	/**
+	 * @param <T> The POJO type
+	 * @param <R> The field type at the first level
+	 * @param <F> The field type at the innermost level
+	 */
+	static final class PojoFieldAccessor<T, R, F> extends FieldAccessor<T, F> {
+
+		private static final long serialVersionUID = 1L;
+
+		private transient Field field;
+		private final FieldAccessor<R, F> innerAccessor;
+
+		PojoFieldAccessor(Field field, FieldAccessor<R, F> innerAccessor) {
+			checkNotNull(field, "field must not be null.");
+			checkNotNull(innerAccessor, "innerAccessor must not be null.");
+
+			this.field = field;
+			this.innerAccessor = innerAccessor;
+			this.fieldType = innerAccessor.fieldType;
+		}
+
+		@Override
+		public F get(T pojo) {
+			try {
+				@SuppressWarnings("unchecked")
+				final R inner = (R)field.get(pojo);
+				return innerAccessor.get(inner);
+			} catch (IllegalAccessException iaex) {
+				throw new RuntimeException("This should not happen since we call setAccesssible(true) in readObject."
+						+ " fields: " + field + " obj: " + pojo);
+			}
+		}
+
+		@Override
+		public T set(T pojo, F valueToSet) {
+			try {
+				@SuppressWarnings("unchecked")
+				final R inner = (R)field.get(pojo);
+				field.set(pojo, innerAccessor.set(inner, valueToSet));
+				return pojo;
+			} catch (IllegalAccessException iaex) {
+				throw new RuntimeException("This should not happen since we call setAccesssible(true) in readObject."
+						+ " fields: " + field + " obj: " + pojo);
+			}
+		}
+
+		private void writeObject(ObjectOutputStream out)
+				throws IOException, ClassNotFoundException {
+			out.defaultWriteObject();
+			FieldSerializer.serializeField(field, out);
+		}
+
+		private void readObject(ObjectInputStream in)
+				throws IOException, ClassNotFoundException {
+			in.defaultReadObject();
+			field = FieldSerializer.deserializeField(in);
+		}
+	}
+
+
+	// --------------------------------------------------------------------------------------------------
+
+	private final static String REGEX_FIELD = "[\\p{L}\\p{Digit}_\\$]*"; // This can start with a digit (because of Tuples)
+	private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?";
+	private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS
+		+"|\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR
+		+"|\\"+ Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA;
+
+	private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
+
+	public static FieldExpression decomposeFieldExpression(String fieldExpression) {
+		Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
+		if(!matcher.matches()) {
+			throw new InvalidFieldReferenceException("Invalid field expression \""+fieldExpression+"\".");
+		}
+
+		String head = matcher.group(0);
+		if(head.equals(Keys.ExpressionKeys.SELECT_ALL_CHAR) || head.equals(Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
+			throw new InvalidFieldReferenceException("No wildcards are allowed here.");
+		} else {
+			head = matcher.group(1);
+		}
+
+		String tail = matcher.group(3);
+
+		return new FieldExpression(head, tail);
+	}
+
+	/**
+	 * Represents a decomposition of a field expression into its first part, and the rest.
+	 * E.g. "foo.f1.bar" is decomposed into "foo" and "f1.bar".
+	 */
+	public static class FieldExpression implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public String head, tail; // tail can be null, if the field expression had just one part
+
+		FieldExpression(String head, String tail) {
+			this.head = head;
+			this.tail = tail;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
index 026cfa6..2e20415 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
@@ -27,6 +27,7 @@ import java.util.Objects;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.runtime.FieldSerializer;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -57,30 +58,13 @@ public class PojoField implements Serializable {
 	private void writeObject(ObjectOutputStream out)
 			throws IOException, ClassNotFoundException {
 		out.defaultWriteObject();
-		out.writeObject(field.getDeclaringClass());
-		out.writeUTF(field.getName());
+		FieldSerializer.serializeField(field, out);
 	}
 
 	private void readObject(ObjectInputStream in)
 			throws IOException, ClassNotFoundException {
 		in.defaultReadObject();
-		Class<?> clazz = (Class<?>)in.readObject();
-		String fieldName = in.readUTF();
-		field = null;
-		// try superclasses as well
-		while (clazz != null) {
-			try {
-				field = clazz.getDeclaredField(fieldName);
-				field.setAccessible(true);
-				break;
-			} catch (NoSuchFieldException e) {
-				clazz = clazz.getSuperclass();
-			}
-		}
-		if (field == null) {
-			throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup."
-					+ " (" + fieldName + ")");
-		}
+		field = FieldSerializer.deserializeField(in);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 9c65263..72432d6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -64,8 +65,8 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 	private final static String REGEX_FIELD = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*";
 	private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?";
 	private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS
-			+"|\\"+ExpressionKeys.SELECT_ALL_CHAR
-			+"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA;
+					+"|\\"+ExpressionKeys.SELECT_ALL_CHAR
+					+"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA;
 
 	private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
 	private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
@@ -132,7 +133,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 		//   gives only some undefined order.
 		return false;
 	}
-	
+
 
 	@Override
 	@PublicEvolving
@@ -264,6 +265,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 	}
 
 	@Override
+	@PublicEvolving
 	protected TypeComparatorBuilder<T> createTypeComparatorBuilder() {
 		return new PojoTypeComparatorBuilder();
 	}
@@ -317,7 +319,39 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
 
 		return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config);
 	}
-	
+
+	@Override
+	@PublicEvolving
+	public <F> FieldAccessor<T, F> getFieldAccessor(String fieldExpression, ExecutionConfig config) {
+		
+		FieldAccessor.FieldExpression decomp = FieldAccessor.decomposeFieldExpression(fieldExpression);
+
+		// get field
+		PojoField field = null;
+		TypeInformation<?> fieldType = null;
+		for (int i = 0; i < fields.length; i++) {
+			if (fields[i].getField().getName().equals(decomp.head)) {
+				field = fields[i];
+				fieldType = fields[i].getTypeInformation();
+				break;
+			}
+		}
+		if (field == null) {
+			throw new InvalidFieldReferenceException("Unable to find field \""+decomp.head+"\" in type "+this+".");
+		}
+
+		if(decomp.tail == null) {
+			@SuppressWarnings("unchecked")
+			FieldAccessor<F,F> innerAccessor = new FieldAccessor.SimpleFieldAccessor<F>((TypeInformation<F>) fieldType);
+			return new FieldAccessor.PojoFieldAccessor<T, F, F>(field.getField(), innerAccessor);
+		} else {
+			@SuppressWarnings("unchecked")
+			FieldAccessor<Object,F> innerAccessor =
+					(FieldAccessor<Object,F>)fieldType.<F>getFieldAccessor(decomp.tail, config);
+			return new FieldAccessor.PojoFieldAccessor<T, Object, F>(field.getField(), innerAccessor);
+		}
+	}
+
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof PojoTypeInfo) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index 807fd54..c9a55fc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -23,7 +23,10 @@ import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 
@@ -203,7 +206,34 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
 		TypeInformation<X> typed = (TypeInformation<X>) this.types[pos];
 		return typed;
 	}
-	
+
+	@Override
+	@PublicEvolving
+	public <F> FieldAccessor<T, F> getFieldAccessor(int pos, ExecutionConfig config) {
+		return new FieldAccessor.SimpleTupleFieldAccessor<T, F>(pos, this);
+	}
+
+	@Override
+	@PublicEvolving
+	public <F> FieldAccessor<T, F> getFieldAccessor(String fieldExpression, ExecutionConfig config) {
+		FieldAccessor.FieldExpression decomp = FieldAccessor.decomposeFieldExpression(fieldExpression);
+		int fieldPos = this.getFieldIndex(decomp.head);
+		if (fieldPos == -1) {
+			try {
+				fieldPos = Integer.parseInt(decomp.head);
+			} catch (NumberFormatException ex) {
+				throw new InvalidFieldReferenceException("Tried to select field \"" + decomp.head
+					+ "\" on " + this.toString());
+			}
+		}
+		if (decomp.tail == null) {
+			return new FieldAccessor.SimpleTupleFieldAccessor<T, F>(fieldPos, this);
+		} else {
+			FieldAccessor<?, F> innerAccessor = getTypeAt(fieldPos).getFieldAccessor(decomp.tail, config);
+			return new FieldAccessor.RecursiveTupleFieldAccessor<>(fieldPos, innerAccessor);
+		}
+	}
+
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof TupleTypeInfoBase) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
new file mode 100644
index 0000000..057eee9
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/FieldSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Field;
+
+/**
+ * This class is for the serialization of java.lang.reflect.Field, which doesn't implement Serializable, therefore
+ * readObject/writeObject need to be implemented in classes where there is a field of type java.lang.reflect.Field.
+ * The two static methods in this class are to be called from these readObject/writeObject methods.
+ */
+public class FieldSerializer {
+
+	public static void serializeField(Field field, ObjectOutputStream out) throws IOException {
+		out.writeObject(field.getDeclaringClass());
+		out.writeUTF(field.getName());
+	}
+
+	public static Field deserializeField(ObjectInputStream in) throws IOException, ClassNotFoundException  {
+		Class<?> clazz = (Class<?>) in.readObject();
+		String fieldName = in.readUTF();
+		// try superclasses as well
+		while (clazz != null) {
+			try {
+				Field field = clazz.getDeclaredField(fieldName);
+				field.setAccessible(true);
+				return field;
+			} catch (NoSuchFieldException e) {
+				clazz = clazz.getSuperclass();
+			}
+		}
+		throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup."
+				+ " (" + fieldName + ")");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
index c0c7797..fc4a305 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
@@ -142,8 +142,7 @@ public final class PojoComparator<T> extends CompositeTypeComparator<T> implemen
 		out.defaultWriteObject();
 		out.writeInt(keyFields.length);
 		for (Field field: keyFields) {
-			out.writeObject(field.getDeclaringClass());
-			out.writeUTF(field.getName());
+			FieldSerializer.serializeField(field, out);
 		}
 	}
 
@@ -153,23 +152,7 @@ public final class PojoComparator<T> extends CompositeTypeComparator<T> implemen
 		int numKeyFields = in.readInt();
 		keyFields = new Field[numKeyFields];
 		for (int i = 0; i < numKeyFields; i++) {
-			Class<?> clazz = (Class<?>) in.readObject();
-			String fieldName = in.readUTF();
-			// try superclasses as well
-			while (clazz != null) {
-				try {
-					Field field = clazz.getDeclaredField(fieldName);
-					field.setAccessible(true);
-					keyFields[i] = field;
-					break;
-				} catch (NoSuchFieldException e) {
-					clazz = clazz.getSuperclass();
-				}
-			}
-			if (keyFields[i] == null ) {
-				throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup."
-						+ " (" + fieldName + ")");
-			}
+			keyFields[i] = FieldSerializer.deserializeField(in);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 9958540..57928b8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -121,8 +121,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 		out.defaultWriteObject();
 		out.writeInt(fields.length);
 		for (Field field: fields) {
-			out.writeObject(field.getDeclaringClass());
-			out.writeUTF(field.getName());
+			FieldSerializer.serializeField(field, out);
 		}
 	}
 
@@ -132,23 +131,7 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
 		int numFields = in.readInt();
 		fields = new Field[numFields];
 		for (int i = 0; i < numFields; i++) {
-			Class<?> clazz = (Class<?>)in.readObject();
-			String fieldName = in.readUTF();
-			fields[i] = null;
-			// try superclasses as well
-			while (clazz != null) {
-				try {
-					fields[i] = clazz.getDeclaredField(fieldName);
-					fields[i].setAccessible(true);
-					break;
-				} catch (NoSuchFieldException e) {
-					clazz = clazz.getSuperclass();
-				}
-			}
-			if (fields[i] == null) {
-				throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup."
-						+ " (" + fieldName + ")");
-			}
+			fields[i] = FieldSerializer.deserializeField(in);
 		}
 
 		cl = Thread.currentThread().getContextClassLoader();

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
new file mode 100644
index 0000000..f780447
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/FieldAccessorTest.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.junit.Test;
+
+public class FieldAccessorTest {
+
+	// Note, that AggregationFunctionTest indirectly also tests FieldAccessors.
+	// ProductFieldAccessor is tested in CaseClassTypeInfoTest.
+
+	@Test
+	public void testFlatTuple() {
+		Tuple2<String, Integer> t = Tuple2.of("aa", 5);
+		TupleTypeInfo<Tuple2<String, Integer>> tpeInfo =
+				(TupleTypeInfo<Tuple2<String, Integer>>) TypeExtractor.getForObject(t);
+
+		FieldAccessor<Tuple2<String, Integer>, String> f0 = tpeInfo.getFieldAccessor("f0", null);
+		assertEquals("aa", f0.get(t));
+		assertEquals("aa", t.f0);
+		t = f0.set(t, "b");
+		assertEquals("b", f0.get(t));
+		assertEquals("b", t.f0);
+
+		FieldAccessor<Tuple2<String, Integer>, Integer> f1 = tpeInfo.getFieldAccessor("f1", null);
+		assertEquals(5, (int) f1.get(t));
+		assertEquals(5, (int) t.f1);
+		t = f1.set(t, 7);
+		assertEquals(7, (int) f1.get(t));
+		assertEquals(7, (int) t.f1);
+		assertEquals("b", f0.get(t));
+		assertEquals("b", t.f0);
+
+
+		FieldAccessor<Tuple2<String, Integer>, Integer> f1n = tpeInfo.getFieldAccessor(1, null);
+		assertEquals(7, (int) f1n.get(t));
+		assertEquals(7, (int) t.f1);
+		t = f1n.set(t, 10);
+		assertEquals(10, (int) f1n.get(t));
+		assertEquals(10, (int) f1.get(t));
+		assertEquals(10, (int) t.f1);
+		assertEquals("b", f0.get(t));
+		assertEquals("b", t.f0);
+
+		FieldAccessor<Tuple2<String, Integer>, Integer> f1ns = tpeInfo.getFieldAccessor("1", null);
+		assertEquals(10, (int) f1ns.get(t));
+		assertEquals(10, (int) t.f1);
+		t = f1ns.set(t, 11);
+		assertEquals(11, (int) f1ns.get(t));
+		assertEquals(11, (int) f1.get(t));
+		assertEquals(11, (int) t.f1);
+		assertEquals("b", f0.get(t));
+		assertEquals("b", t.f0);
+
+		// This is technically valid (the ".0" is selecting the 0th field of a basic type).
+		FieldAccessor<Tuple2<String, Integer>, String> f0_0 = tpeInfo.getFieldAccessor("f0.0", null);
+		assertEquals("b", f0_0.get(t));
+		assertEquals("b", t.f0);
+		t = f0_0.set(t, "cc");
+		assertEquals("cc", f0_0.get(t));
+		assertEquals("cc", t.f0);
+
+		try {
+			FieldAccessor<Tuple2<String, Integer>, String> bad = tpeInfo.getFieldAccessor("almafa", null);
+			assertFalse("Expected exception, because of bad field name", false);
+		} catch (InvalidFieldReferenceException ex) {
+			// OK
+		}
+	}
+
+	@Test
+	public void testTupleInTuple() {
+		Tuple2<String, Tuple3<Integer, Long, Double>> t = Tuple2.of("aa", Tuple3.of(5, 9L, 2.0));
+		TupleTypeInfo<Tuple2<String, Tuple3<Integer, Long, Double>>> tpeInfo =
+				(TupleTypeInfo<Tuple2<String, Tuple3<Integer, Long, Double>>>)TypeExtractor.getForObject(t);
+
+		FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, String> f0 = tpeInfo.getFieldAccessor("f0", null);
+		assertEquals("aa", f0.get(t));
+		assertEquals("aa", t.f0);
+
+		FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, Double> f1f2 = tpeInfo.getFieldAccessor("f1.f2", null);
+		assertEquals(2.0, f1f2.get(t), 0);
+		assertEquals(2.0, t.f1.f2, 0);
+		t = f1f2.set(t, 3.0);
+		assertEquals(3.0, f1f2.get(t), 0);
+		assertEquals(3.0, t.f1.f2, 0);
+		assertEquals("aa", f0.get(t));
+		assertEquals("aa", t.f0);
+
+		FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, Tuple3<Integer, Long, Double>> f1 = tpeInfo.getFieldAccessor("f1", null);
+		assertEquals(Tuple3.of(5, 9L, 3.0), f1.get(t));
+		assertEquals(Tuple3.of(5, 9L, 3.0), t.f1);
+		t = f1.set(t, Tuple3.of(8, 12L, 4.0));
+		assertEquals(Tuple3.of(8, 12L, 4.0), f1.get(t));
+		assertEquals(Tuple3.of(8, 12L, 4.0), t.f1);
+		assertEquals("aa", f0.get(t));
+		assertEquals("aa", t.f0);
+
+		FieldAccessor<Tuple2<String, Tuple3<Integer, Long, Double>>, Tuple3<Integer, Long, Double>> f1n = tpeInfo.getFieldAccessor(1, null);
+		assertEquals(Tuple3.of(8, 12L, 4.0), f1n.get(t));
+		assertEquals(Tuple3.of(8, 12L, 4.0), t.f1);
+		t = f1n.set(t, Tuple3.of(10, 13L, 5.0));
+		assertEquals(Tuple3.of(10, 13L, 5.0), f1n.get(t));
+		assertEquals(Tuple3.of(10, 13L, 5.0), f1.get(t));
+		assertEquals(Tuple3.of(10, 13L, 5.0), t.f1);
+		assertEquals("aa", f0.get(t));
+		assertEquals("aa", t.f0);
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testTupleFieldAccessorOutOfBounds() {
+		try {
+			TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class).getFieldAccessor(2, null);
+			fail();
+		} catch (InvalidFieldReferenceException e) {
+			// Nothing to do here
+		}
+	}
+
+	public static class Foo {
+		public int x;
+		public Tuple2<String, Long> t;
+		public Short y;
+
+		public Foo() {}
+
+		public Foo(int x, Tuple2<String, Long> t, Short y) {
+			this.x = x;
+			this.t = t;
+			this.y = y;
+		}
+	}
+
+	@Test
+	public void testTupleInPojoInTuple() {
+		Tuple2<String, Foo> t = Tuple2.of("aa", new Foo(8, Tuple2.of("ddd", 9L), (short) 2));
+		TupleTypeInfo<Tuple2<String, Foo>> tpeInfo =
+				(TupleTypeInfo<Tuple2<String, Foo>>)TypeExtractor.getForObject(t);
+
+		FieldAccessor<Tuple2<String, Foo>, Long> f1tf1 = tpeInfo.getFieldAccessor("f1.t.f1", null);
+		assertEquals(9L, (long) f1tf1.get(t));
+		assertEquals(9L, (long) t.f1.t.f1);
+		t = f1tf1.set(t, 12L);
+		assertEquals(12L, (long) f1tf1.get(t));
+		assertEquals(12L, (long) t.f1.t.f1);
+
+		FieldAccessor<Tuple2<String, Foo>, String> f1tf0 = tpeInfo.getFieldAccessor("f1.t.f0", null);
+		assertEquals("ddd", f1tf0.get(t));
+		assertEquals("ddd", t.f1.t.f0);
+		t = f1tf0.set(t, "alma");
+		assertEquals("alma", f1tf0.get(t));
+		assertEquals("alma", t.f1.t.f0);
+
+		FieldAccessor<Tuple2<String, Foo>, Foo> f1 = tpeInfo.getFieldAccessor("f1", null);
+		FieldAccessor<Tuple2<String, Foo>, Foo> f1n = tpeInfo.getFieldAccessor(1, null);
+		assertEquals(Tuple2.of("alma", 12L), f1.get(t).t);
+		assertEquals(Tuple2.of("alma", 12L), f1n.get(t).t);
+		assertEquals(Tuple2.of("alma", 12L), t.f1.t);
+		Foo newFoo = new Foo(8, Tuple2.of("ddd", 9L), (short) 2);
+		f1.set(t, newFoo);
+		assertEquals(newFoo, f1.get(t));
+		assertEquals(newFoo, f1n.get(t));
+		assertEquals(newFoo, t.f1);
+	}
+
+
+	public static class Inner {
+		public long x;
+		public boolean b;
+
+		public Inner(){}
+
+		public Inner(long x) {
+			this.x = x;
+		}
+
+		public Inner(long x, boolean b) {
+			this.x = x;
+			this.b = b;
+		}
+
+		@Override
+		public String toString() {
+			return ((Long)x).toString() + ", " + b;
+		}
+	}
+
+	public static class Outer {
+		public int a;
+		public Inner i;
+		public short b;
+
+		public Outer(){}
+
+		public Outer(int a, Inner i, short b) {
+			this.a = a;
+			this.i = i;
+			this.b = b;
+		}
+
+		@Override
+		public String toString() {
+			return a+", "+i.toString()+", "+b;
+		}
+	}
+
+	@Test
+	public void testPojoInPojo() {
+		Outer o = new Outer(10, new Inner(4L), (short)12);
+		PojoTypeInfo<Outer> tpeInfo = (PojoTypeInfo<Outer>)TypeInformation.of(Outer.class);
+
+		FieldAccessor<Outer, Long> fix = tpeInfo.getFieldAccessor("i.x", null);
+		assertEquals(4L, (long) fix.get(o));
+		assertEquals(4L, o.i.x);
+		o = fix.set(o, 22L);
+		assertEquals(22L, (long) fix.get(o));
+		assertEquals(22L, o.i.x);
+
+		FieldAccessor<Outer, Inner> fi = tpeInfo.getFieldAccessor("i", null);
+		assertEquals(22L, fi.get(o).x);
+		assertEquals(22L, (long) fix.get(o));
+		assertEquals(22L, o.i.x);
+		o = fi.set(o, new Inner(30L));
+		assertEquals(30L, fi.get(o).x);
+		assertEquals(30L, (long) fix.get(o));
+		assertEquals(30L, o.i.x);
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testArray() {
+		int[] a = new int[]{3,5};
+		FieldAccessor<int[], Integer> fieldAccessor =
+				(FieldAccessor<int[], Integer>) (Object)
+						PrimitiveArrayTypeInfo.getInfoFor(a.getClass()).getFieldAccessor(1, null);
+
+		assertEquals(Integer.class, fieldAccessor.getFieldType().getTypeClass());
+
+		assertEquals((Integer)a[1], fieldAccessor.get(a));
+
+		a = fieldAccessor.set(a, 6);
+		assertEquals((Integer)a[1], fieldAccessor.get(a));
+
+
+
+		Integer[] b = new Integer[]{3,5};
+		FieldAccessor<Integer[], Integer> fieldAccessor2 =
+				(FieldAccessor<Integer[], Integer>) (Object)
+						BasicArrayTypeInfo.getInfoFor(b.getClass()).getFieldAccessor(1, null);
+
+		assertEquals(Integer.class, fieldAccessor2.getFieldType().getTypeClass());
+
+		assertEquals(b[1], fieldAccessor2.get(b));
+
+		b = fieldAccessor2.set(b, 6);
+		assertEquals(b[1], fieldAccessor2.get(b));
+	}
+
+	public static class ArrayInPojo {
+		public long x;
+		public int[] arr;
+		public int y;
+
+		public ArrayInPojo() {}
+
+		public ArrayInPojo(long x, int[] arr, int y) {
+			this.x = x;
+			this.arr = arr;
+			this.y = y;
+		}
+	}
+
+	@Test
+	public void testArrayInPojo() {
+		ArrayInPojo o = new ArrayInPojo(10L, new int[]{3,4,5}, 12);
+		PojoTypeInfo<ArrayInPojo> tpeInfo = (PojoTypeInfo<ArrayInPojo>)TypeInformation.of(ArrayInPojo.class);
+
+		FieldAccessor<ArrayInPojo, Integer> fix = tpeInfo.getFieldAccessor("arr.1", null);
+		assertEquals(4, (int) fix.get(o));
+		assertEquals(4L, o.arr[1]);
+		o = fix.set(o, 8);
+		assertEquals(8, (int) fix.get(o));
+		assertEquals(8, o.arr[1]);
+	}
+
+	@Test
+	public void testBasicType() {
+		Long x = 7L;
+		TypeInformation<Long> tpeInfo = BasicTypeInfo.LONG_TYPE_INFO;
+
+		try {
+			FieldAccessor<Long, Long> f = tpeInfo.getFieldAccessor(1, null);
+			assertFalse("Expected exception, because not the 0th field selected for a basic type.", false);
+		} catch (InvalidFieldReferenceException ex) {
+			// OK
+		}
+
+		try {
+			FieldAccessor<Long, Long> f = tpeInfo.getFieldAccessor("foo", null);
+			assertFalse("Expected exception, because not the 0th field selected for a basic type.", false);
+		} catch (InvalidFieldReferenceException ex) {
+			// OK
+		}
+
+		FieldAccessor<Long, Long> f = tpeInfo.getFieldAccessor(0, null);
+		assertEquals(7L, (long) f.get(x));
+		x = f.set(x, 12L);
+		assertEquals(12L, (long) f.get(x));
+		assertEquals(12L, (long) x);
+
+		FieldAccessor<Long, Long> f2 = tpeInfo.getFieldAccessor("*", null);
+		assertEquals(12L, (long) f2.get(x));
+		x = f2.set(x, 14L);
+		assertEquals(14L, (long) f2.get(x));
+		assertEquals(14L, (long) x);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
index f8c76e1..4a0b2fc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
-import org.apache.flink.api.common.typeutils.CompositeType.InvalidFieldReferenceException;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
index 0493583..0da417b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DataSinkTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.operator;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -166,7 +167,7 @@ public class DataSinkTest {
 			.sortLocalOutput(5, Order.DESCENDING);
 	}
 
-	@Test(expected = CompositeType.InvalidFieldReferenceException.class)
+	@Test(expected = InvalidFieldReferenceException.class)
 	public void testFailTupleInv() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment
@@ -284,7 +285,7 @@ public class DataSinkTest {
 			.sortLocalOutput(1, Order.DESCENDING);
 	}
 
-	@Test(expected = CompositeType.InvalidFieldReferenceException.class)
+	@Test(expected = InvalidFieldReferenceException.class)
 	public void testFailPojoInvalidField() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
index 9f2aa41..9f5cfb2 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -135,7 +136,7 @@ public class FullOuterJoinOperatorTest {
 				.with(new DummyJoin());
 	}
 
-	@Test(expected = CompositeType.InvalidFieldReferenceException.class)
+	@Test(expected = InvalidFieldReferenceException.class)
 	public void testFullOuter8() {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
index bfcc3e8..914c75c 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -136,7 +137,7 @@ public class LeftOuterJoinOperatorTest {
 				.with(new DummyJoin());
 	}
 
-	@Test(expected = CompositeType.InvalidFieldReferenceException.class)
+	@Test(expected = InvalidFieldReferenceException.class)
 	public void testLeftOuter8() {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
index 709d830..f5d8129 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -135,7 +136,7 @@ public class RightOuterJoinOperatorTest {
 				.with(new DummyJoin());
 	}
 
-	@Test(expected = CompositeType.InvalidFieldReferenceException.class)
+	@Test(expected = InvalidFieldReferenceException.class)
 	public void testRightOuter8() {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData, tupleTypeInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java
new file mode 100644
index 0000000..0be6f33
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ProductFieldAccessor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.FieldAccessor;
+import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import scala.Product;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public final class ProductFieldAccessor<T, R, F> extends FieldAccessor<T, F> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final int pos;
+	private final TupleSerializerBase<T> serializer;
+	private final Object[] fields;
+	private final int length;
+	private final FieldAccessor<R, F> innerAccessor;
+
+	ProductFieldAccessor(int pos, TypeInformation<T> typeInfo, FieldAccessor<R, F> innerAccessor, ExecutionConfig config) {
+		int arity = ((TupleTypeInfoBase)typeInfo).getArity();
+		if(pos < 0 || pos >= arity) {
+			throw new InvalidFieldReferenceException(
+				"Tried to select " + ((Integer) pos).toString() + ". field on \"" +
+					typeInfo.toString() + "\", which is an invalid index.");
+		}
+		checkNotNull(typeInfo, "typeInfo must not be null.");
+		checkNotNull(innerAccessor, "innerAccessor must not be null.");
+
+		this.pos = pos;
+		this.fieldType = ((TupleTypeInfoBase<T>)typeInfo).getTypeAt(pos);
+		this.serializer = (TupleSerializerBase<T>)typeInfo.createSerializer(config);
+		this.length = this.serializer.getArity();
+		this.fields = new Object[this.length];
+		this.innerAccessor = innerAccessor;
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public F get(T record) {
+		return innerAccessor.get((R)((Product)record).productElement(pos));
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public T set(T record, F fieldValue) {
+		Product prod = (Product)record;
+		for (int i = 0; i < length; i++) {
+			fields[i] = prod.productElement(i);
+		}
+		fields[pos] = innerAccessor.set((R)fields[pos], fieldValue);
+		return serializer.createInstance(fields);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
index 2aecd7a..d970dfd 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala
@@ -19,17 +19,18 @@
 package org.apache.flink.api.scala.typeutils
 
 import java.util
-import java.util.regex.{Pattern, Matcher}
+import java.util.regex.{Matcher, Pattern}
 
-import org.apache.flink.annotation.{PublicEvolving, Public}
+import org.apache.flink.annotation.{Public, PublicEvolving}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.operators.Keys
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType.{TypeComparatorBuilder,
-InvalidFieldReferenceException, FlatFieldDescriptor}
+import org.apache.flink.api.common.typeutils.CompositeType.{FlatFieldDescriptor, TypeComparatorBuilder}
 import org.apache.flink.api.common.typeutils._
 import Keys.ExpressionKeys
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
+import org.apache.flink.api.common.typeinfo.InvalidFieldReferenceException
+import org.apache.flink.api.java.typeutils.FieldAccessor.SimpleFieldAccessor
+import org.apache.flink.api.java.typeutils.{FieldAccessor, TupleTypeInfoBase}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
@@ -202,7 +203,7 @@ abstract class CaseClassTypeInfo[T <: Product](
   override def getFieldIndex(fieldName: String): Int = {
     val result = fieldNames.indexOf(fieldName)
     if (result != fieldNames.lastIndexOf(fieldName)) {
-      -2
+      -1
     } else {
       result
     }
@@ -238,6 +239,31 @@ abstract class CaseClassTypeInfo[T <: Product](
     }
   }
 
+  override def getFieldAccessor[F](pos: Int, config: ExecutionConfig): FieldAccessor[T, F] = {
+    new ProductFieldAccessor[T,F,F](
+      pos, this, new SimpleFieldAccessor[F](types(pos).asInstanceOf[TypeInformation[F]]), config)
+  }
+
+  override def getFieldAccessor[F](fieldExpression: String, config: ExecutionConfig):
+    FieldAccessor[T, F] = {
+
+    val decomp = FieldAccessor.decomposeFieldExpression(fieldExpression)
+
+    val pos = getFieldIndex(decomp.head)
+    if(pos < 0) {
+      throw new InvalidFieldReferenceException("Invalid field selected: " + fieldExpression)
+    }
+    val fieldType = types(pos)
+
+    if (decomp.tail == null) {
+      getFieldAccessor(pos, config)
+    } else {
+      val innerAccessor =
+        fieldType.getFieldAccessor[F](decomp.tail, config).asInstanceOf[FieldAccessor[AnyRef, F]]
+      new ProductFieldAccessor[T,Object,F](pos, this, innerAccessor, config)
+    }
+  }
+
   override def toString: String = {
     clazz.getName + "(" + fieldNames.zip(types).map {
       case (n, t) => n + ": " + t

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala
index 479483f..a9abea1 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfoTest.scala
@@ -21,9 +21,11 @@ package org.apache.flink.api.scala.typeutils
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
 import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.FieldAccessorTest
 import org.apache.flink.util.TestLogger
 import org.junit.Test
 import org.scalatest.junit.JUnitSuiteLike
+import org.apache.flink.api.scala._
 
 class CaseClassTypeInfoTest extends TestLogger with JUnitSuiteLike {
 
@@ -70,4 +72,112 @@ class CaseClassTypeInfoTest extends TestLogger with JUnitSuiteLike {
     assert(!tpeInfo1.equals(tpeInfo2))
   }
 
+  @Test
+  def testFieldAccessorFlatCaseClass(): Unit = {
+    case class IntBoolean(foo: Int, bar: Boolean)
+    val tpeInfo = createTypeInformation[IntBoolean]
+
+    {
+      // by field name
+      val accessor1 = tpeInfo.getFieldAccessor[Int]("foo", null)
+      val accessor2 = tpeInfo.getFieldAccessor[Boolean]("bar", null)
+
+      val x1 = IntBoolean(5, false)
+      assert(accessor1.get(x1) == 5)
+      assert(accessor2.get(x1) == false)
+      assert(x1.foo == 5)
+      assert(x1.bar == false)
+
+      val x2: IntBoolean = accessor1.set(x1, 6)
+      assert(accessor1.get(x2) == 6)
+      assert(x2.foo == 6)
+
+      val x3 = accessor2.set(x2, true)
+      assert(x3.bar == true)
+      assert(accessor2.get(x3) == true)
+      assert(x3.foo == 6)
+    }
+
+    {
+      // by field pos
+      val accessor1 = tpeInfo.getFieldAccessor[Int](0, null)
+      val accessor2 = tpeInfo.getFieldAccessor[Boolean](1, null)
+
+      val x1 = IntBoolean(5, false)
+      assert(accessor1.get(x1) == 5)
+      assert(accessor2.get(x1) == false)
+      assert(x1.foo == 5)
+      assert(x1.bar == false)
+
+      val x2: IntBoolean = accessor1.set(x1, 6)
+      assert(accessor1.get(x2) == 6)
+      assert(x2.foo == 6)
+
+      val x3 = accessor2.set(x2, true)
+      assert(x3.bar == true)
+      assert(accessor2.get(x3) == true)
+      assert(x3.foo == 6)
+    }
+  }
+
+  @Test
+  def testFieldAccessorTuple(): Unit = {
+    val tpeInfo = createTypeInformation[(Int, Long)]
+    var x = (5, 6L)
+    val f0 = tpeInfo.getFieldAccessor[Int](0, null)
+    assert(f0.get(x) == 5)
+    x = f0.set(x, 8)
+    assert(f0.get(x) == 8)
+    assert(x._1 == 8)
+  }
+
+  @Test
+  def testFieldAccessorCaseClassInCaseClass(): Unit = {
+    case class Inner(a: Short, b: String)
+    case class Outer(a: Int, i: Inner, b: Boolean)
+    val tpeInfo = createTypeInformation[Outer]
+
+    var x = Outer(1, Inner(2, "alma"), true)
+
+    val fib = tpeInfo.getFieldAccessor[String]("i.b", null)
+    assert(fib.get(x) == "alma")
+    assert(x.i.b == "alma")
+    x = fib.set(x, "korte")
+    assert(fib.get(x) == "korte")
+    assert(x.i.b == "korte")
+
+    val fi = tpeInfo.getFieldAccessor[Inner]("i", null)
+    assert(fi.get(x) == Inner(2, "korte"))
+    x = fi.set(x, Inner(3, "aaa"))
+    assert(x.i == Inner(3, "aaa"))
+  }
+
+  @Test
+  def testFieldAccessorPojoInCaseClass(): Unit = {
+    case class Outer(a: Int, i: FieldAccessorTest.Inner, b: Boolean)
+    var x = Outer(1, new FieldAccessorTest.Inner(3L, true), false)
+    val tpeInfo = createTypeInformation[Outer]
+    val cfg = new ExecutionConfig
+
+    val fib = tpeInfo.getFieldAccessor[Boolean]("i.b", cfg)
+    assert(fib.get(x) == true)
+    assert(x.i.b == true)
+    x = fib.set(x, false)
+    assert(fib.get(x) == false)
+    assert(x.i.b == false)
+
+    val fi = tpeInfo.getFieldAccessor[FieldAccessorTest.Inner]("i", cfg)
+    assert(fi.get(x).x == 3L)
+    assert(x.i.x == 3L)
+    x = fi.set(x, new FieldAccessorTest.Inner(4L, true))
+    assert(fi.get(x).x == 4L)
+    assert(x.i.x == 4L)
+
+    val fin = tpeInfo.getFieldAccessor[FieldAccessorTest.Inner](1, cfg)
+    assert(fin.get(x).x == 4L)
+    assert(x.i.x == 4L)
+    x = fin.set(x, new FieldAccessorTest.Inner(5L, true))
+    assert(fin.get(x).x == 5L)
+    assert(x.i.x == 5L)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f04542e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 4063b60..264d5d0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -365,7 +365,9 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * per key.
 	 *
 	 * @param positionToSum
-	 *            The position in the data point to sum
+	 *            The field position in the data points to sum. This is applicable to
+	 *            Tuple types, basic and primitive array types, Scala case classes,
+	 *            and primitive types (which is considered as having one field).
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<T> sum(int positionToSum) {
@@ -373,16 +375,18 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the current sum of the pojo data
-	 * stream at the given field expressionby the given key. An independent
-	 * aggregate is kept per key. A field expression is either the name of a
-	 * public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * Applies an aggregation that gives the current sum of the data
+	 * stream at the given field by the given key. An independent
+	 * aggregate is kept per key.
 	 *
 	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
+	 *            In case of a POJO, Scala case class, or Tuple type, the
+	 *            name of the (public) field on which to perform the aggregation.
+	 *            Additionally, a dot can be used to drill down into nested
+	 *            objects, as in {@code "field1.fieldxy" }.
+	 *            Furthermore, an array index can also be specified in case of an array of
+	 *            a primitive or basic type; or "0" or "*" can be specified in case of a
+	 *            basic type (which is considered as having only one field).
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<T> sum(String field) {
@@ -390,12 +394,14 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the current minimum of the data
+	 * Applies an aggregation that gives the current minimum of the data
 	 * stream at the given position by the given key. An independent aggregate
 	 * is kept per key.
 	 *
 	 * @param positionToMin
-	 *            The position in the data point to minimize
+	 *            The field position in the data points to minimize. This is applicable to
+	 *            Tuple types, basic and primitive array types, Scala case classes,
+	 *            and primitive types (which is considered as having one field).
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<T> min(int positionToMin) {
@@ -404,16 +410,21 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the current minimum of the pojo
+	 * Applies an aggregation that gives the current minimum of the
 	 * data stream at the given field expression by the given key. An
 	 * independent aggregate is kept per key. A field expression is either the
 	 * name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * {@link DataStream}'s underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.fieldxy" }.
 	 *
 	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
+	 *            In case of a POJO, Scala case class, or Tuple type, the
+	 *            name of the (public) field on which to perform the aggregation.
+	 *            Additionally, a dot can be used to drill down into nested
+	 *            objects, as in {@code "field1.fieldxy" }.
+	 *            Furthermore, an array index can also be specified in case of an array of
+	 *            a primitive or basic type; or "0" or "*" can be specified in case of a
+	 *            basic type (which is considered as having only one field).
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<T> min(String field) {
@@ -427,7 +438,9 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * per key.
 	 *
 	 * @param positionToMax
-	 *            The position in the data point to maximize
+	 *            The field position in the data points to maximize. This is applicable to
+	 *            Tuple types, basic and primitive array types, Scala case classes,
+	 *            and primitive types (which is considered as having one field).
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<T> max(int positionToMax) {
@@ -436,16 +449,21 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the current maximum of the pojo
+	 * Applies an aggregation that gives the current maximum of the
 	 * data stream at the given field expression by the given key. An
 	 * independent aggregate is kept per key. A field expression is either the
 	 * name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * {@link DataStream}'s underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.fieldxy" }.
 	 *
 	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
+	 *            In case of a POJO, Scala case class, or Tuple type, the
+	 *            name of the (public) field on which to perform the aggregation.
+	 *            Additionally, a dot can be used to drill down into nested
+	 *            objects, as in {@code "field1.fieldxy" }.
+	 *            Furthermore, an array index can also be specified in case of an array of
+	 *            a primitive or basic type; or "0" or "*" can be specified in case of a
+	 *            basic type (which is considered as having only one field).
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<T> max(String field) {
@@ -454,16 +472,21 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the current minimum element of the
-	 * pojo data stream by the given field expression by the given key. An
+	 * Applies an aggregation that gives the current minimum element of the
+	 * data stream by the given field expression by the given key. An
 	 * independent aggregate is kept per key. A field expression is either the
 	 * name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * {@link DataStream}'s underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.fieldxy" }.
 	 *
 	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
+	 *            In case of a POJO, Scala case class, or Tuple type, the
+	 *            name of the (public) field on which to perform the aggregation.
+	 *            Additionally, a dot can be used to drill down into nested
+	 *            objects, as in {@code "field1.fieldxy" }.
+	 *            Furthermore, an array index can also be specified in case of an array of
+	 *            a primitive or basic type; or "0" or "*" can be specified in case of a
+	 *            basic type (which is considered as having only one field).
 	 * @param first
 	 *            If True then in case of field equality the first object will
 	 *            be returned
@@ -476,16 +499,21 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the current maximum element of the
-	 * pojo data stream by the given field expression by the given key. An
+	 * Applies an aggregation that gives the current maximum element of the
+	 * data stream by the given field expression by the given key. An
 	 * independent aggregate is kept per key. A field expression is either the
 	 * name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * {@link DataStream}'s underlying type. A dot can be used to drill down into
+	 * objects, as in {@code "field1.fieldxy" }.
 	 *
 	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
+	 *            In case of a POJO, Scala case class, or Tuple type, the
+	 *            name of the (public) field on which to perform the aggregation.
+	 *            Additionally, a dot can be used to drill down into nested
+	 *            objects, as in {@code "field1.fieldxy" }.
+	 *            Furthermore, an array index can also be specified in case of an array of
+	 *            a primitive or basic type; or "0" or "*" can be specified in case of a
+	 *            basic type (which is considered as having only one field).
 	 * @param first
 	 *            If True then in case of field equality the first object will
 	 *            be returned
@@ -497,13 +525,15 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the current element with the
+	 * Applies an aggregation that gives the current element with the
 	 * minimum value at the given position by the given key. An independent
 	 * aggregate is kept per key. If more elements have the minimum value at the
 	 * given position, the operator returns the first one by default.
 	 *
 	 * @param positionToMinBy
-	 *            The position in the data point to minimize
+	 *            The field position in the data points to minimize. This is applicable to
+	 *            Tuple types, basic and primitive array types, Scala case classes,
+	 *            and primitive types (which is considered as having one field).
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<T> minBy(int positionToMinBy) {
@@ -511,13 +541,19 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the current element with the
+	 * Applies an aggregation that gives the current element with the
 	 * minimum value at the given position by the given key. An independent
 	 * aggregate is kept per key. If more elements have the minimum value at the
 	 * given position, the operator returns the first one by default.
 	 *
 	 * @param positionToMinBy
-	 *            The position in the data point to minimize
+	 *            In case of a POJO, Scala case class, or Tuple type, the
+	 *            name of the (public) field on which to perform the aggregation.
+	 *            Additionally, a dot can be used to drill down into nested
+	 *            objects, as in {@code "field1.fieldxy" }.
+	 *            Furthermore, an array index can also be specified in case of an array of
+	 *            a primitive or basic type; or "0" or "*" can be specified in case of a
+	 *            basic type (which is considered as having only one field).
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<T> minBy(String positionToMinBy) {
@@ -525,14 +561,16 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the current element with the
+	 * Applies an aggregation that gives the current element with the
 	 * minimum value at the given position by the given key. An independent
 	 * aggregate is kept per key. If more elements have the minimum value at the
 	 * given position, the operator returns either the first or last one,
 	 * depending on the parameter set.
 	 *
 	 * @param positionToMinBy
-	 *            The position in the data point to minimize
+	 *            The field position in the data points to minimize. This is applicable to
+	 *            Tuple types, basic and primitive array types, Scala case classes,
+	 *            and primitive types (which is considered as having one field).
 	 * @param first
 	 *            If true, then the operator return the first element with the
 	 *            minimal value, otherwise returns the last
@@ -544,13 +582,15 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the current element with the
+	 * Applies an aggregation that gives the current element with the
 	 * maximum value at the given position by the given key. An independent
 	 * aggregate is kept per key. If more elements have the maximum value at the
 	 * given position, the operator returns the first one by default.
 	 *
 	 * @param positionToMaxBy
-	 *            The position in the data point to maximize
+	 *            The field position in the data points to maximize. This is applicable to
+	 *            Tuple types, basic and primitive array types, Scala case classes,
+	 *            and primitive types (which is considered as having one field).
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy) {
@@ -558,13 +598,19 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the current element with the
+	 * Applies an aggregation that gives the current element with the
 	 * maximum value at the given position by the given key. An independent
 	 * aggregate is kept per key. If more elements have the maximum value at the
 	 * given position, the operator returns the first one by default.
 	 *
 	 * @param positionToMaxBy
-	 *            The position in the data point to maximize
+	 *            In case of a POJO, Scala case class, or Tuple type, the
+	 *            name of the (public) field on which to perform the aggregation.
+	 *            Additionally, a dot can be used to drill down into nested
+	 *            objects, as in {@code "field1.fieldxy" }.
+	 *            Furthermore, an array index can also be specified in case of an array of
+	 *            a primitive or basic type; or "0" or "*" can be specified in case of a
+	 *            basic type (which is considered as having only one field).
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) {
@@ -572,14 +618,16 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the current element with the
+	 * Applies an aggregation that gives the current element with the
 	 * maximum value at the given position by the given key. An independent
 	 * aggregate is kept per key. If more elements have the maximum value at the
 	 * given position, the operator returns either the first or last one,
 	 * depending on the parameter set.
 	 *
 	 * @param positionToMaxBy
-	 *            The position in the data point to maximize.
+	 *            The field position in the data points to maximize. This is applicable to
+	 *            Tuple types, basic and primitive array types, Scala case classes,
+	 *            and primitive types (which is considered as having one field).
 	 * @param first
 	 *            If true, then the operator return the first element with the
 	 *            maximum value, otherwise returns the last