You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/03/29 13:50:40 UTC
[11/12] flink git commit: [FLINK-947] Add parser to Expression API
for exposing it to Java
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java
new file mode 100644
index 0000000..89ec2e5
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.expressions.test;
+
+import org.apache.flink.api.expressions.ExpressionException;
+import org.apache.flink.api.expressions.ExpressionOperation;
+import org.apache.flink.api.expressions.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.expressions.ExpressionUtil;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class SelectITCase extends MultipleProgramsTestBase {
+
+
+ public SelectITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expected = "";
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception {
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception {
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
+
+ @Test
+ public void testSimpleSelectAllWithAs() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+ ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a,b,c");
+
+ ExpressionOperation<JavaBatchTranslator> result = in
+ .select("a, b, c");
+
+ DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class);
+ resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+
+ expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+ "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+ "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+ "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+ "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+ "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n";
+
+ }
+
+ @Test
+ public void testSimpleSelectWithNaming() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+ ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds);
+
+ ExpressionOperation<JavaBatchTranslator> result = in
+ .select("f0 as a, f1 as b")
+ .select("a, b");
+
+ DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class);
+ resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+
+ expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+ "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+ "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+ }
+
+ @Test(expected = ExpressionException.class)
+ public void testAsWithToFewFields() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+ ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b");
+
+ DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class);
+ resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+
+ expected = " sorry dude ";
+ }
+
+ @Test(expected = ExpressionException.class)
+ public void testAsWithToManyFields() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+ ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b, c, d");
+
+ DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class);
+ resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+
+ expected = " sorry dude ";
+ }
+
+ @Test(expected = ExpressionException.class)
+ public void testAsWithAmbiguousFields() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+ ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b, c, b");
+
+ DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class);
+ resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+
+ expected = " today's not your day ";
+ }
+
+ @Test(expected = ExpressionException.class)
+ public void testOnlyFieldRefInAs() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+ ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b as c, d");
+
+ DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class);
+ resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+
+ expected = "sorry bro";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java
new file mode 100644
index 0000000..f9f1c6b
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.expressions.test;
+
+import org.apache.flink.api.expressions.ExpressionException;
+import org.apache.flink.api.expressions.ExpressionOperation;
+import org.apache.flink.api.expressions.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.expressions.ExpressionUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class StringExpressionsITCase extends MultipleProgramsTestBase {
+
+
+ public StringExpressionsITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ private String resultPath;
+ private String expected = "";
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception {
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+
+ @After
+ public void after() throws Exception {
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
+
+ @Test
+ public void testSubstring() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+ new Tuple2<String, Integer>("AAAA", 2),
+ new Tuple2<String, Integer>("BBBB", 1));
+
+ ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b");
+
+ ExpressionOperation<JavaBatchTranslator> result = in
+ .select("a.substring(0, b)");
+
+ DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class);
+ resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+
+ expected = "AA\nB";
+ }
+
+ @Test
+ public void testSubstringWithMaxEnd() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple2<String, Integer>> ds = env.fromElements(
+ new Tuple2<String, Integer>("ABCD", 2),
+ new Tuple2<String, Integer>("ABCD", 1));
+
+ ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b");
+
+ ExpressionOperation<JavaBatchTranslator> result = in
+ .select("a.substring(b)");
+
+ DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class);
+ resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+
+ expected = "CD\nBCD";
+ }
+
+ @Test(expected = ExpressionException.class)
+ public void testNonWorkingSubstring1() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple2<String, Float>> ds = env.fromElements(
+ new Tuple2<String, Float>("ABCD", 2.0f),
+ new Tuple2<String, Float>("ABCD", 1.0f));
+
+ ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b");
+
+ ExpressionOperation<JavaBatchTranslator> result = in
+ .select("a.substring(0, b)");
+
+ DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class);
+ resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+
+ expected = "";
+ }
+
+ @Test(expected = ExpressionException.class)
+ public void testNonWorkingSubstring2() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple2<String, String>> ds = env.fromElements(
+ new Tuple2<String, String>("ABCD", "a"),
+ new Tuple2<String, String>("ABCD", "b"));
+
+ ExpressionOperation<JavaBatchTranslator> in = ExpressionUtil.from(ds, "a, b");
+
+ ExpressionOperation<JavaBatchTranslator> result = in
+ .select("a.substring(b, 15)");
+
+ DataSet<Row> resultSet = ExpressionUtil.toSet(result, Row.class);
+ resultSet.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+ env.execute();
+
+ expected = "";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java
new file mode 100644
index 0000000..b75a2ee
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java
@@ -0,0 +1,100 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.api.scala.expressions.test;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.examples.scala.PageRankExpression;
+import org.apache.flink.test.testdata.PageRankData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+@RunWith(Parameterized.class)
+public class PageRankExpressionITCase extends JavaProgramTestBase {
+
+ private static int NUM_PROGRAMS = 2;
+
+ private int curProgId = config.getInteger("ProgramId", -1);
+
+ private String verticesPath;
+ private String edgesPath;
+ private String resultPath;
+ private String expectedResult;
+
+ public PageRankExpressionITCase(Configuration config) {
+ super(config);
+ }
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES);
+ edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ expectedResult = runProgram(curProgId);
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.01);
+ }
+
+ @Parameters
+ public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+
+ LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+ for(int i=1; i <= NUM_PROGRAMS; i++) {
+ Configuration config = new Configuration();
+ config.setInteger("ProgramId", i);
+ tConfigs.add(config);
+ }
+
+ return toParameterList(tConfigs);
+ }
+
+
+ public String runProgram(int progId) throws Exception {
+
+ switch(progId) {
+ case 1: {
+ PageRankExpression.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData
+ .NUM_VERTICES + "", "3"});
+ return PageRankData.RANKS_AFTER_3_ITERATIONS;
+ }
+ case 2: {
+ // start with a very high number of iteration such that the dynamic convergence criterion must handle termination
+ PageRankExpression.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
+ return PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
+ }
+
+ default:
+ throw new IllegalArgumentException("Invalid program id");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AggregationsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AggregationsITCase.scala
deleted file mode 100644
index fee7ac8..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AggregationsITCase.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
- private var expected: String = ""
- private val _tempFolder = new TemporaryFolder()
-
- @Rule
- def tempFolder = _tempFolder
-
- @Before
- def before(): Unit = {
- resultPath = tempFolder.newFile().toURI.toString
- }
-
- @After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
- }
-
- @Test
- def testAggregationTypes: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
- .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "231,1,21,21,11"
- }
-
- @Test(expected = classOf[ExpressionException])
- def testAggregationOnNonExistingField: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
- .select('foo.avg)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = ""
- }
-
- @Test
- def testWorkingAggregationDataTypes: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = env.fromElements(
- (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
- (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toExpression
- .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1,1,1,1,1.5,1.5,2"
- }
-
- @Test
- def testAggregationWithArithmetic: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toExpression
- .select(('_1 + 2).avg + 2, '_2.count + " THE COUNT")
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "5.5,2 THE COUNT"
- }
-
- @Test(expected = classOf[ExpressionException])
- def testNonWorkingAggregationDataTypes: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = env.fromElements(("Hello", 1)).toExpression
- .select('_1.sum)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = ""
- }
-
- @Test(expected = classOf[ExpressionException])
- def testNoNestedAggregations: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = env.fromElements(("Hello", 1)).toExpression
- .select('_2.sum.sum)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = ""
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AsITCase.scala
deleted file mode 100644
index 8921e89..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/AsITCase.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
- private var expected: String = ""
- private val _tempFolder = new TemporaryFolder()
-
- @Rule
- def tempFolder = _tempFolder
-
- @Before
- def before(): Unit = {
- resultPath = tempFolder.newFile().toURI.toString
- }
-
- @After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
- }
-
- @Test
- def testAs: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
- "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
- "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
- "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
- "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
- "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- }
-
- @Test(expected = classOf[ExpressionException])
- def testAsWithToFewFields: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "no"
- }
-
- @Test(expected = classOf[ExpressionException])
- def testAsWithToManyFields: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "no"
- }
-
- @Test(expected = classOf[ExpressionException])
- def testAsWithAmbiguousFields: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "no"
- }
-
- @Test(expected = classOf[ExpressionException])
- def testAsWithNonFieldReference1: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- // as can only have field references
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "no"
- }
-
- @Test(expected = classOf[ExpressionException])
- def testAsWithNonFieldReference2: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- // as can only have field references
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "no"
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/CastingITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/CastingITCase.scala
deleted file mode 100644
index b3f8ef3..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/CastingITCase.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
- private var expected: String = ""
- private val _tempFolder = new TemporaryFolder()
-
- @Rule
- def tempFolder = _tempFolder
-
- @Before
- def before(): Unit = {
- resultPath = tempFolder.newFile().toURI.toString
- }
-
- @After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
- }
-
- @Test
- def testAutoCastToString: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toExpression
- .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d")
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1b,1s,1i,1L,1.0f,1.0d"
- }
-
- @Test
- def testNumericAutoCastInArithmetic: Unit = {
-
- // don't test everything, just some common cast directions
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toExpression
- .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "2,2,2,2.0,2.0,2.0"
- }
-
- @Test
- def testNumericAutoCastInComparison: Unit = {
-
- // don't test everything, just some common cast directions
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = env.fromElements(
- (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d),
- (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f)
- .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d && 'f > 1)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "2,2,2,2,2.0,2.0"
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/ExpressionsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/ExpressionsITCase.scala
deleted file mode 100644
index de41f65..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/ExpressionsITCase.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
- private var expected: String = ""
- private val _tempFolder = new TemporaryFolder()
-
- @Rule
- def tempFolder = _tempFolder
-
- @Before
- def before(): Unit = {
- resultPath = tempFolder.newFile().toURI.toString
- }
-
- @After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
- }
-
- @Test
- def testArithmetic: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = env.fromElements((5, 10)).as('a, 'b)
- .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "0,10,2,10,1,-5"
- }
-
- @Test
- def testLogic: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = env.fromElements((5, true)).as('a, 'b)
- .select('b && true, 'b && false, 'b || false, !'b)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "true,false,true,false"
- }
-
- @Test
- def testComparisons: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
- .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "true,true,false,false,true"
- }
-
- @Test
- def testBitwiseOperations: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val ds = env.fromElements((3.toByte, 5.toByte)).as('a, 'b)
- .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1,7,6,-4"
- }
-
- @Test
- def testBitwiseWithAutocast: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val ds = env.fromElements((3, 5.toByte)).as('a, 'b)
- .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1,7,6,-4"
- }
-
- @Test(expected = classOf[ExpressionException])
- def testBitwiseWithNonWorkingAutocast: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val ds = env.fromElements((3.0, 5)).as('a, 'b)
- .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1,7,6,-4"
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/FilterITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/FilterITCase.scala
deleted file mode 100644
index 4b46458..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/FilterITCase.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions.tree.Literal
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit._
-
-
-@RunWith(classOf[Parameterized])
-class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
- private var expected: String = null
- private val _tempFolder = new TemporaryFolder()
-
- @Rule
- def tempFolder = _tempFolder
-
- @Before
- def before(): Unit = {
- resultPath = tempFolder.newFile().toURI.toString
- }
-
- @After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
- }
-
- @Test
- def testAllRejectingFilter: Unit = {
- /*
- * Test all-rejecting filter.
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-
- val filterDs = ds.filter( Literal(false) )
-
- filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- env.execute()
- expected = "\n"
- }
-
- @Test
- def testAllPassingFilter: Unit = {
- /*
- * Test all-passing filter.
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-
- val filterDs = ds.filter( Literal(true) )
-
- filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- env.execute()
- expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
- "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
- "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
- "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
- "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
- "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- }
-
- @Test
- def testFilterOnStringTupleField: Unit = {
- /*
- * Test filter on String tuple field.
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env)
- val filterDs = ds.filter( _._3.contains("world") )
- filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- env.execute()
- expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
- }
-
- @Test
- def testFilterOnIntegerTupleField: Unit = {
- /*
- * Test filter on Integer tuple field.
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
-
- val filterDs = ds.filter( 'a % 2 === 0 )
-
- filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- env.execute()
- expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
- "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
- "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
- }
-
- // These two not yet done, but are planned
-
- @Ignore
- @Test
- def testFilterBasicType: Unit = {
- /*
- * Test filter on basic type
- */
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.getStringDataSet(env)
-
- val filterDs = ds.filter( _.startsWith("H") )
-
- filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
- }
-
- @Ignore
- @Test
- def testFilterOnCustomType: Unit = {
- /*
- * Test filter on custom type
- */
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.getCustomTypeDataSet(env)
- val filterDs = ds.filter( _.myString.contains("a") )
- filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/GroupedAggreagationsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/GroupedAggreagationsITCase.scala
deleted file mode 100644
index 06f61db..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/GroupedAggreagationsITCase.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
- private var expected: String = ""
- private val _tempFolder = new TemporaryFolder()
-
- @Rule
- def tempFolder = _tempFolder
-
- @Before
- def before(): Unit = {
- resultPath = tempFolder.newFile().toURI.toString
- }
-
- @After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
- }
-
- @Test(expected = classOf[ExpressionException])
- def testGroupingOnNonExistentField: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
- .groupBy('_foo)
- .select('a.avg)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = ""
- }
-
- @Test
- def testGroupedAggregate: Unit = {
-
- // the grouping key needs to be forwarded to the intermediate DataSet, even
- // if we don't want the key in the output
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
- .groupBy('b)
- .select('b, 'a.sum)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
- }
-
- @Test
- def testGroupingKeyForwardIfNotUsed: Unit = {
-
- // the grouping key needs to be forwarded to the intermediate DataSet, even
- // if we don't want the key in the output
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
- .groupBy('b)
- .select('a.sum)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
- }
-
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/JoinITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/JoinITCase.scala
deleted file mode 100644
index d52acf6..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/JoinITCase.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
- private var expected: String = ""
- private val _tempFolder = new TemporaryFolder()
-
- @Rule
- def tempFolder = _tempFolder
-
- @Before
- def before(): Unit = {
- resultPath = tempFolder.newFile().toURI.toString
- }
-
- @After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
- }
-
- @Test
- def testJoin: Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
- val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g)
-
- joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- env.execute()
- expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
- }
-
- @Test
- def testJoinWithFilter: Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
- val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
-
- joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- env.execute()
- expected = "Hi,Hallo\n"
- }
-
- @Test
- def testJoinWithMultipleKeys: Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
- val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
-
- joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- env.execute()
- expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
- "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
- }
-
- @Test(expected = classOf[ExpressionException])
- def testJoinNonExistingKey: Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
- val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g)
-
- joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- env.execute()
- expected = ""
- }
-
- @Test(expected = classOf[ExpressionException])
- def testJoinWithNonMatchingKeyTypes: Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
- val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g)
-
- joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- env.execute()
- expected = ""
- }
-
- @Test
- def testJoinWithAggregation: Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
- val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
-
- val joinDs = ds1.join(ds2).where('a === 'd).select('g.count)
-
- joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
- env.execute()
- expected = "6"
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/PageRankExpressionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/PageRankExpressionITCase.java b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/PageRankExpressionITCase.java
deleted file mode 100644
index aefc8cf..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/PageRankExpressionITCase.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.api.scala.expressions;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.examples.scala.PageRankExpression;
-import org.apache.flink.test.testdata.PageRankData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
-@RunWith(Parameterized.class)
-public class PageRankExpressionITCase extends JavaProgramTestBase {
-
- private static int NUM_PROGRAMS = 2;
-
- private int curProgId = config.getInteger("ProgramId", -1);
-
- private String verticesPath;
- private String edgesPath;
- private String resultPath;
- private String expectedResult;
-
- public PageRankExpressionITCase(Configuration config) {
- super(config);
- }
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result");
- verticesPath = createTempFile("vertices.txt", PageRankData.VERTICES);
- edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
- }
-
- @Override
- protected void testProgram() throws Exception {
- expectedResult = runProgram(curProgId);
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 0.01);
- }
-
- @Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
-
- LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
-
- for(int i=1; i <= NUM_PROGRAMS; i++) {
- Configuration config = new Configuration();
- config.setInteger("ProgramId", i);
- tConfigs.add(config);
- }
-
- return toParameterList(tConfigs);
- }
-
-
- public String runProgram(int progId) throws Exception {
-
- switch(progId) {
- case 1: {
- PageRankExpression.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData
- .NUM_VERTICES + "", "3"});
- return PageRankData.RANKS_AFTER_3_ITERATIONS;
- }
- case 2: {
- // start with a very high number of iteration such that the dynamic convergence criterion must handle termination
- PageRankExpression.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
- return PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
- }
-
- default:
- throw new IllegalArgumentException("Invalid program id");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/SelectITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/SelectITCase.scala
deleted file mode 100644
index b286421..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/SelectITCase.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
- private var expected: String = ""
- private val _tempFolder = new TemporaryFolder()
-
- @Rule
- def tempFolder = _tempFolder
-
- @Before
- def before(): Unit = {
- resultPath = tempFolder.newFile().toURI.toString
- }
-
- @After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
- }
-
- @Test
- def testSimpleSelectAll: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).toExpression.select('_1, '_2, '_3)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
- "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
- "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
- "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
- "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
- "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- }
-
- @Test
- def testSimpleSelectAllWithAs: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
- "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
- "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
- "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
- "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
- "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
- }
-
- @Test
- def testSimpleSelectWithNaming: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
- .select('_1 as 'a, '_2 as 'b)
- .select('a, 'b)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
- "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
- "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
- }
-
- @Test(expected = classOf[ExpressionException])
- def testAsWithToFewFields: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "no"
- }
-
- @Test(expected = classOf[ExpressionException])
- def testAsWithToManyFields: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "no"
- }
-
- @Test(expected = classOf[ExpressionException])
- def testAsWithAmbiguousFields: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c as 'b)
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "no"
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/StringExpressionsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/StringExpressionsITCase.scala
deleted file mode 100644
index 3a7ad02..0000000
--- a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/StringExpressionsITCase.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expressions
-
-import org.apache.flink.api.expressions.ExpressionException
-import org.apache.flink.api.scala._
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-@RunWith(classOf[Parameterized])
-class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
- private var resultPath: String = null
- private var expected: String = ""
- private val _tempFolder = new TemporaryFolder()
-
- @Rule
- def tempFolder = _tempFolder
-
- @Before
- def before(): Unit = {
- resultPath = tempFolder.newFile().toURI.toString
- }
-
- @After
- def after: Unit = {
- compareResultsByLinesInMemory(expected, resultPath)
- }
-
- @Test
- def testSubstring: Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
- .select('a.substring(0, 'b))
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "AA\nB"
- }
-
- @Test
- def testSubstringWithMaxEnd: Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b)
- .select('a.substring('b))
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "CD\nBCD"
- }
-
- @Test(expected = classOf[ExpressionException])
- def testNonWorkingSubstring1: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
- .select('a.substring(0, 'b))
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "AAA\nBB"
- }
-
- @Test(expected = classOf[ExpressionException])
- def testNonWorkingSubstring2: Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)
- .select('a.substring('b, 15))
-
- ds.writeAsText(resultPath, WriteMode.OVERWRITE)
- env.execute()
- expected = "AAA\nBB"
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala
new file mode 100644
index 0000000..4a358bc
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AggregationsITCase.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+ private var resultPath: String = null
+ private var expected: String = ""
+ private val _tempFolder = new TemporaryFolder()
+
+ @Rule
+ def tempFolder = _tempFolder
+
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile().toURI.toString
+ }
+
+ @After
+ def after: Unit = {
+ compareResultsByLinesInMemory(expected, resultPath)
+ }
+
+ @Test
+ def testAggregationTypes: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
+ .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "231,1,21,21,11"
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testAggregationOnNonExistingField: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
+ .select('foo.avg)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = ""
+ }
+
+ @Test
+ def testWorkingAggregationDataTypes: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.fromElements(
+ (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+ (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toExpression
+ .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,1,1,1,1.5,1.5,2"
+ }
+
+ @Test
+ def testAggregationWithArithmetic: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toExpression
+ .select(('_1 + 2).avg + 2, '_2.count + " THE COUNT")
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "5.5,2 THE COUNT"
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testNonWorkingAggregationDataTypes: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.fromElements(("Hello", 1)).toExpression
+ .select('_1.sum)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = ""
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testNoNestedAggregations: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.fromElements(("Hello", 1)).toExpression
+ .select('_2.sum.sum)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = ""
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala
new file mode 100644
index 0000000..18d7b09
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/AsITCase.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class AsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+ private var resultPath: String = null
+ private var expected: String = ""
+ private val _tempFolder = new TemporaryFolder()
+
+ @Rule
+ def tempFolder = _tempFolder
+
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile().toURI.toString
+ }
+
+ @After
+ def after: Unit = {
+ compareResultsByLinesInMemory(expected, resultPath)
+ }
+
+ @Test
+ def testAs: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+ "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+ "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+ "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+ "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+ "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testAsWithToFewFields: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "no"
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testAsWithToManyFields: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "no"
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testAsWithAmbiguousFields: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "no"
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testAsWithNonFieldReference1: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ // as can only have field references
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "no"
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testAsWithNonFieldReference2: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ // as can only have field references
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "no"
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala
new file mode 100644
index 0000000..599ef6b
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/CastingITCase.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+ private var resultPath: String = null
+ private var expected: String = ""
+ private val _tempFolder = new TemporaryFolder()
+
+ @Rule
+ def tempFolder = _tempFolder
+
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile().toURI.toString
+ }
+
+ @After
+ def after: Unit = {
+ compareResultsByLinesInMemory(expected, resultPath)
+ }
+
+ @Test
+ def testAutoCastToString: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toExpression
+ .select('_1 + "b", '_2 + "s", '_3 + "i", '_4 + "L", '_5 + "f", '_6 + "d")
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1b,1s,1i,1L,1.0f,1.0d"
+ }
+
+ @Test
+ def testNumericAutoCastInArithmetic: Unit = {
+
+ // don't test everything, just some common cast directions
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.fromElements((1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d)).toExpression
+ .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f, '_5 + 1.0d, '_6 + 1)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "2,2,2,2.0,2.0,2.0"
+ }
+
+ @Test
+ def testNumericAutoCastInComparison: Unit = {
+
+ // don't test everything, just some common cast directions
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.fromElements(
+ (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d),
+ (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d)).as('a, 'b, 'c, 'd, 'e, 'f)
+ .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d && 'f > 1)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "2,2,2,2,2.0,2.0"
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala
new file mode 100644
index 0000000..9d37f70
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/ExpressionsITCase.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+ private var resultPath: String = null
+ private var expected: String = ""
+ private val _tempFolder = new TemporaryFolder()
+
+ @Rule
+ def tempFolder = _tempFolder
+
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile().toURI.toString
+ }
+
+ @After
+ def after: Unit = {
+ compareResultsByLinesInMemory(expected, resultPath)
+ }
+
+ @Test
+ def testArithmetic: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.fromElements((5, 10)).as('a, 'b)
+ .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "0,10,2,10,1,-5"
+ }
+
+ @Test
+ def testLogic: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.fromElements((5, true)).as('a, 'b)
+ .select('b && true, 'b && false, 'b || false, !'b)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "true,false,true,false"
+ }
+
+ @Test
+ def testComparisons: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = env.fromElements((5, 5, 4)).as('a, 'b, 'c)
+ .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "true,true,false,false,true"
+ }
+
+ @Test
+ def testBitwiseOperations: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val ds = env.fromElements((3.toByte, 5.toByte)).as('a, 'b)
+ .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,7,6,-4"
+ }
+
+ @Test
+ def testBitwiseWithAutocast: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val ds = env.fromElements((3, 5.toByte)).as('a, 'b)
+ .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,7,6,-4"
+ }
+
+ @Test(expected = classOf[ExpressionException])
+ def testBitwiseWithNonWorkingAutocast: Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val ds = env.fromElements((3.0, 5)).as('a, 'b)
+ .select('a & 'b, 'a | 'b, 'a ^ 'b, ~'a)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,7,6,-4"
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala
new file mode 100644
index 0000000..2841534
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/FilterITCase.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.expressions.tree.Literal
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+
+@RunWith(classOf[Parameterized])
+class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+ private var resultPath: String = null
+ private var expected: String = null
+ private val _tempFolder = new TemporaryFolder()
+
+ @Rule
+ def tempFolder = _tempFolder
+
+ @Before
+ def before(): Unit = {
+ resultPath = tempFolder.newFile().toURI.toString
+ }
+
+ @After
+ def after: Unit = {
+ compareResultsByLinesInMemory(expected, resultPath)
+ }
+
+ @Test
+ def testAllRejectingFilter: Unit = {
+ /*
+ * Test all-rejecting filter.
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+ val filterDs = ds.filter( Literal(false) )
+
+ filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "\n"
+ }
+
+ @Test
+ def testAllPassingFilter: Unit = {
+ /*
+ * Test all-passing filter.
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+ val filterDs = ds.filter( Literal(true) )
+
+ filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+ "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+ "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+ "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+ "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+ "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+ }
+
+ @Test
+ def testFilterOnStringTupleField: Unit = {
+ /*
+ * Test filter on String tuple field.
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env)
+ val filterDs = ds.filter( _._3.contains("world") )
+ filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
+ }
+
+ @Test
+ def testFilterOnIntegerTupleField: Unit = {
+ /*
+ * Test filter on Integer tuple field.
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+ val filterDs = ds.filter( 'a % 2 === 0 )
+
+ filterDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+ env.execute()
+ expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+ "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+ "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
+ }
+
+ // These two not yet done, but are planned
+
+ @Ignore
+ @Test
+ def testFilterBasicType: Unit = {
+ /*
+ * Test filter on basic type
+ */
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.getStringDataSet(env)
+
+ val filterDs = ds.filter( _.startsWith("H") )
+
+ filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
+ }
+
+ @Ignore
+ @Test
+ def testFilterOnCustomType: Unit = {
+ /*
+ * Test filter on custom type
+ */
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.getCustomTypeDataSet(env)
+ val filterDs = ds.filter( _.myString.contains("a") )
+ filterDs.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
+ }
+
+}