You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/15 10:49:52 UTC
[2/7] flink git commit: [FLINK-5188] [table] [connectors] [core]
Adjust imports and method calls to new Row type.
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
new file mode 100644
index 0000000..a68e81e
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
@@ -0,0 +1,879 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public class RowCsvInputFormatTest {
+
+ private static Path PATH = new Path("an/ignored/file/");
+
+ // static variables for testing the removal of \r\n to \n
+ private static String FIRST_PART = "That is the first part";
+ private static String SECOND_PART = "That is the second part";
+
+ @Test
+ public void ignoreInvalidLines() throws Exception {
+ String fileContent =
+ "#description of the data\n" +
+ "header1|header2|header3|\n" +
+ "this is|1|2.0|\n" +
+ "//a comment\n" +
+ "a test|3|4.0|\n" +
+ "#next|5|6.0|\n";
+
+ FileInputSplit split = createTempFile(fileContent);
+
+ RowTypeInfo typeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+ format.setLenient(false);
+ Configuration parameters = new Configuration();
+ format.configure(new Configuration());
+ format.open(split);
+
+ Row result = new Row(3);
+ try {
+ result = format.nextRecord(result);
+ fail("Parse Exception was not thrown! (Row too short)");
+ } catch (ParseException ignored) {
+ } // => ok
+
+ try {
+ result = format.nextRecord(result);
+ fail("Parse Exception was not thrown! (Invalid int value)");
+ } catch (ParseException ignored) {
+ } // => ok
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("this is", result.getField(0));
+ assertEquals(1, result.getField(1));
+ assertEquals(2.0, result.getField(2));
+
+ try {
+ result = format.nextRecord(result);
+ fail("Parse Exception was not thrown! (Row too short)");
+ } catch (ParseException ignored) {
+ } // => ok
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("a test", result.getField(0));
+ assertEquals(3, result.getField(1));
+ assertEquals(4.0, result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("#next", result.getField(0));
+ assertEquals(5, result.getField(1));
+ assertEquals(6.0, result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNull(result);
+
+ // re-open with lenient = true
+ format.setLenient(true);
+ format.configure(parameters);
+ format.open(split);
+
+ result = new Row(3);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("header1", result.getField(0));
+ assertNull(result.getField(1));
+ assertNull(result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("this is", result.getField(0));
+ assertEquals(1, result.getField(1));
+ assertEquals(2.0, result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("a test", result.getField(0));
+ assertEquals(3, result.getField(1));
+ assertEquals(4.0, result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("#next", result.getField(0));
+ assertEquals(5, result.getField(1));
+ assertEquals(6.0, result.getField(2));
+ result = format.nextRecord(result);
+ assertNull(result);
+ }
+
+ @Test
+ public void ignoreSingleCharPrefixComments() throws Exception {
+ String fileContent =
+ "#description of the data\n" +
+ "#successive commented line\n" +
+ "this is|1|2.0|\n" +
+ "a test|3|4.0|\n" +
+ "#next|5|6.0|\n";
+
+ FileInputSplit split = createTempFile(fileContent);
+
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+ format.setCommentPrefix("#");
+ format.configure(new Configuration());
+ format.open(split);
+
+ Row result = new Row(3);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("this is", result.getField(0));
+ assertEquals(1, result.getField(1));
+ assertEquals(2.0, result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("a test", result.getField(0));
+ assertEquals(3, result.getField(1));
+ assertEquals(4.0, result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ }
+
+ @Test
+ public void ignoreMultiCharPrefixComments() throws Exception {
+ String fileContent =
+ "//description of the data\n" +
+ "//successive commented line\n" +
+ "this is|1|2.0|\n" +
+ "a test|3|4.0|\n" +
+ "//next|5|6.0|\n";
+
+ FileInputSplit split = createTempFile(fileContent);
+
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+ format.setCommentPrefix("//");
+ format.configure(new Configuration());
+ format.open(split);
+
+ Row result = new Row(3);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("this is", result.getField(0));
+ assertEquals(1, result.getField(1));
+ assertEquals(2.0, result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("a test", result.getField(0));
+ assertEquals(3, result.getField(1));
+ assertEquals(4.0, result.getField(2));
+ result = format.nextRecord(result);
+ assertNull(result);
+ }
+
+ @Test
+ public void readStringFields() throws Exception {
+ String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
+
+ FileInputSplit split = createTempFile(fileContent);
+
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+ format.configure(new Configuration());
+ format.open(split);
+
+ Row result = new Row(3);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("abc", result.getField(0));
+ assertEquals("def", result.getField(1));
+ assertEquals("ghijk", result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("abc", result.getField(0));
+ assertEquals("", result.getField(1));
+ assertEquals("hhg", result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("", result.getField(0));
+ assertEquals("", result.getField(1));
+ assertEquals("", result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ assertTrue(format.reachedEnd());
+ }
+
+ @Test
+ public void readMixedQuotedStringFields() throws Exception {
+ String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||";
+
+ FileInputSplit split = createTempFile(fileContent);
+
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+ format.configure(new Configuration());
+ format.enableQuotedStringParsing('@');
+ format.open(split);
+
+ Row result = new Row(3);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("a|b|c", result.getField(0));
+ assertEquals("def", result.getField(1));
+ assertEquals("ghijk", result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("abc", result.getField(0));
+ assertEquals("", result.getField(1));
+ assertEquals("|hhg", result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("", result.getField(0));
+ assertEquals("", result.getField(1));
+ assertEquals("", result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ assertTrue(format.reachedEnd());
+ }
+
+ @Test
+ public void readStringFieldsWithTrailingDelimiters() throws Exception {
+ String fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n";
+
+ FileInputSplit split = createTempFile(fileContent);
+
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+ format.setFieldDelimiter("|-");
+ format.configure(new Configuration());
+ format.open(split);
+
+ Row result = new Row(3);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("abc", result.getField(0));
+ assertEquals("def", result.getField(1));
+ assertEquals("ghijk", result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("abc", result.getField(0));
+ assertEquals("", result.getField(1));
+ assertEquals("hhg", result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals("", result.getField(0));
+ assertEquals("", result.getField(1));
+ assertEquals("", result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ assertTrue(format.reachedEnd());
+ }
+
+ @Test
+ public void testIntegerFields() throws Exception {
+ String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n";
+
+ FileInputSplit split = createTempFile(fileContent);
+
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|");
+
+ format.setFieldDelimiter("|");
+ format.configure(new Configuration());
+ format.open(split);
+
+ Row result = new Row(5);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals(111, result.getField(0));
+ assertEquals(222, result.getField(1));
+ assertEquals(333, result.getField(2));
+ assertEquals(444, result.getField(3));
+ assertEquals(555, result.getField(4));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals(666, result.getField(0));
+ assertEquals(777, result.getField(1));
+ assertEquals(888, result.getField(2));
+ assertEquals(999, result.getField(3));
+ assertEquals(0, result.getField(4));
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ assertTrue(format.reachedEnd());
+ }
+
+ @Test
+ public void testEmptyFields() throws Exception {
+ String fileContent =
+ ",,,,,,,,\n" +
+ ",,,,,,,,\n" +
+ ",,,,,,,,\n" +
+ ",,,,,,,,\n" +
+ ",,,,,,,,\n" +
+ ",,,,,,,,\n" +
+ ",,,,,,,,\n" +
+ ",,,,,,,,\n";
+
+ FileInputSplit split = createTempFile(fileContent);
+
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.BOOLEAN_TYPE_INFO,
+ BasicTypeInfo.BYTE_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.FLOAT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.SHORT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo, true);
+ format.setFieldDelimiter(",");
+ format.configure(new Configuration());
+ format.open(split);
+
+ Row result = new Row(8);
+ int linesCnt = fileContent.split("\n").length;
+
+ for (int i = 0; i < linesCnt; i++) {
+ result = format.nextRecord(result);
+ assertNull(result.getField(i));
+ }
+
+ // ensure no more rows
+ assertNull(format.nextRecord(result));
+ assertTrue(format.reachedEnd());
+ }
+
+ @Test
+ public void testDoubleFields() throws Exception {
+ String fileContent = "11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n";
+
+ FileInputSplit split = createTempFile(fileContent);
+
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo);
+ format.setFieldDelimiter("|");
+ format.configure(new Configuration());
+ format.open(split);
+
+ Row result = new Row(5);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals(11.1, result.getField(0));
+ assertEquals(22.2, result.getField(1));
+ assertEquals(33.3, result.getField(2));
+ assertEquals(44.4, result.getField(3));
+ assertEquals(55.5, result.getField(4));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals(66.6, result.getField(0));
+ assertEquals(77.7, result.getField(1));
+ assertEquals(88.8, result.getField(2));
+ assertEquals(99.9, result.getField(3));
+ assertEquals(0.0, result.getField(4));
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ assertTrue(format.reachedEnd());
+ }
+
+ @Test
+ public void testReadFirstN() throws Exception {
+ String fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n";
+
+ FileInputSplit split = createTempFile(fileContent);
+
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo);
+ format.setFieldDelimiter("|");
+ format.configure(new Configuration());
+ format.open(split);
+
+ Row result = new Row(2);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals(111, result.getField(0));
+ assertEquals(222, result.getField(1));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals(666, result.getField(0));
+ assertEquals(777, result.getField(1));
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ assertTrue(format.reachedEnd());
+ }
+
+ @Test
+ public void testReadSparseWithNullFieldsForTypes() throws Exception {
+ String fileContent = "111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" +
+ "000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|";
+
+ FileInputSplit split = createTempFile(fileContent);
+
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ RowCsvInputFormat format = new RowCsvInputFormat(
+ PATH,
+ typeInfo,
+ new boolean[]{true, false, false, true, false, false, false, true});
+ format.setFieldDelimiter("|x|");
+ format.configure(new Configuration());
+ format.open(split);
+
+ Row result = new Row(3);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals(111, result.getField(0));
+ assertEquals(444, result.getField(1));
+ assertEquals(888, result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals(0, result.getField(0));
+ assertEquals(777, result.getField(1));
+ assertEquals(333, result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ assertTrue(format.reachedEnd());
+ }
+
+ @Test
+ public void testReadSparseWithPositionSetter() throws Exception {
+ String fileContent = "111|222|333|444|555|666|777|888|999|000|\n" +
+ "000|999|888|777|666|555|444|333|222|111|";
+
+ FileInputSplit split = createTempFile(fileContent);
+
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ RowCsvInputFormat format = new RowCsvInputFormat(
+ PATH,
+ typeInfo,
+ new int[]{0, 3, 7});
+ format.setFieldDelimiter("|");
+ format.configure(new Configuration());
+ format.open(split);
+
+ Row result = new Row(3);
+ result = format.nextRecord(result);
+
+ assertNotNull(result);
+ assertEquals(111, result.getField(0));
+ assertEquals(444, result.getField(1));
+ assertEquals(888, result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals(0, result.getField(0));
+ assertEquals(777, result.getField(1));
+ assertEquals(333, result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ assertTrue(format.reachedEnd());
+ }
+
+ @Test
+ public void testReadSparseWithMask() throws Exception {
+ String fileContent = "111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" +
+ "000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&";
+
+ FileInputSplit split = RowCsvInputFormatTest.createTempFile(fileContent);
+
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ RowCsvInputFormat format = new RowCsvInputFormat(
+ PATH,
+ typeInfo,
+ new boolean[]{true, false, false, true, false, false, false, true});
+ format.setFieldDelimiter("&&");
+ format.configure(new Configuration());
+ format.open(split);
+
+ Row result = new Row(3);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals(111, result.getField(0));
+ assertEquals(444, result.getField(1));
+ assertEquals(888, result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals(0, result.getField(0));
+ assertEquals(777, result.getField(1));
+ assertEquals(333, result.getField(2));
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ assertTrue(format.reachedEnd());
+ }
+
+ @Test
+ public void testParseStringErrors() throws Exception {
+ StringParser stringParser = new StringParser();
+
+ stringParser.enableQuotedStringParsing((byte) '"');
+
+ Map<String, StringParser.ParseErrorState> failures = new HashMap<>();
+ failures.put("\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING);
+ failures.put("\"unterminated ", FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING);
+
+ for (Map.Entry<String, StringParser.ParseErrorState> failure : failures.entrySet()) {
+ int result = stringParser.parseField(
+ failure.getKey().getBytes(),
+ 0,
+ failure.getKey().length(),
+ new byte[]{(byte) '|'},
+ null);
+ assertEquals(-1, result);
+ assertEquals(failure.getValue(), stringParser.getErrorState());
+ }
+ }
+
+ // Test disabled because we do not support double-quote escaped quotes right now.
+ @Test
+ @Ignore
+ public void testParserCorrectness() throws Exception {
+ // RFC 4180 Compliance Test content
+ // Taken from http://en.wikipedia.org/wiki/Comma-separated_values#Example
+ String fileContent = "Year,Make,Model,Description,Price\n" +
+ "1997,Ford,E350,\"ac, abs, moon\",3000.00\n" +
+ "1999,Chevy,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00\n" +
+ "1996,Jeep,Grand Cherokee,\"MUST SELL! air, moon roof, loaded\",4799.00\n" +
+ "1999,Chevy,\"Venture \"\"Extended Edition, Very Large\"\"\",,5000.00\n" +
+ ",,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00";
+
+ FileInputSplit split = createTempFile(fileContent);
+
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO);
+
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo);
+ format.setSkipFirstLineAsHeader(true);
+ format.setFieldDelimiter(",");
+ format.configure(new Configuration());
+ format.open(split);
+
+ Row result = new Row(5);
+ Row r1 = new Row(5);
+ r1.setField(0, 1997);
+ r1.setField(1, "Ford");
+ r1.setField(2, "E350");
+ r1.setField(3, "ac, abs, moon");
+ r1.setField(4, 3000.0);
+
+ Row r2 = new Row(5);
+ r2.setField(0, 1999);
+ r2.setField(1, "Chevy");
+ r2.setField(2, "Venture \"Extended Edition\"");
+ r2.setField(3, "");
+ r2.setField(4, 4900.0);
+
+ Row r3 = new Row(5);
+ r3.setField(0, 1996);
+ r3.setField(1, "Jeep");
+ r3.setField(2, "Grand Cherokee");
+ r3.setField(3, "MUST SELL! air, moon roof, loaded");
+ r3.setField(4, 4799.0);
+
+ Row r4 = new Row(5);
+ r4.setField(0, 1999);
+ r4.setField(1, "Chevy");
+ r4.setField(2, "Venture \"Extended Edition, Very Large\"");
+ r4.setField(3, "");
+ r4.setField(4, 5000.0);
+
+ Row r5 = new Row(5);
+ r5.setField(0, 0);
+ r5.setField(1, "");
+ r5.setField(2, "Venture \"Extended Edition\"");
+ r5.setField(3, "");
+ r5.setField(4, 4900.0);
+
+ Row[] expectedLines = new Row[]{r1, r2, r3, r4, r5};
+ for (Row expected : expectedLines) {
+ result = format.nextRecord(result);
+ assertEquals(expected, result);
+ }
+ assertNull(format.nextRecord(result));
+ assertTrue(format.reachedEnd());
+ }
+
+ @Test
+ public void testWindowsLineEndRemoval() throws Exception {
+
+ // check typical use case -- linux file is correct and it is set up to linux(\n)
+ testRemovingTrailingCR("\n", "\n");
+
+ // check typical windows case -- windows file endings and file has windows file endings set up
+ testRemovingTrailingCR("\r\n", "\r\n");
+
+ // check problematic case windows file -- windows file endings(\r\n)
+ // but linux line endings (\n) set up
+ testRemovingTrailingCR("\r\n", "\n");
+
+ // check problematic case linux file -- linux file endings (\n)
+ // but windows file endings set up (\r\n)
+ // specific setup for windows line endings will expect \r\n because
+ // it has to be set up and is not standard.
+ }
+
+ @Test
+ public void testQuotedStringParsingWithIncludeFields() throws Exception {
+ String fileContent = "\"20:41:52-1-3-2015\"|\"Re: Taskmanager memory error in Eclipse\"|" +
+ "\"Blahblah <bl...@blahblah.org>\"|\"blaaa|\"blubb\"";
+ File tempFile = File.createTempFile("CsvReaderQuotedString", "tmp");
+ tempFile.deleteOnExit();
+ tempFile.setWritable(true);
+
+ OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(tempFile));
+ writer.write(fileContent);
+ writer.close();
+
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ RowCsvInputFormat inputFormat = new RowCsvInputFormat(
+ new Path(tempFile.toURI().toString()),
+ typeInfo,
+ new boolean[]{true, false, true});
+ inputFormat.enableQuotedStringParsing('"');
+ inputFormat.setFieldDelimiter("|");
+ inputFormat.setDelimiter('\n');
+ inputFormat.configure(new Configuration());
+
+ FileInputSplit[] splits = inputFormat.createInputSplits(1);
+ inputFormat.open(splits[0]);
+
+ Row record = inputFormat.nextRecord(new Row(2));
+ assertEquals("20:41:52-1-3-2015", record.getField(0));
+ assertEquals("Blahblah <bl...@blahblah.org>", record.getField(1));
+ }
+
+ @Test
+ public void testQuotedStringParsingWithEscapedQuotes() throws Exception {
+ String fileContent = "\"\\\"Hello\\\" World\"|\"We are\\\" young\"";
+ File tempFile = File.createTempFile("CsvReaderQuotedString", "tmp");
+ tempFile.deleteOnExit();
+ tempFile.setWritable(true);
+
+ OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(tempFile));
+ writer.write(fileContent);
+ writer.close();
+
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ RowCsvInputFormat inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo);
+ inputFormat.enableQuotedStringParsing('"');
+ inputFormat.setFieldDelimiter("|");
+ inputFormat.setDelimiter('\n');
+ inputFormat.configure(new Configuration());
+
+ FileInputSplit[] splits = inputFormat.createInputSplits(1);
+ inputFormat.open(splits[0]);
+
+ Row record = inputFormat.nextRecord(new Row(2));
+ assertEquals("\\\"Hello\\\" World", record.getField(0));
+ assertEquals("We are\\\" young", record.getField(1));
+ }
+
+ @Test
+ public void testSqlTimeFields() throws Exception {
+ String fileContent = "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5\n" +
+ "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5.3\n";
+
+ FileInputSplit split = createTempFile(fileContent);
+
+ RowTypeInfo typeInfo = new RowTypeInfo(
+ SqlTimeTypeInfo.DATE,
+ SqlTimeTypeInfo.TIME,
+ SqlTimeTypeInfo.TIMESTAMP,
+ SqlTimeTypeInfo.TIMESTAMP);
+
+ RowCsvInputFormat format = new RowCsvInputFormat(PATH, typeInfo);
+ format.setFieldDelimiter("|");
+ format.configure(new Configuration());
+ format.open(split);
+
+ Row result = new Row(4);
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals(Date.valueOf("1990-10-14"), result.getField(0));
+ assertEquals(Time.valueOf("02:42:25"), result.getField(1));
+ assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.getField(2));
+ assertEquals(Timestamp.valueOf("1990-01-04 02:02:05"), result.getField(3));
+
+ result = format.nextRecord(result);
+ assertNotNull(result);
+ assertEquals(Date.valueOf("1990-10-14"), result.getField(0));
+ assertEquals(Time.valueOf("02:42:25"), result.getField(1));
+ assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.getField(2));
+ assertEquals(Timestamp.valueOf("1990-01-04 02:02:05.3"), result.getField(3));
+
+ result = format.nextRecord(result);
+ assertNull(result);
+ assertTrue(format.reachedEnd());
+ }
+
+ private static FileInputSplit createTempFile(String content) throws IOException {
+ File tempFile = File.createTempFile("test_contents", "tmp");
+ tempFile.deleteOnExit();
+ OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile), StandardCharsets.UTF_8);
+ wrt.write(content);
+ wrt.close();
+ return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[]{"localhost"});
+ }
+
+ private static void testRemovingTrailingCR(String lineBreakerInFile, String lineBreakerSetup) throws IOException {
+ String fileContent = FIRST_PART + lineBreakerInFile + SECOND_PART + lineBreakerInFile;
+
+ // create input file
+ File tempFile = File.createTempFile("CsvInputFormatTest", "tmp");
+ tempFile.deleteOnExit();
+ tempFile.setWritable(true);
+
+ OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
+ wrt.write(fileContent);
+ wrt.close();
+
+ RowTypeInfo typeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO);
+
+ RowCsvInputFormat inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo);
+ inputFormat.configure(new Configuration());
+ inputFormat.setDelimiter(lineBreakerSetup);
+
+ FileInputSplit[] splits = inputFormat.createInputSplits(1);
+ inputFormat.open(splits[0]);
+
+ Row result = inputFormat.nextRecord(new Row(1));
+ assertNotNull("Expecting to not return null", result);
+ assertEquals(FIRST_PART, result.getField(0));
+
+ result = inputFormat.nextRecord(result);
+ assertNotNull("Expecting to not return null", result);
+ assertEquals(SECOND_PART, result.getField(0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
index 3517338..0f748c5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
@@ -133,7 +133,7 @@ class BatchTableEnvironment(
* Converts the given [[Table]] into a [[DataSet]] of a specified type.
*
* The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
- * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
*
@@ -150,7 +150,7 @@ class BatchTableEnvironment(
* Converts the given [[Table]] into a [[DataSet]] of a specified type.
*
* The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
- * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
index 83293e3..3218ced 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
@@ -135,7 +135,7 @@ class StreamTableEnvironment(
* Converts the given [[Table]] into a [[DataStream]] of a specified type.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
- * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
@@ -152,7 +152,7 @@ class StreamTableEnvironment(
* Converts the given [[Table]] into a [[DataStream]] of a specified type.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
- * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
index f4bfe31..26fe51e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
@@ -128,7 +128,7 @@ class BatchTableEnvironment(
* Converts the given [[Table]] into a [[DataSet]] of a specified type.
*
* The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
- * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataSet]] types: Fields are mapped by field name, field types must match.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
index dde69d5..044ace8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/StreamTableEnvironment.scala
@@ -131,7 +131,7 @@ class StreamTableEnvironment(
* Converts the given [[Table]] into a [[DataStream]] of a specified type.
*
* The fields of the [[Table]] are mapped to [[DataStream]] fields as follows:
- * - [[org.apache.flink.api.table.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
+ * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]]
* types: Fields are mapped by position, field types must match.
* - POJO [[DataStream]] types: Fields are mapped by field name, field types must match.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
index 3bce5cf..1e8bf39 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala
@@ -18,7 +18,8 @@
package org.apache.flink.api.scala
import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.{Row, Table}
+import org.apache.flink.api.table.Table
+import org.apache.flink.types.Row
import scala.language.implicitConversions
import org.apache.flink.streaming.api.scala._
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
index 918b01b..6d00ab3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala
@@ -37,6 +37,7 @@ import org.apache.flink.api.table.plan.rules.FlinkRuleSets
import org.apache.flink.api.table.plan.schema.{DataSetTable, TableSourceTable}
import org.apache.flink.api.table.sinks.{BatchTableSink, TableSink}
import org.apache.flink.api.table.sources.BatchTableSource
+import org.apache.flink.types.Row
/**
* The abstract base class for batch TableEnvironments.
@@ -168,7 +169,7 @@ abstract class BatchTableEnvironment(
private[flink] def explain(table: Table, extended: Boolean): String = {
val ast = table.getRelNode
val optimizedPlan = optimize(ast)
- val dataSet = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
+ val dataSet = translate[Row](optimizedPlan) (TypeExtractor.createTypeInfo(classOf[Row]))
dataSet.output(new DiscardingOutputFormat[Row])
val env = dataSet.getExecutionEnvironment
val jasonSqlPlan = env.getExecutionPlan
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
index 8f00586..da20e07 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala
@@ -37,6 +37,7 @@ import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink}
import org.apache.flink.api.table.sources.StreamTableSource
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.types.Row
/**
* The base class for stream TableEnvironments.
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index b6d0e31..07ea860 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -32,15 +32,15 @@ import org.apache.calcite.sql.util.ChainedSqlOperatorTable
import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets}
import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
import org.apache.flink.api.java.table.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
import org.apache.flink.api.scala.table.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
import org.apache.flink.api.table.codegen.ExpressionReducer
import org.apache.flink.api.table.expressions.{Alias, Expression, UnresolvedFieldReference}
-import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createTableSqlFunctions, createScalarSqlFunction}
-import org.apache.flink.api.table.functions.{TableFunction, ScalarFunction}
+import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
+import org.apache.flink.api.table.functions.{ScalarFunction, TableFunction}
import org.apache.flink.api.table.plan.cost.DataSetCostFactory
import org.apache.flink.api.table.plan.schema.RelTable
import org.apache.flink.api.table.sinks.TableSink
@@ -347,6 +347,7 @@ abstract class TableEnvironment(val config: TableConfig) {
case t: TupleTypeInfo[A] => t.getFieldNames
case c: CaseClassTypeInfo[A] => c.getFieldNames
case p: PojoTypeInfo[A] => p.getFieldNames
+ case r: RowTypeInfo => r.getFieldNames
case tpe =>
throw new TableException(s"Type $tpe lacks explicit field naming")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
index 4092a24..a706309 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
@@ -28,7 +28,8 @@ import org.apache.flink.api.common.typeinfo.{FractionalTypeInfo, SqlTimeTypeInfo
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo, TypeExtractor}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.apache.flink.api.table.typeutils.{TimeIntervalTypeInfo, RowTypeInfo, TypeCheckUtils}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.{TimeIntervalTypeInfo, TypeCheckUtils}
object CodeGenUtils {
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index 7caad12..cdb3753 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -37,7 +37,8 @@ import org.apache.flink.api.table.codegen.Indenter.toISC
import org.apache.flink.api.table.codegen.calls.FunctionGenerator
import org.apache.flink.api.table.codegen.calls.ScalarOperators._
import org.apache.flink.api.table.functions.UserDefinedFunction
-import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.typeutils.TypeConverter
import org.apache.flink.api.table.typeutils.TypeCheckUtils._
import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
@@ -1139,7 +1140,7 @@ class CodeGenerator(
case ProductAccessor(i) =>
// Object
- val inputCode = s"($fieldTypeTerm) $inputTerm.productElement($i)"
+ val inputCode = s"($fieldTypeTerm) $inputTerm.getField($i)"
generateInputFieldUnboxing(fieldType, inputCode)
case ObjectPrivateFieldAccessor(field) =>
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
index 731452f..871264e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionReducer.scala
@@ -25,8 +25,10 @@ import org.apache.calcite.rex.{RexBuilder, RexNode}
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter}
-import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableConfig}
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.flink.api.table.{FlinkTypeFactory, TableConfig}
import scala.collection.JavaConverters._
@@ -69,7 +71,7 @@ class ExpressionReducer(config: TableConfig)
}
val literalTypes = literals.map(e => FlinkTypeFactory.toTypeInfo(e.getType))
- val resultType = new RowTypeInfo(literalTypes)
+ val resultType = new RowTypeInfo(literalTypes: _*)
// generate MapFunction
val generator = new CodeGenerator(config, false, EMPTY_ROW_INFO)
@@ -105,7 +107,7 @@ class ExpressionReducer(config: TableConfig)
reducedValues.add(unreduced)
case _ =>
val literal = rexBuilder.makeLiteral(
- reduced.productElement(reducedIdx),
+ reduced.getField(reducedIdx),
unreduced.getType,
true)
reducedValues.add(literal)
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index e85ade0..94513d9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -28,8 +28,10 @@ import org.apache.flink.api.java.DataSet
import org.apache.flink.api.table.plan.nodes.FlinkAggregate
import org.apache.flink.api.table.runtime.aggregate.AggregateUtil
import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair
-import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter}
-import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory, Row}
+import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory}
+import org.apache.flink.types.Row
import scala.collection.JavaConverters._
@@ -119,7 +121,7 @@ class DataSetAggregate(
.map(mapFunction)
.name(prepareOpName)
- val rowTypeInfo = new RowTypeInfo(fieldTypes)
+ val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
val result = {
if (groupingKeys.length > 0) {
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
index c7d5131..7133773 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
@@ -32,8 +32,10 @@ import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate._
import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
import org.apache.flink.api.table.runtime.aggregate.{Aggregate, _}
import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, RowTypeInfo, TimeIntervalTypeInfo, TypeConverter}
-import org.apache.flink.api.table.{FlinkTypeFactory, Row, StreamTableEnvironment}
+import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeConverter}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.table.{FlinkTypeFactory, StreamTableEnvironment}
+import org.apache.flink.types.Row
import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time
@@ -117,7 +119,7 @@ class DataStreamAggregate(
.map(field => FlinkTypeFactory.toTypeInfo(field.getType))
.toArray
- val rowTypeInfo = new RowTypeInfo(fieldTypes)
+ val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
val aggString = aggregationToString(
inputType,
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
index 54cb8d1..3bf3e0c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.convert.ConverterRule
import org.apache.calcite.rel.logical.{LogicalValues, LogicalUnion, LogicalAggregate}
import org.apache.calcite.rex.RexLiteral
import org.apache.flink.api.table._
+import org.apache.flink.types.Row
import org.apache.flink.api.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
index a11e8c1..72be00c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/TableSourceTable.scala
@@ -18,13 +18,13 @@
package org.apache.flink.api.table.plan.schema
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
import org.apache.flink.api.table.sources.TableSource
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
/** Table which defines an external table via a [[TableSource]] */
class TableSourceTable(val tableSource: TableSource[_])
extends FlinkTable[Row](
- typeInfo = new RowTypeInfo(tableSource.getFieldTypes),
+ typeInfo = new RowTypeInfo(tableSource.getFieldTypes: _*),
fieldIndexes = 0.until(tableSource.getNumberOfFields).toArray,
fieldNames = tableSource.getFieldsNames)
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
index 1e91711..273aa60 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
@@ -18,7 +18,7 @@
package org.apache.flink.api.table.runtime.aggregate
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
/**
* The interface for all Flink aggregate functions, which expressed in terms of initiate(),
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
index 7ace2c5..4c473d4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.table.runtime.aggregate
import java.lang.Iterable
import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
index 4b045be..db5f477 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.table.runtime.aggregate
import java.lang.Iterable
import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
index 7559cec..0699bfa 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Preconditions
@@ -47,11 +47,11 @@ class AggregateMapFunction[IN, OUT](
val input = value.asInstanceOf[Row]
for (i <- 0 until aggregates.length) {
- val fieldValue = input.productElement(aggFields(i))
+ val fieldValue = input.getField(aggFields(i))
aggregates(i).prepare(fieldValue, output)
}
for (i <- 0 until groupingKeys.length) {
- output.setField(i, input.productElement(groupingKeys(i)))
+ output.setField(i, input.getField(groupingKeys(i)))
}
output.asInstanceOf[OUT]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
index ebf0ca7..b2cf07e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
@@ -21,7 +21,7 @@ package org.apache.flink.api.table.runtime.aggregate
import java.lang.Iterable
import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction}
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.{Collector, Preconditions}
@@ -74,7 +74,7 @@ class AggregateReduceCombineFunction(
// Set group keys to aggregateBuffer.
for (i <- groupKeysMapping.indices) {
- aggregateBuffer.setField(i, last.productElement(i))
+ aggregateBuffer.setField(i, last.getField(i))
}
aggregateBuffer
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
index 8f096cc..6fe712b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate
import java.lang.Iterable
import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.{Collector, Preconditions}
@@ -78,7 +78,7 @@ class AggregateReduceGroupFunction(
// Set group keys value to final output.
groupKeysMapping.foreach {
case (after, previous) =>
- output.setField(after, last.productElement(previous))
+ output.setField(after, last.getField(previous))
}
// Evaluate final aggregate value and set to output.
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
index 9b7ea0b..ff8f6fb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
@@ -22,7 +22,7 @@ import java.lang.Iterable
import org.apache.flink.api.common.functions.RichGroupReduceFunction
import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
index 4428963..a181068 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
@@ -31,9 +31,10 @@ import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.api.table.expressions.{WindowEnd, WindowStart}
import org.apache.flink.api.table.plan.logical._
-import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.table.typeutils.TypeCheckUtils._
-import org.apache.flink.api.table.{FlinkTypeFactory, Row, TableException}
+import org.apache.flink.api.table.{FlinkTypeFactory, TableException}
+import org.apache.flink.types.Row
import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
import scala.collection.JavaConversions._
@@ -529,7 +530,7 @@ object AggregateUtil {
// concat group key types and aggregation types
val allFieldTypes = groupingTypes ++: aggTypes
- val partialType = new RowTypeInfo(allFieldTypes)
+ val partialType = new RowTypeInfo(allFieldTypes: _*)
partialType
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
index 6fd890d..4e77549 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateWindowFunction.scala
@@ -22,7 +22,7 @@ import java.lang.Iterable
import org.apache.flink.api.common.functions.RichGroupReduceFunction
import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
index ce5bc81..998ae62 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate
import com.google.common.math.LongMath
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
import java.math.BigDecimal
import java.math.BigInteger
@@ -52,10 +52,10 @@ abstract class IntegralAvgAggregate[T] extends AvgAggregate[T] {
}
override def merge(partial: Row, buffer: Row): Unit = {
- val partialSum = partial.productElement(partialSumIndex).asInstanceOf[Long]
- val partialCount = partial.productElement(partialCountIndex).asInstanceOf[Long]
- val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
- val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+ val partialSum = partial.getField(partialSumIndex).asInstanceOf[Long]
+ val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
+ val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
+ val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
buffer.setField(partialSumIndex, LongMath.checkedAdd(partialSum, bufferSum))
buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
}
@@ -81,8 +81,8 @@ class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
}
override def doEvaluate(buffer: Row): Any = {
- val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
- val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+ val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
+ val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
if (bufferCount == 0L) {
null
} else {
@@ -100,8 +100,8 @@ class ShortAvgAggregate extends IntegralAvgAggregate[Short] {
}
override def doEvaluate(buffer: Row): Any = {
- val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
- val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+ val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
+ val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
if (bufferCount == 0L) {
null
} else {
@@ -119,8 +119,8 @@ class IntAvgAggregate extends IntegralAvgAggregate[Int] {
}
override def doEvaluate(buffer: Row): Any = {
- val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Long]
- val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+ val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Long]
+ val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
if (bufferCount == 0L) {
null
} else {
@@ -156,17 +156,17 @@ class LongAvgAggregate extends IntegralAvgAggregate[Long] {
}
override def merge(partial: Row, buffer: Row): Unit = {
- val partialSum = partial.productElement(partialSumIndex).asInstanceOf[BigInteger]
- val partialCount = partial.productElement(partialCountIndex).asInstanceOf[Long]
- val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[BigInteger]
- val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+ val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigInteger]
+ val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
+ val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger]
+ val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
buffer.setField(partialSumIndex, partialSum.add(bufferSum))
buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
}
override def doEvaluate(buffer: Row): Any = {
- val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[BigInteger]
- val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+ val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigInteger]
+ val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
if (bufferCount == 0L) {
null
} else {
@@ -192,10 +192,10 @@ abstract class FloatingAvgAggregate[T: Numeric] extends AvgAggregate[T] {
}
override def merge(partial: Row, buffer: Row): Unit = {
- val partialSum = partial.productElement(partialSumIndex).asInstanceOf[Double]
- val partialCount = partial.productElement(partialCountIndex).asInstanceOf[Long]
- val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double]
- val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+ val partialSum = partial.getField(partialSumIndex).asInstanceOf[Double]
+ val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
+ val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
+ val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
buffer.setField(partialSumIndex, partialSum + bufferSum)
buffer.setField(partialCountIndex, partialCount + bufferCount)
@@ -224,8 +224,8 @@ class FloatAvgAggregate extends FloatingAvgAggregate[Float] {
override def doEvaluate(buffer: Row): Any = {
- val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double]
- val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+ val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
+ val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
if (bufferCount == 0L) {
null
} else {
@@ -243,8 +243,8 @@ class DoubleAvgAggregate extends FloatingAvgAggregate[Double] {
}
override def doEvaluate(buffer: Row): Any = {
- val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[Double]
- val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+ val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[Double]
+ val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
if (bufferCount == 0L) {
null
} else {
@@ -275,18 +275,18 @@ class DecimalAvgAggregate extends AvgAggregate[BigDecimal] {
}
override def merge(partial: Row, buffer: Row): Unit = {
- val partialSum = partial.productElement(partialSumIndex).asInstanceOf[BigDecimal]
- val partialCount = partial.productElement(partialCountIndex).asInstanceOf[Long]
- val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[BigDecimal]
- val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+ val partialSum = partial.getField(partialSumIndex).asInstanceOf[BigDecimal]
+ val partialCount = partial.getField(partialCountIndex).asInstanceOf[Long]
+ val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal]
+ val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
buffer.setField(partialSumIndex, partialSum.add(bufferSum))
buffer.setField(partialCountIndex, LongMath.checkedAdd(partialCount, bufferCount))
}
override def evaluate(buffer: Row): BigDecimal = {
- val bufferCount = buffer.productElement(partialCountIndex).asInstanceOf[Long]
+ val bufferCount = buffer.getField(partialCountIndex).asInstanceOf[Long]
if (bufferCount != 0) {
- val bufferSum = buffer.productElement(partialSumIndex).asInstanceOf[BigDecimal]
+ val bufferSum = buffer.getField(partialSumIndex).asInstanceOf[BigDecimal]
bufferSum.divide(BigDecimal.valueOf(bufferCount))
} else {
null.asInstanceOf[BigDecimal]
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
index d9f288a..4d6d20b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/CountAggregate.scala
@@ -18,7 +18,7 @@
package org.apache.flink.api.table.runtime.aggregate
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
class CountAggregate extends Aggregate[Long] {
private var countIndex: Int = _
@@ -28,13 +28,13 @@ class CountAggregate extends Aggregate[Long] {
}
override def merge(intermediate: Row, buffer: Row): Unit = {
- val partialCount = intermediate.productElement(countIndex).asInstanceOf[Long]
- val bufferCount = buffer.productElement(countIndex).asInstanceOf[Long]
+ val partialCount = intermediate.getField(countIndex).asInstanceOf[Long]
+ val bufferCount = buffer.getField(countIndex).asInstanceOf[Long]
buffer.setField(countIndex, partialCount + bufferCount)
}
override def evaluate(buffer: Row): Long = {
- buffer.productElement(countIndex).asInstanceOf[Long]
+ buffer.getField(countIndex).asInstanceOf[Long]
}
override def prepare(value: Any, intermediate: Row): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
index 85ad8e5..48e2313 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate
import java.lang.Iterable
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
import org.apache.flink.util.Collector
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
index d3f871a..1a85dca 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate
import java.lang.Iterable
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
@@ -66,7 +66,7 @@ class IncrementalAggregateAllWindowFunction[W <: Window](
// Set group keys value to final output.
groupKeysMapping.foreach {
case (after, previous) =>
- output.setField(after, record.productElement(previous))
+ output.setField(after, record.getField(previous))
}
// Evaluate final aggregate value and set to output.
aggregateMapping.foreach {
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
index e2830da..5c36821 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
@@ -18,7 +18,7 @@
package org.apache.flink.api.table.runtime.aggregate
import org.apache.flink.api.common.functions.ReduceFunction
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
import org.apache.flink.util.Preconditions
/**
@@ -54,7 +54,7 @@ class IncrementalAggregateReduceFunction(
// copy all fields of value1 into accumulatorRow
(0 until intermediateRowArity)
- .foreach(i => accumulatorRow.setField(i, value1.productElement(i)))
+ .foreach(i => accumulatorRow.setField(i, value1.getField(i)))
// merge value2 to accumulatorRow
aggregates.foreach(_.merge(value2, accumulatorRow))
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
index c880f87..2513383 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate
import java.lang.Iterable
import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
index 81e6890..d0d71ee 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate
import java.lang.Iterable
import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
@@ -68,7 +68,7 @@ class IncrementalAggregateWindowFunction[W <: Window](
// Set group keys value to final output.
groupKeysMapping.foreach {
case (after, previous) =>
- output.setField(after, record.productElement(previous))
+ output.setField(after, record.getField(previous))
}
// Evaluate final aggregate value and set to output.
aggregateMapping.foreach {
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
index 9267527..2cb3dc7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.table.runtime.aggregate
import java.math.BigDecimal
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
@@ -57,9 +57,9 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
* @param buffer
*/
override def merge(intermediate: Row, buffer: Row): Unit = {
- val partialValue = intermediate.productElement(maxIndex).asInstanceOf[T]
+ val partialValue = intermediate.getField(maxIndex).asInstanceOf[T]
if (partialValue != null) {
- val bufferValue = buffer.productElement(maxIndex).asInstanceOf[T]
+ val bufferValue = buffer.getField(maxIndex).asInstanceOf[T]
if (bufferValue != null) {
val max : T = if (ord.compare(partialValue, bufferValue) > 0) partialValue else bufferValue
buffer.setField(maxIndex, max)
@@ -76,7 +76,7 @@ abstract class MaxAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
* @return
*/
override def evaluate(buffer: Row): T = {
- buffer.productElement(maxIndex).asInstanceOf[T]
+ buffer.getField(maxIndex).asInstanceOf[T]
}
override def supportPartial: Boolean = true
@@ -147,9 +147,9 @@ class DecimalMaxAggregate extends Aggregate[BigDecimal] {
}
override def merge(partial: Row, buffer: Row): Unit = {
- val partialValue = partial.productElement(minIndex).asInstanceOf[BigDecimal]
+ val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal]
if (partialValue != null) {
- val bufferValue = buffer.productElement(minIndex).asInstanceOf[BigDecimal]
+ val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal]
if (bufferValue != null) {
val min = if (partialValue.compareTo(bufferValue) > 0) partialValue else bufferValue
buffer.setField(minIndex, min)
@@ -160,7 +160,7 @@ class DecimalMaxAggregate extends Aggregate[BigDecimal] {
}
override def evaluate(buffer: Row): BigDecimal = {
- buffer.productElement(minIndex).asInstanceOf[BigDecimal]
+ buffer.getField(minIndex).asInstanceOf[BigDecimal]
}
override def supportPartial: Boolean = true
http://git-wip-us.apache.org/repos/asf/flink/blob/4d27f8f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
index 7e2ebf4..c1c79ec 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala
@@ -19,7 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate
import java.math.BigDecimal
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.Row
+import org.apache.flink.types.Row
abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
@@ -56,9 +56,9 @@ abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
* @param buffer
*/
override def merge(partial: Row, buffer: Row): Unit = {
- val partialValue = partial.productElement(minIndex).asInstanceOf[T]
+ val partialValue = partial.getField(minIndex).asInstanceOf[T]
if (partialValue != null) {
- val bufferValue = buffer.productElement(minIndex).asInstanceOf[T]
+ val bufferValue = buffer.getField(minIndex).asInstanceOf[T]
if (bufferValue != null) {
val min : T = if (ord.compare(partialValue, bufferValue) < 0) partialValue else bufferValue
buffer.setField(minIndex, min)
@@ -75,7 +75,7 @@ abstract class MinAggregate[T](implicit ord: Ordering[T]) extends Aggregate[T] {
* @return
*/
override def evaluate(buffer: Row): T = {
- buffer.productElement(minIndex).asInstanceOf[T]
+ buffer.getField(minIndex).asInstanceOf[T]
}
override def supportPartial: Boolean = true
@@ -146,9 +146,9 @@ class DecimalMinAggregate extends Aggregate[BigDecimal] {
}
override def merge(partial: Row, buffer: Row): Unit = {
- val partialValue = partial.productElement(minIndex).asInstanceOf[BigDecimal]
+ val partialValue = partial.getField(minIndex).asInstanceOf[BigDecimal]
if (partialValue != null) {
- val bufferValue = buffer.productElement(minIndex).asInstanceOf[BigDecimal]
+ val bufferValue = buffer.getField(minIndex).asInstanceOf[BigDecimal]
if (bufferValue != null) {
val min = if (partialValue.compareTo(bufferValue) < 0) partialValue else bufferValue
buffer.setField(minIndex, min)
@@ -159,7 +159,7 @@ class DecimalMinAggregate extends Aggregate[BigDecimal] {
}
override def evaluate(buffer: Row): BigDecimal = {
- buffer.productElement(minIndex).asInstanceOf[BigDecimal]
+ buffer.getField(minIndex).asInstanceOf[BigDecimal]
}
override def supportPartial: Boolean = true