You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/03/25 20:45:16 UTC
[3/5] flink git commit: [FLINK-1512] [java api] Add CsvReader for
reading into POJOs
http://git-wip-us.apache.org/repos/asf/flink/blob/7b1c19cf/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderWithPOJOITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderWithPOJOITCase.java b/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderWithPOJOITCase.java
new file mode 100644
index 0000000..6a614e9
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderWithPOJOITCase.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.io;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+public class CsvReaderWithPOJOITCase extends MultipleProgramsTestBase {
+ private String resultPath;
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ public CsvReaderWithPOJOITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Before
+ public void before() throws Exception {
+ resultPath = tempFolder.newFile("result").toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception {
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
+
+ private String createInputData(String data) throws Exception {
+ File file = tempFolder.newFile("input");
+ Files.write(data, file, Charsets.UTF_8);
+
+ return file.toURI().toString();
+ }
+
+ @Test
+ public void testPOJOType() throws Exception {
+ final String inputData = "ABC,2.20,3\nDEF,5.1,5\nDEF,3.30,1\nGHI,3.30,10";
+ final String dataPath = createInputData(inputData);
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJOItem> data = env.readCsvFile(dataPath).pojoType(POJOItem.class, new String[]{"f1", "f3", "f2"});
+ data.writeAsText(resultPath);
+
+ env.execute();
+
+ expected = "ABC,3,2.20\nDEF,5,5.10\nDEF,1,3.30\nGHI,10,3.30";
+ }
+
+ @Test
+ public void testPOJOTypeWithFieldsOrder() throws Exception {
+ final String inputData = "2.20,ABC,3\n5.1,DEF,5\n3.30,DEF,1\n3.30,GHI,10";
+ final String dataPath = createInputData(inputData);
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJOItem> data = env.readCsvFile(dataPath).pojoType(POJOItem.class, new String[]{"f3", "f1", "f2"});
+ data.writeAsText(resultPath);
+
+ env.execute();
+
+ expected = "ABC,3,2.20\nDEF,5,5.10\nDEF,1,3.30\nGHI,10,3.30";
+ }
+
+ @Test
+ public void testPOJOTypeWithoutFieldsOrder() throws Exception {
+ final String inputData = "";
+ final String dataPath = createInputData(inputData);
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ try {
+ env.readCsvFile(dataPath).pojoType(POJOItem.class, null);
+ fail("POJO type without fields order must raise NullPointerException!");
+ } catch (NullPointerException e) {
+ // success
+ }
+
+ expected = "";
+ resultPath = dataPath;
+ }
+
+ @Test
+ public void testPOJOTypeWithFieldsOrderAndFieldsSelection() throws Exception {
+ final String inputData = "3,2.20,ABC\n5,5.1,DEF\n1,3.30,DEF\n10,3.30,GHI";
+ final String dataPath = createInputData(inputData);
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJOItem> data = env.readCsvFile(dataPath).includeFields(true, false, true).pojoType(POJOItem.class, new String[]{"f2", "f1"});
+ data.writeAsText(resultPath);
+
+ env.execute();
+
+ expected = "ABC,3,0.00\nDEF,5,0.00\nDEF,1,0.00\nGHI,10,0.00";
+ }
+
+ public static class POJOItem {
+ public String f1;
+ private int f2;
+ public double f3;
+
+ public int getF2() {
+ return f2;
+ }
+
+ public void setF2(int f2) {
+ this.f2 = f2;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s,%d,%.02f", f1, f2, f3);
+ }
+ }
+}