You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/03/29 13:50:39 UTC

[10/12] flink git commit: [FLINK-947] Add parser to Expression API for exposing it to Java

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala
new file mode 100644
index 0000000..f2e0286
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/GroupedAggreagationsITCase.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = ""
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testGroupingOnNonExistentField: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+      .groupBy('_foo)
+      .select('a.avg)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = ""
+  }
+
+  @Test
+  def testGroupedAggregate: Unit = {
+
+    // the grouping key needs to be forwarded to the intermediate DataSet, even
+    // if we don't want the key in the output
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+      .groupBy('b)
+      .select('b, 'a.sum)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
+  }
+
+  @Test
+  def testGroupingKeyForwardIfNotUsed: Unit = {
+
+    // the grouping key needs to be forwarded to the intermediate DataSet, even
+    // if we don't want the key in the output
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+      .groupBy('b)
+      .select('a.sum)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala
new file mode 100644
index 0000000..91b3b19
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/JoinITCase.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = ""
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test
+  def testJoin: Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g)
+
+    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
+  }
+
+  @Test
+  def testJoinWithFilter: Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
+
+    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "Hi,Hallo\n"
+  }
+
+  @Test
+  def testJoinWithMultipleKeys: Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
+
+    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
+      "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testJoinNonExistingKey: Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g)
+
+    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = ""
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testJoinWithNonMatchingKeyTypes: Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g)
+
+    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = ""
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testJoinWithAmbiguousFields: Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'c)
+
+    val joinDs = ds1.join(ds2).where('a === 'd).select('c, 'g)
+
+    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = ""
+  }
+
+  @Test
+  def testJoinWithAggregation: Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h)
+
+    val joinDs = ds1.join(ds2).where('a === 'd).select('g.count)
+
+    joinDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "6"
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala
new file mode 100644
index 0000000..a799b60
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/SelectITCase.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = ""
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test
+  def testSimpleSelectAll: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).toExpression.select('_1, '_2, '_3)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+  }
+
+  @Test
+  def testSimpleSelectAllWithAs: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c).select('a, 'b, 'c)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+  }
+
+  @Test
+  def testSimpleSelectWithNaming: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).toExpression
+      .select('_1 as 'a, '_2 as 'b)
+      .select('a, 'b)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithToFewFields: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "no"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithToManyFields: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "no"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testAsWithAmbiguousFields: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "no"
+  }
+
+
+  @Test(expected = classOf[ExpressionException])
+  def testOnlyFieldRefInAs: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 'd)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "no"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7d9b639/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala
new file mode 100644
index 0000000..c6c1113
--- /dev/null
+++ b/flink-staging/flink-expressions/src/test/scala/org/apache/flink/api/scala/expressions/test/StringExpressionsITCase.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.expressions.test
+
+import org.apache.flink.api.expressions.ExpressionException
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.expressions._
+import org.apache.flink.core.fs.FileSystem.WriteMode
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit._
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+@RunWith(classOf[Parameterized])
+class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+  private var resultPath: String = null
+  private var expected: String = ""
+  private val _tempFolder = new TemporaryFolder()
+
+  @Rule
+  def tempFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    resultPath = tempFolder.newFile().toURI.toString
+  }
+
+  @After
+  def after: Unit = {
+    compareResultsByLinesInMemory(expected, resultPath)
+  }
+
+  @Test
+  def testSubstring: Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b)
+      .select('a.substring(0, 'b))
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "AA\nB"
+  }
+
+  @Test
+  def testSubstringWithMaxEnd: Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b)
+      .select('a.substring('b))
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "CD\nBCD"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testNonWorkingSubstring1: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b)
+      .select('a.substring(0, 'b))
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "AAA\nBB"
+  }
+
+  @Test(expected = classOf[ExpressionException])
+  def testNonWorkingSubstring2: Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b)
+      .select('a.substring('b, 15))
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "AAA\nBB"
+  }
+
+
+}