You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/09 04:49:47 UTC

[GitHub] [flink] hequn8128 commented on a change in pull request #12009: [FLINK-17303][python]Return TableResult for Python TableEnvironment

hequn8128 commented on a change in pull request #12009:
URL: https://github.com/apache/flink/pull/12009#discussion_r422449259



##########
File path: flink-python/pyflink/common/__init__.py
##########
@@ -37,4 +40,7 @@
     'JobExecutionResult',
     'RestartStrategies',
     'RestartStrategyConfiguration',
+    'CompletableFuture',

Review comment:
       Alphabetize the classes.

##########
File path: flink-python/pyflink/table/tests/test_sql.py
##########
@@ -58,6 +56,40 @@ def test_sql_query(self):
         expected = ['2,Hi,Hello', '3,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_execute_sql(self):
+        t_env = self.t_env
+        table_result = t_env.execute_sql("create table tbl"
+                                         "("
+                                         "   a bigint,"
+                                         "   b int,"
+                                         "   c varchar"
+                                         ") with ("
+                                         "  'connector' = 'COLLECTION',"
+                                         "   'is-bounded' = 'false'"
+                                         ")")
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assertEqual(table_result.get_table_schema().get_field_count(), 1)
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)

Review comment:
       These tests are too weak. Maybe test more details of the result?

##########
File path: flink-python/pyflink/table/result_kind.py
##########
@@ -0,0 +1,61 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.java_gateway import get_gateway
+
+__all__ = ['ResultKind']
+
+
+class ResultKind(object):
+    """
+    ResultKind defines the types of the result.
+
+    :data:`SUCCESS`:
+
+    The statement (e.g. DDL, USE) executes successfully, and the result only contains a simple "OK".
+
+    :data:`SUCCESS_WITH_CONTENT`:
+
+    The statement (e.g. DML, DQL, SHOW) executes successfully, and the result contains important
+    content.
+    """
+
+    SUCCESS = 0
+    SUCCESS_WITH_CONTENT = 1
+
+    @staticmethod
+    def _from_j_result_kind(j_result_kind):
+        gateway = get_gateway()
+        JResultKind = gateway.jvm.org.apache.flink.table.api.ResultKind
+        if j_result_kind == JResultKind.SUCCESS:
+            return ResultKind.SUCCESS
+        elif j_result_kind == JResultKind.SUCCESS_WITH_CONTENT:
+            return ResultKind.SUCCESS_WITH_CONTENT
+        else:
+            raise Exception("Unsupported Java result kind: %s" % j_result_kind)
+
+    @staticmethod
+    def _to_j_result_kind(result_kind):

Review comment:
       Remove this method? It has not been used.

##########
File path: flink-python/pyflink/table/tests/test_sql.py
##########
@@ -58,6 +56,40 @@ def test_sql_query(self):
         expected = ['2,Hi,Hello', '3,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_execute_sql(self):
+        t_env = self.t_env
+        table_result = t_env.execute_sql("create table tbl"
+                                         "("
+                                         "   a bigint,"
+                                         "   b int,"
+                                         "   c varchar"
+                                         ") with ("
+                                         "  'connector' = 'COLLECTION',"
+                                         "   'is-bounded' = 'false'"
+                                         ")")
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assertEqual(table_result.get_table_schema().get_field_count(), 1)
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        table_result = t_env.execute_sql("alter table tbl set ('k1' = 'a', 'k2' = 'b')")
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assertEqual(table_result.get_table_schema().get_field_count(), 1)
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()
+
+        table_result = t_env.execute_sql("drop table tbl")
+        self.assertIsNone(table_result.get_job_client())

Review comment:
       The result is always None here. Since Java has not implemented `getJobClient()`, we can:
   - Merge this PR after FLINK-16367.  `getJobClient()` has been implemented in this task and it seems the PR will be merged soon.
   - Remove the support of `getJobClient()` from the Python side in this PR for now and support it in another task.
   
   Which would you prefer? I would ok with both. 

##########
File path: flink-python/pyflink/table/tests/test_sql.py
##########
@@ -58,6 +56,40 @@ def test_sql_query(self):
         expected = ['2,Hi,Hello', '3,Hello,Hello']
         self.assert_equals(actual, expected)
 
+    def test_execute_sql(self):
+        t_env = self.t_env
+        table_result = t_env.execute_sql("create table tbl"
+                                         "("
+                                         "   a bigint,"
+                                         "   b int,"
+                                         "   c varchar"
+                                         ") with ("
+                                         "  'connector' = 'COLLECTION',"
+                                         "   'is-bounded' = 'false'"
+                                         ")")
+        self.assertIsNone(table_result.get_job_client())
+        self.assertIsNotNone(table_result.get_table_schema())
+        self.assertEqual(table_result.get_table_schema().get_field_count(), 1)
+        self.assertIsNotNone(table_result.get_result_kind())
+        self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS)
+        table_result.print()

Review comment:
       Verify results.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org