You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/15 10:49:52 UTC

[2/7] flink git commit: [FLINK-5188] [table] [connectors] [core] Adjust imports and method calls to new Row type.

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
new file mode 100644
index 0000000..a68e81e
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
@@ -0,0 +1,879 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public class RowCsvInputFormatTest {
+
+	private static Path PATH = new Path("an/ignored/file/");
+
+	// static variables for testing the removal of \r\n to \n
+	private static String FIRST_PART = "That is the first part";
+	private static String SECOND_PART = "That is the second part";
+
+	@Test
+	public void ignoreInvalidLines() throws Exception {
+		String fileContent =
+			"#description of the data\n" +
+				"header1|header2|header3|\n" +
+				"this is|1|2.0|\n" +
+				"//a comment\n" +
+				"a test|3|4.0|\n" +
+				"#next|5|6.0|\n";
+
+		FileInputSplit split = createTempFile(fileContent);
+
+		RowTypeInfo typeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+		format.setLenient(false);
+		Configuration parameters = new Configuration();
+		format.configure(new Configuration());
+		format.open(split);
+
+		Row result = new Row(3);
+		try {
+			result = format.nextRecord(result);
+			fail("Parse Exception was not thrown! (Row too short)");
+		} catch (ParseException ignored) {
+		} // => ok
+
+		try {
+			result = format.nextRecord(result);
+			fail("Parse Exception was not thrown! (Invalid int value)");
+		} catch (ParseException ignored) {
+		} // => ok
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("this is", result.getField(0));
+		assertEquals(1, result.getField(1));
+		assertEquals(2.0, result.getField(2));
+
+		try {
+			result = format.nextRecord(result);
+			fail("Parse Exception was not thrown! (Row too short)");
+		} catch (ParseException ignored) {
+		} // => ok
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("a test", result.getField(0));
+		assertEquals(3, result.getField(1));
+		assertEquals(4.0, result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("#next", result.getField(0));
+		assertEquals(5, result.getField(1));
+		assertEquals(6.0, result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNull(result);
+
+		// re-open with lenient = true
+		format.setLenient(true);
+		format.configure(parameters);
+		format.open(split);
+
+		result = new Row(3);
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("header1", result.getField(0));
+		assertNull(result.getField(1));
+		assertNull(result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("this is", result.getField(0));
+		assertEquals(1, result.getField(1));
+		assertEquals(2.0, result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("a test", result.getField(0));
+		assertEquals(3, result.getField(1));
+		assertEquals(4.0, result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("#next", result.getField(0));
+		assertEquals(5, result.getField(1));
+		assertEquals(6.0, result.getField(2));
+		result = format.nextRecord(result);
+		assertNull(result);
+	}
+
+	@Test
+	public void ignoreSingleCharPrefixComments() throws Exception {
+		String fileContent =
+			"#description of the data\n" +
+				"#successive commented line\n" +
+				"this is|1|2.0|\n" +
+				"a test|3|4.0|\n" +
+				"#next|5|6.0|\n";
+
+		FileInputSplit split = createTempFile(fileContent);
+
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+		format.setCommentPrefix("#");
+		format.configure(new Configuration());
+		format.open(split);
+
+		Row result = new Row(3);
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("this is", result.getField(0));
+		assertEquals(1, result.getField(1));
+		assertEquals(2.0, result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("a test", result.getField(0));
+		assertEquals(3, result.getField(1));
+		assertEquals(4.0, result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNull(result);
+	}
+
+	@Test
+	public void ignoreMultiCharPrefixComments() throws Exception {
+		String fileContent =
+			"//description of the data\n" +
+				"//successive commented line\n" +
+				"this is|1|2.0|\n" +
+				"a test|3|4.0|\n" +
+				"//next|5|6.0|\n";
+
+		FileInputSplit split = createTempFile(fileContent);
+
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+		format.setCommentPrefix("//");
+		format.configure(new Configuration());
+		format.open(split);
+
+		Row result = new Row(3);
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("this is", result.getField(0));
+		assertEquals(1, result.getField(1));
+		assertEquals(2.0, result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("a test", result.getField(0));
+		assertEquals(3, result.getField(1));
+		assertEquals(4.0, result.getField(2));
+		result = format.nextRecord(result);
+		assertNull(result);
+	}
+
+	@Test
+	public void readStringFields() throws Exception {
+		String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
+
+		FileInputSplit split = createTempFile(fileContent);
+
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+		format.configure(new Configuration());
+		format.open(split);
+
+		Row result = new Row(3);
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("abc", result.getField(0));
+		assertEquals("def", result.getField(1));
+		assertEquals("ghijk", result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("abc", result.getField(0));
+		assertEquals("", result.getField(1));
+		assertEquals("hhg", result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("", result.getField(0));
+		assertEquals("", result.getField(1));
+		assertEquals("", result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNull(result);
+		assertTrue(format.reachedEnd());
+	}
+
+	@Test
+	public void readMixedQuotedStringFields() throws Exception {
+		String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||";
+
+		FileInputSplit split = createTempFile(fileContent);
+
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+		format.configure(new Configuration());
+		format.enableQuotedStringParsing('@');
+		format.open(split);
+
+		Row result = new Row(3);
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("a|b|c", result.getField(0));
+		assertEquals("def", result.getField(1));
+		assertEquals("ghijk", result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("abc", result.getField(0));
+		assertEquals("", result.getField(1));
+		assertEquals("|hhg", result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("", result.getField(0));
+		assertEquals("", result.getField(1));
+		assertEquals("", result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNull(result);
+		assertTrue(format.reachedEnd());
+	}
+
+	@Test
+	public void readStringFieldsWithTrailingDelimiters() throws Exception {
+		String fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n";
+
+		FileInputSplit split = createTempFile(fileContent);
+
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+		format.setFieldDelimiter("|-");
+		format.configure(new Configuration());
+		format.open(split);
+
+		Row result = new Row(3);
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("abc", result.getField(0));
+		assertEquals("def", result.getField(1));
+		assertEquals("ghijk", result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("abc", result.getField(0));
+		assertEquals("", result.getField(1));
+		assertEquals("hhg", result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals("", result.getField(0));
+		assertEquals("", result.getField(1));
+		assertEquals("", result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNull(result);
+		assertTrue(format.reachedEnd());
+	}
+
+	@Test
+	public void testIntegerFields() throws Exception {
+		String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
+
+		FileInputSplit split = createTempFile(fileContent);
+
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO);
+
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+
+		format.setFieldDelimiter("|");
+		format.configure(new Configuration());
+		format.open(split);
+
+		Row result = new Row(5);
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals(111, result.getField(0));
+		assertEquals(222, result.getField(1));
+		assertEquals(333, result.getField(2));
+		assertEquals(444, result.getField(3));
+		assertEquals(555, result.getField(4));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals(666, result.getField(0));
+		assertEquals(777, result.getField(1));
+		assertEquals(888, result.getField(2));
+		assertEquals(999, result.getField(3));
+		assertEquals(0, result.getField(4));
+
+		result = format.nextRecord(result);
+		assertNull(result);
+		assertTrue(format.reachedEnd());
+	}
+
+	@Test
+	public void testEmptyFields() throws Exception {
+		String fileContent =
+			",,,,,,,,\n" +
+				",,,,,,,,\n" +
+				",,,,,,,,\n" +
+				",,,,,,,,\n" +
+				",,,,,,,,\n" +
+				",,,,,,,,\n" +
+				",,,,,,,,\n" +
+				",,,,,,,,\n";
+
+		FileInputSplit split = createTempFile(fileContent);
+
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.BOOLEAN_TYPE_INFO,
+			BasicTypeInfo.BYTE_TYPE_INFO,
+			BasicTypeInfo.DOUBLE_TYPE_INFO,
+			BasicTypeInfo.FLOAT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.LONG_TYPE_INFO,
+			BasicTypeInfo.SHORT_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, true);
+		format.setFieldDelimiter(",");
+		format.configure(new Configuration());
+		format.open(split);
+
+		Row result = new Row(8);
+		int linesCnt = fileContent.split("\n").length;
+
+		for (int i = 0; i < linesCnt; i++) {
+			result = format.nextRecord(result);
+			assertNull(result.getField(i));
+		}
+
+		// ensure no more rows
+		assertNull(format.nextRecord(result));
+		assertTrue(format.reachedEnd());
+	}
+
+	@Test
+	public void testDoubleFields() throws Exception {
+		String fileContent = "11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n";
+
+		FileInputSplit split = createTempFile(fileContent);
+
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.DOUBLE_TYPE_INFO,
+			BasicTypeInfo.DOUBLE_TYPE_INFO,
+			BasicTypeInfo.DOUBLE_TYPE_INFO,
+			BasicTypeInfo.DOUBLE_TYPE_INFO,
+			BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo);
+		format.setFieldDelimiter("|");
+		format.configure(new Configuration());
+		format.open(split);
+
+		Row result = new Row(5);
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals(11.1, result.getField(0));
+		assertEquals(22.2, result.getField(1));
+		assertEquals(33.3, result.getField(2));
+		assertEquals(44.4, result.getField(3));
+		assertEquals(55.5, result.getField(4));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals(66.6, result.getField(0));
+		assertEquals(77.7, result.getField(1));
+		assertEquals(88.8, result.getField(2));
+		assertEquals(99.9, result.getField(3));
+		assertEquals(0.0, result.getField(4));
+
+		result = format.nextRecord(result);
+		assertNull(result);
+		assertTrue(format.reachedEnd());
+	}
+
+	@Test
+	public void testReadFirstN() throws Exception {
+		String fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n";
+
+		FileInputSplit split = createTempFile(fileContent);
+
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO);
+
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo);
+		format.setFieldDelimiter("|");
+		format.configure(new Configuration());
+		format.open(split);
+
+		Row result = new Row(2);
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals(111, result.getField(0));
+		assertEquals(222, result.getField(1));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals(666, result.getField(0));
+		assertEquals(777, result.getField(1));
+
+		result = format.nextRecord(result);
+		assertNull(result);
+		assertTrue(format.reachedEnd());
+	}
+
+	@Test
+	public void testReadSparseWithNullFieldsForTypes() throws Exception {
+		String fileContent = "111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" +
+			"000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|";
+
+		FileInputSplit split = createTempFile(fileContent);
+
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO);
+
+		RowCsvInputFormat format = new RowCsvInputFormat(
+			PATH,
+			typeInfo,
+			new boolean[]{true, false, false, true, false, false, false, true});
+		format.setFieldDelimiter("|x|");
+		format.configure(new Configuration());
+		format.open(split);
+
+		Row result = new Row(3);
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals(111, result.getField(0));
+		assertEquals(444, result.getField(1));
+		assertEquals(888, result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals(0, result.getField(0));
+		assertEquals(777, result.getField(1));
+		assertEquals(333, result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNull(result);
+		assertTrue(format.reachedEnd());
+	}
+
+	@Test
+	public void testReadSparseWithPositionSetter() throws Exception {
+		String fileContent = "111|222|333|444|555|666|777|888|999|000|\n" +
+			"000|999|888|777|666|555|444|333|222|111|";
+
+		FileInputSplit split = createTempFile(fileContent);
+
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO);
+
+		RowCsvInputFormat format = new RowCsvInputFormat(
+			PATH,
+			typeInfo,
+			new int[]{0, 3, 7});
+		format.setFieldDelimiter("|");
+		format.configure(new Configuration());
+		format.open(split);
+
+		Row result = new Row(3);
+		result = format.nextRecord(result);
+
+		assertNotNull(result);
+		assertEquals(111, result.getField(0));
+		assertEquals(444, result.getField(1));
+		assertEquals(888, result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals(0, result.getField(0));
+		assertEquals(777, result.getField(1));
+		assertEquals(333, result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNull(result);
+		assertTrue(format.reachedEnd());
+	}
+
+	@Test
+	public void testReadSparseWithMask() throws Exception {
+		String fileContent = "111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" +
+			"000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&";
+
+		FileInputSplit split = RowCsvInputFormatTest.createTempFile(fileContent);
+
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO);
+
+		RowCsvInputFormat format = new RowCsvInputFormat(
+			PATH,
+			typeInfo,
+			new boolean[]{true, false, false, true, false, false, false, true});
+		format.setFieldDelimiter("&&");
+		format.configure(new Configuration());
+		format.open(split);
+
+		Row result = new Row(3);
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals(111, result.getField(0));
+		assertEquals(444, result.getField(1));
+		assertEquals(888, result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals(0, result.getField(0));
+		assertEquals(777, result.getField(1));
+		assertEquals(333, result.getField(2));
+
+		result = format.nextRecord(result);
+		assertNull(result);
+		assertTrue(format.reachedEnd());
+	}
+
+	@Test
+	public void testParseStringErrors() throws Exception {
+		StringParser stringParser = new StringParser();
+
+		stringParser.enableQuotedStringParsing((byte) '"');
+
+		Map<String, StringParser.ParseErrorState> failures = new HashMap<>();
+		failures.put("\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING);
+		failures.put("\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING);
+
+		for (Map.Entry<String, StringParser.ParseErrorState> failure : failures.entrySet()) {
+			int result = stringParser.parseField(
+				failure.getKey().getBytes(),
+				0,
+				failure.getKey().length(),
+				new byte[]{(byte) '|'},
+				null);
+			assertEquals(-1, result);
+			assertEquals(failure.getValue(), stringParser.getErrorState());
+		}
+	}
+
+	// Test disabled because we do not support double-quote escaped quotes right now.
+	@Test
+	@Ignore
+	public void testParserCorrectness() throws Exception {
+		// RFC 4180 Compliance Test content
+		// Taken from http://en.wikipedia.org/wiki/Comma-separated_values#Example
+		String fileContent = "Year,Make,Model,Description,Price\n" +
+			"1997,Ford,E350,\"ac, abs, moon\",3000.00\n" +
+			"1999,Chevy,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00\n" +
+			"1996,Jeep,Grand Cherokee,\"MUST SELL! air, moon roof, loaded\",4799.00\n" +
+			"1999,Chevy,\"Venture \"\"Extended Edition, Very Large\"\"\",,5000.00\n" +
+			",,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00";
+
+		FileInputSplit split = createTempFile(fileContent);
+
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo);
+		format.setSkipFirstLineAsHeader(true);
+		format.setFieldDelimiter(",");
+		format.configure(new Configuration());
+		format.open(split);
+
+		Row result = new Row(5);
+		Row r1 = new Row(5);
+		r1.setField(0, 1997);
+		r1.setField(1, "Ford");
+		r1.setField(2, "E350");
+		r1.setField(3, "ac, abs, moon");
+		r1.setField(4, 3000.0);
+
+		Row r2 = new Row(5);
+		r2.setField(0, 1999);
+		r2.setField(1, "Chevy");
+		r2.setField(2, "Venture \"Extended Edition\"");
+		r2.setField(3, "");
+		r2.setField(4, 4900.0);
+
+		Row r3 = new Row(5);
+		r3.setField(0, 1996);
+		r3.setField(1, "Jeep");
+		r3.setField(2, "Grand Cherokee");
+		r3.setField(3, "MUST SELL! air, moon roof, loaded");
+		r3.setField(4, 4799.0);
+
+		Row r4 = new Row(5);
+		r4.setField(0, 1999);
+		r4.setField(1, "Chevy");
+		r4.setField(2, "Venture \"Extended Edition, Very Large\"");
+		r4.setField(3, "");
+		r4.setField(4, 5000.0);
+
+		Row r5 = new Row(5);
+		r5.setField(0, 0);
+		r5.setField(1, "");
+		r5.setField(2, "Venture \"Extended Edition\"");
+		r5.setField(3, "");
+		r5.setField(4, 4900.0);
+
+		Row[] expectedLines = new Row[]{r1, r2, r3, r4, r5};
+		for (Row expected : expectedLines) {
+			result = format.nextRecord(result);
+			assertEquals(expected, result);
+		}
+		assertNull(format.nextRecord(result));
+		assertTrue(format.reachedEnd());
+	}
+
+	@Test
+	public void testWindowsLineEndRemoval() throws Exception {
+
+		// check typical use case -- linux file is correct and it is set up to linux(\n)
+		testRemovingTrailingCR("\n", "\n");
+
+		// check typical windows case -- windows file endings and file has windows file endings set up
+		testRemovingTrailingCR("\r\n", "\r\n");
+
+		// check problematic case windows file -- windows file endings(\r\n)
+		// but linux line endings (\n) set up
+		testRemovingTrailingCR("\r\n", "\n");
+
+		// check problematic case linux file -- linux file endings (\n)
+		// but windows file endings set up (\r\n)
+		// specific setup for windows line endings will expect \r\n because
+		// it has to be set up and is not standard.
+	}
+
+	@Test
+	public void testQuotedStringParsingWithIncludeFields() throws Exception {
+		String fileContent = "\"20:41:52-1-3-2015\"|\"Re: Taskmanager memory error in Eclipse\"|" +
+			"\"Blahblah <bl...@blahblah.org>\"|\"blaaa|\"blubb\"";
+		File tempFile = File.createTempFile("CsvReaderQuotedString", "tmp");
+		tempFile.deleteOnExit();
+		tempFile.setWritable(true);
+
+		OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(tempFile));
+		writer.write(fileContent);
+		writer.close();
+
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		RowCsvInputFormat inputFormat = new RowCsvInputFormat(
+			new Path(tempFile.toURI().toString()),
+			typeInfo,
+			new boolean[]{true, false, true});
+		inputFormat.enableQuotedStringParsing('"');
+		inputFormat.setFieldDelimiter("|");
+		inputFormat.setDelimiter('\n');
+		inputFormat.configure(new Configuration());
+
+		FileInputSplit[] splits = inputFormat.createInputSplits(1);
+		inputFormat.open(splits[0]);
+
+		Row record = inputFormat.nextRecord(new Row(2));
+		assertEquals("20:41:52-1-3-2015", record.getField(0));
+		assertEquals("Blahblah <bl...@blahblah.org>", record.getField(1));
+	}
+
+	@Test
+	public void testQuotedStringParsingWithEscapedQuotes() throws Exception {
+		String fileContent = "\"\\\"Hello\\\" World\"|\"We are\\\" young\"";
+		File tempFile = File.createTempFile("CsvReaderQuotedString", "tmp");
+		tempFile.deleteOnExit();
+		tempFile.setWritable(true);
+
+		OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(tempFile));
+		writer.write(fileContent);
+		writer.close();
+
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			BasicTypeInfo.STRING_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		RowCsvInputFormat inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo);
+		inputFormat.enableQuotedStringParsing('"');
+		inputFormat.setFieldDelimiter("|");
+		inputFormat.setDelimiter('\n');
+		inputFormat.configure(new Configuration());
+
+		FileInputSplit[] splits = inputFormat.createInputSplits(1);
+		inputFormat.open(splits[0]);
+
+		Row record = inputFormat.nextRecord(new Row(2));
+		assertEquals("\\\"Hello\\\" World", record.getField(0));
+		assertEquals("We are\\\" young", record.getField(1));
+	}
+
+	@Test
+	public void testSqlTimeFields() throws Exception {
+		String fileContent = "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5\n" +
+			"1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5.3\n";
+
+		FileInputSplit split = createTempFile(fileContent);
+
+		RowTypeInfo typeInfo = new RowTypeInfo(
+			SqlTimeTypeInfo.DATE,
+			SqlTimeTypeInfo.TIME,
+			SqlTimeTypeInfo.TIMESTAMP,
+			SqlTimeTypeInfo.TIMESTAMP);
+
+		RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo);
+		format.setFieldDelimiter("|");
+		format.configure(new Configuration());
+		format.open(split);
+
+		Row result = new Row(4);
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals(Date.valueOf("1990-10-14"), result.getField(0));
+		assertEquals(Time.valueOf("02:42:25"), result.getField(1));
+		assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.getField(2));
+		assertEquals(Timestamp.valueOf("1990-01-04 02:02:05"), result.getField(3));
+
+		result = format.nextRecord(result);
+		assertNotNull(result);
+		assertEquals(Date.valueOf("1990-10-14"), result.getField(0));
+		assertEquals(Time.valueOf("02:42:25"), result.getField(1));
+		assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.getField(2));
+		assertEquals(Timestamp.valueOf("1990-01-04 02:02:05.3"), result.getField(3));
+
+		result = format.nextRecord(result);
+		assertNull(result);
+		assertTrue(format.reachedEnd());
+	}
+
+	private static FileInputSplit createTempFile(String content) throws IOException {
+		File tempFile = File.createTempFile("test_contents", "tmp");
+		tempFile.deleteOnExit();
+		OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile), StandardCharsets.UTF_8);
+		wrt.write(content);
+		wrt.close();
+		return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[]{"localhost"});
+	}
+
+	private static void testRemovingTrailingCR(String lineBreakerInFile, String lineBreakerSetup) throws IOException {
+		String fileContent = FIRST_PART + lineBreakerInFile + SECOND_PART + lineBreakerInFile;
+
+		// create input file
+		File tempFile = File.createTempFile("CsvInputFormatTest", "tmp");
+		tempFile.deleteOnExit();
+		tempFile.setWritable(true);
+
+		OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
+		wrt.write(fileContent);
+		wrt.close();
+
+		RowTypeInfo typeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO);
+
+		RowCsvInputFormat inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo);
+		inputFormat.configure(new Configuration());
+		inputFormat.setDelimiter(lineBreakerSetup);
+
+		FileInputSplit[] splits = inputFormat.createInputSplits(1);
+		inputFormat.open(splits[0]);
+
+		Row result = inputFormat.nextRecord(new Row(1));
+		assertNotNull("Expecting to not return null", result);
+		assertEquals(FIRST_PART, result.getField(0));
+
+		result = inputFormat.nextRecord(result);
+		assertNotNull("Expecting to not return null", result);
+		assertEquals(SECOND_PART, result.getField(0));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
index 3517338..0f748c5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
@@ -133,7 +133,7 @@ class BatchTableEnvironment(
     * Converts the given [[Table]] into a [[DataSet]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
-    * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
     *
@@ -150,7 +150,7 @@ class BatchTableEnvironment(
     * Converts the given [[Table]] into a [[DataSet]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
-    * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
index 83293e3..3218ced 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
@@ -135,7 +135,7 @@ class StreamTableEnvironment(
     * Converts the given [[Table]] into a [[DataStream]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
-    * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
     *
@@ -152,7 +152,7 @@ class StreamTableEnvironment(
     * Converts the given [[Table]] into a [[DataStream]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
-    * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
index f4bfe31..26fe51e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
@@ -128,7 +128,7 @@ class BatchTableEnvironment(
     * Converts the given [[Table]] into a [[DataSet]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
-    * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
index dde69d5..044ace8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
@@ -131,7 +131,7 @@ class StreamTableEnvironment(
     * Converts the given [[Table]] into a [[DataStream]] of a specified type.
     *
     * The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
-    * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+    * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
     * types: Fields are mapped by position, field types must match.
     * - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
index 3bce5cf..1e8bf39 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
@@ -18,7 +18,8 @@
 package org.apache.flink.api.scala
 
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.{Row, Table}
+import org.apache.flink.api.table.Table
+import org.apache.flink.types.Row
 import scala.language.implicitConversions
 import org.apache.flink.streaming.api.scala._
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index 918b01b..6d00ab3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -37,6 +37,7 @@ import org.apache.flink.api.table.plan.rules.FlinkRuleSets
 import org.apache.flink.api.table.plan.schema.{DataSetTable, TableSourceTable}
 import org.apache.flink.api.table.sinks.{BatchTableSink, TableSink}
 import org.apache.flink.api.table.sources.BatchTableSource
+import org.apache.flink.types.Row
 
 /**
   * The abstract base class for batch TableEnvironments.
@@ -168,7 +169,7 @@ abstract class BatchTableEnvironment(
   private[flink] def explain(table: Table, extended: Boolean): String = {
     val ast = table.getRelNode
     val optimizedPlan = optimize(ast)
-    val dataSet = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
+    val dataSet = translate[Row](optimizedPlan) (TypeExtractor.createTypeInfo(classOf[Row]))
     dataSet.output(new DiscardingOutputFormat[Row])
     val env = dataSet.getExecutionEnvironment
     val jasonSqlPlan = env.getExecutionPlan

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index 8f00586..da20e07 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -37,6 +37,7 @@ import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink}
 import org.apache.flink.api.table.sources.StreamTableSource
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.types.Row
 
 /**
   * The base class for stream TableEnvironments.

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index b6d0e31..07ea860 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -32,15 +32,15 @@ import org.apache.calcite.sql.util.ChainedSqlOperatorTable
 import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets}
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.java.table.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
 import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
 import org.apache.flink.api.table.codegen.ExpressionReducer
 import org.apache.flink.api.table.expressions.{Alias, Expression, UnresolvedFieldReference}
-import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createTableSqlFunctions, createScalarSqlFunction}
-import org.apache.flink.api.table.functions.{TableFunction, ScalarFunction}
+import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
+import org.apache.flink.api.table.functions.{ScalarFunction, TableFunction}
 import org.apache.flink.api.table.plan.cost.DataSetCostFactory
 import org.apache.flink.api.table.plan.schema.RelTable
 import org.apache.flink.api.table.sinks.TableSink
@@ -347,6 +347,7 @@ abstract class TableEnvironment(val config: TableConfig) {
       case t: TupleTypeInfo[A] => t.getFieldNames
       case c: CaseClassTypeInfo[A] => c.getFieldNames
       case p: PojoTypeInfo[A] => p.getFieldNames
+      case r: RowTypeInfo => r.getFieldNames
       case tpe =>
         throw new TableException(s"Type $tpe lacks explicit field naming")
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
index 4092a24..a706309 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
@@ -28,7 +28,8 @@ import org.apache.flink.api.common.typeinfo.{FractionalTypeInfo, SqlTimeTypeInfo
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo, TypeExtractor}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.table.typeutils.{TimeIntervalTypeInfo, RowTypeInfo, TypeCheckUtils}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.{TimeIntervalTypeInfo, TypeCheckUtils}
 
 object CodeGenUtils {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index 7caad12..cdb3753 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -37,7 +37,8 @@ import org.apache.flink.api.table.codegen.Indenter.toISC
 import org.apache.flink.api.table.codegen.calls.FunctionGenerator
 import org.apache.flink.api.table.codegen.calls.ScalarOperators._
 import org.apache.flink.api.table.functions.UserDefinedFunction
-import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.TypeConverter
 import org.apache.flink.api.table.typeutils.TypeCheckUtils._
 import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
 
@@ -1139,7 +1140,7 @@ class CodeGenerator(
 
           case ProductAccessor(i) =>
             // Object
-            val inputCode = s"($fieldTypeTerm) $inputTerm.productElement($i)"
+            val inputCode = s"($fieldTypeTerm) $inputTerm.getField($i)"
             generateInputFieldUnboxing(fieldType, inputCode)
 
           case ObjectPrivateFieldAccessor(field) =>

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
index 731452f..871264e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
@@ -25,8 +25,10 @@ import org.apache.calcite.rex.{RexBuilder, RexNode}
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter}
-import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableConfig}
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
 
 import scala.collection.JavaConverters._
 
@@ -69,7 +71,7 @@ class ExpressionReducer(config: TableConfig)
     }
 
     val literalTypes = literals.map(e => FlinkTypeFactory.toTypeInfo(e.getType))
-    val resultType = new RowTypeInfo(literalTypes)
+    val resultType = new RowTypeInfo(literalTypes: _*)
 
     // generate MapFunction
     val generator = new CodeGenerator(config, false, EMPTY_ROW_INFO)
@@ -105,7 +107,7 @@ class ExpressionReducer(config: TableConfig)
           reducedValues.add(unreduced)
         case _ =>
           val literal = rexBuilder.makeLiteral(
-            reduced.productElement(reducedIdx),
+            reduced.getField(reducedIdx),
             unreduced.getType,
             true)
           reducedValues.add(literal)

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index e85ade0..94513d9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -28,8 +28,10 @@ import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.plan.nodes.FlinkAggregate
 import org.apache.flink.api.table.runtime.aggregate.AggregateUtil
 import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair
-import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter}
-import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory, Row}
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory}
+import org.apache.flink.types.Row
 
 import scala.collection.JavaConverters._
 
@@ -119,7 +121,7 @@ class DataSetAggregate(
       .map(mapFunction)
       .name(prepareOpName)
 
-    val rowTypeInfo = new RowTypeInfo(fieldTypes)
+    val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
 
     val result = {
       if (groupingKeys.length > 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
index c7d5131..7133773 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
@@ -32,8 +32,10 @@ import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate._
 import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
 import org.apache.flink.api.table.runtime.aggregate.{Aggregate, _}
 import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, RowTypeInfo, TimeIntervalTypeInfo, TypeConverter}
-import org.apache.flink.api.table.{FlinkTypeFactory, Row, StreamTableEnvironment}
+import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeConverter}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{FlinkTypeFactory, StreamTableEnvironment}
+import org.apache.flink.types.Row
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.time.Time
@@ -117,7 +119,7 @@ class DataStreamAggregate(
       .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
       .toArray
 
-    val rowTypeInfo = new RowTypeInfo(fieldTypes)
+    val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
 
     val aggString = aggregationToString(
       inputType,

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
index 54cb8d1..3bf3e0c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.logical.{LogicalValues, LogicalUnion, LogicalAggregate}
 import org.apache.calcite.rex.RexLiteral
 import org.apache.flink.api.table._
+import org.apache.flink.types.Row
 import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention}
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
index a11e8c1..72be00c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
@@ -18,13 +18,13 @@
 
 package org.apache.flink.api.table.plan.schema
 
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.api.table.sources.TableSource
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 
 /** Table which defines an external table via a [[TableSource]] */
 class TableSourceTable(val tableSource: TableSource[_])
   extends FlinkTable[Row](
-    typeInfo = new RowTypeInfo(tableSource.getFieldTypes),
+    typeInfo = new RowTypeInfo(tableSource.getFieldTypes: _*),
     fieldIndexes = 0.until(tableSource.getNumberOfFields).toArray,
     fieldNames = tableSource.getFieldsNames)

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
index 1e91711..273aa60 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.api.table.runtime.aggregate
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 
 /**
  * The interface for all Flink aggregate functions, which expressed in terms of initiate(),

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
index 7ace2c5..4c473d4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.table.runtime.aggregate
 import java.lang.Iterable
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
index 4b045be..db5f477 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.table.runtime.aggregate
 import java.lang.Iterable
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
index 7559cec..0699bfa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate
 import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.util.Preconditions
 
@@ -47,11 +47,11 @@ class AggregateMapFunction[IN, OUT](
     
     val input = value.asInstanceOf[Row]
     for (i <- 0 until aggregates.length) {
-      val fieldValue = input.productElement(aggFields(i))
+      val fieldValue = input.getField(aggFields(i))
       aggregates(i).prepare(fieldValue, output)
     }
     for (i <- 0 until groupingKeys.length) {
-      output.setField(i, input.productElement(groupingKeys(i)))
+      output.setField(i, input.getField(groupingKeys(i)))
     }
     output.asInstanceOf[OUT]
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
index ebf0ca7..b2cf07e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.table.runtime.aggregate
 import java.lang.Iterable
 
 import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction}
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.util.{Collector, Preconditions}
 
@@ -74,7 +74,7 @@ class AggregateReduceCombineFunction(
 
     // Set group keys to aggregateBuffer.
     for (i <- groupKeysMapping.indices) {
-      aggregateBuffer.setField(i, last.productElement(i))
+      aggregateBuffer.setField(i, last.getField(i))
     }
 
     aggregateBuffer

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
index 8f096cc..6fe712b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate
 import java.lang.Iterable
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.util.{Collector, Preconditions}
 
@@ -78,7 +78,7 @@ class AggregateReduceGroupFunction(
     // Set group keys value to final output.
     groupKeysMapping.foreach {
       case (after, previous) =>
-        output.setField(after, last.productElement(previous))
+        output.setField(after, last.getField(previous))
     }
 
     // Evaluate final aggregate value and set to output.

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
index 9b7ea0b..ff8f6fb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
@@ -22,7 +22,7 @@ import java.lang.Iterable
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
index 4428963..a181068 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
@@ -31,9 +31,10 @@ import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.api.table.expressions.{WindowEnd, WindowStart}
 import org.apache.flink.api.table.plan.logical._
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.table.typeutils.TypeCheckUtils._
-import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableException}
+import org.apache.flink.api.table.{FlinkTypeFactory,  TableException}
+import org.apache.flink.types.Row
 import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
 import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
 import scala.collection.JavaConversions._
@@ -529,7 +530,7 @@ object AggregateUtil {
 
     // concat group key types and aggregation types
     val allFieldTypes = groupingTypes ++: aggTypes
-    val partialType = new RowTypeInfo(allFieldTypes)
+    val partialType = new RowTypeInfo(allFieldTypes: _*)
     partialType
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
index 6fd890d..4e77549 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
@@ -22,7 +22,7 @@ import java.lang.Iterable
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
index ce5bc81..998ae62 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate
 
 import com.google.common.math.LongMath
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import java.math.BigDecimal
 import java.math.BigInteger
 
@@ -52,10 +52,10 @@ abstract class IntegralAvgAggregate[T] extends AvgAggregate[T] {
   }
 
   override def merge(partial: Row, buffer: Row): Unit = {
-    val partialSum = partial.productElement(partialSumIndex).asInstanceOf[Long]
-    val partialCount = partial.productElement(partialCountIndex).asInstanceOf[Long]
-    val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
-    val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val partialSum = partial.getField(partialSumIndex).asInstanceOf[Long]
+    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     buffer.setField(partialSumIndex, LongMath.checkedAdd(partialSum, bufferSum))
     buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
   }
@@ -81,8 +81,8 @@ class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
   }
 
   override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
-    val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     if (bufferCount == 0L) {
       null
     } else {
@@ -100,8 +100,8 @@ class ShortAvgAggregate extends IntegralAvgAggregate[Short] {
   }
 
   override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
-    val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     if (bufferCount == 0L) {
       null
     } else {
@@ -119,8 +119,8 @@ class IntAvgAggregate extends IntegralAvgAggregate[Int] {
   }
 
   override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
-    val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     if (bufferCount == 0L) {
       null
     } else {
@@ -156,17 +156,17 @@ class LongAvgAggregate extends IntegralAvgAggregate[Long] {
   }
 
   override def merge(partial: Row, buffer: Row): Unit = {
-    val partialSum = partial.productElement(partialSumIndex).asInstanceOf[BigInteger]
-    val partialCount = partial.productElement(partialCountIndex).asInstanceOf[Long]
-    val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[BigInteger]
-    val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigInteger]
+    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     buffer.setField(partialSumIndex, partialSum.add(bufferSum))
     buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
   }
 
   override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[BigInteger]
-    val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     if (bufferCount == 0L) {
       null
     } else {
@@ -192,10 +192,10 @@ abstract class FloatingAvgAggregate[T: Numeric] extends AvgAggregate[T] {
   }
 
   override def merge(partial: Row, buffer: Row): Unit = {
-    val partialSum = partial.productElement(partialSumIndex).asInstanceOf[Double]
-    val partialCount = partial.productElement(partialCountIndex).asInstanceOf[Long]
-    val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double]
-    val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val partialSum = partial.getField(partialSumIndex).asInstanceOf[Double]
+    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
 
     buffer.setField(partialSumIndex, partialSum + bufferSum)
     buffer.setField(partialCountIndex, partialCount + bufferCount)
@@ -224,8 +224,8 @@ class FloatAvgAggregate extends FloatingAvgAggregate[Float] {
 
 
   override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double]
-    val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     if (bufferCount == 0L) {
       null
     } else {
@@ -243,8 +243,8 @@ class DoubleAvgAggregate extends FloatingAvgAggregate[Double] {
   }
 
   override def doEvaluate(buffer: Row): Any = {
-    val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double]
-    val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     if (bufferCount == 0L) {
       null
     } else {
@@ -275,18 +275,18 @@ class DecimalAvgAggregate extends AvgAggregate[BigDecimal] {
   }
 
   override def merge(partial: Row, buffer: Row): Unit = {
-    val partialSum = partial.productElement(partialSumIndex).asInstanceOf[BigDecimal]
-    val partialCount = partial.productElement(partialCountIndex).asInstanceOf[Long]
-    val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[BigDecimal]
-    val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigDecimal]
+    val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
+    val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     buffer.setField(partialSumIndex, partialSum.add(bufferSum))
     buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
   }
 
   override def evaluate(buffer: Row): BigDecimal = {
-    val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+    val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
     if (bufferCount != 0) {
-      val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[BigDecimal]
+      val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal]
       bufferSum.divide(BigDecimal.valueOf(bufferCount))
     } else {
       null.asInstanceOf[BigDecimal]

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
index d9f288a..4d6d20b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.api.table.runtime.aggregate
 
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 
 class CountAggregate extends Aggregate[Long] {
   private var countIndex: Int = _
@@ -28,13 +28,13 @@ class CountAggregate extends Aggregate[Long] {
   }
 
   override def merge(intermediate: Row, buffer: Row): Unit = {
-    val partialCount = intermediate.productElement(countIndex).asInstanceOf[Long]
-    val bufferCount = buffer.productElement(countIndex).asInstanceOf[Long]
+    val partialCount = intermediate.getField(countIndex).asInstanceOf[Long]
+    val bufferCount = buffer.getField(countIndex).asInstanceOf[Long]
     buffer.setField(countIndex, partialCount + bufferCount)
   }
 
   override def evaluate(buffer: Row): Long = {
-    buffer.productElement(countIndex).asInstanceOf[Long]
+    buffer.getField(countIndex).asInstanceOf[Long]
   }
 
   override def prepare(value: Any, intermediate: Row): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
index 85ad8e5..48e2313 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate
 
 import java.lang.Iterable
 
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
 import org.apache.flink.util.Collector

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
index d3f871a..1a85dca 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate
 
 import java.lang.Iterable
 
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
@@ -66,7 +66,7 @@ class IncrementalAggregateAllWindowFunction[W <: Window](
       // Set group keys value to final output.
       groupKeysMapping.foreach {
         case (after, previous) =>
-          output.setField(after, record.productElement(previous))
+          output.setField(after, record.getField(previous))
       }
       // Evaluate final aggregate value and set to output.
       aggregateMapping.foreach {

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
index e2830da..5c36821 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.api.table.runtime.aggregate
 
 import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.util.Preconditions
 
 /**
@@ -54,7 +54,7 @@ class IncrementalAggregateReduceFunction(
 
     // copy all fields of value1 into accumulatorRow
     (0 until intermediateRowArity)
-    .foreach(i => accumulatorRow.setField(i, value1.productElement(i)))
+    .foreach(i => accumulatorRow.setField(i, value1.getField(i)))
     // merge value2 to accumulatorRow
     aggregates.foreach(_.merge(value2, accumulatorRow))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
index c880f87..2513383 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate
 import java.lang.Iterable
 
 import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.util.Collector

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
index 81e6890..d0d71ee 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate
 import java.lang.Iterable
 
 import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
@@ -68,7 +68,7 @@ class IncrementalAggregateWindowFunction[W <: Window](
       // Set group keys value to final output.
       groupKeysMapping.foreach {
         case (after, previous) =>
-          output.setField(after, record.productElement(previous))
+          output.setField(after, record.getField(previous))
       }
       // Evaluate final aggregate value and set to output.
       aggregateMapping.foreach {

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
index 9267527..2cb3dc7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate
 import java.math.BigDecimal
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 
 abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
 
@@ -57,9 +57,9 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
    * @param buffer
    */
   override def merge(intermediate: Row, buffer: Row): Unit = {
-    val partialValue = intermediate.productElement(maxIndex).asInstanceOf[T]
+    val partialValue = intermediate.getField(maxIndex).asInstanceOf[T]
     if (partialValue != null) {
-      val bufferValue = buffer.productElement(maxIndex).asInstanceOf[T]
+      val bufferValue = buffer.getField(maxIndex).asInstanceOf[T]
       if (bufferValue != null) {
         val max : T = if (ord.compare(partialValue, bufferValue) > 0) partialValue else bufferValue
         buffer.setField(maxIndex, max)
@@ -76,7 +76,7 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
    * @return
    */
   override def evaluate(buffer: Row): T = {
-    buffer.productElement(maxIndex).asInstanceOf[T]
+    buffer.getField(maxIndex).asInstanceOf[T]
   }
 
   override def supportPartial: Boolean = true
@@ -147,9 +147,9 @@ class DecimalMaxAggregate extends Aggregate[BigDecimal] {
   }
 
   override def merge(partial: Row, buffer: Row): Unit = {
-    val partialValue = partial.productElement(minIndex).asInstanceOf[BigDecimal]
+    val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal]
     if (partialValue != null) {
-      val bufferValue = buffer.productElement(minIndex).asInstanceOf[BigDecimal]
+      val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal]
       if (bufferValue != null) {
         val min = if (partialValue.compareTo(bufferValue) > 0) partialValue else bufferValue
         buffer.setField(minIndex, min)
@@ -160,7 +160,7 @@ class DecimalMaxAggregate extends Aggregate[BigDecimal] {
   }
 
   override def evaluate(buffer: Row): BigDecimal = {
-    buffer.productElement(minIndex).asInstanceOf[BigDecimal]
+    buffer.getField(minIndex).asInstanceOf[BigDecimal]
   }
 
   override def supportPartial: Boolean = true

http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
index 7e2ebf4..c1c79ec 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate
 
 import java.math.BigDecimal
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
 
 abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
 
@@ -56,9 +56,9 @@ abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
    * @param buffer
    */
   override def merge(partial: Row, buffer: Row): Unit = {
-    val partialValue = partial.productElement(minIndex).asInstanceOf[T]
+    val partialValue = partial.getField(minIndex).asInstanceOf[T]
     if (partialValue != null) {
-      val bufferValue = buffer.productElement(minIndex).asInstanceOf[T]
+      val bufferValue = buffer.getField(minIndex).asInstanceOf[T]
       if (bufferValue != null) {
         val min : T = if (ord.compare(partialValue, bufferValue) < 0) partialValue else bufferValue
         buffer.setField(minIndex, min)
@@ -75,7 +75,7 @@ abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
    * @return
    */
   override def evaluate(buffer: Row): T = {
-    buffer.productElement(minIndex).asInstanceOf[T]
+    buffer.getField(minIndex).asInstanceOf[T]
   }
 
   override def supportPartial: Boolean = true
@@ -146,9 +146,9 @@ class DecimalMinAggregate extends Aggregate[BigDecimal] {
   }
 
   override def merge(partial: Row, buffer: Row): Unit = {
-    val partialValue = partial.productElement(minIndex).asInstanceOf[BigDecimal]
+    val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal]
     if (partialValue != null) {
-      val bufferValue = buffer.productElement(minIndex).asInstanceOf[BigDecimal]
+      val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal]
       if (bufferValue != null) {
         val min = if (partialValue.compareTo(bufferValue) < 0) partialValue else bufferValue
         buffer.setField(minIndex, min)
@@ -159,7 +159,7 @@ class DecimalMinAggregate extends Aggregate[BigDecimal] {
   }
 
   override def evaluate(buffer: Row): BigDecimal = {
-    buffer.productElement(minIndex).asInstanceOf[BigDecimal]
+    buffer.getField(minIndex).asInstanceOf[BigDecimal]
   }
 
   override def supportPartial: Boolean = true