You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/31 16:23:01 UTC
[1/4] flink git commit: [FLINK-7185] Activate checkstyle flink-java/io
Repository: flink
Updated Branches:
refs/heads/master d578810b4 -> 3c42557f3
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index d047aa6..8939c5a 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -16,11 +16,14 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.io;
import org.apache.flink.api.common.io.ParseException;
-import org.apache.flink.api.java.tuple.*;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -30,6 +33,7 @@ import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.types.parser.StringParser;
+
import org.junit.Assert;
import org.junit.Test;
@@ -49,13 +53,16 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+/**
+ * Tests for {@link CsvInputFormat}.
+ */
public class CsvInputFormatTest {
-
+
private static final Path PATH = new Path("an/ignored/file/");
-
+
//Static variables for testing the removal of \r\n to \n
private static final String FIRST_PART = "That is the first part";
-
+
private static final String SECOND_PART = "That is the second part";
@Test
@@ -70,7 +77,7 @@ public class CsvInputFormatTest {
private void testSplitCsvInputStream(int bufferSize, boolean failAtStart) throws Exception {
final String fileContent =
- "this is|1|2.0|\n"+
+ "this is|1|2.0|\n" +
"a test|3|4.0|\n" +
"#next|5|6.0|\n" +
"asdadas|5|30.0|\n";
@@ -173,9 +180,9 @@ public class CsvInputFormatTest {
private void ignoreInvalidLines(int bufferSize) {
try {
- final String fileContent = "#description of the data\n" +
- "header1|header2|header3|\n"+
- "this is|1|2.0|\n"+
+ final String fileContent = "#description of the data\n" +
+ "header1|header2|header3|\n" +
+ "this is|1|2.0|\n" +
"//a comment\n" +
"a test|3|4.0|\n" +
"#next|5|6.0|\n" +
@@ -191,8 +198,7 @@ public class CsvInputFormatTest {
final Configuration parameters = new Configuration();
format.configure(parameters);
format.open(split);
-
-
+
Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
result = format.nextRecord(result);
assertNotNull(result);
@@ -224,34 +230,34 @@ public class CsvInputFormatTest {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
-
+
@Test
public void ignoreSingleCharPrefixComments() {
try {
final String fileContent = "#description of the data\n" +
- "#successive commented line\n" +
- "this is|1|2.0|\n" +
- "a test|3|4.0|\n" +
- "#next|5|6.0|\n";
-
+ "#successive commented line\n" +
+ "this is|1|2.0|\n" +
+ "a test|3|4.0|\n" +
+ "#next|5|6.0|\n";
+
final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
final CsvInputFormat<Tuple3<String, Integer, Double>> format = new TupleCsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
format.setCommentPrefix("#");
-
+
final Configuration parameters = new Configuration();
format.configure(parameters);
format.open(split);
-
+
Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("this is", result.f0);
assertEquals(Integer.valueOf(1), result.f1);
assertEquals(new Double(2.0), result.f2);
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("a test", result.f0);
@@ -266,42 +272,41 @@ public class CsvInputFormatTest {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
-
+
@Test
public void ignoreMultiCharPrefixComments() {
try {
-
-
+
final String fileContent = "//description of the data\n" +
- "//successive commented line\n" +
- "this is|1|2.0|\n"+
- "a test|3|4.0|\n" +
- "//next|5|6.0|\n";
-
+ "//successive commented line\n" +
+ "this is|1|2.0|\n" +
+ "a test|3|4.0|\n" +
+ "//next|5|6.0|\n";
+
final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple3<String, Integer, Double>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class);
final CsvInputFormat<Tuple3<String, Integer, Double>> format = new TupleCsvInputFormat<Tuple3<String, Integer, Double>>(PATH, "\n", "|", typeInfo);
format.setCommentPrefix("//");
-
+
final Configuration parameters = new Configuration();
format.configure(parameters);
format.open(split);
-
+
Tuple3<String, Integer, Double> result = new Tuple3<String, Integer, Double>();
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("this is", result.f0);
assertEquals(Integer.valueOf(1), result.f1);
assertEquals(new Double(2.0), result.f2);
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("a test", result.f0);
assertEquals(Integer.valueOf(3), result.f1);
assertEquals(new Double(4.0), result.f2);
-
+
result = format.nextRecord(result);
assertNull(result);
}
@@ -323,27 +328,27 @@ public class CsvInputFormatTest {
final Configuration parameters = new Configuration();
format.configure(parameters);
format.open(split);
-
+
Tuple3<String, String, String> result = new Tuple3<String, String, String>();
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("abc", result.f0);
assertEquals("def", result.f1);
assertEquals("ghijk", result.f2);
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("abc", result.f0);
assertEquals("", result.f1);
assertEquals("hhg", result.f2);
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("", result.f0);
assertEquals("", result.f1);
assertEquals("", result.f2);
-
+
result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
@@ -397,7 +402,7 @@ public class CsvInputFormatTest {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
-
+
@Test
public void readStringFieldsWithTrailingDelimiters() {
try {
@@ -406,26 +411,26 @@ public class CsvInputFormatTest {
final TupleTypeInfo<Tuple3<String, String, String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class);
final CsvInputFormat<Tuple3<String, String, String>> format = new TupleCsvInputFormat<Tuple3<String, String, String>>(PATH, typeInfo);
-
+
format.setFieldDelimiter("|-");
format.configure(new Configuration());
format.open(split);
Tuple3<String, String, String> result = new Tuple3<String, String, String>();
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("abc", result.f0);
assertEquals("def", result.f1);
assertEquals("ghijk", result.f2);
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("abc", result.f0);
assertEquals("", result.f1);
assertEquals("hhg", result.f2);
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals("", result.f0);
@@ -594,19 +599,19 @@ public class CsvInputFormatTest {
public void testDoubleFields() throws IOException {
try {
final String fileContent = "11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n";
- final FileInputSplit split = createTempFile(fileContent);
+ final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple5<Double, Double, Double, Double, Double>> typeInfo =
TupleTypeInfo.getBasicTupleTypeInfo(Double.class, Double.class, Double.class, Double.class, Double.class);
final CsvInputFormat<Tuple5<Double, Double, Double, Double, Double>> format = new TupleCsvInputFormat<Tuple5<Double, Double, Double, Double, Double>>(PATH, typeInfo);
-
+
format.setFieldDelimiter("|");
format.configure(new Configuration());
format.open(split);
-
+
Tuple5<Double, Double, Double, Double, Double> result = new Tuple5<Double, Double, Double, Double, Double>();
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Double.valueOf(11.1), result.f0);
@@ -614,7 +619,7 @@ public class CsvInputFormatTest {
assertEquals(Double.valueOf(33.3), result.f2);
assertEquals(Double.valueOf(44.4), result.f3);
assertEquals(Double.valueOf(55.5), result.f4);
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Double.valueOf(66.6), result.f0);
@@ -622,7 +627,7 @@ public class CsvInputFormatTest {
assertEquals(Double.valueOf(88.8), result.f2);
assertEquals(Double.valueOf(99.9), result.f3);
assertEquals(Double.valueOf(00.0), result.f4);
-
+
result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
@@ -631,7 +636,7 @@ public class CsvInputFormatTest {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
-
+
@Test
public void testReadFirstN() throws IOException {
try {
@@ -640,24 +645,24 @@ public class CsvInputFormatTest {
final TupleTypeInfo<Tuple2<Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class);
final CsvInputFormat<Tuple2<Integer, Integer>> format = new TupleCsvInputFormat<Tuple2<Integer, Integer>>(PATH, typeInfo);
-
+
format.setFieldDelimiter("|");
format.configure(new Configuration());
format.open(split);
-
+
Tuple2<Integer, Integer> result = new Tuple2<Integer, Integer>();
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(111), result.f0);
assertEquals(Integer.valueOf(222), result.f1);
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(666), result.f0);
assertEquals(Integer.valueOf(777), result.f1);
-
+
result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
@@ -665,38 +670,38 @@ public class CsvInputFormatTest {
catch (Exception ex) {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
-
+
}
-
+
@Test
public void testReadSparseWithNullFieldsForTypes() throws IOException {
try {
final String fileContent = "111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" +
"000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|";
- final FileInputSplit split = createTempFile(fileContent);
+ final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new TupleCsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo, new boolean[]{true, false, false, true, false, false, false, true});
-
+
format.setFieldDelimiter("|x|");
-
+
format.configure(new Configuration());
format.open(split);
-
+
Tuple3<Integer, Integer, Integer> result = new Tuple3<Integer, Integer, Integer>();
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(111), result.f0);
assertEquals(Integer.valueOf(444), result.f1);
assertEquals(Integer.valueOf(888), result.f2);
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(000), result.f0);
assertEquals(Integer.valueOf(777), result.f1);
assertEquals(Integer.valueOf(333), result.f2);
-
+
result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
@@ -705,35 +710,35 @@ public class CsvInputFormatTest {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
-
+
@Test
public void testReadSparseWithPositionSetter() throws IOException {
try {
final String fileContent = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|";
- final FileInputSplit split = createTempFile(fileContent);
+ final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new TupleCsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo, new int[]{0, 3, 7});
-
+
format.setFieldDelimiter("|");
-
+
format.configure(new Configuration());
format.open(split);
-
+
Tuple3<Integer, Integer, Integer> result = new Tuple3<Integer, Integer, Integer>();
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(111), result.f0);
assertEquals(Integer.valueOf(444), result.f1);
assertEquals(Integer.valueOf(888), result.f2);
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(000), result.f0);
assertEquals(Integer.valueOf(777), result.f1);
assertEquals(Integer.valueOf(333), result.f2);
-
+
result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
@@ -748,30 +753,30 @@ public class CsvInputFormatTest {
try {
final String fileContent = "111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" +
"000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&";
- final FileInputSplit split = createTempFile(fileContent);
+ final FileInputSplit split = createTempFile(fileContent);
final TupleTypeInfo<Tuple3<Integer, Integer, Integer>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class);
final CsvInputFormat<Tuple3<Integer, Integer, Integer>> format = new TupleCsvInputFormat<Tuple3<Integer, Integer, Integer>>(PATH, typeInfo, new boolean[]{true, false, false, true, false, false, false, true});
-
+
format.setFieldDelimiter("&&");
-
+
format.configure(new Configuration());
format.open(split);
-
+
Tuple3<Integer, Integer, Integer> result = new Tuple3<Integer, Integer, Integer>();
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(111), result.f0);
assertEquals(Integer.valueOf(444), result.f1);
assertEquals(Integer.valueOf(888), result.f2);
-
+
result = format.nextRecord(result);
assertNotNull(result);
assertEquals(Integer.valueOf(000), result.f0);
assertEquals(Integer.valueOf(777), result.f1);
assertEquals(Integer.valueOf(333), result.f2);
-
+
result = format.nextRecord(result);
assertNull(result);
assertTrue(format.reachedEnd());
@@ -784,7 +789,7 @@ public class CsvInputFormatTest {
@Test
public void testParseStringErrors() throws Exception {
StringParser stringParser = new StringParser();
- stringParser.enableQuotedStringParsing((byte)'"');
+ stringParser.enableQuotedStringParsing((byte) '"');
Object[][] failures = {
{"\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING},
@@ -801,7 +806,6 @@ public class CsvInputFormatTest {
assertThat(stringParser.getErrorState(), is(failure[1]));
}
-
}
// Test disabled becase we do not support double-quote escaped quotes right now.
@@ -836,7 +840,7 @@ public class CsvInputFormatTest {
new Tuple5<Integer, String, String, String, Double>(1997, "Ford", "E350", "ac, abs, moon", 3000.0),
new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition\"", "", 4900.0),
new Tuple5<Integer, String, String, String, Double>(1996, "Jeep", "Grand Cherokee", "MUST SELL! air, moon roof, loaded", 4799.00),
- new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition, Very Large\"", "", 5000.00 ),
+ new Tuple5<Integer, String, String, String, Double>(1999, "Chevy", "Venture \"Extended Edition, Very Large\"", "", 5000.00),
new Tuple5<Integer, String, String, String, Double>(0, "", "Venture \"Extended Edition\"", "", 4900.0)
};
@@ -864,37 +868,37 @@ public class CsvInputFormatTest {
);
wrt.write(content);
wrt.close();
-
+
return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"});
}
-
+
@Test
public void testWindowsLineEndRemoval() {
-
+
//Check typical use case -- linux file is correct and it is set up to linuc(\n)
this.testRemovingTrailingCR("\n", "\n");
-
+
//Check typical windows case -- windows file endings and file has windows file endings set up
this.testRemovingTrailingCR("\r\n", "\r\n");
-
+
//Check problematic case windows file -- windows file endings(\r\n) but linux line endings (\n) set up
this.testRemovingTrailingCR("\r\n", "\n");
-
+
//Check problematic case linux file -- linux file endings (\n) but windows file endings set up (\r\n)
//Specific setup for windows line endings will expect \r\n because it has to be set up and is not standard.
}
-
+
private void testRemovingTrailingCR(String lineBreakerInFile, String lineBreakerSetup) {
- File tempFile=null;
-
+ File tempFile = null;
+
String fileContent = CsvInputFormatTest.FIRST_PART + lineBreakerInFile + CsvInputFormatTest.SECOND_PART + lineBreakerInFile;
-
+
try {
// create input file
tempFile = File.createTempFile("CsvInputFormatTest", "tmp");
tempFile.deleteOnExit();
tempFile.setWritable(true);
-
+
OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
wrt.write(fileContent);
wrt.close();
@@ -902,28 +906,26 @@ public class CsvInputFormatTest {
final TupleTypeInfo<Tuple1<String>> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class);
final CsvInputFormat<Tuple1<String>> inputFormat = new TupleCsvInputFormat<Tuple1<String>>(new Path(tempFile.toURI().toString()), typeInfo);
- Configuration parameters = new Configuration();
+ Configuration parameters = new Configuration();
inputFormat.configure(parameters);
-
+
inputFormat.setDelimiter(lineBreakerSetup);
-
+
FileInputSplit[] splits = inputFormat.createInputSplits(1);
-
+
inputFormat.open(splits[0]);
-
+
Tuple1<String> result = inputFormat.nextRecord(new Tuple1<String>());
-
+
assertNotNull("Expecting to not return null", result);
-
-
-
+
assertEquals(FIRST_PART, result.f0);
-
+
result = inputFormat.nextRecord(result);
-
+
assertNotNull("Expecting to not return null", result);
assertEquals(SECOND_PART, result.f0);
-
+
}
catch (Throwable t) {
System.err.println("test failed with exception: " + t.getMessage());
@@ -1219,7 +1221,7 @@ public class CsvInputFormatTest {
writer.close();
@SuppressWarnings("unchecked")
- PojoTypeInfo<TwitterPOJO> typeInfo = (PojoTypeInfo<TwitterPOJO>)TypeExtractor.createTypeInfo(TwitterPOJO.class);
+ PojoTypeInfo<TwitterPOJO> typeInfo = (PojoTypeInfo<TwitterPOJO>) TypeExtractor.createTypeInfo(TwitterPOJO.class);
CsvInputFormat<TwitterPOJO> inputFormat = new PojoCsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo);
inputFormat.configure(new Configuration());
@@ -1238,7 +1240,7 @@ public class CsvInputFormatTest {
TwitterPOJO pojo;
- while((pojo = inputFormat.nextRecord(new TwitterPOJO())) != null) {
+ while ((pojo = inputFormat.nextRecord(new TwitterPOJO())) != null) {
actual.add(pojo);
}
@@ -1249,6 +1251,9 @@ public class CsvInputFormatTest {
// Custom types for testing
// --------------------------------------------------------------------------------------------
+ /**
+ * Sample test pojo.
+ */
public static class PojoItem {
public int field1;
public String field2;
@@ -1256,6 +1261,9 @@ public class CsvInputFormatTest {
public String field4;
}
+ /**
+ * Sample test pojo with private fields.
+ */
public static class PrivatePojoItem {
private int field1;
private String field2;
@@ -1295,6 +1303,9 @@ public class CsvInputFormatTest {
}
}
+ /**
+ * Sample test pojo.
+ */
public static class POJO {
public String table;
public String time;
@@ -1319,6 +1330,9 @@ public class CsvInputFormatTest {
}
}
+ /**
+ * Sample test pojo representing tweets.
+ */
public static class TwitterPOJO extends POJO {
public String tweet;
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
index a8ce495..a244306 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java
@@ -37,13 +37,16 @@ import java.util.List;
import static org.junit.Assert.fail;
+/**
+ * Tests for {@link CsvOutputFormat}.
+ */
public class CsvOutputFormatTest {
private String path = null;
@Before
public void createFile() throws Exception {
- path = File.createTempFile("csv_output_test_file",".csv").getAbsolutePath();
+ path = File.createTempFile("csv_output_test_file", ".csv").getAbsolutePath();
}
@Test
@@ -80,7 +83,7 @@ public class CsvOutputFormatTest {
} catch (RuntimeException e) {
// expected
}
-
+
}
finally {
csvOutputFormat.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java
index 2f403aa..9280009 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java
@@ -18,8 +18,12 @@
package org.apache.flink.api.java.io;
import org.apache.flink.api.java.ExecutionEnvironment;
+
import org.junit.Test;
+/**
+ * Tests for {@link ExecutionEnvironment#fromElements}.
+ */
public class FromElementsTest {
@Test
@@ -34,7 +38,7 @@ public class FromElementsTest {
executionEnvironment.fromElements(SubType.class, new SubType(1, "Java"), new ParentType(1, "hello"));
}
- public static class ParentType {
+ private static class ParentType {
int num;
String string;
public ParentType(int num, String string) {
@@ -43,7 +47,7 @@ public class FromElementsTest {
}
}
- public static class SubType extends ParentType{
+ private static class SubType extends ParentType{
public SubType(int num, String string) {
super(num, string);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
index f9dc28a..d90d657 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java
@@ -18,26 +18,28 @@
package org.apache.flink.api.java.io;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Test;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
-import org.junit.Test;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+/**
+ * Tests for {@link PrimitiveInputFormat}.
+ */
public class PrimitiveInputFormatTest {
private static final Path PATH = new Path("an/ignored/file/");
-
@Test
public void testStringInput() {
try {
@@ -71,15 +73,13 @@ public class PrimitiveInputFormatTest {
}
}
-
-
@Test
public void testIntegerInput() throws IOException {
try {
final String fileContent = "111|222|";
final FileInputSplit split = createInputSplit(fileContent);
- final PrimitiveInputFormat<Integer> format = new PrimitiveInputFormat<Integer>(PATH,"|", Integer.class);
+ final PrimitiveInputFormat<Integer> format = new PrimitiveInputFormat<Integer>(PATH, "|", Integer.class);
format.configure(new Configuration());
format.open(split);
@@ -99,7 +99,6 @@ public class PrimitiveInputFormatTest {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
-
@Test
public void testDoubleInputLinewise() throws IOException {
@@ -136,7 +135,7 @@ public class PrimitiveInputFormatTest {
String fileContent = first + "\r\n" + second + "\r\n";
final FileInputSplit split = createInputSplit(fileContent);
- final PrimitiveInputFormat<String> format = new PrimitiveInputFormat<String>(PATH ,String.class);
+ final PrimitiveInputFormat<String> format = new PrimitiveInputFormat<String>(PATH, String.class);
format.configure(new Configuration());
format.open(split);
@@ -153,14 +152,14 @@ public class PrimitiveInputFormatTest {
fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage());
}
}
-
+
@Test(expected = IOException.class)
public void testFailingInput() throws IOException {
-
+
final String fileContent = "111|222|asdf|17";
final FileInputSplit split = createInputSplit(fileContent);
- final PrimitiveInputFormat<Integer> format = new PrimitiveInputFormat<Integer>(PATH,"|", Integer.class);
+ final PrimitiveInputFormat<Integer> format = new PrimitiveInputFormat<Integer>(PATH, "|", Integer.class);
format.configure(new Configuration());
format.open(split);
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
index 943db36..f44d5bf 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
@@ -30,6 +29,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.types.Row;
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.types.parser.StringParser;
+
import org.junit.Ignore;
import org.junit.Test;
@@ -45,18 +45,21 @@ import java.util.HashMap;
import java.util.Map;
import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
+/**
+ * Tests for {@link RowCsvInputFormat}.
+ */
public class RowCsvInputFormatTest {
- private static Path PATH = new Path("an/ignored/file/");
+ private static final Path PATH = new Path("an/ignored/file/");
// static variables for testing the removal of \r\n to \n
- private static String FIRST_PART = "That is the first part";
- private static String SECOND_PART = "That is the second part";
+ private static final String FIRST_PART = "That is the first part";
+ private static final String SECOND_PART = "That is the second part";
@Test
public void ignoreInvalidLines() throws Exception {
@@ -588,7 +591,7 @@ public class RowCsvInputFormatTest {
RowCsvInputFormat format = new RowCsvInputFormat(
PATH,
fieldTypes,
- new int[]{0,3,7});
+ new int[]{0, 3, 7});
format.setFieldDelimiter("|x|");
format.configure(new Configuration());
format.open(split);
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
index 6bff9db..e78232a 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
@@ -16,15 +16,14 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.io;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
@@ -34,51 +33,55 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+/**
+ * Tests for {@link TextInputFormat}.
+ */
public class TextInputFormatTest {
@Test
public void testSimpleRead() {
- final String FIRST = "First line";
- final String SECOND = "Second line";
-
+ final String first = "First line";
+ final String second = "Second line";
+
try {
// create input file
File tempFile = File.createTempFile("TextInputFormatTest", "tmp");
tempFile.deleteOnExit();
tempFile.setWritable(true);
-
+
PrintStream ps = new PrintStream(tempFile);
- ps.println(FIRST);
- ps.println(SECOND);
+ ps.println(first);
+ ps.println(second);
ps.close();
-
+
TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString()));
-
- Configuration parameters = new Configuration();
+
+ Configuration parameters = new Configuration();
inputFormat.configure(parameters);
-
+
FileInputSplit[] splits = inputFormat.createInputSplits(1);
assertTrue("expected at least one input split", splits.length >= 1);
-
+
inputFormat.open(splits[0]);
-
+
String result = "";
-
+
assertFalse(inputFormat.reachedEnd());
result = inputFormat.nextRecord("");
assertNotNull("Expecting first record here", result);
- assertEquals(FIRST, result);
-
+ assertEquals(first, result);
+
assertFalse(inputFormat.reachedEnd());
result = inputFormat.nextRecord(result);
assertNotNull("Expecting second record here", result);
- assertEquals(SECOND, result);
-
+ assertEquals(second, result);
+
assertTrue(inputFormat.reachedEnd() || null == inputFormat.nextRecord(result));
}
catch (Throwable t) {
@@ -142,70 +145,68 @@ public class TextInputFormatTest {
}
/**
- * This tests cases when line ends with \r\n and \n is used as delimiter, the last \r should be removed
+ * This tests cases when line ends with \r\n and \n is used as delimiter, the last \r should be removed.
*/
@Test
public void testRemovingTrailingCR() {
-
- testRemovingTrailingCR("\n","\n");
- testRemovingTrailingCR("\r\n","\n");
-
- testRemovingTrailingCR("|","|");
- testRemovingTrailingCR("|","\n");
+
+ testRemovingTrailingCR("\n", "\n");
+ testRemovingTrailingCR("\r\n", "\n");
+
+ testRemovingTrailingCR("|", "|");
+ testRemovingTrailingCR("|", "\n");
}
-
- private void testRemovingTrailingCR(String lineBreaker,String delimiter) {
- File tempFile=null;
-
- String FIRST = "First line";
- String SECOND = "Second line";
- String CONTENT = FIRST + lineBreaker + SECOND + lineBreaker;
-
+
+ private void testRemovingTrailingCR(String lineBreaker, String delimiter) {
+ File tempFile = null;
+
+ String first = "First line";
+ String second = "Second line";
+ String content = first + lineBreaker + second + lineBreaker;
+
try {
// create input file
tempFile = File.createTempFile("TextInputFormatTest", "tmp");
tempFile.deleteOnExit();
tempFile.setWritable(true);
-
+
OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile));
- wrt.write(CONTENT);
+ wrt.write(content);
wrt.close();
-
+
TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString()));
inputFormat.setFilePath(tempFile.toURI().toString());
-
- Configuration parameters = new Configuration();
+
+ Configuration parameters = new Configuration();
inputFormat.configure(parameters);
-
+
inputFormat.setDelimiter(delimiter);
-
+
FileInputSplit[] splits = inputFormat.createInputSplits(1);
-
+
inputFormat.open(splits[0]);
-
String result = "";
- if ( (delimiter.equals("\n") && (lineBreaker.equals("\n") || lineBreaker.equals("\r\n") ) )
- || (lineBreaker.equals(delimiter)) ){
-
+ if ((delimiter.equals("\n") && (lineBreaker.equals("\n") || lineBreaker.equals("\r\n")))
+ || (lineBreaker.equals(delimiter))){
+
result = inputFormat.nextRecord("");
assertNotNull("Expecting first record here", result);
- assertEquals(FIRST, result);
-
+ assertEquals(first, result);
+
result = inputFormat.nextRecord(result);
assertNotNull("Expecting second record here", result);
- assertEquals(SECOND, result);
-
+ assertEquals(second, result);
+
result = inputFormat.nextRecord(result);
assertNull("The input file is over", result);
-
+
} else {
result = inputFormat.nextRecord("");
assertNotNull("Expecting first record here", result);
- assertEquals(CONTENT, result);
+ assertEquals(content, result);
}
-
-
+
}
catch (Throwable t) {
System.err.println("test failed with exception: " + t.getMessage());
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
index a119d59..676a7c4 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataOutputView;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.runner.RunWith;
@@ -38,6 +39,9 @@ import org.junit.runners.Parameterized;
import java.io.IOException;
+/**
+ * Tests for type serialization format.
+ */
@RunWith(Parameterized.class)
public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<Integer, String>> {
@@ -50,7 +54,7 @@ public class TypeSerializerFormatTest extends SequentialFormatTestBase<Tuple2<In
public TypeSerializerFormatTest(int numberOfTuples, long blockSize, int parallelism) {
super(numberOfTuples, blockSize, parallelism);
- resultType = TypeExtractor.getForObject(getRecord(0));
+ resultType = TypeExtractor.getForObject(getRecord(0));
serializer = resultType.createSerializer(new ExecutionConfig());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/tools/maven/suppressions-java.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions-java.xml b/tools/maven/suppressions-java.xml
index 6b63bb1..d7e42e5 100644
--- a/tools/maven/suppressions-java.xml
+++ b/tools/maven/suppressions-java.xml
@@ -23,13 +23,6 @@ under the License.
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
<suppressions>
- <suppress
- files="(.*)api[/\\]java[/\\]io[/\\](.*)"
- checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
- <!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
- <suppress
- files="(.*)test[/\\](.*)api[/\\]java[/\\]io[/\\](.*)"
- checks="UnusedImports|AvoidStarImport"/>
<suppress
files="(.*)api[/\\]java[/\\]aggregation[/\\](.*)"
[4/4] flink git commit: [FLINK-6732] [java] Activate strict
checkstyle for flink-java
Posted by ch...@apache.org.
[FLINK-6732] [java] Activate strict checkstyle for flink-java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c42557f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c42557f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c42557f
Branch: refs/heads/master
Commit: 3c42557f3083182b0ab66d15cb6ec8452b59464c
Parents: 8e97536
Author: zentol <ch...@apache.org>
Authored: Mon Jul 31 17:03:48 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 31 17:05:39 2017 +0200
----------------------------------------------------------------------
docs/internals/ide_setup.md | 2 +-
flink-java/pom.xml | 29 --------------------
tools/maven/suppressions-java.xml | 50 ----------------------------------
3 files changed, 1 insertion(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3c42557f/docs/internals/ide_setup.md
----------------------------------------------------------------------
diff --git a/docs/internals/ide_setup.md b/docs/internals/ide_setup.md
index c070331..6781f44 100644
--- a/docs/internals/ide_setup.md
+++ b/docs/internals/ide_setup.md
@@ -107,7 +107,7 @@ You can scan an entire module by opening the Checkstyle tools window and
clicking the "Check Module" button. The scan should report no errors.
<span class="label label-info">Note</span> Some modules are not fully covered by checkstyle,
-which include `flink-core`, `flink-java`, `flink-optimizer`, and `flink-runtime`.
+which include `flink-core`, `flink-optimizer`, and `flink-runtime`.
Nevertheless please make sure that code you add/modify in these modules still conforms to the checkstyle rules.
## Eclipse
http://git-wip-us.apache.org/repos/asf/flink/blob/3c42557f/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 9d69ca8..2dc243e 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -85,35 +85,6 @@ under the License.
<build>
<plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <version>2.17</version>
- <dependencies>
- <dependency>
- <groupId>com.puppycrawl.tools</groupId>
- <artifactId>checkstyle</artifactId>
- <version>6.19</version>
- </dependency>
- </dependencies>
- <executions>
- <execution>
- <id>validate</id>
- <phase>validate</phase>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
-
- <configuration>
- <configLocation>/tools/maven/checkstyle.xml</configLocation>
- <suppressionsLocation>/tools/maven/suppressions-java.xml</suppressionsLocation>
- <includeTestSourceDirectory>true</includeTestSourceDirectory>
- <logViolationsToConsole>true</logViolationsToConsole>
- <failOnViolation>true</failOnViolation>
- </configuration>
- </plugin>
<!-- activate API compatibility checks -->
<plugin>
http://git-wip-us.apache.org/repos/asf/flink/blob/3c42557f/tools/maven/suppressions-java.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions-java.xml b/tools/maven/suppressions-java.xml
deleted file mode 100644
index 3bb8556..0000000
--- a/tools/maven/suppressions-java.xml
+++ /dev/null
@@ -1,50 +0,0 @@
-<?xml version="1.0"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<!DOCTYPE suppressions PUBLIC
- "-//Puppy Crawl//DTD Suppressions 1.1//EN"
- "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
-
-<suppressions>
-
- <suppress
- files="(.*)api[/\\]java[/\\]aggregation[/\\](.*)"
- checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-
- <suppress
- files="(.*)api[/\\]java[/\\]operators[/\\]join[/\\](.*)"
- checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-
- <suppress
- files="(.*)api[/\\]java[/\\]operator[/\\]([^/\\]*\.java)"
- checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-
- <suppress
- files="(.*)test[/\\](.*)api[/\\]java[/\\]tuple[/\\](.*)"
- checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-
- <suppress
- files="(.*)test[/\\](.*)api[/\\]common[/\\]io[/\\](.*)"
- checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-
- <suppress
- files="(.*)test[/\\](.*)api[/\\]common[/\\]operators[/\\](.*)"
- checks="AvoidStarImport|NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-</suppressions>
[2/4] flink git commit: [FLINK-7185] Activate checkstyle flink-java/io
Posted by ch...@apache.org.
[FLINK-7185] Activate checkstyle flink-java/io
This closes #4340.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c9c9fb5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c9c9fb5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c9c9fb5
Branch: refs/heads/master
Commit: 0c9c9fb5cb7a8a27d444db5c725c8abd792ca761
Parents: d578810
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Fri Jul 14 10:31:34 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 31 16:37:52 2017 +0200
----------------------------------------------------------------------
.../api/java/io/CollectionInputFormat.java | 40 ++--
.../flink/api/java/io/CsvInputFormat.java | 17 +-
.../flink/api/java/io/CsvOutputFormat.java | 26 +--
.../org/apache/flink/api/java/io/CsvReader.java | 97 ++++----
.../api/java/io/DiscardingOutputFormat.java | 1 -
.../flink/api/java/io/IteratorInputFormat.java | 12 +-
.../java/io/LocalCollectionOutputFormat.java | 21 +-
.../java/io/ParallelIteratorInputFormat.java | 20 +-
.../flink/api/java/io/PojoCsvInputFormat.java | 5 +
.../flink/api/java/io/PrimitiveInputFormat.java | 5 +-
.../flink/api/java/io/PrintingOutputFormat.java | 30 +--
.../flink/api/java/io/RowCsvInputFormat.java | 3 +
.../flink/api/java/io/SplitDataProperties.java | 85 ++++---
.../flink/api/java/io/TextInputFormat.java | 50 ++--
.../flink/api/java/io/TextOutputFormat.java | 50 ++--
.../flink/api/java/io/TextValueInputFormat.java | 57 ++---
.../flink/api/java/io/TupleCsvInputFormat.java | 9 +-
.../api/java/io/TypeSerializerOutputFormat.java | 6 +-
.../apache/flink/api/java/io/CSVReaderTest.java | 90 ++++----
.../api/java/io/CollectionInputFormatTest.java | 67 +++---
.../flink/api/java/io/CsvInputFormatTest.java | 230 ++++++++++---------
.../flink/api/java/io/CsvOutputFormatTest.java | 7 +-
.../flink/api/java/io/FromElementsTest.java | 8 +-
.../api/java/io/PrimitiveInputFormatTest.java | 35 ++-
.../api/java/io/RowCsvInputFormatTest.java | 15 +-
.../flink/api/java/io/TextInputFormatTest.java | 129 +++++------
.../api/java/io/TypeSerializerFormatTest.java | 6 +-
tools/maven/suppressions-java.xml | 7 -
28 files changed, 587 insertions(+), 541 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
index 90e6712..eebe56f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java
@@ -18,14 +18,6 @@
package org.apache.flink.api.java.io;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.NonParallelInput;
@@ -34,6 +26,14 @@ import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
/**
* An input format that returns objects from a collection.
*/
@@ -55,7 +55,7 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
}
this.serializer = serializer;
-
+
this.dataSet = dataSet;
}
@@ -67,10 +67,10 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
@Override
public void open(GenericInputSplit split) throws IOException {
super.open(split);
-
+
this.iterator = this.dataSet.iterator();
}
-
+
@Override
public T nextRecord(T record) throws IOException {
return this.iterator.next();
@@ -80,10 +80,10 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
-
+
final int size = dataSet.size();
out.writeInt(size);
-
+
if (size > 0) {
DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(out);
for (T element : dataSet){
@@ -97,7 +97,7 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
int collectionLength = in.readInt();
List<T> list = new ArrayList<T>(collectionLength);
-
+
if (collectionLength > 0) {
try {
DataInputViewStreamWrapper wrapper = new DataInputViewStreamWrapper(in);
@@ -113,9 +113,9 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
dataSet = list;
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@@ -136,14 +136,14 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
sb.append(']');
return sb.toString();
}
-
+
// --------------------------------------------------------------------------------------------
-
+
public static <X> void checkCollection(Collection<X> elements, Class<X> viewedAs) {
if (elements == null || viewedAs == null) {
throw new NullPointerException();
}
-
+
for (X elem : elements) {
if (elem == null) {
throw new IllegalArgumentException("The collection must not contain null elements.");
@@ -157,7 +157,7 @@ public class CollectionInputFormat<T> extends GenericInputFormat<T> implements N
if (!viewedAs.isAssignableFrom(elem.getClass()) &&
!(elem.getClass().toString().equals("class scala.runtime.BoxedUnit") && viewedAs.equals(void.class))) {
- throw new IllegalArgumentException("The elements in the collection are not all subclasses of " +
+ throw new IllegalArgumentException("The elements in the collection are not all subclasses of " +
viewedAs.getCanonicalName());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
index f1a16ea..0bd4e69 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
@@ -21,13 +21,18 @@ package org.apache.flink.api.java.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.GenericCsvInputFormat;
import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
import java.io.IOException;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.StringUtils;
+/**
+ * InputFormat that reads csv files.
+ *
+ * @param <OUT>
+ */
@Internal
public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
@@ -38,7 +43,7 @@ public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
public static final String DEFAULT_FIELD_DELIMITER = ",";
protected transient Object[] parsedValues;
-
+
protected CsvInputFormat(Path filePath) {
super(filePath);
}
@@ -63,7 +68,7 @@ public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
// left to right evaluation makes access [0] okay
// this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
- if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
+ if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n') {
this.lineDelimiterIsLinebreak = true;
}
@@ -123,7 +128,7 @@ public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
protected static boolean[] createDefaultMask(int size) {
boolean[] includedMask = new boolean[size];
- for (int x=0; x<includedMask.length; x++) {
+ for (int x = 0; x < includedMask.length; x++) {
includedMask[x] = true;
}
return includedMask;
@@ -154,5 +159,5 @@ public abstract class CsvInputFormat<OUT> extends GenericCsvInputFormat<OUT> {
public String toString() {
return "CSV Input (" + StringUtils.showControlCharacters(String.valueOf(getFieldDelimiter())) + ") " + getFilePath();
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
index c2fe13c..44fd679 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvOutputFormat.java
@@ -16,18 +16,10 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.io;
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -35,6 +27,15 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.StringValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
/**
* This is an OutputFormat to serialize {@link org.apache.flink.api.java.tuple.Tuple}s to text. The output is
* structured by record delimiters and field delimiters as common in CSV files.
@@ -121,8 +122,8 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
/**
* Configures the format to either allow null values (writing an empty field),
* or to throw an exception when encountering a null field.
- * <p>
- * by default, null values are disallowed.
+ *
+ * <p>by default, null values are disallowed.
*
* @param allowNulls Flag to indicate whether the output format should accept null values.
*/
@@ -144,8 +145,8 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
* Configures whether the output format should quote string values. String values are fields
* of type {@link java.lang.String} and {@link org.apache.flink.types.StringValue}, as well as
* all subclasses of the latter.
- * <p>
- * By default, strings are not quoted.
+ *
+ * <p>By default, strings are not quoted.
*
* @param quoteStrings Flag indicating whether string fields should be quoted.
*/
@@ -215,7 +216,6 @@ public class CsvOutputFormat<T extends Tuple> extends FileOutputFormat<T> implem
}
/**
- *
* The purpose of this method is solely to check whether the data type to be processed
* is in fact a tuple type.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index ce2f4fa..684911a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -23,9 +23,9 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.operators.DataSource;
-//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
+//CHECKSTYLE.OFF: AvoidStarImport|ImportOrder
import org.apache.flink.api.java.tuple.*;
-//CHECKSTYLE.ON: AvoidStarImport
+//CHECKSTYLE.ON: AvoidStarImport|ImportOrder
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -44,14 +44,13 @@ import java.util.Arrays;
public class CsvReader {
private final Path path;
-
+
private final ExecutionEnvironment executionContext;
-
-
+
protected boolean[] includedMask;
-
+
protected String lineDelimiter = CsvInputFormat.DEFAULT_LINE_DELIMITER;
-
+
protected String fieldDelimiter = CsvInputFormat.DEFAULT_FIELD_DELIMITER;
protected String commentPrefix = null; //default: no comments
@@ -61,35 +60,35 @@ public class CsvReader {
protected char quoteCharacter = '"';
protected boolean skipFirstLineAsHeader = false;
-
+
protected boolean ignoreInvalidLines = false;
private String charset = "UTF-8";
-
+
// --------------------------------------------------------------------------------------------
-
+
public CsvReader(Path filePath, ExecutionEnvironment executionContext) {
Preconditions.checkNotNull(filePath, "The file path may not be null.");
Preconditions.checkNotNull(executionContext, "The execution context may not be null.");
-
+
this.path = filePath;
this.executionContext = executionContext;
}
-
+
public CsvReader(String filePath, ExecutionEnvironment executionContext) {
this(new Path(Preconditions.checkNotNull(filePath, "The file path may not be null.")), executionContext);
}
-
+
public Path getFilePath() {
return this.path;
}
-
+
// --------------------------------------------------------------------------------------------
-
+
/**
* Configures the delimiter that separates the lines/rows. The linebreak character
* ({@code '\n'}) is used by default.
- *
+ *
* @param delimiter The delimiter that separates the rows.
* @return The CSV reader instance itself, to allow for fluent function chaining.
*/
@@ -97,15 +96,15 @@ public class CsvReader {
if (delimiter == null || delimiter.length() == 0) {
throw new IllegalArgumentException("The delimiter must not be null or an empty string");
}
-
+
this.lineDelimiter = delimiter;
return this;
}
-
+
/**
* Configures the delimiter that separates the fields within a row. The comma character
* ({@code ','}) is used by default.
- *
+ *
* @param delimiter The delimiter that separates the fields in one row.
* @return The CSV reader instance itself, to allow for fluent function chaining.
*
@@ -148,7 +147,7 @@ public class CsvReader {
* Configures the string that starts comments.
* By default comments will be treated as invalid lines.
* This function only recognizes comments which start at the beginning of the line!
- *
+ *
* @param commentPrefix The string that starts the comments.
* @return The CSV reader instance itself, to allow for fluent function chaining.
*/
@@ -156,7 +155,7 @@ public class CsvReader {
if (commentPrefix == null || commentPrefix.length() == 0) {
throw new IllegalArgumentException("The comment prefix must not be null or an empty string");
}
-
+
this.commentPrefix = commentPrefix;
return this;
}
@@ -172,7 +171,7 @@ public class CsvReader {
}
/**
- * Sets the charset of the reader
+ * Sets the charset of the reader.
*
* @param charset The character set to set.
*/
@@ -189,7 +188,7 @@ public class CsvReader {
* the boolean array is {@code true}.
* The number of fields in the result is consequently equal to the number of times that {@code true}
* occurs in the fields array.
- *
+ *
* @param fields The array of flags that describes which fields are to be included and which not.
* @return The CSV reader instance itself, to allow for fluent function chaining.
*/
@@ -197,14 +196,14 @@ public class CsvReader {
if (fields == null || fields.length == 0) {
throw new IllegalArgumentException("The set of included fields must not be null or empty.");
}
-
+
int lastTruePos = -1;
for (int i = 0; i < fields.length; i++) {
if (fields[i]) {
lastTruePos = i;
}
}
-
+
if (lastTruePos == -1) {
throw new IllegalArgumentException("The description of fields to parse excluded all fields. At least one fields must be included.");
}
@@ -225,13 +224,13 @@ public class CsvReader {
* in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} (representing the value
* {@code false}). The result contains the fields where the corresponding position in
* the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}).
- *
+ *
* @param mask The string mask defining which fields to include and which to skip.
* @return The CSV reader instance itself, to allow for fluent function chaining.
*/
public CsvReader includeFields(String mask) {
boolean[] includedMask = new boolean[mask.length()];
-
+
for (int i = 0; i < mask.length(); i++) {
char c = mask.charAt(i);
if (c == '1' || c == 'T' || c == 't') {
@@ -240,10 +239,10 @@ public class CsvReader {
throw new IllegalArgumentException("Mask string may contain only '0' and '1'.");
}
}
-
+
return includeFields(includedMask);
}
-
+
/**
* Configures which fields of the CSV file should be included and which should be skipped. The
* bits in the value (read from least significant to most significant) define whether the field at
@@ -252,14 +251,14 @@ public class CsvReader {
* non-zero bit.
* The parser will skip over all fields where the character at the corresponding bit is zero, and
* include the fields where the corresponding bit is one.
- * <p>
- * Examples:
+ *
+ * <p>Examples:
* <ul>
* <li>A mask of {@code 0x7} would include the first three fields.</li>
* <li>A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields
* two and three, skip fields four and five, and include field six.</li>
* </ul>
- *
+ *
* @param mask The bit mask defining which fields to include and which to skip.
* @return The CSV reader instance itself, to allow for fluent function chaining.
*/
@@ -267,36 +266,36 @@ public class CsvReader {
if (mask == 0) {
throw new IllegalArgumentException("The description of fields to parse excluded all fields. At least one fields must be included.");
}
-
+
ArrayList<Boolean> fields = new ArrayList<Boolean>();
while (mask != 0) {
fields.add((mask & 0x1L) != 0);
mask >>>= 1;
}
-
+
boolean[] fieldsArray = new boolean[fields.size()];
for (int i = 0; i < fieldsArray.length; i++) {
fieldsArray[i] = fields.get(i);
}
-
+
return includeFields(fieldsArray);
}
/**
* Sets the CSV reader to ignore the first line. This is useful for files that contain a header line.
- *
+ *
* @return The CSV reader instance itself, to allow for fluent function chaining.
*/
public CsvReader ignoreFirstLine() {
skipFirstLineAsHeader = true;
return this;
}
-
+
/**
- * Sets the CSV reader to ignore any invalid lines.
+ * Sets the CSV reader to ignore any invalid lines.
* This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise.
- *
+ *
* @return The CSV reader instance itself, to allow for fluent function chaining.
*/
public CsvReader ignoreInvalidLines(){
@@ -325,12 +324,12 @@ public class CsvReader {
return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
}
-
+
/**
* Configures the reader to read the CSV data and parse it to the given type. The type must be a subclass of
* {@link Tuple}. The type information for the fields is obtained from the type class. The type
* consequently needs to specify all generic field types of the tuple.
- *
+ *
* @param targetType The class of the target type, needs to be a subclass of Tuple.
* @return The DataSet representing the parsed CSV data.
*/
@@ -339,24 +338,24 @@ public class CsvReader {
if (!Tuple.class.isAssignableFrom(targetType)) {
throw new IllegalArgumentException("The target type must be a subclass of " + Tuple.class.getName());
}
-
+
@SuppressWarnings("unchecked")
TupleTypeInfo<T> typeInfo = (TupleTypeInfo<T>) TypeExtractor.createTypeInfo(targetType);
CsvInputFormat<T> inputFormat = new TupleCsvInputFormat<T>(path, this.lineDelimiter, this.fieldDelimiter, typeInfo, this.includedMask);
-
+
Class<?>[] classes = new Class<?>[typeInfo.getArity()];
for (int i = 0; i < typeInfo.getArity(); i++) {
classes[i] = typeInfo.getTypeAt(i).getTypeClass();
}
-
+
configureInputFormat(inputFormat);
return new DataSource<T>(executionContext, inputFormat, typeInfo, Utils.getCallLocationName());
}
-
+
// --------------------------------------------------------------------------------------------
// Miscellaneous
// --------------------------------------------------------------------------------------------
-
+
private void configureInputFormat(CsvInputFormat<?> format) {
format.setCharset(this.charset);
format.setDelimiter(this.lineDelimiter);
@@ -368,10 +367,10 @@ public class CsvReader {
format.enableQuotedStringParsing(this.quoteCharacter);
}
}
-
- // --------------------------------------------------------------------------------------------
+
+ // --------------------------------------------------------------------------------------------
// The following lines are generated.
- // --------------------------------------------------------------------------------------------
+ // --------------------------------------------------------------------------------------------
// BEGIN_OF_TUPLE_DEPENDENT_CODE
// GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator.
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java
index f01d864..7358b14 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.io;
import org.apache.flink.annotation.Public;
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java
index cb8bd6a..05f3ccd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java
@@ -16,16 +16,15 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.io;
-import java.io.Serializable;
-import java.util.Iterator;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.NonParallelInput;
+import java.io.Serializable;
+import java.util.Iterator;
+
/**
* An input format that returns objects from an iterator.
*/
@@ -35,13 +34,12 @@ public class IteratorInputFormat<T> extends GenericInputFormat<T> implements Non
private static final long serialVersionUID = 1L;
private Iterator<T> iterator; // input data as serializable iterator
-
-
+
public IteratorInputFormat(Iterator<T> iterator) {
if (!(iterator instanceof Serializable)) {
throw new IllegalArgumentException("The data source iterator must be serializable.");
}
-
+
this.iterator = iterator;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
index 65ed6c3..bcb1cf1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java
@@ -18,13 +18,6 @@
package org.apache.flink.api.java.io;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.RichOutputFormat;
@@ -33,15 +26,21 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.configuration.Configuration;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
/**
- * An output format that writes record into collection
+ * An output format that adds records to a collection.
*/
@PublicEvolving
public class LocalCollectionOutputFormat<T> extends RichOutputFormat<T> implements InputTypeConfigurable {
private static final long serialVersionUID = 1L;
- private static Map<Integer,Collection<?>> RESULT_HOLDER = new HashMap<Integer, Collection<?>>();
+ private static final Map<Integer, Collection<?>> RESULT_HOLDER = new HashMap<Integer, Collection<?>>();
private transient ArrayList<T> taskResult;
@@ -67,7 +66,6 @@ public class LocalCollectionOutputFormat<T> extends RichOutputFormat<T> implemen
@Override
public void configure(Configuration parameters) {}
-
@Override
public void open(int taskNumber, int numTasks) throws IOException {
this.taskResult = new ArrayList<T>();
@@ -80,7 +78,6 @@ public class LocalCollectionOutputFormat<T> extends RichOutputFormat<T> implemen
this.taskResult.add(recordCopy);
}
-
@Override
public void close() throws IOException {
synchronized (RESULT_HOLDER) {
@@ -93,6 +90,6 @@ public class LocalCollectionOutputFormat<T> extends RichOutputFormat<T> implemen
@Override
@SuppressWarnings("unchecked")
public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
- this.typeSerializer = (TypeSerializer<T>)type.createSerializer(executionConfig);
+ this.typeSerializer = (TypeSerializer<T>) type.createSerializer(executionConfig);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java
index a6ac853..25e25b4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java
@@ -18,14 +18,13 @@
package org.apache.flink.api.java.io;
-import java.io.IOException;
-import java.util.Iterator;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.util.SplittableIterator;
+import java.io.IOException;
+import java.util.Iterator;
/**
* An input format that generates data in parallel through a {@link SplittableIterator}.
@@ -34,25 +33,22 @@ import org.apache.flink.util.SplittableIterator;
public class ParallelIteratorInputFormat<T> extends GenericInputFormat<T> {
private static final long serialVersionUID = 1L;
-
-
+
private final SplittableIterator<T> source;
-
+
private transient Iterator<T> splitIterator;
-
-
-
+
public ParallelIteratorInputFormat(SplittableIterator<T> iterator) {
this.source = iterator;
}
-
+
@Override
public void open(GenericInputSplit split) throws IOException {
super.open(split);
-
+
this.splitIterator = this.source.getSplit(split.getSplitNumber(), split.getTotalNumberOfSplits());
}
-
+
@Override
public boolean reachedEnd() {
return !this.splitIterator.hasNext();
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
index 990e9e6..804d02b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.java.io;
import org.apache.flink.annotation.Internal;
@@ -29,6 +30,10 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+/**
+ * Input format that reads csv into POJOs.
+ * @param <OUT> resulting POJO type
+ */
@Internal
public class PojoCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
index d454765..794703b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -44,7 +44,6 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
private transient FieldParser<OT> parser;
-
public PrimitiveInputFormat(Path filePath, Class<OT> primitiveClass) {
super(filePath, null);
this.primitiveClass = primitiveClass;
@@ -70,7 +69,7 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
public OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) throws IOException {
// Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == NEW_LINE
- && offset+numBytes >= 1 && bytes[offset+numBytes-1] == CARRIAGE_RETURN){
+ && offset + numBytes >= 1 && bytes[offset + numBytes - 1] == CARRIAGE_RETURN) {
numBytes -= 1;
}
@@ -79,7 +78,7 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
return parser.getLastResult();
} else {
String s = new String(bytes, offset, numBytes, getCharset());
- throw new IOException("Could not parse value: \""+s+"\" as type "+primitiveClass.getSimpleName());
+ throw new IOException("Could not parse value: \"" + s + "\" as type " + primitiveClass.getSimpleName());
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
index a010fd8..0ab1abb 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
@@ -18,12 +18,16 @@
package org.apache.flink.api.java.io;
-import java.io.PrintStream;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
+import java.io.PrintStream;
+
+/**
+ * Output format that prints results into either stdout or stderr.
+ * @param <T>
+ */
@PublicEvolving
public class PrintingOutputFormat<T> extends RichOutputFormat<T> {
@@ -37,19 +41,19 @@ public class PrintingOutputFormat<T> extends RichOutputFormat<T> {
private boolean target;
private transient PrintStream stream;
-
+
private transient String prefix;
-
+
// --------------------------------------------------------------------------------------------
-
+
/**
* Instantiates a printing output format that prints to standard out.
*/
public PrintingOutputFormat() {}
-
+
/**
* Instantiates a printing output format that prints to standard out.
- *
+ *
* @param stdErr True, if the format should print to standard error instead of standard out.
*/
public PrintingOutputFormat(boolean stdErr) {
@@ -65,20 +69,18 @@ public class PrintingOutputFormat<T> extends RichOutputFormat<T> {
this(stdErr);
this.sinkIdentifier = sinkIdentifier;
}
-
+
public void setTargetToStandardOut() {
this.target = STD_OUT;
}
-
+
public void setTargetToStandardErr() {
this.target = STD_ERR;
}
-
-
+
@Override
public void configure(Configuration parameters) {}
-
@Override
public void open(int taskNumber, int numTasks) {
// get the target stream
@@ -116,9 +118,9 @@ public class PrintingOutputFormat<T> extends RichOutputFormat<T> {
this.prefix = null;
this.sinkIdentifier = null;
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@Override
public String toString() {
return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
index b752966..15ef90e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
@@ -29,6 +29,9 @@ import org.apache.flink.types.parser.FieldParser;
import java.util.Arrays;
+/**
+ * Input format that reads csv into {@link Row}.
+ */
@PublicEvolving
public class RowCsvInputFormat extends CsvInputFormat<Row> implements ResultTypeQueryable<Row> {
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
index db09380..c1487f6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
@@ -22,11 +22,11 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.common.operators.Keys;
import java.util.Arrays;
@@ -34,7 +34,7 @@ import java.util.Arrays;
* SplitDataProperties define data properties on {@link org.apache.flink.core.io.InputSplit}
* generated by the {@link org.apache.flink.api.common.io.InputFormat} of a {@link DataSource}.
*
- * InputSplits are units of input which are distributed among and assigned to parallel data source subtasks.
+ * <p>InputSplits are units of input which are distributed among and assigned to parallel data source subtasks.
* SplitDataProperties can define that the elements which are generated by the associated InputFormat
* are
* <ul>
@@ -46,7 +46,7 @@ import java.util.Arrays;
* are in the defined order.</li>
* </ul>
*
- * <b>IMPORTANT: SplitDataProperties can improve the execution of a program because certain
+ * <p><b>IMPORTANT: SplitDataProperties can improve the execution of a program because certain
* data reorganization steps such as shuffling or sorting can be avoided.
* HOWEVER, if SplitDataProperties are not correctly defined, the result of the program might be wrong!</b>
*
@@ -90,8 +90,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
/**
* Defines that data is partitioned across input splits on the fields defined by field positions.
* All records sharing the same key (combination) must be contained in a single input split.
- * <br>
- * <b>
+ *
+ * <p><b>
* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
* </b>
*
@@ -106,8 +106,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
* Defines that data is partitioned using a specific partitioning method
* across input splits on the fields defined by field positions.
* All records sharing the same key (combination) must be contained in a single input split.
- * <br>
- * <b>
+ *
+ * <p><b>
* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
* </b>
*
@@ -137,8 +137,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
* Defines that data is partitioned across input splits on the fields defined by field expressions.
* Multiple field expressions must be separated by the semicolon ';' character.
* All records sharing the same key (combination) must be contained in a single input split.
- * <br>
- * <b>
+ *
+ * <p><b>
* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
* </b>
*
@@ -154,8 +154,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
* across input splits on the fields defined by field expressions.
* Multiple field expressions must be separated by the semicolon ';' character.
* All records sharing the same key (combination) must be contained in a single input split.
- * <br>
- * <b>
+ *
+ * <p><b>
* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
* </b>
*
@@ -165,7 +165,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
*/
public SplitDataProperties<T> splitsPartitionedBy(String partitionMethodId, String partitionFields) {
- if(partitionFields == null) {
+ if (partitionFields == null) {
throw new InvalidProgramException("PartitionFields may not be null.");
}
@@ -175,7 +175,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
}
this.splitPartitionKeys = getAllFlatKeys(partitionKeysA);
- if(partitionMethodId != null) {
+ if (partitionMethodId != null) {
this.splitPartitioner = new SourcePartitionerMarker<>(partitionMethodId);
}
else {
@@ -189,8 +189,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
* Defines that the data within an input split is grouped on the fields defined by the field positions.
* All records sharing the same key (combination) must be subsequently emitted by the input
* format for each input split.
- * <br>
- * <b>
+ *
+ * <p><b>
* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
* </b>
*
@@ -199,13 +199,13 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
*/
public SplitDataProperties<T> splitsGroupedBy(int... groupFields) {
- if(groupFields == null) {
+ if (groupFields == null) {
throw new InvalidProgramException("GroupFields may not be null.");
} else if (groupFields.length == 0) {
throw new InvalidProgramException("GroupFields may not be empty.");
}
- if(this.splitOrdering != null) {
+ if (this.splitOrdering != null) {
throw new InvalidProgramException("DataSource may either be grouped or sorted.");
}
@@ -219,8 +219,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
* Multiple field expressions must be separated by the semicolon ';' character.
* All records sharing the same key (combination) must be subsequently emitted by the input
* format for each input split.
- * <br>
- * <b>
+ *
+ * <p><b>
* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
* </b>
*
@@ -229,7 +229,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
*/
public SplitDataProperties<T> splitsGroupedBy(String groupFields) {
- if(groupFields == null) {
+ if (groupFields == null) {
throw new InvalidProgramException("GroupFields may not be null.");
}
@@ -238,7 +238,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
throw new InvalidProgramException("GroupFields may not be empty.");
}
- if(this.splitOrdering != null) {
+ if (this.splitOrdering != null) {
throw new InvalidProgramException("DataSource may either be grouped or sorted.");
}
@@ -251,8 +251,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
* Defines that the data within an input split is sorted on the fields defined by the field positions
* in the specified orders.
* All records of an input split must be emitted by the input format in the defined order.
- * <br>
- * <b>
+ *
+ * <p><b>
* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
* </b>
*
@@ -262,7 +262,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
*/
public SplitDataProperties<T> splitsOrderedBy(int[] orderFields, Order[] orders) {
- if(orderFields == null || orders == null) {
+ if (orderFields == null || orders == null) {
throw new InvalidProgramException("OrderFields or Orders may not be null.");
} else if (orderFields.length == 0) {
throw new InvalidProgramException("OrderFields may not be empty.");
@@ -272,17 +272,17 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
throw new InvalidProgramException("Number of OrderFields and Orders must match.");
}
- if(this.splitGroupKeys != null) {
+ if (this.splitGroupKeys != null) {
throw new InvalidProgramException("DataSource may either be grouped or sorted.");
}
this.splitOrdering = new Ordering();
- for(int i=0; i<orderFields.length; i++) {
+ for (int i = 0; i < orderFields.length; i++) {
int pos = orderFields[i];
int[] flatKeys = this.getAllFlatKeys(new int[]{pos});
- for(int key : flatKeys) {
+ for (int key : flatKeys) {
// check for duplicates
for (int okey : splitOrdering.getFieldPositions()) {
if (key == okey) {
@@ -290,7 +290,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
}
}
// append key
- this.splitOrdering.appendOrdering(key, null, orders[i] );
+ this.splitOrdering.appendOrdering(key, null, orders[i]);
}
}
return this;
@@ -300,8 +300,8 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
* Defines that the data within an input split is sorted on the fields defined by the field expressions
* in the specified orders. Multiple field expressions must be separated by the semicolon ';' character.
* All records of an input split must be emitted by the input format in the defined order.
- * <br>
- * <b>
+ *
+ * <p><b>
* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results!
* </b>
*
@@ -311,7 +311,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
*/
public SplitDataProperties<T> splitsOrderedBy(String orderFields, Order[] orders) {
- if(orderFields == null || orders == null) {
+ if (orderFields == null || orders == null) {
throw new InvalidProgramException("OrderFields or Orders may not be null.");
}
@@ -324,18 +324,18 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
throw new InvalidProgramException("Number of OrderFields and Orders must match.");
}
- if(this.splitGroupKeys != null) {
+ if (this.splitGroupKeys != null) {
throw new InvalidProgramException("DataSource may either be grouped or sorted.");
}
this.splitOrdering = new Ordering();
- for(int i=0; i<orderKeysA.length; i++) {
+ for (int i = 0; i < orderKeysA.length; i++) {
String keyExp = orderKeysA[i];
Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(keyExp, this.type);
int[] flatKeys = ek.computeLogicalKeyPositions();
- for(int key : flatKeys) {
+ for (int key : flatKeys) {
// check for duplicates
for (int okey : splitOrdering.getFieldPositions()) {
if (key == okey) {
@@ -343,7 +343,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
}
}
// append key
- this.splitOrdering.appendOrdering(key, null, orders[i] );
+ this.splitOrdering.appendOrdering(key, null, orders[i]);
}
}
return this;
@@ -365,25 +365,24 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
return this.splitOrdering;
}
-
/////////////////////// FLAT FIELD EXTRACTION METHODS
private int[] getAllFlatKeys(String[] fieldExpressions) {
int[] allKeys = null;
- for(String keyExp : fieldExpressions) {
+ for (String keyExp : fieldExpressions) {
Keys.ExpressionKeys<T> ek = new Keys.ExpressionKeys<>(keyExp, this.type);
int[] flatKeys = ek.computeLogicalKeyPositions();
- if(allKeys == null) {
+ if (allKeys == null) {
allKeys = flatKeys;
} else {
// check for duplicates
- for(int key1 : flatKeys) {
- for(int key2 : allKeys) {
- if(key1 == key2) {
- throw new InvalidProgramException("Duplicate fields in field expression "+keyExp);
+ for (int key1 : flatKeys) {
+ for (int key2 : allKeys) {
+ if (key1 == key2) {
+ throw new InvalidProgramException("Duplicate fields in field expression " + keyExp);
}
}
}
@@ -425,7 +424,7 @@ public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataPr
@Override
public boolean equals(Object o) {
- if(o instanceof SourcePartitionerMarker) {
+ if (o instanceof SourcePartitionerMarker) {
return this.partitionMarker.equals(((SourcePartitionerMarker<?>) o).partitionMarker);
} else {
return false;
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
index b2554bf..7d4cf9c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
@@ -18,61 +18,63 @@
package org.apache.flink.api.java.io;
-import java.io.IOException;
-import java.nio.charset.Charset;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.io.DelimitedInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+/**
+ * Input Format that reads text files. Each line results in another element.
+ */
@PublicEvolving
public class TextInputFormat extends DelimitedInputFormat<String> {
-
+
private static final long serialVersionUID = 1L;
-
+
/**
- * Code of \r, used to remove \r from a line when the line ends with \r\n
+ * Code of \r, used to remove \r from a line when the line ends with \r\n.
*/
private static final byte CARRIAGE_RETURN = (byte) '\r';
/**
- * Code of \n, used to identify if \n is used as delimiter
+ * Code of \n, used to identify if \n is used as delimiter.
*/
private static final byte NEW_LINE = (byte) '\n';
-
-
+
/**
* The name of the charset to use for decoding.
*/
private String charsetName = "UTF-8";
-
+
// --------------------------------------------------------------------------------------------
-
+
public TextInputFormat(Path filePath) {
super(filePath, null);
}
-
- // --------------------------------------------------------------------------------------------
-
+
+ // --------------------------------------------------------------------------------------------
+
public String getCharsetName() {
return charsetName;
}
-
+
public void setCharsetName(String charsetName) {
if (charsetName == null) {
throw new IllegalArgumentException("Charset must not be null.");
}
-
+
this.charsetName = charsetName;
}
-
+
// --------------------------------------------------------------------------------------------
@Override
public void configure(Configuration parameters) {
super.configure(parameters);
-
+
if (charsetName == null || !Charset.isSupported(charsetName)) {
throw new RuntimeException("Unsupported charset: " + charsetName);
}
@@ -83,17 +85,17 @@ public class TextInputFormat extends DelimitedInputFormat<String> {
@Override
public String readRecord(String reusable, byte[] bytes, int offset, int numBytes) throws IOException {
//Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
- if (this.getDelimiter() != null && this.getDelimiter().length == 1
- && this.getDelimiter()[0] == NEW_LINE && offset+numBytes >= 1
- && bytes[offset+numBytes-1] == CARRIAGE_RETURN){
+ if (this.getDelimiter() != null && this.getDelimiter().length == 1
+ && this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 1
+ && bytes[offset + numBytes - 1] == CARRIAGE_RETURN){
numBytes -= 1;
}
-
+
return new String(bytes, offset, numBytes, this.charsetName);
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@Override
public String toString() {
return "TextInputFormat (" + getFilePath() + ") - " + this.charsetName;
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
index d466082..006b571 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java
@@ -18,65 +18,75 @@
package org.apache.flink.api.java.io;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.core.fs.Path;
+
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.core.fs.Path;
-
+/**
+ * A {@link FileOutputFormat} that writes objects to a text file.
+ *
+ * <p>Objects are converted to Strings using either {@link Object#toString()} or a {@link TextFormatter}.
+ * @param <T> type of elements
+ */
@PublicEvolving
public class TextOutputFormat<T> extends FileOutputFormat<T> {
private static final long serialVersionUID = 1L;
-
+
private static final int NEWLINE = '\n';
private String charsetName;
-
+
private transient Charset charset;
// --------------------------------------------------------------------------------------------
- public static interface TextFormatter<IN> extends Serializable {
- public String format(IN value);
+
+ /**
+ * Formatter that transforms values into their {@link String} representations.
+ * @param <IN> type of input elements
+ */
+ public interface TextFormatter<IN> extends Serializable {
+ String format(IN value);
}
public TextOutputFormat(Path outputPath) {
this(outputPath, "UTF-8");
}
-
+
public TextOutputFormat(Path outputPath, String charset) {
super(outputPath);
this.charsetName = charset;
}
-
-
+
public String getCharsetName() {
return charsetName;
}
-
+
public void setCharsetName(String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException {
if (charsetName == null) {
throw new NullPointerException();
}
-
+
if (!Charset.isSupported(charsetName)) {
throw new UnsupportedCharsetException("The charset " + charsetName + " is not supported.");
}
-
+
this.charsetName = charsetName;
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
-
+
try {
this.charset = Charset.forName(charsetName);
}
@@ -87,16 +97,16 @@ public class TextOutputFormat<T> extends FileOutputFormat<T> {
throw new IOException("The charset " + charsetName + " is not supported.", e);
}
}
-
+
@Override
public void writeRecord(T record) throws IOException {
byte[] bytes = record.toString().getBytes(charset);
this.stream.write(bytes);
this.stream.write(NEWLINE);
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@Override
public String toString() {
return "TextOutputFormat (" + getOutputFilePath() + ") - " + this.charsetName;
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
index 45a2e3e..4721439 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
@@ -18,6 +18,12 @@
package org.apache.flink.api.java.io;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.StringValue;
+
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
@@ -26,61 +32,58 @@ import java.nio.charset.CharsetDecoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.io.DelimitedInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.StringValue;
-
+/**
+ * Input format that reads text files.
+ */
@PublicEvolving
public class TextValueInputFormat extends DelimitedInputFormat<StringValue> {
-
+
private static final long serialVersionUID = 1L;
-
+
private String charsetName = "UTF-8";
-
+
private boolean skipInvalidLines;
-
+
private transient CharsetDecoder decoder;
-
+
private transient ByteBuffer byteWrapper;
-
+
private transient boolean ascii;
-
+
// --------------------------------------------------------------------------------------------
-
+
public TextValueInputFormat(Path filePath) {
super(filePath, null);
}
-
- // --------------------------------------------------------------------------------------------
-
+
+ // --------------------------------------------------------------------------------------------
+
public String getCharsetName() {
return charsetName;
}
-
+
public void setCharsetName(String charsetName) {
if (charsetName == null) {
throw new IllegalArgumentException("The charset name may not be null.");
}
-
+
this.charsetName = charsetName;
}
-
+
public boolean isSkipInvalidLines() {
return skipInvalidLines;
}
-
+
public void setSkipInvalidLines(boolean skipInvalidLines) {
this.skipInvalidLines = skipInvalidLines;
}
-
+
// --------------------------------------------------------------------------------------------
@Override
public void configure(Configuration parameters) {
super.configure(parameters);
-
+
if (charsetName == null || !Charset.isSupported(charsetName)) {
throw new RuntimeException("Unsupported charset: " + charsetName);
}
@@ -88,7 +91,7 @@ public class TextValueInputFormat extends DelimitedInputFormat<StringValue> {
if (charsetName.equalsIgnoreCase(StandardCharsets.US_ASCII.name())) {
ascii = true;
}
-
+
this.decoder = Charset.forName(charsetName).newDecoder();
this.byteWrapper = ByteBuffer.allocate(1);
}
@@ -109,7 +112,7 @@ public class TextValueInputFormat extends DelimitedInputFormat<StringValue> {
}
byteWrapper.limit(offset + numBytes);
byteWrapper.position(offset);
-
+
try {
CharBuffer result = this.decoder.decode(byteWrapper);
reuse.setValue(result);
@@ -126,9 +129,9 @@ public class TextValueInputFormat extends DelimitedInputFormat<StringValue> {
}
}
}
-
+
// --------------------------------------------------------------------------------------------
-
+
@Override
public String toString() {
return "TextValueInputFormat (" + getFilePath() + ") - " + this.charsetName + (this.skipInvalidLines ? "(skipping invalid lines)" : "");
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
index 6efd566..887620a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java
@@ -15,15 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.java.io;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.fs.Path;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
+import org.apache.flink.core.fs.Path;
+/**
+ * Input format that reads csv into tuples.
+ */
@Internal
public class TupleCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
@@ -59,7 +62,7 @@ public class TupleCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
super(filePath);
configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask);
}
-
+
private void configure(String lineDelimiter, String fieldDelimiter,
TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] includedFieldsMask) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
index 81a142e..108448d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java
@@ -36,16 +36,16 @@ import java.io.IOException;
public class TypeSerializerOutputFormat<T> extends BinaryOutputFormat<T> implements InputTypeConfigurable {
private static final long serialVersionUID = -6653022644629315158L;
-
+
private TypeSerializer<T> serializer;
@Override
protected void serialize(T record, DataOutputView dataOutput) throws IOException {
- if(serializer == null){
+ if (serializer == null){
throw new RuntimeException("TypeSerializerOutputFormat requires a type serializer to " +
"be defined.");
}
-
+
serializer.serialize(record, dataOutput);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
index de57e5c..5622930 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.types.LongValue;
import org.apache.flink.types.ShortValue;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
+
import org.junit.Assert;
import org.junit.Test;
@@ -59,7 +60,7 @@ public class CSVReaderTest {
reader.ignoreFirstLine();
Assert.assertTrue(reader.skipFirstLineAsHeader);
}
-
+
@Test
public void testIgnoreInvalidLinesConfigure() {
CsvReader reader = getCsvReader();
@@ -67,7 +68,7 @@ public class CSVReaderTest {
reader.ignoreInvalidLines();
Assert.assertTrue(reader.ignoreInvalidLines);
}
-
+
@Test
public void testIgnoreComments() {
CsvReader reader = getCsvReader();
@@ -89,38 +90,38 @@ public class CSVReaderTest {
CsvReader reader = getCsvReader();
reader.includeFields(true, true, true);
Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask));
-
+
reader = getCsvReader();
reader.includeFields("ttt");
Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask));
-
+
reader = getCsvReader();
reader.includeFields("TTT");
Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask));
-
+
reader = getCsvReader();
reader.includeFields("111");
Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask));
-
+
reader = getCsvReader();
reader.includeFields(0x7L);
Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask));
}
-
+
@Test
public void testIncludeFieldsSparse() {
CsvReader reader = getCsvReader();
reader.includeFields(false, true, true, false, false, true, false, false);
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
-
+
reader = getCsvReader();
reader.includeFields("fttfftff");
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
-
+
reader = getCsvReader();
reader.includeFields("FTTFFTFF");
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
-
+
reader = getCsvReader();
reader.includeFields("01100100");
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
@@ -128,16 +129,16 @@ public class CSVReaderTest {
reader = getCsvReader();
reader.includeFields("0t1f0TFF");
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
-
+
reader = getCsvReader();
reader.includeFields(0x26L);
Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask));
}
-
+
@Test
public void testIllegalCharInStringMask() {
CsvReader reader = getCsvReader();
-
+
try {
reader.includeFields("1t0Tfht");
Assert.fail("Reader accepted an invalid mask string");
@@ -146,12 +147,11 @@ public class CSVReaderTest {
// expected
}
}
-
-
+
@Test
public void testIncludeFieldsErrorWhenExcludingAll() {
CsvReader reader = getCsvReader();
-
+
try {
reader.includeFields(false, false, false, false, false, false);
Assert.fail("The reader accepted a fields configuration that excludes all fields.");
@@ -159,7 +159,7 @@ public class CSVReaderTest {
catch (IllegalArgumentException e) {
// all good
}
-
+
try {
reader.includeFields(0);
Assert.fail("The reader accepted a fields configuration that excludes all fields.");
@@ -167,7 +167,7 @@ public class CSVReaderTest {
catch (IllegalArgumentException e) {
// all good
}
-
+
try {
reader.includeFields("ffffffffffffff");
Assert.fail("The reader accepted a fields configuration that excludes all fields.");
@@ -175,7 +175,7 @@ public class CSVReaderTest {
catch (IllegalArgumentException e) {
// all good
}
-
+
try {
reader.includeFields("00000000000000000");
Assert.fail("The reader accepted a fields configuration that excludes all fields.");
@@ -191,12 +191,12 @@ public class CSVReaderTest {
DataSource<Item> items = reader.tupleType(Item.class);
Assert.assertTrue(items.getType().getTypeClass() == Item.class);
}
-
+
@Test
public void testFieldTypes() throws Exception {
CsvReader reader = getCsvReader();
DataSource<Item> items = reader.tupleType(Item.class);
-
+
TypeInformation<?> info = items.getType();
if (!info.isTupleType()) {
Assert.fail();
@@ -208,44 +208,44 @@ public class CSVReaderTest {
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(3));
}
-
+
CsvInputFormat<?> inputFormat = (CsvInputFormat<?>) items.getInputFormat();
Assert.assertArrayEquals(new Class<?>[]{Integer.class, String.class, Double.class, String.class}, inputFormat.getFieldTypes());
}
-
+
@Test
public void testSubClass() throws Exception {
CsvReader reader = getCsvReader();
DataSource<SubItem> sitems = reader.tupleType(SubItem.class);
TypeInformation<?> info = sitems.getType();
-
+
Assert.assertEquals(true, info.isTupleType());
Assert.assertEquals(SubItem.class, info.getTypeClass());
-
+
@SuppressWarnings("unchecked")
TupleTypeInfo<SubItem> tinfo = (TupleTypeInfo<SubItem>) info;
-
+
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tinfo.getTypeAt(0));
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(1));
Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tinfo.getTypeAt(2));
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(3));
-
+
CsvInputFormat<?> inputFormat = (CsvInputFormat<?>) sitems.getInputFormat();
Assert.assertArrayEquals(new Class<?>[]{Integer.class, String.class, Double.class, String.class}, inputFormat.getFieldTypes());
}
-
+
@Test
public void testSubClassWithPartialsInHierarchie() throws Exception {
CsvReader reader = getCsvReader();
DataSource<FinalItem> sitems = reader.tupleType(FinalItem.class);
TypeInformation<?> info = sitems.getType();
-
+
Assert.assertEquals(true, info.isTupleType());
Assert.assertEquals(FinalItem.class, info.getTypeClass());
-
+
@SuppressWarnings("unchecked")
TupleTypeInfo<SubItem> tinfo = (TupleTypeInfo<SubItem>) info;
-
+
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tinfo.getTypeAt(0));
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(1));
Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tinfo.getTypeAt(2));
@@ -253,15 +253,15 @@ public class CSVReaderTest {
Assert.assertEquals(ValueTypeInfo.class, tinfo.getTypeAt(4).getClass());
Assert.assertEquals(StringValue.class, ((ValueTypeInfo<?>) tinfo.getTypeAt(3)).getTypeClass());
Assert.assertEquals(LongValue.class, ((ValueTypeInfo<?>) tinfo.getTypeAt(4)).getTypeClass());
-
+
CsvInputFormat<?> inputFormat = (CsvInputFormat<?>) sitems.getInputFormat();
Assert.assertArrayEquals(new Class<?>[] {Integer.class, String.class, Double.class, StringValue.class, LongValue.class}, inputFormat.getFieldTypes());
}
-
+
@Test
public void testUnsupportedPartialitem() throws Exception {
CsvReader reader = getCsvReader();
-
+
try {
reader.tupleType(PartialItem.class);
Assert.fail("tupleType() accepted an underspecified generic class.");
@@ -295,32 +295,32 @@ public class CSVReaderTest {
// CsvReader doesn't support custom Value type
reader.types(ValueItem.class);
}
-
+
private static CsvReader getCsvReader() {
return new CsvReader("/some/none/existing/path", ExecutionEnvironment.createLocalEnvironment(1));
}
-
+
// --------------------------------------------------------------------------------------------
// Custom types for testing
// --------------------------------------------------------------------------------------------
-
- public static class Item extends Tuple4<Integer, String, Double, String> {
+
+ private static class Item extends Tuple4<Integer, String, Double, String> {
private static final long serialVersionUID = -7444437337392053502L;
}
-
- public static class SubItem extends Item {
+
+ private static class SubItem extends Item {
private static final long serialVersionUID = 1L;
}
-
- public static class PartialItem<A, B, C> extends Tuple5<Integer, A, Double, B, C> {
+
+ private static class PartialItem<A, B, C> extends Tuple5<Integer, A, Double, B, C> {
private static final long serialVersionUID = 1L;
}
-
- public static class FinalItem extends PartialItem<String, StringValue, LongValue> {
+
+ private static class FinalItem extends PartialItem<String, StringValue, LongValue> {
private static final long serialVersionUID = 1L;
}
- public static class ValueItem implements Value {
+ private static class ValueItem implements Value {
private int v1;
public int getV1() {
http://git-wip-us.apache.org/repos/asf/flink/blob/0c9c9fb5/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index 4dabaca..77945cc 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -18,11 +18,6 @@
package org.apache.flink.api.java.io;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -33,6 +28,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+
import org.junit.Test;
import java.io.ByteArrayInputStream;
@@ -47,9 +43,17 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link CollectionInputFormat}.
+ */
public class CollectionInputFormatTest {
-
- public static class ElementType {
+
+ private static class ElementType {
private final int id;
public ElementType(){
@@ -73,7 +77,7 @@ public class CollectionInputFormatTest {
return false;
}
}
-
+
@Override
public int hashCode() {
return id;
@@ -90,7 +94,8 @@ public class CollectionInputFormatTest {
@Test
public void testSerializability() {
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(buffer)) {
+ ObjectOutputStream out = new ObjectOutputStream(buffer)) {
+
Collection<ElementType> inputCollection = new ArrayList<ElementType>();
ElementType element1 = new ElementType(1);
ElementType element2 = new ElementType(2);
@@ -98,10 +103,10 @@ public class CollectionInputFormatTest {
inputCollection.add(element1);
inputCollection.add(element2);
inputCollection.add(element3);
-
+
@SuppressWarnings("unchecked")
TypeInformation<ElementType> info = (TypeInformation<ElementType>) TypeExtractor.createTypeInfo(ElementType.class);
-
+
CollectionInputFormat<ElementType> inputFormat = new CollectionInputFormat<ElementType>(inputCollection,
info.createSerializer(new ExecutionConfig()));
@@ -121,23 +126,23 @@ public class CollectionInputFormatTest {
inputFormat.open(inputSplit);
result.open(inputSplit);
- while(!inputFormat.reachedEnd() && !result.reachedEnd()){
+ while (!inputFormat.reachedEnd() && !result.reachedEnd()){
ElementType expectedElement = inputFormat.nextRecord(null);
ElementType actualElement = result.nextRecord(null);
assertEquals(expectedElement, actualElement);
}
}
- catch(Exception e) {
+ catch (Exception e) {
e.printStackTrace();
fail(e.toString());
}
}
-
+
@Test
public void testSerializabilityStrings() {
-
+
final String[] data = new String[] {
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
@@ -175,33 +180,33 @@ public class CollectionInputFormatTest {
"The fair Ophelia!--Nymph, in thy orisons",
"Be all my sins remember'd."
};
-
+
try {
List<String> inputCollection = Arrays.asList(data);
CollectionInputFormat<String> inputFormat = new CollectionInputFormat<String>(inputCollection, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()));
-
+
// serialize
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(inputFormat);
oos.close();
-
+
// deserialize
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
ObjectInputStream ois = new ObjectInputStream(bais);
Object result = ois.readObject();
-
+
assertTrue(result instanceof CollectionInputFormat);
-
+
int i = 0;
@SuppressWarnings("unchecked")
CollectionInputFormat<String> in = (CollectionInputFormat<String>) result;
in.open(new GenericInputSplit(0, 1));
-
+
while (!in.reachedEnd()) {
assertEquals(data[i++], in.nextRecord(""));
}
-
+
assertEquals(data.length, i);
}
catch (Exception e) {
@@ -209,7 +214,7 @@ public class CollectionInputFormatTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testSerializationFailure() {
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream();
@@ -217,7 +222,7 @@ public class CollectionInputFormatTest {
// a mock serializer that fails when writing
CollectionInputFormat<ElementType> inFormat = new CollectionInputFormat<ElementType>(
Collections.singleton(new ElementType()), new TestSerializer(false, true));
-
+
try {
out.writeObject(inFormat);
fail("should throw an exception");
@@ -234,21 +239,21 @@ public class CollectionInputFormatTest {
fail(e.getMessage());
}
}
-
+
@Test
public void testDeserializationFailure() {
try (ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(buffer)) {
+ ObjectOutputStream out = new ObjectOutputStream(buffer)) {
// a mock serializer that fails when writing
CollectionInputFormat<ElementType> inFormat = new CollectionInputFormat<ElementType>(
Collections.singleton(new ElementType()), new TestSerializer(true, false));
out.writeObject(inFormat);
out.close();
-
+
ByteArrayInputStream bais = new ByteArrayInputStream(buffer.toByteArray());
ObjectInputStream in = new ObjectInputStream(bais);
-
+
try {
in.readObject();
fail("should throw an exception");
@@ -296,14 +301,14 @@ public class CollectionInputFormatTest {
private static class TestException extends IOException{
private static final long serialVersionUID = 1L;
}
-
+
private static class TestSerializer extends TypeSerializer<ElementType> {
private static final long serialVersionUID = 1L;
-
+
private final boolean failOnRead;
private final boolean failOnWrite;
-
+
public TestSerializer(boolean failOnRead, boolean failOnWrite) {
this.failOnRead = failOnRead;
this.failOnWrite = failOnWrite;
[3/4] flink git commit: [FLINK-7191] Activate checkstyle
flink-java/operators/translation
Posted by ch...@apache.org.
[FLINK-7191] Activate checkstyle flink-java/operators/translation
This closes #4334.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e975362
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e975362
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e975362
Branch: refs/heads/master
Commit: 8e975362312c727fd602429778bc1c3628b95619
Parents: 0c9c9fb
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Fri Jul 14 10:42:57 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 31 17:04:26 2017 +0200
----------------------------------------------------------------------
.../CombineToGroupCombineWrapper.java | 1 +
.../translation/KeyExtractingMapper.java | 23 +--
.../translation/KeyRemovingMapper.java | 9 +-
.../PlanBothUnwrappingCoGroupOperator.java | 22 +--
.../translation/PlanFilterOperator.java | 17 +-
.../PlanLeftUnwrappingCoGroupOperator.java | 15 +-
.../translation/PlanProjectOperator.java | 25 +--
.../PlanRightUnwrappingCoGroupOperator.java | 11 +-
.../PlanUnwrappingGroupCombineOperator.java | 29 ++--
.../PlanUnwrappingReduceGroupOperator.java | 47 +++---
.../PlanUnwrappingReduceOperator.java | 12 +-
...lanUnwrappingSortedGroupCombineOperator.java | 15 +-
...PlanUnwrappingSortedReduceGroupOperator.java | 23 ++-
.../RichCombineToGroupCombineWrapper.java | 3 +-
.../translation/Tuple3UnwrappingIterator.java | 4 +-
.../translation/Tuple3WrappingCollector.java | 3 +-
.../translation/TupleLeftUnwrappingJoiner.java | 8 +
.../translation/TupleRightUnwrappingJoiner.java | 8 +
.../translation/TupleUnwrappingIterator.java | 14 +-
.../translation/TupleUnwrappingJoiner.java | 8 +
.../translation/TupleWrappingCollector.java | 13 +-
.../translation/TwoKeyExtractingMapper.java | 10 +-
.../operators/translation/WrappingFunction.java | 13 +-
.../translation/AggregateTranslationTest.java | 35 ++--
.../translation/CoGroupSortTranslationTest.java | 53 +++---
.../DeltaIterationTranslationTest.java | 160 ++++++++++---------
.../translation/DistinctTranslationTest.java | 15 +-
.../translation/ReduceTranslationTests.java | 98 ++++++------
tools/maven/suppressions-java.xml | 8 -
29 files changed, 375 insertions(+), 327 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java
index 408d4b3..f574218 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/CombineToGroupCombineWrapper.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
index f35b950..1f0cc1d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyExtractingMapper.java
@@ -19,34 +19,37 @@
package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
+/**
+ * Mapper that extracts keys.
+ * @param <T> type of value
+ * @param <K> type of key
+ */
@Internal
@ForwardedFields("*->1")
public final class KeyExtractingMapper<T, K> extends RichMapFunction<T, Tuple2<K, T>> {
-
+
private static final long serialVersionUID = 1L;
-
+
private final KeySelector<T, K> keySelector;
-
+
private final Tuple2<K, T> tuple = new Tuple2<K, T>();
-
-
+
public KeyExtractingMapper(KeySelector<T, K> keySelector) {
this.keySelector = keySelector;
}
-
-
+
@Override
public Tuple2<K, T> map(T value) throws Exception {
-
+
K key = keySelector.getKey(value);
tuple.f0 = key;
tuple.f1 = value;
-
+
return tuple;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
index 5f0de32..920f893 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/KeyRemovingMapper.java
@@ -23,12 +23,17 @@ import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.tuple.Tuple2;
+/**
+ * Mapper that removes keys.
+ * @param <T> type of values
+ * @param <K> type of keys
+ */
@Internal
@ForwardedFields("1->*")
public final class KeyRemovingMapper<T, K> extends RichMapFunction<Tuple2<K, T>, T> {
-
+
private static final long serialVersionUID = 1L;
-
+
@Override
public T map(Tuple2<K, T> value) {
return value.f1;
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
index 1814329..6ccf0f3 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanBothUnwrappingCoGroupOperator.java
@@ -21,16 +21,18 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
+/**
+ * A co group operator that applies the operation only on the unwrapped values.
+ */
@Internal
public class PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K>
- extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>>
-{
+ extends CoGroupOperatorBase<Tuple2<K, I1>, Tuple2<K, I2>, OUT, CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>> {
public PlanBothUnwrappingCoGroupOperator(
CoGroupFunction<I1, I2, OUT> udf,
@@ -52,23 +54,21 @@ public class PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K>
name);
}
- public static final class TupleBothUnwrappingCoGrouper<I1, I2, OUT, K>
+ private static final class TupleBothUnwrappingCoGrouper<I1, I2, OUT, K>
extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
- implements CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT>
- {
+ implements CoGroupFunction<Tuple2<K, I1>, Tuple2<K, I2>, OUT> {
private static final long serialVersionUID = 1L;
-
+
private final TupleUnwrappingIterator<I1, K> iter1;
private final TupleUnwrappingIterator<I2, K> iter2;
-
+
private TupleBothUnwrappingCoGrouper(CoGroupFunction<I1, I2, OUT> wrapped) {
super(wrapped);
-
+
this.iter1 = new TupleUnwrappingIterator<I1, K>();
this.iter2 = new TupleUnwrappingIterator<I2, K>();
}
-
@Override
public void coGroup(
Iterable<Tuple2<K, I1>> records1,
@@ -79,6 +79,6 @@ public class PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K>
iter2.set(records2.iterator());
this.wrappedFunction.coGroup(iter1, iter2, out);
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
index c93191f..ecf1aac 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanFilterOperator.java
@@ -27,26 +27,33 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.util.Collector;
+/**
+ * @see FilterOperatorBase
+ * @param <T>
+ */
@Internal
@ForwardedFields("*")
public class PlanFilterOperator<T> extends FilterOperatorBase<T, FlatMapFunction<T, T>> {
-
+
public PlanFilterOperator(FilterFunction<T> udf, String name, TypeInformation<T> type) {
super(new FlatMapFilter<T>(udf), new UnaryOperatorInformation<T, T>(type, type), name);
}
+ /**
+ * @see FlatMapFunction
+ * @param <T>
+ */
public static final class FlatMapFilter<T> extends WrappingFunction<FilterFunction<T>>
- implements FlatMapFunction<T, T>
- {
+ implements FlatMapFunction<T, T> {
private static final long serialVersionUID = 1L;
-
+
private FlatMapFilter(FilterFunction<T> wrapped) {
super(wrapped);
}
@Override
- public final void flatMap(T value, Collector<T> out) throws Exception {
+ public void flatMap(T value, Collector<T> out) throws Exception {
if (this.wrappedFunction.filter(value)) {
out.collect(value);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
index 78840ce..b2a6937 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanLeftUnwrappingCoGroupOperator.java
@@ -21,16 +21,18 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
+/**
+ * A co group operator that applies the operation only on the unwrapped values on the left.
+ */
@Internal
public class PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K>
- extends CoGroupOperatorBase<Tuple2<K, I1>, I2, OUT, CoGroupFunction<Tuple2<K, I1>, I2, OUT>>
-{
+ extends CoGroupOperatorBase<Tuple2<K, I1>, I2, OUT, CoGroupFunction<Tuple2<K, I1>, I2, OUT>> {
public PlanLeftUnwrappingCoGroupOperator(
CoGroupFunction<I1, I2, OUT> udf,
@@ -52,21 +54,20 @@ public class PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K>
name);
}
- public static final class TupleLeftUnwrappingCoGrouper<I1, I2, OUT, K>
+ private static final class TupleLeftUnwrappingCoGrouper<I1, I2, OUT, K>
extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
implements CoGroupFunction<Tuple2<K, I1>, I2, OUT> {
private static final long serialVersionUID = 1L;
-
+
private final TupleUnwrappingIterator<I1, K> iter1;
private TupleLeftUnwrappingCoGrouper(CoGroupFunction<I1, I2, OUT> wrapped) {
super(wrapped);
-
+
this.iter1 = new TupleUnwrappingIterator<I1, K>();
}
-
@Override
public void coGroup(
Iterable<Tuple2<K, I1>> records1,
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
index fe981a5..3960807 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanProjectOperator.java
@@ -27,30 +27,33 @@ import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
+/**
+ * A map operator that retains a subset of fields from incoming tuples.
+ *
+ * @param <T> Input tuple type
+ * @param <R> Output tuple type
+ */
@Internal
public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T, R, MapFunction<T, R>> {
public PlanProjectOperator(int[] fields, String name,
TypeInformation<T> inType, TypeInformation<R> outType,
- ExecutionConfig executionConfig)
- {
+ ExecutionConfig executionConfig) {
super(PlanProjectOperator.<T, R, Tuple>createTypedProjector(fields), new UnaryOperatorInformation<T, R>(inType, outType), name);
}
-
+
@SuppressWarnings("unchecked")
private static <T, R extends Tuple, X extends Tuple> MapFunction<T, R> createTypedProjector(int[] fields) {
return (MapFunction<T, R>) new MapProjector<X, R>(fields);
}
-
-
- public static final class MapProjector<T extends Tuple, R extends Tuple>
- extends AbstractRichFunction implements MapFunction<T, R>
- {
+
+ private static final class MapProjector<T extends Tuple, R extends Tuple>
+ extends AbstractRichFunction implements MapFunction<T, R> {
private static final long serialVersionUID = 1L;
-
+
private final int[] fields;
private final Tuple outTuple;
-
+
private MapProjector(int[] fields) {
this.fields = fields;
try {
@@ -69,7 +72,7 @@ public class PlanProjectOperator<T, R extends Tuple> extends MapOperatorBase<T,
for (int i = 0; i < fields.length; i++) {
outTuple.setField(inTuple.getField(fields[i]), i);
}
-
+
return (R) outTuple;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
index faeca4e..f86deb7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanRightUnwrappingCoGroupOperator.java
@@ -21,16 +21,18 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
+/**
+ * A co group operator that applies the operation only on the unwrapped values on the right.
+ */
@Internal
public class PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K>
- extends CoGroupOperatorBase<I1, Tuple2<K, I2>, OUT, CoGroupFunction<I1, Tuple2<K, I2>, OUT>>
-{
+ extends CoGroupOperatorBase<I1, Tuple2<K, I2>, OUT, CoGroupFunction<I1, Tuple2<K, I2>, OUT>> {
public PlanRightUnwrappingCoGroupOperator(
CoGroupFunction<I1, I2, OUT> udf,
@@ -52,7 +54,7 @@ public class PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K>
name);
}
- public static final class TupleRightUnwrappingCoGrouper<I1, I2, OUT, K>
+ private static final class TupleRightUnwrappingCoGrouper<I1, I2, OUT, K>
extends WrappingFunction<CoGroupFunction<I1, I2, OUT>>
implements CoGroupFunction<I1, Tuple2<K, I2>, OUT> {
@@ -66,7 +68,6 @@ public class PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K>
this.iter2 = new TupleUnwrappingIterator<I2, K>();
}
-
@Override
public void coGroup(
Iterable<I1> records1,
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
index e9feb61..6e6f226 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingGroupCombineOperator.java
@@ -20,10 +20,10 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
@@ -35,35 +35,32 @@ import org.apache.flink.util.Collector;
public class PlanUnwrappingGroupCombineOperator<IN, OUT, K> extends GroupCombineOperatorBase<Tuple2<K, IN>, OUT, GroupCombineFunction<Tuple2<K, IN>, OUT>> {
public PlanUnwrappingGroupCombineOperator(GroupCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
- TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey)
- {
+ TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey) {
super(new TupleUnwrappingGroupCombiner<IN, OUT, K>(udf),
new UnaryOperatorInformation<Tuple2<K, IN>, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
-
+
}
-
+
// --------------------------------------------------------------------------------------------
-
- public static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
- implements GroupCombineFunction<Tuple2<K, IN>, OUT>
- {
-
+
+ private static final class TupleUnwrappingGroupCombiner<IN, OUT, K> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
+ implements GroupCombineFunction<Tuple2<K, IN>, OUT> {
+
private static final long serialVersionUID = 1L;
-
- private final TupleUnwrappingIterator<IN, K> iter;
-
+
+ private final TupleUnwrappingIterator<IN, K> iter;
+
private TupleUnwrappingGroupCombiner(GroupCombineFunction<IN, OUT> wrapped) {
super(wrapped);
this.iter = new TupleUnwrappingIterator<IN, K>();
}
-
-
+
@Override
public void combine(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
iter.set(values.iterator());
this.wrappedFunction.combine(iter, out);
}
-
+
@Override
public String toString() {
return this.wrappedFunction.toString();
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
index 8568659..33c527d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceGroupOperator.java
@@ -21,10 +21,10 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
* on the unwrapped values.
*/
@Internal
-public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GroupReduceFunction<Tuple2<K, IN>,OUT>> {
+public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOperatorBase<Tuple2<K, IN>, OUT, GroupReduceFunction<Tuple2<K, IN>, OUT>> {
public PlanUnwrappingReduceGroupOperator(
GroupReduceFunction<IN, OUT> udf,
@@ -41,32 +41,30 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
String name,
TypeInformation<OUT> outType,
TypeInformation<Tuple2<K, IN>> typeInfoWithKey,
- boolean combinable)
- {
+ boolean combinable) {
super(
combinable ?
new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K>(udf) :
new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
new UnaryOperatorInformation<>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
-
+
super.setCombinable(combinable);
}
-
+
// --------------------------------------------------------------------------------------------
-
- public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
- implements GroupReduceFunction<Tuple2<K, IN>, OUT>, GroupCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>>
- {
+
+ private static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
+ implements GroupReduceFunction<Tuple2<K, IN>, OUT>, GroupCombineFunction<Tuple2<K, IN>, Tuple2<K, IN>> {
private static final long serialVersionUID = 1L;
-
+
private TupleUnwrappingIterator<IN, K> iter;
private TupleWrappingCollector<IN, K> coll;
private TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
super(wrapped);
- if(!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
+ if (!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
throw new IllegalArgumentException("Wrapped reduce function does not implement the GroupCombineFunction interface.");
}
@@ -74,7 +72,6 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
this.coll = new TupleWrappingCollector<>(this.iter);
}
-
@Override
public void reduce(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
iter.set(values.iterator());
@@ -87,35 +84,33 @@ public class PlanUnwrappingReduceGroupOperator<IN, OUT, K> extends GroupReduceOp
iter.set(values.iterator());
coll.set(out);
- ((GroupCombineFunction<IN, IN>)this.wrappedFunction).combine(iter, coll);
+ ((GroupCombineFunction<IN, IN>) this.wrappedFunction).combine(iter, coll);
}
-
+
@Override
public String toString() {
return this.wrappedFunction.toString();
}
}
-
- public static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
- implements GroupReduceFunction<Tuple2<K, IN>, OUT>
- {
-
+
+ private static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
+ implements GroupReduceFunction<Tuple2<K, IN>, OUT> {
+
private static final long serialVersionUID = 1L;
-
- private final TupleUnwrappingIterator<IN, K> iter;
-
+
+ private final TupleUnwrappingIterator<IN, K> iter;
+
private TupleUnwrappingNonCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
super(wrapped);
this.iter = new TupleUnwrappingIterator<>();
}
-
-
+
@Override
public void reduce(Iterable<Tuple2<K, IN>> values, Collector<OUT> out) throws Exception {
iter.set(values.iterator());
this.wrappedFunction.reduce(iter, out);
}
-
+
@Override
public String toString() {
return this.wrappedFunction.toString();
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
index 72dc41a..b2e614e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingReduceOperator.java
@@ -20,13 +20,12 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
-
/**
* A reduce operator that takes 2-tuples (key-value pairs), and applies the reduce operation only
* on the unwrapped values.
@@ -35,16 +34,13 @@ import org.apache.flink.api.java.tuple.Tuple2;
public class PlanUnwrappingReduceOperator<T, K> extends ReduceOperatorBase<Tuple2<K, T>, ReduceFunction<Tuple2<K, T>>> {
public PlanUnwrappingReduceOperator(ReduceFunction<T> udf, Keys.SelectorFunctionKeys<T, K> key, String name,
- TypeInformation<T> type, TypeInformation<Tuple2<K, T>> typeInfoWithKey)
- {
+ TypeInformation<T> type, TypeInformation<Tuple2<K, T>> typeInfoWithKey) {
super(new ReduceWrapper<T, K>(udf), new UnaryOperatorInformation<Tuple2<K, T>, Tuple2<K, T>>(typeInfoWithKey, typeInfoWithKey), key.computeLogicalKeyPositions(), name);
}
- public static final class ReduceWrapper<T, K> extends WrappingFunction<ReduceFunction<T>>
- implements ReduceFunction<Tuple2<K, T>>
- {
+ private static final class ReduceWrapper<T, K> extends WrappingFunction<ReduceFunction<T>>
+ implements ReduceFunction<Tuple2<K, T>> {
private static final long serialVersionUID = 1L;
-
private ReduceWrapper(ReduceFunction<T> wrapped) {
super(wrapped);
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
index f65f169..a2a9010 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedGroupCombineOperator.java
@@ -20,10 +20,10 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
@@ -32,21 +32,19 @@ import org.apache.flink.util.Collector;
* operation only on the unwrapped values.
*/
@Internal
-public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupCombineFunction<Tuple3<K1, K2, IN>,OUT>> {
+public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends GroupCombineOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupCombineFunction<Tuple3<K1, K2, IN>, OUT>> {
public PlanUnwrappingSortedGroupCombineOperator(GroupCombineFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K1> groupingKey, Keys.SelectorFunctionKeys<IN, K2> sortingKey, String name,
- TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey)
- {
+ TypeInformation<OUT> outType, TypeInformation<Tuple3<K1, K2, IN>> typeInfoWithKey) {
super(new TupleUnwrappingGroupReducer<IN, OUT, K1, K2>(udf),
new UnaryOperatorInformation<Tuple3<K1, K2, IN>, OUT>(typeInfoWithKey, outType),
- groupingKey.computeLogicalKeyPositions(),
+ groupingKey.computeLogicalKeyPositions(),
name);
}
- public static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
- implements GroupCombineFunction<Tuple3<K1, K2, IN>, OUT>
- {
+ private static final class TupleUnwrappingGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupCombineFunction<IN, OUT>>
+ implements GroupCombineFunction<Tuple3<K1, K2, IN>, OUT> {
private static final long serialVersionUID = 1L;
@@ -57,7 +55,6 @@ public class PlanUnwrappingSortedGroupCombineOperator<IN, OUT, K1, K2> extends G
this.iter = new Tuple3UnwrappingIterator<IN, K1, K2>();
}
-
@Override
public void combine(Iterable<Tuple3<K1, K2, IN>> values, Collector<OUT> out) throws Exception {
iter.set(values.iterator());
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
index 8080477..7f81fee 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanUnwrappingSortedReduceGroupOperator.java
@@ -21,10 +21,10 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
* operation only on the unwrapped values.
*/
@Internal
-public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends GroupReduceOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupReduceFunction<Tuple3<K1, K2, IN>,OUT>> {
+public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends GroupReduceOperatorBase<Tuple3<K1, K2, IN>, OUT, GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>> {
public PlanUnwrappingSortedReduceGroupOperator(
GroupReduceFunction<IN, OUT> udf,
@@ -42,8 +42,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
String name,
TypeInformation<OUT> outType,
TypeInformation<Tuple3<K1, K2, IN>>
- typeInfoWithKey, boolean combinable)
- {
+ typeInfoWithKey, boolean combinable) {
super(
combinable ?
new TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2>(udf) :
@@ -55,9 +54,8 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
// --------------------------------------------------------------------------------------------
- public static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
- implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>>
- {
+ private static final class TupleUnwrappingGroupCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
+ implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>, GroupCombineFunction<Tuple3<K1, K2, IN>, Tuple3<K1, K2, IN>> {
private static final long serialVersionUID = 1L;
@@ -67,7 +65,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
private TupleUnwrappingGroupCombinableGroupReducer(GroupReduceFunction<IN, OUT> wrapped) {
super(wrapped);
- if(!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
+ if (!GroupCombineFunction.class.isAssignableFrom(wrappedFunction.getClass())) {
throw new IllegalArgumentException("Wrapped reduce function does not implement the GroupCombineFunction interface.");
}
@@ -75,7 +73,6 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
this.coll = new Tuple3WrappingCollector<>(this.iter);
}
-
@Override
public void reduce(Iterable<Tuple3<K1, K2, IN>> values, Collector<OUT> out) throws Exception {
iter.set(values.iterator());
@@ -87,7 +84,7 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
public void combine(Iterable<Tuple3<K1, K2, IN>> values, Collector<Tuple3<K1, K2, IN>> out) throws Exception {
iter.set(values.iterator());
coll.set(out);
- ((GroupCombineFunction<IN, IN>)this.wrappedFunction).combine(iter, coll);
+ ((GroupCombineFunction<IN, IN>) this.wrappedFunction).combine(iter, coll);
}
@Override
@@ -96,9 +93,8 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
}
}
- public static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
- implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT>
- {
+ private static final class TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K1, K2> extends WrappingFunction<GroupReduceFunction<IN, OUT>>
+ implements GroupReduceFunction<Tuple3<K1, K2, IN>, OUT> {
private static final long serialVersionUID = 1L;
@@ -109,7 +105,6 @@ public class PlanUnwrappingSortedReduceGroupOperator<IN, OUT, K1, K2> extends Gr
this.iter = new Tuple3UnwrappingIterator<>();
}
-
@Override
public void reduce(Iterable<Tuple3<K1, K2, IN>> values, Collector<OUT> out) throws Exception {
iter.set(values.iterator());
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
index d8c54d6..3f6463a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/RichCombineToGroupCombineWrapper.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.java.operators.translation;
import org.apache.flink.api.common.functions.CombineFunction;
@@ -31,7 +32,7 @@ import org.apache.flink.util.Preconditions;
* and makes it look like a function that implements {@link GroupCombineFunction} and {@link GroupReduceFunction} to the runtime.
*/
public class RichCombineToGroupCombineWrapper<IN, OUT, F extends RichGroupReduceFunction<IN, OUT> & CombineFunction<IN, IN>>
- extends RichGroupCombineFunction<IN,IN> implements GroupReduceFunction<IN, OUT> {
+ extends RichGroupCombineFunction<IN, IN> implements GroupReduceFunction<IN, OUT> {
private final F wrappedFunction;
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
index fd3b4f6..b697ac9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
@@ -18,12 +18,12 @@
package org.apache.flink.api.java.operators.translation;
-import java.util.Iterator;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.TraversableOnceException;
+import java.util.Iterator;
+
/**
* An iterator that reads 3-tuples (groupKey, sortKey, value) and returns only the values (third field).
* The iterator also tracks the groupKeys, as the triples flow though it.
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java
index 189dcdb..57b6bc7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3WrappingCollector.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
/**
- * Needed to wrap tuples to {@code Tuple3<groupKey, sortKey, value>} for combine method of group reduce with key selector sorting
+ * Needed to wrap tuples to {@code Tuple3<groupKey, sortKey, value>} for combine method of group reduce with key selector sorting.
*/
@Internal
public class Tuple3WrappingCollector<IN, K1, K2> implements Collector<IN>, java.io.Serializable {
@@ -35,7 +35,6 @@ public class Tuple3WrappingCollector<IN, K1, K2> implements Collector<IN>, java.
private Collector<Tuple3<K1, K2, IN>> wrappedCollector;
-
public Tuple3WrappingCollector(Tuple3UnwrappingIterator<IN, K1, K2> tui) {
this.tui = tui;
this.outTuple = new Tuple3<K1, K2, IN>();
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
index 2ff73ef..e39ec47 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleLeftUnwrappingJoiner.java
@@ -23,6 +23,14 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
+/**
+ * Joiner that unwraps values from the left set before applying the join operation.
+ *
+ * @param <I1> type of values in the left set
+ * @param <I2> type of values in the right set
+ * @param <OUT> type of resulting values
+ * @param <K> type of key
+ */
@Internal
public final class TupleLeftUnwrappingJoiner<I1, I2, OUT, K>
extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
index c9b9c27..847acd7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleRightUnwrappingJoiner.java
@@ -23,6 +23,14 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
+/**
+ * Joiner that unwraps values from the right set before applying the join operation.
+ *
+ * @param <I1> type of values in the left set
+ * @param <I2> type of values in the right set
+ * @param <OUT> type of resulting values
+ * @param <K> type of key
+ */
@Internal
public final class TupleRightUnwrappingJoiner<I1, I2, OUT, K>
extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
index 5dbe266..16ebef8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingIterator.java
@@ -18,12 +18,12 @@
package org.apache.flink.api.java.operators.translation;
-import java.util.Iterator;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.TraversableOnceException;
+import java.util.Iterator;
+
/**
* An iterator that reads 2-tuples (key value pairs) and returns only the values (second field).
* The iterator also tracks the keys, as the pairs flow though it.
@@ -32,16 +32,16 @@ import org.apache.flink.util.TraversableOnceException;
public class TupleUnwrappingIterator<T, K> implements Iterator<T>, Iterable<T>, java.io.Serializable {
private static final long serialVersionUID = 1L;
-
- private K lastKey;
+
+ private K lastKey;
private Iterator<Tuple2<K, T>> iterator;
private boolean iteratorAvailable;
-
+
public void set(Iterator<Tuple2<K, T>> iterator) {
this.iterator = iterator;
this.iteratorAvailable = true;
}
-
+
public K getLastKey() {
return lastKey;
}
@@ -53,7 +53,7 @@ public class TupleUnwrappingIterator<T, K> implements Iterator<T>, Iterable<T>,
@Override
public T next() {
- Tuple2<K, T> t = iterator.next();
+ Tuple2<K, T> t = iterator.next();
this.lastKey = t.f0;
return t.f1;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
index e0ee3b3..1e56dac 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleUnwrappingJoiner.java
@@ -23,6 +23,14 @@ import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
+/**
+ * Joiner that unwraps both values before applying the join operation.
+ *
+ * @param <I1> type of values in the left set
+ * @param <I2> type of values in the right set
+ * @param <OUT> type of resulting values
+ * @param <K> type of key
+ */
@Internal
public final class TupleUnwrappingJoiner<I1, I2, OUT, K>
extends WrappingFunction<FlatJoinFunction<I1, I2, OUT>>
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java
index 4581bf2..7369804 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TupleWrappingCollector.java
@@ -23,28 +23,27 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
- * Needed to wrap tuples to {@code Tuple2<key, value>} pairs for combine method of group reduce with key selector function
+ * Needed to wrap tuples to {@code Tuple2<key, value>} pairs for combine method of group reduce with key selector function.
*/
@Internal
public class TupleWrappingCollector<IN, K> implements Collector<IN>, java.io.Serializable {
-
+
private static final long serialVersionUID = 1L;
private final TupleUnwrappingIterator<IN, K> tui;
private final Tuple2<K, IN> outTuple;
-
+
private Collector<Tuple2<K, IN>> wrappedCollector;
-
-
+
public TupleWrappingCollector(TupleUnwrappingIterator<IN, K> tui) {
this.tui = tui;
this.outTuple = new Tuple2<K, IN>();
}
-
+
public void set(Collector<Tuple2<K, IN>> wrappedCollector) {
this.wrappedCollector = wrappedCollector;
}
-
+
@Override
public void close() {
this.wrappedCollector.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
index 7d5e39b..9449237 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/TwoKeyExtractingMapper.java
@@ -19,11 +19,17 @@
package org.apache.flink.api.java.operators.translation;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
+/**
+ * Mapper that extracts two keys of a value.
+ * @param <T> type of the values
+ * @param <K1> type of the first key
+ * @param <K2> type of the second key
+ */
@Internal
@ForwardedFields("*->2")
public final class TwoKeyExtractingMapper<T, K1, K2> extends RichMapFunction<T, Tuple3<K1, K2, T>> {
@@ -36,13 +42,11 @@ public final class TwoKeyExtractingMapper<T, K1, K2> extends RichMapFunction<T,
private final Tuple3<K1, K2, T> tuple = new Tuple3<K1, K2, T>();
-
public TwoKeyExtractingMapper(KeySelector<T, K1> keySelector1, KeySelector<T, K2> keySelector2) {
this.keySelector1 = keySelector1;
this.keySelector2 = keySelector2;
}
-
@Override
public Tuple3<K1, K2, T> map(T value) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
index 9851c42..6c52ace 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@@ -25,9 +25,13 @@ import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
+/**
+ * Wrapper around {@link Function}.
+ * @param <T>
+ */
@Internal
public abstract class WrappingFunction<T extends Function> extends AbstractRichFunction {
-
+
private static final long serialVersionUID = 1L;
protected T wrappedFunction;
@@ -36,21 +40,20 @@ public abstract class WrappingFunction<T extends Function> extends AbstractRichF
this.wrappedFunction = wrappedFunction;
}
-
@Override
public void open(Configuration parameters) throws Exception {
FunctionUtils.openFunction(this.wrappedFunction, parameters);
}
-
+
@Override
public void close() throws Exception {
FunctionUtils.closeFunction(this.wrappedFunction);
}
-
+
@Override
public void setRuntimeContext(RuntimeContext t) {
super.setRuntimeContext(t);
-
+
FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
index 0ce79e3..2f74288 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/AggregateTranslationTest.java
@@ -16,25 +16,28 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.operators.translation;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.types.StringValue;
+
import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of aggregations.
+ */
public class AggregateTranslationTest {
@Test
@@ -42,26 +45,26 @@ public class AggregateTranslationTest {
try {
final int parallelism = 8;
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
-
+
@SuppressWarnings("unchecked")
- DataSet<Tuple3<Double, StringValue, Long>> initialData =
+ DataSet<Tuple3<Double, StringValue, Long>> initialData =
env.fromElements(new Tuple3<Double, StringValue, Long>(3.141592, new StringValue("foobar"), Long.valueOf(77)));
-
+
initialData.groupBy(0).aggregate(Aggregations.MIN, 1).and(Aggregations.SUM, 2).output(new DiscardingOutputFormat<Tuple3<Double, StringValue, Long>>());
-
+
Plan p = env.createProgramPlan();
-
+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
-
+
GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
-
+
// check keys
assertEquals(1, reducer.getKeyColumns(0).length);
assertEquals(0, reducer.getKeyColumns(0)[0]);
-
+
assertEquals(-1, reducer.getParallelism());
assertTrue(reducer.isCombinable());
-
+
assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
index 887173d..9c67e60 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.api.java.operators.translation;
-import static org.junit.Assert.*;
-
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
@@ -31,8 +29,16 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
+
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of co-group sort.
+ */
@SuppressWarnings({"serial", "unchecked"})
public class CoGroupSortTranslationTest implements java.io.Serializable {
@@ -40,35 +46,35 @@ public class CoGroupSortTranslationTest implements java.io.Serializable {
public void testGroupSortTuples() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
-
+
input1.coGroup(input2)
.where(1).equalTo(2)
.sortFirstGroup(0, Order.DESCENDING)
.sortSecondGroup(1, Order.ASCENDING).sortSecondGroup(0, Order.DESCENDING)
-
+
.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Long>, Long>() {
@Override
public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<Tuple3<Long, Long, Long>> second,
Collector<Long> out) {}
})
-
+
.output(new DiscardingOutputFormat<Long>());
-
+
Plan p = env.createProgramPlan();
-
+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
CoGroupOperatorBase<?, ?, ?, ?> coGroup = (CoGroupOperatorBase<?, ?, ?, ?>) sink.getInput();
-
+
assertNotNull(coGroup.getGroupOrderForInputOne());
assertNotNull(coGroup.getGroupOrderForInputTwo());
-
+
assertEquals(1, coGroup.getGroupOrderForInputOne().getNumberOfFields());
assertEquals(0, coGroup.getGroupOrderForInputOne().getFieldNumber(0).intValue());
assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne().getOrder(0));
-
+
assertEquals(2, coGroup.getGroupOrderForInputTwo().getNumberOfFields());
assertEquals(1, coGroup.getGroupOrderForInputTwo().getFieldNumber(0).intValue());
assertEquals(0, coGroup.getGroupOrderForInputTwo().getFieldNumber(1).intValue());
@@ -80,39 +86,39 @@ public class CoGroupSortTranslationTest implements java.io.Serializable {
fail(e.getMessage());
}
}
-
+
@Test
public void testSortTuplesAndPojos() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
DataSet<TestPoJo> input2 = env.fromElements(new TestPoJo());
-
+
input1.coGroup(input2)
.where(1).equalTo("b")
.sortFirstGroup(0, Order.DESCENDING)
.sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING)
-
+
.with(new CoGroupFunction<Tuple2<Long, Long>, TestPoJo, Long>() {
@Override
public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<TestPoJo> second, Collector<Long> out) {}
})
-
+
.output(new DiscardingOutputFormat<Long>());
-
+
Plan p = env.createProgramPlan();
-
+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
CoGroupOperatorBase<?, ?, ?, ?> coGroup = (CoGroupOperatorBase<?, ?, ?, ?>) sink.getInput();
-
+
assertNotNull(coGroup.getGroupOrderForInputOne());
assertNotNull(coGroup.getGroupOrderForInputTwo());
-
+
assertEquals(1, coGroup.getGroupOrderForInputOne().getNumberOfFields());
assertEquals(0, coGroup.getGroupOrderForInputOne().getFieldNumber(0).intValue());
assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne().getOrder(0));
-
+
assertEquals(2, coGroup.getGroupOrderForInputTwo().getNumberOfFields());
assertEquals(2, coGroup.getGroupOrderForInputTwo().getFieldNumber(0).intValue());
assertEquals(0, coGroup.getGroupOrderForInputTwo().getFieldNumber(1).intValue());
@@ -124,7 +130,10 @@ public class CoGroupSortTranslationTest implements java.io.Serializable {
fail(e.getMessage());
}
}
-
+
+ /**
+ * Sample test pojo.
+ */
public static class TestPoJo {
public long a;
public long b;
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index fd60bc6..e4cb8c4 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -16,93 +16,95 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.operators.translation;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.Iterator;
-
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.base.DeltaIterationBase;
import org.apache.flink.api.common.operators.base.InnerJoinOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
+
import org.junit.Test;
+import java.util.Iterator;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of delta iterations.
+ */
@SuppressWarnings("serial")
public class DeltaIterationTranslationTest implements java.io.Serializable {
@Test
public void testCorrectTranslation() {
try {
- final String JOB_NAME = "Test JobName";
- final String ITERATION_NAME = "Test Name";
-
- final String BEFORE_NEXT_WORKSET_MAP = "Some Mapper";
-
- final String AGGREGATOR_NAME = "AggregatorName";
-
- final int[] ITERATION_KEYS = new int[] {2};
- final int NUM_ITERATIONS = 13;
-
- final int DEFAULT_parallelism= 133;
- final int ITERATION_parallelism = 77;
-
+ final String jobName = "Test JobName";
+ final String iterationName = "Test Name";
+
+ final String beforeNextWorksetMap = "Some Mapper";
+
+ final String aggregatorName = "AggregatorName";
+
+ final int[] iterationKeys = new int[] {2};
+ final int numIterations = 13;
+
+ final int defaultParallelism = 133;
+ final int iterationParallelism = 77;
+
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
// ------------ construct the test program ------------------
{
- env.setParallelism(DEFAULT_parallelism);
-
+ env.setParallelism(defaultParallelism);
+
@SuppressWarnings("unchecked")
DataSet<Tuple3<Double, Long, String>> initialSolutionSet = env.fromElements(new Tuple3<Double, Long, String>(3.44, 5L, "abc"));
-
+
@SuppressWarnings("unchecked")
DataSet<Tuple2<Double, String>> initialWorkSet = env.fromElements(new Tuple2<Double, String>(1.23, "abc"));
-
- DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, NUM_ITERATIONS, ITERATION_KEYS);
- iteration.name(ITERATION_NAME).parallelism(ITERATION_parallelism);
-
- iteration.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
-
+
+ DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, numIterations, iterationKeys);
+ iteration.name(iterationName).parallelism(iterationParallelism);
+
+ iteration.registerAggregator(aggregatorName, new LongSumAggregator());
+
// test that multiple workset consumers are supported
- DataSet<Tuple2<Double, String>> worksetSelfJoin =
+ DataSet<Tuple2<Double, String>> worksetSelfJoin =
iteration.getWorkset()
- .map(new IdentityMapper<Tuple2<Double,String>>())
+ .map(new IdentityMapper<Tuple2<Double, String>>())
.join(iteration.getWorkset()).where(1).equalTo(1).projectFirst(0, 1);
-
+
DataSet<Tuple3<Double, Long, String>> joined = worksetSelfJoin.join(iteration.getSolutionSet()).where(1).equalTo(2).with(new SolutionWorksetJoin());
DataSet<Tuple3<Double, Long, String>> result = iteration.closeWith(
joined,
- joined.map(new NextWorksetMapper()).name(BEFORE_NEXT_WORKSET_MAP));
-
+ joined.map(new NextWorksetMapper()).name(beforeNextWorksetMap));
+
result.output(new DiscardingOutputFormat<Tuple3<Double, Long, String>>());
result.writeAsText("/dev/null");
}
-
-
- Plan p = env.createProgramPlan(JOB_NAME);
-
+
+ Plan p = env.createProgramPlan(jobName);
+
// ------------- validate the plan ----------------
- assertEquals(JOB_NAME, p.getJobName());
- assertEquals(DEFAULT_parallelism, p.getDefaultParallelism());
-
+ assertEquals(jobName, p.getJobName());
+ assertEquals(defaultParallelism, p.getDefaultParallelism());
+
// validate the iteration
GenericDataSinkBase<?> sink1, sink2;
{
@@ -110,23 +112,23 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
sink1 = sinks.next();
sink2 = sinks.next();
}
-
+
DeltaIterationBase<?, ?> iteration = (DeltaIterationBase<?, ?>) sink1.getInput();
-
+
// check that multi consumer translation works for iterations
assertEquals(iteration, sink2.getInput());
-
+
// check the basic iteration properties
- assertEquals(NUM_ITERATIONS, iteration.getMaximumNumberOfIterations());
- assertArrayEquals(ITERATION_KEYS, iteration.getSolutionSetKeyFields());
- assertEquals(ITERATION_parallelism, iteration.getParallelism());
- assertEquals(ITERATION_NAME, iteration.getName());
-
+ assertEquals(numIterations, iteration.getMaximumNumberOfIterations());
+ assertArrayEquals(iterationKeys, iteration.getSolutionSetKeyFields());
+ assertEquals(iterationParallelism, iteration.getParallelism());
+ assertEquals(iterationName, iteration.getName());
+
MapOperatorBase<?, ?, ?> nextWorksetMapper = (MapOperatorBase<?, ?, ?>) iteration.getNextWorkset();
InnerJoinOperatorBase<?, ?, ?, ?> solutionSetJoin = (InnerJoinOperatorBase<?, ?, ?, ?>) iteration.getSolutionSetDelta();
InnerJoinOperatorBase<?, ?, ?, ?> worksetSelfJoin = (InnerJoinOperatorBase<?, ?, ?, ?>) solutionSetJoin.getFirstInput();
MapOperatorBase<?, ?, ?> worksetMapper = (MapOperatorBase<?, ?, ?>) worksetSelfJoin.getFirstInput();
-
+
assertEquals(IdentityMapper.class, worksetMapper.getUserCodeWrapper().getUserCodeClass());
assertEquals(NextWorksetMapper.class, nextWorksetMapper.getUserCodeWrapper().getUserCodeClass());
if (solutionSetJoin.getUserCodeWrapper().getUserCodeObject() instanceof WrappingFunction) {
@@ -137,9 +139,9 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
assertEquals(SolutionWorksetJoin.class, solutionSetJoin.getUserCodeWrapper().getUserCodeClass());
}
- assertEquals(BEFORE_NEXT_WORKSET_MAP, nextWorksetMapper.getName());
-
- assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+ assertEquals(beforeNextWorksetMap, nextWorksetMapper.getName());
+
+ assertEquals(aggregatorName, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
}
catch (Exception e) {
System.err.println(e.getMessage());
@@ -147,20 +149,20 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
fail(e.getMessage());
}
}
-
+
@Test
public void testRejectWhenSolutionSetKeysDontMatchJoin() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
@SuppressWarnings("unchecked")
DataSet<Tuple3<Double, Long, String>> initialSolutionSet = env.fromElements(new Tuple3<Double, Long, String>(3.44, 5L, "abc"));
@SuppressWarnings("unchecked")
DataSet<Tuple2<Double, String>> initialWorkSet = env.fromElements(new Tuple2<Double, String>(1.23, "abc"));
-
+
DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, 1);
-
+
try {
iteration.getWorkset().join(iteration.getSolutionSet()).where(1).equalTo(2);
fail("Accepted invalid program.");
@@ -168,7 +170,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
catch (InvalidProgramException e) {
// all good!
}
-
+
try {
iteration.getSolutionSet().join(iteration.getWorkset()).where(2).equalTo(1);
fail("Accepted invalid program.");
@@ -183,20 +185,20 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
fail(e.getMessage());
}
}
-
+
@Test
public void testRejectWhenSolutionSetKeysDontMatchCoGroup() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
+
@SuppressWarnings("unchecked")
DataSet<Tuple3<Double, Long, String>> initialSolutionSet = env.fromElements(new Tuple3<Double, Long, String>(3.44, 5L, "abc"));
@SuppressWarnings("unchecked")
DataSet<Tuple2<Double, String>> initialWorkSet = env.fromElements(new Tuple2<Double, String>(1.23, "abc"));
-
+
DeltaIteration<Tuple3<Double, Long, String>, Tuple2<Double, String>> iteration = initialSolutionSet.iterateDelta(initialWorkSet, 10, 1);
-
+
try {
iteration.getWorkset().coGroup(iteration.getSolutionSet()).where(1).equalTo(2).with(new SolutionWorksetCoGroup1());
fail("Accepted invalid program.");
@@ -204,7 +206,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
catch (InvalidProgramException e) {
// all good!
}
-
+
try {
iteration.getSolutionSet().coGroup(iteration.getWorkset()).where(2).equalTo(1).with(new SolutionWorksetCoGroup2());
fail("Accepted invalid program.");
@@ -219,40 +221,40 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
fail(e.getMessage());
}
}
-
+
// --------------------------------------------------------------------------------------------
-
- public static class SolutionWorksetJoin extends RichJoinFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
+
+ private static class SolutionWorksetJoin extends RichJoinFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
@Override
public Tuple3<Double, Long, String> join(Tuple2<Double, String> first, Tuple3<Double, Long, String> second){
return null;
}
}
-
- public static class NextWorksetMapper extends RichMapFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>> {
+
+ private static class NextWorksetMapper extends RichMapFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>> {
@Override
public Tuple2<Double, String> map(Tuple3<Double, Long, String> value) {
return null;
}
}
-
- public static class IdentityMapper<T> extends RichMapFunction<T, T> {
+
+ private static class IdentityMapper<T> extends RichMapFunction<T, T> {
@Override
public T map(T value) throws Exception {
return value;
}
}
-
- public static class SolutionWorksetCoGroup1 extends RichCoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
+
+ private static class SolutionWorksetCoGroup1 extends RichCoGroupFunction<Tuple2<Double, String>, Tuple3<Double, Long, String>, Tuple3<Double, Long, String>> {
@Override
public void coGroup(Iterable<Tuple2<Double, String>> first, Iterable<Tuple3<Double, Long, String>> second,
Collector<Tuple3<Double, Long, String>> out) {
}
}
-
- public static class SolutionWorksetCoGroup2 extends RichCoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> {
+
+ private static class SolutionWorksetCoGroup2 extends RichCoGroupFunction<Tuple3<Double, Long, String>, Tuple2<Double, String>, Tuple3<Double, Long, String>> {
@Override
public void coGroup(Iterable<Tuple3<Double, Long, String>> second, Iterable<Tuple2<Double, String>> first,
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
index 27c7b2f..6a98a8d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.StringValue;
+
import org.junit.Test;
import java.io.Serializable;
@@ -45,6 +46,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+/**
+ * Tests for translation of distinct operation.
+ */
@SuppressWarnings("serial")
public class DistinctTranslationTest {
@@ -164,7 +168,7 @@ public class DistinctTranslationTest {
DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
- initialData.distinct(new KeySelector<Tuple3<Double,StringValue,LongValue>, StringValue>() {
+ initialData.distinct(new KeySelector<Tuple3<Double, StringValue, LongValue>, StringValue>() {
public StringValue getKey(Tuple3<Double, StringValue, LongValue> value) {
return value.f1;
}
@@ -183,7 +187,7 @@ public class DistinctTranslationTest {
assertEquals(4, reducer.getParallelism());
// check types
- TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>(
+ TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double, StringValue, LongValue>>>(
new ValueTypeInfo<StringValue>(StringValue.class),
initialData.getType());
@@ -245,7 +249,7 @@ public class DistinctTranslationTest {
}
@SuppressWarnings("unchecked")
- private static final DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
+ private static DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
return env.fromElements(new Tuple3<Double, StringValue, LongValue>(3.141592, new StringValue("foobar"), new LongValue(77)))
.setParallelism(1);
}
@@ -256,6 +260,9 @@ public class DistinctTranslationTest {
return env.fromCollection(data);
}
+ /**
+ * Custom data type, for testing purposes.
+ */
public static class CustomType implements Serializable {
private static final long serialVersionUID = 1L;
@@ -269,7 +276,7 @@ public class DistinctTranslationTest {
@Override
public String toString() {
- return ""+myInt;
+ return "" + myInt;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
index 3adbbb8..486cad4 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/ReduceTranslationTests.java
@@ -16,20 +16,19 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.operators.translation;
-import static org.junit.Assert.*;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -37,10 +36,17 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.StringValue;
+
import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for translation of reduce operation.
+ */
@SuppressWarnings("serial")
public class ReduceTranslationTests implements java.io.Serializable {
@@ -49,31 +55,31 @@ public class ReduceTranslationTests implements java.io.Serializable {
try {
final int parallelism = 8;
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
-
+
DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
-
- initialData.reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+
+ initialData.reduce(new RichReduceFunction<Tuple3<Double, StringValue, LongValue>>() {
public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
return value1;
}
}).output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
-
+
Plan p = env.createProgramPlan();
-
+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
-
+
ReduceOperatorBase<?, ?> reducer = (ReduceOperatorBase<?, ?>) sink.getInput();
-
+
// check types
assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
-
+
// check keys
assertTrue(reducer.getKeyColumns(0) == null || reducer.getKeyColumns(0).length == 0);
-
+
// parallelism was not configured on the operator
assertTrue(reducer.getParallelism() == 1 || reducer.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT);
-
+
assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
}
catch (Exception e) {
@@ -82,40 +88,40 @@ public class ReduceTranslationTests implements java.io.Serializable {
fail("Test caused an error: " + e.getMessage());
}
}
-
+
@Test
public void translateGroupedReduceNoMapper() {
try {
final int parallelism = 8;
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
-
+
DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
-
+
initialData
.groupBy(2)
- .reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+ .reduce(new RichReduceFunction<Tuple3<Double, StringValue, LongValue>>() {
public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
return value1;
}
})
.output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
-
+
Plan p = env.createProgramPlan();
-
+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
-
+
ReduceOperatorBase<?, ?> reducer = (ReduceOperatorBase<?, ?>) sink.getInput();
-
+
// check types
assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
-
+
// parallelism was not configured on the operator
assertTrue(reducer.getParallelism() == parallelism || reducer.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT);
-
+
// check keys
assertArrayEquals(new int[] {2}, reducer.getKeyColumns(0));
-
+
assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
}
catch (Exception e) {
@@ -124,60 +130,58 @@ public class ReduceTranslationTests implements java.io.Serializable {
fail("Test caused an error: " + e.getMessage());
}
}
-
-
+
@Test
public void translateGroupedReduceWithkeyExtractor() {
try {
final int parallelism = 8;
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(parallelism);
-
+
DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
-
+
initialData
- .groupBy(new KeySelector<Tuple3<Double,StringValue,LongValue>, StringValue>() {
+ .groupBy(new KeySelector<Tuple3<Double, StringValue, LongValue>, StringValue>() {
public StringValue getKey(Tuple3<Double, StringValue, LongValue> value) {
return value.f1;
}
})
- .reduce(new RichReduceFunction<Tuple3<Double,StringValue,LongValue>>() {
+ .reduce(new RichReduceFunction<Tuple3<Double, StringValue, LongValue>>() {
public Tuple3<Double, StringValue, LongValue> reduce(Tuple3<Double, StringValue, LongValue> value1, Tuple3<Double, StringValue, LongValue> value2) {
return value1;
}
}).setParallelism(4)
.output(new DiscardingOutputFormat<Tuple3<Double, StringValue, LongValue>>());
-
+
Plan p = env.createProgramPlan();
-
+
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
-
-
+
MapOperatorBase<?, ?, ?> keyProjector = (MapOperatorBase<?, ?, ?>) sink.getInput();
PlanUnwrappingReduceOperator<?, ?> reducer = (PlanUnwrappingReduceOperator<?, ?>) keyProjector.getInput();
MapOperatorBase<?, ?, ?> keyExtractor = (MapOperatorBase<?, ?, ?>) reducer.getInput();
-
+
// check the parallelisms
assertEquals(1, keyExtractor.getParallelism());
assertEquals(4, reducer.getParallelism());
assertEquals(4, keyProjector.getParallelism());
-
+
// check types
- TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>(
+ TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double, StringValue, LongValue>>>(
new ValueTypeInfo<StringValue>(StringValue.class),
initialData.getType());
-
+
assertEquals(initialData.getType(), keyExtractor.getOperatorInfo().getInputType());
assertEquals(keyValueInfo, keyExtractor.getOperatorInfo().getOutputType());
-
+
assertEquals(keyValueInfo, reducer.getOperatorInfo().getInputType());
assertEquals(keyValueInfo, reducer.getOperatorInfo().getOutputType());
-
+
assertEquals(keyValueInfo, keyProjector.getOperatorInfo().getInputType());
assertEquals(initialData.getType(), keyProjector.getOperatorInfo().getOutputType());
-
+
// check keys
assertEquals(KeyExtractingMapper.class, keyExtractor.getUserCodeWrapper().getUserCodeClass());
-
+
assertTrue(keyExtractor.getInput() instanceof GenericDataSourceBase<?, ?>);
}
catch (Exception e) {
@@ -186,9 +190,9 @@ public class ReduceTranslationTests implements java.io.Serializable {
fail("Test caused an error: " + e.getMessage());
}
}
-
+
@SuppressWarnings("unchecked")
- private static final DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
+ private static DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
return env.fromElements(new Tuple3<Double, StringValue, LongValue>(3.141592, new StringValue("foobar"), new LongValue(77)))
.setParallelism(1);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8e975362/tools/maven/suppressions-java.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions-java.xml b/tools/maven/suppressions-java.xml
index d7e42e5..3bb8556 100644
--- a/tools/maven/suppressions-java.xml
+++ b/tools/maven/suppressions-java.xml
@@ -33,14 +33,6 @@ under the License.
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
<suppress
- files="(.*)api[/\\]java[/\\]operators[/\\]translation[/\\](.*)"
- checks="AvoidStarImport|NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
- <!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
- <suppress
- files="(.*)test[/\\](.*)api[/\\]java[/\\]operators[/\\]translation[/\\](.*)"
- checks="AvoidStarImport"/>
-
- <suppress
files="(.*)api[/\\]java[/\\]operator[/\\]([^/\\]*\.java)"
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>