You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2015/10/06 14:30:27 UTC
flink git commit: [FLINK-2642] [table] Scala Table API crashes when
executing word count example
Repository: flink
Updated Branches:
refs/heads/master 4c5d43b6f -> 4938ff09f
[FLINK-2642] [table] Scala Table API crashes when executing word count example
This closes #1209.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4938ff09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4938ff09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4938ff09
Branch: refs/heads/master
Commit: 4938ff09f692d7b8b1c3af16125d2216eb5c623c
Parents: 4c5d43b
Author: twalthr <tw...@apache.org>
Authored: Fri Oct 2 11:12:01 2015 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Oct 6 14:26:27 2015 +0200
----------------------------------------------------------------------
.../flink/api/table/plan/PlanTranslator.scala | 3 +-
.../flink/examples/scala/WordCountTable.scala | 45 ++++++++++++++++++++
.../scala/table/test/TypeExceptionTest.scala | 42 ++++++++++++++++++
3 files changed, 89 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4938ff09/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
index 354c7d4..ba8aba4 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
@@ -110,7 +110,8 @@ abstract class PlanTranslator {
}
val clazz = repr.getType().getTypeClass
- if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
+ if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers))
+ || clazz.getCanonicalName() == null) {
throw new ExpressionException("Cannot create Table from DataSet or DataStream of type " +
clazz.getName + ". Only top-level classes or static members classes " +
" are supported.")
http://git-wip-us.apache.org/repos/asf/flink/blob/4938ff09/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
new file mode 100644
index 0000000..cac9590
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+
+/**
+ * Simple example for demonstrating the use of the Table API for a Word Count.
+ */
+object WordCountTable {
+
+ case class WC(word: String, count: Int)
+
+ def main(args: Array[String]): Unit = {
+
+ // set up execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
+ val expr = input.toTable
+ val result = expr
+ .groupBy('word)
+ .select('word, 'count.sum as 'count)
+ .toDataSet[WC]
+
+ result.print()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4938ff09/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala b/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala
new file mode 100644
index 0000000..acb7ded
--- /dev/null
+++ b/flink-staging/flink-table/src/test/java/org/apache/flink/api/scala/table/test/TypeExceptionTest.scala
@@ -0,0 +1,42 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.api.scala.table.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.ExpressionException
+import org.junit.Test
+
+class TypeExceptionTest {
+
+ @Test(expected = classOf[ExpressionException])
+ def testInnerCaseClassException(): Unit = {
+ case class WC(word: String, count: Int)
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
+ val expr = input.toTable // this should fail
+ val result = expr
+ .groupBy('word)
+ .select('word, 'count.sum as 'count)
+ .toDataSet[WC]
+
+ result.print()
+ }
+}