You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2015/10/06 14:30:27 UTC

flink git commit: [FLINK-2642] [table] Scala Table API crashes when executing word count example

Repository: flink
Updated Branches:
  refs/heads/master 4c5d43b6f -> 4938ff09f


[FLINK-2642] [table] Scala Table API crashes when executing word count example

This closes #1209.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4938ff09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4938ff09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4938ff09

Branch: refs/heads/master
Commit: 4938ff09f692d7b8b1c3af16125d2216eb5c623c
Parents: 4c5d43b
Author: twalthr <tw...@apache.org>
Authored: Fri Oct 2 11:12:01 2015 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Oct 6 14:26:27 2015 +0200

----------------------------------------------------------------------
 .../flink/api/table/plan/PlanTranslator.scala   |  3 +-
 .../flink/examples/scala/WordCountTable.scala   | 45 ++++++++++++++++++++
 .../scala/table/test/TypeExceptionTest.scala    | 42 ++++++++++++++++++
 3 files changed, 89 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4938ff09/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
index 354c7d4..ba8aba4 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
@@ -110,7 +110,8 @@ abstract class PlanTranslator {
     }
 
     val clazz = repr.getType().getTypeClass
-    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
+    if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers))
+        || clazz.getCanonicalName() == null) {
       throw new ExpressionException("Cannot create Table from DataSet or DataStream of type " +
         clazz.getName + ". Only top-level classes or static members classes " +
         " are supported.")

http://git-wip-us.apache.org/repos/asf/flink/blob/4938ff09/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
new file mode 100644
index 0000000..cac9590
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+
+/**
+ * Simple example for demonstrating the use of the Table API for a Word Count.
+ */
+object WordCountTable {
+
+  case class WC(word: String, count: Int)
+
+  def main(args: Array[String]): Unit = {
+
+    // set up execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
+    val expr = input.toTable
+    val result = expr
+      .groupBy('word)
+      .select('word, 'count.sum as 'count)
+      .toDataSet[WC]
+
+    result.print()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4938ff09/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala b/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala
new file mode 100644
index 0000000..acb7ded
--- /dev/null
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala
@@ -0,0 +1,42 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.ExpressionException
+import org.junit.Test
+
+class TypeExceptionTest {
+
+  @Test(expected = classOf[ExpressionException])
+  def testInnerCaseClassException(): Unit = {
+    case class WC(word: String, count: Int)
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
+    val expr = input.toTable // this should fail
+    val result = expr
+      .groupBy('word)
+      .select('word, 'count.sum as 'count)
+      .toDataSet[WC]
+
+    result.print()
+  }
+}