You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/20 14:59:44 UTC

[GitHub] [flink] dianfu opened a new pull request #11832: [FLINK-17148][python] Support converting pandas dataframe to flink table

dianfu opened a new pull request #11832:
URL: https://github.com/apache/flink/pull/11832


   
   ## What is the purpose of the change
   
   *This pull request add the support for converting pandas dataframe to flink table.*
   
   ## Brief change log
   
     - *Introduce ArrowSourceFunction and ArrowTableSource which takes the serialized byte array of arrow record batch as the source data*
     - *Add TableEnvironment.from_pandas which could be used to convert pandas dataframe to flink table*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Added Java tests ArrowSourceFunctionTest and RowArrowSourceFunctionTest*
     - *Added Python tests test_pandas_conversion.py*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11832: [FLINK-17148][python] Support converting pandas dataframe to flink table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-616617636


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161065718",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161369936",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32",
       "triggerID" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161375441",
       "triggerID" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c2105e0170d56fb95b849c4b830ca7a7b9d48fd3 UNKNOWN
   * 1e4aee0d61292ed92724bcabb97ce307833366da Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/161369936) Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26) 
   * f427a2215c9bb6ecb5458e8693887cafffbef491 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/161375441) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11832: [FLINK-17148][python] Support converting pandas dataframe to flink table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-616617636


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161065718",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161369936",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32",
       "triggerID" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161375441",
       "triggerID" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c2105e0170d56fb95b849c4b830ca7a7b9d48fd3 UNKNOWN
   * f427a2215c9bb6ecb5458e8693887cafffbef491 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/161375441) Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu commented on pull request #11832: [FLINK-17148][python] Support converting pandas DataFrame to Flink Table

Posted by GitBox <gi...@apache.org>.
dianfu commented on pull request #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-621200089


   @hequn8128 Thanks for the review. Updated.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on issue #11832: [FLINK-17148][python] Support converting pandas dataframe to flink table

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-616612757


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit b463662a390b45757a682cee7ea89ed52e1fa6f6 (Mon Apr 20 15:04:35 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] hequn8128 commented on a change in pull request #11832: [FLINK-17148][python] Support converting pandas DataFrame to Flink Table

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11832:
URL: https://github.com/apache/flink/pull/11832#discussion_r415254771



##########
File path: flink-python/pyflink/table/tests/test_pandas_conversion.py
##########
@@ -0,0 +1,147 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import datetime
+import decimal
+
+from pyflink.table.types import DataTypes, Row
+from pyflink.testing import source_sink_utils
+from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase, \
+    PyFlinkBlinkStreamTableTestCase, PyFlinkStreamTableTestCase
+
+
+class PandasConversionTestBase(object):
+
+    @classmethod
+    def setUpClass(cls):
+        super(PandasConversionTestBase, cls).setUpClass()
+        cls.data = [(1, 1, 1, 1, True, 1.1, 1.2, 'hello', bytearray(b"aaa"),
+                     decimal.Decimal('1000000000000000000.01'), datetime.date(2014, 9, 13),
+                     datetime.time(hour=1, minute=0, second=1),
+                     datetime.datetime(1970, 1, 1, 0, 0, 0, 123000), ['hello', '中文'],
+                     Row(a=1, b='hello', c=datetime.datetime(1970, 1, 1, 0, 0, 0, 123000),
+                         d=[1, 2])),
+                    (2, 2, 2, 2, False, 2.1, 2.2, 'world', bytearray(b"bbb"),
+                     decimal.Decimal('1000000000000000000.02'), datetime.date(2014, 9, 13),
+                     datetime.time(hour=1, minute=0, second=1),
+                     datetime.datetime(1970, 1, 1, 0, 0, 0, 123000), ['hello', '中文'],
+                     Row(a=1, b='hello', c=datetime.datetime(1970, 1, 1, 0, 0, 0, 123000),
+                         d=[1, 2]))]
+        cls.data_type = DataTypes.ROW(
+            [DataTypes.FIELD("f1", DataTypes.TINYINT()),
+             DataTypes.FIELD("f2", DataTypes.SMALLINT()),
+             DataTypes.FIELD("f3", DataTypes.INT()),
+             DataTypes.FIELD("f4", DataTypes.BIGINT()),
+             DataTypes.FIELD("f5", DataTypes.BOOLEAN()),
+             DataTypes.FIELD("f6", DataTypes.FLOAT()),
+             DataTypes.FIELD("f7", DataTypes.DOUBLE()),
+             DataTypes.FIELD("f8", DataTypes.STRING()),
+             DataTypes.FIELD("f9", DataTypes.BYTES()),
+             DataTypes.FIELD("f10", DataTypes.DECIMAL(38, 18)),
+             DataTypes.FIELD("f11", DataTypes.DATE()),
+             DataTypes.FIELD("f12", DataTypes.TIME()),
+             DataTypes.FIELD("f13", DataTypes.TIMESTAMP(3)),
+             DataTypes.FIELD("f14", DataTypes.ARRAY(DataTypes.STRING())),
+             DataTypes.FIELD("f15", DataTypes.ROW(
+                 [DataTypes.FIELD("a", DataTypes.INT()),
+                  DataTypes.FIELD("b", DataTypes.STRING()),
+                  DataTypes.FIELD("c", DataTypes.TIMESTAMP(3)),
+                  DataTypes.FIELD("d", DataTypes.ARRAY(DataTypes.INT()))]))])
+        cls.pdf = cls.create_pandas_data_frame()
+
+    @classmethod
+    def create_pandas_data_frame(cls):
+        data_dict = {}
+        for j, name in enumerate(cls.data_type.names):
+            data_dict[name] = [cls.data[i][j] for i in range(len(cls.data))]
+        # need convert to numpy types
+        import numpy as np
+        data_dict["f1"] = np.int8(data_dict["f1"])
+        data_dict["f2"] = np.int16(data_dict["f2"])
+        data_dict["f3"] = np.int32(data_dict["f3"])
+        data_dict["f4"] = np.int64(data_dict["f4"])
+        data_dict["f6"] = np.float32(data_dict["f6"])
+        data_dict["f7"] = np.float64(data_dict["f7"])
+        data_dict["f15"] = [row.as_dict() for row in data_dict["f15"]]
+        import pandas as pd
+        return pd.DataFrame(data=data_dict,
+                            columns=['f1', 'f2', 'f3', 'f4', 'f5', 'f6', 'f7', 'f8', 'f9',
+                                     'f10', 'f11', 'f12', 'f13', 'f14', 'f15'])
+
+
+class PandasConversionTests(PandasConversionTestBase):
+
+    def test_from_pandas_with_incorrect_schema(self):
+        fields = self.data_type.fields.copy()
+        fields[0], fields[7] = fields[7], fields[0]  # swap str with tinyint
+        wrong_schema = DataTypes.ROW(fields)  # should be DataTypes.STRING()
+        with self.assertRaisesRegex(Exception, "Expected a string.*got int8"):
+            self.t_env.from_pandas(self.pdf, schema=wrong_schema)
+
+    def test_from_pandas_with_names(self):
+        # skip decimal as currently only decimal(38, 18) is supported
+        pdf = self.pdf.drop(['f10', 'f11', 'f12', 'f13', 'f14', 'f15'], axis=1)
+        new_names = list(map(str, range(len(pdf.columns))))
+        table = self.t_env.from_pandas(pdf, schema=new_names)
+        self.assertEqual(new_names, table.get_schema().get_field_names())
+        table = self.t_env.from_pandas(pdf, schema=tuple(new_names))
+        self.assertEqual(new_names, table.get_schema().get_field_names())
+
+    def test_from_pandas_with_types(self):
+        new_types = self.data_type.field_types()
+        new_types[0] = DataTypes.BIGINT()
+        table = self.t_env.from_pandas(self.pdf, schema=new_types)
+        self.assertEqual(new_types, table.get_schema().get_field_data_types())
+        table = self.t_env.from_pandas(self.pdf, schema=tuple(new_types))
+        self.assertEqual(new_types, table.get_schema().get_field_data_types())
+
+
+class PandasConversionITTests(PandasConversionTestBase):
+
+    def test_from_pandas(self):
+        table = self.t_env.from_pandas(self.pdf, self.data_type, 5)
+        self.assertEqual(self.data_type, table.get_schema().to_row_data_type())
+
+        table = table.filter("f1 < 2")
+        table_sink = source_sink_utils.TestAppendSink(
+            self.data_type.field_names(),
+            self.data_type.field_types())
+        self.t_env.register_table_sink("Results", table_sink)
+        table.insert_into("Results")
+        self.t_env.execute("test")
+        actual = source_sink_utils.results()
+        self.assert_equals(actual,
+                           ["1,1,1,1,true,1.1,1.2,hello,[97, 97, 97],"
+                            "1000000000000000000.010000000000000000,2014-09-13,01:00:01,"
+                            "1970-01-01 00:00:00.123,[hello, 中文],1,hello,"
+                            "1970-01-01 00:00:00.123,[1, 2]"])
+
+
+class StreamPandasConversionTests(PandasConversionITTests,

Review comment:
       Can we also cover the batch mode for the old planner? 

##########
File path: flink-python/pyflink/table/tests/test_pandas_conversion.py
##########
@@ -0,0 +1,147 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import datetime
+import decimal
+
+from pyflink.table.types import DataTypes, Row
+from pyflink.testing import source_sink_utils
+from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase, \
+    PyFlinkBlinkStreamTableTestCase, PyFlinkStreamTableTestCase
+
+
+class PandasConversionTestBase(object):
+
+    @classmethod
+    def setUpClass(cls):

Review comment:
       I found we should use lowercase for these test methods. However, it is not related to this PR. Maybe we can create another jira to address the problem. 

##########
File path: flink-python/pyflink/table/tests/test_pandas_conversion.py
##########
@@ -0,0 +1,147 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import datetime
+import decimal
+
+from pyflink.table.types import DataTypes, Row
+from pyflink.testing import source_sink_utils
+from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase, \
+    PyFlinkBlinkStreamTableTestCase, PyFlinkStreamTableTestCase
+
+
+class PandasConversionTestBase(object):
+
+    @classmethod
+    def setUpClass(cls):
+        super(PandasConversionTestBase, cls).setUpClass()
+        cls.data = [(1, 1, 1, 1, True, 1.1, 1.2, 'hello', bytearray(b"aaa"),
+                     decimal.Decimal('1000000000000000000.01'), datetime.date(2014, 9, 13),
+                     datetime.time(hour=1, minute=0, second=1),
+                     datetime.datetime(1970, 1, 1, 0, 0, 0, 123000), ['hello', '中文'],
+                     Row(a=1, b='hello', c=datetime.datetime(1970, 1, 1, 0, 0, 0, 123000),
+                         d=[1, 2])),
+                    (2, 2, 2, 2, False, 2.1, 2.2, 'world', bytearray(b"bbb"),
+                     decimal.Decimal('1000000000000000000.02'), datetime.date(2014, 9, 13),
+                     datetime.time(hour=1, minute=0, second=1),
+                     datetime.datetime(1970, 1, 1, 0, 0, 0, 123000), ['hello', '中文'],
+                     Row(a=1, b='hello', c=datetime.datetime(1970, 1, 1, 0, 0, 0, 123000),
+                         d=[1, 2]))]
+        cls.data_type = DataTypes.ROW(
+            [DataTypes.FIELD("f1", DataTypes.TINYINT()),
+             DataTypes.FIELD("f2", DataTypes.SMALLINT()),
+             DataTypes.FIELD("f3", DataTypes.INT()),
+             DataTypes.FIELD("f4", DataTypes.BIGINT()),
+             DataTypes.FIELD("f5", DataTypes.BOOLEAN()),
+             DataTypes.FIELD("f6", DataTypes.FLOAT()),
+             DataTypes.FIELD("f7", DataTypes.DOUBLE()),
+             DataTypes.FIELD("f8", DataTypes.STRING()),
+             DataTypes.FIELD("f9", DataTypes.BYTES()),
+             DataTypes.FIELD("f10", DataTypes.DECIMAL(38, 18)),
+             DataTypes.FIELD("f11", DataTypes.DATE()),
+             DataTypes.FIELD("f12", DataTypes.TIME()),
+             DataTypes.FIELD("f13", DataTypes.TIMESTAMP(3)),
+             DataTypes.FIELD("f14", DataTypes.ARRAY(DataTypes.STRING())),
+             DataTypes.FIELD("f15", DataTypes.ROW(
+                 [DataTypes.FIELD("a", DataTypes.INT()),
+                  DataTypes.FIELD("b", DataTypes.STRING()),
+                  DataTypes.FIELD("c", DataTypes.TIMESTAMP(3)),
+                  DataTypes.FIELD("d", DataTypes.ARRAY(DataTypes.INT()))]))])
+        cls.pdf = cls.create_pandas_data_frame()
+
+    @classmethod
+    def create_pandas_data_frame(cls):
+        data_dict = {}
+        for j, name in enumerate(cls.data_type.names):
+            data_dict[name] = [cls.data[i][j] for i in range(len(cls.data))]
+        # need convert to numpy types

Review comment:
       Why we need to convert to NumPy types?

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/AbstractArrowSourceFunction.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow.sources;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.runtime.arrow.ArrowReader;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ReadChannel;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+/**
+ * An Arrow {@link SourceFunction} which takes the serialized arrow record batch data as input.
+ *
+ * @param <OUT> The type of the records produced by this source.
+ */
+@Internal
+public abstract class AbstractArrowSourceFunction<OUT>
+		extends RichParallelSourceFunction<OUT>
+		implements ResultTypeQueryable<OUT>, CheckpointedFunction {
+
+	private static final long serialVersionUID = 1L;
+
+	static {
+		ArrowUtils.checkArrowUsable();
+	}
+
+	/**
+	 * The type of the records produced by this source.
+	 */
+	final DataType dataType;
+
+	/**
+	 * The array of byte array of the source data. Each element is an array
+	 * representing an arrow batch.
+	 */
+	private final byte[][] arrowData;
+
+	/**
+	 * Allocator which is used for byte buffer allocation.
+	 */
+	private transient BufferAllocator allocator;
+
+	/**
+	 * Container that holds a set of vectors for the source data to emit.
+	 */
+	private transient VectorSchemaRoot root;
+
+	private transient volatile boolean running;
+
+	/**
+	 * The indexes of the collection of source data to emit. Each element is a tuple of
+	 * the index of the arrow batch and the staring index inside the arrow batch.
+	 */
+	private transient Deque<Tuple2<Integer, Integer>> indexesToEmit;
+
+	/**
+	 * The indexes of the source data which have not been emitted.
+	 */
+	private transient ListState<Tuple2<Integer, Integer>> checkpointedState;
+
+	AbstractArrowSourceFunction(DataType dataType, byte[][] arrowData) {
+		this.dataType = Preconditions.checkNotNull(dataType);
+		this.arrowData = Preconditions.checkNotNull(arrowData);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		allocator = ArrowUtils.getRootAllocator().newChildAllocator("ArrowSourceFunction", 0, Long.MAX_VALUE);
+		root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema((RowType) dataType.getLogicalType()), allocator);
+		running = true;
+	}
+
+	@Override
+	public void close() throws Exception {
+		try {
+			super.close();
+		} finally {
+			if (root != null) {
+				root.close();
+				root = null;
+			}
+			if (allocator != null) {
+				allocator.close();
+				allocator = null;
+			}
+		}
+	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {
+		Preconditions.checkState(this.checkpointedState == null,
+			"The " + getClass().getSimpleName() + " has already been initialized.");
+
+		this.checkpointedState = context.getOperatorStateStore().getListState(
+			new ListStateDescriptor<>(
+				"arrow-source-state",
+				new TupleSerializer<>(
+					(Class<Tuple2<Integer, Integer>>) (Class<?>) Tuple2.class,
+					new TypeSerializer[]{IntSerializer.INSTANCE, IntSerializer.INSTANCE})
+			)
+		);
+
+		this.indexesToEmit = new ArrayDeque<>();
+		if (context.isRestored()) {
+			// upon restoring
+			for (Tuple2<Integer, Integer> v : this.checkpointedState.get()) {
+				this.indexesToEmit.add(v);
+			}
+		} else {
+			// the first time the job is executed
+			final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
+			final int taskIdx = getRuntimeContext().getIndexOfThisSubtask();
+
+			for (int i = taskIdx; i < arrowData.length; i += stepSize) {
+				this.indexesToEmit.add(Tuple2.of(i, 0));
+			}
+		}
+	}
+
+	@Override
+	public void snapshotState(FunctionSnapshotContext context) throws Exception {
+		Preconditions.checkState(this.checkpointedState != null,
+			"The " + getClass().getSimpleName() + " state has not been properly initialized.");
+
+		this.checkpointedState.clear();
+		for (Tuple2<Integer, Integer> v : indexesToEmit) {
+			this.checkpointedState.add(v);
+		}
+	}
+
+	@Override
+	public void run(SourceContext<OUT> ctx) throws Exception {
+		VectorLoader vectorLoader = new VectorLoader(root);
+		while (running && !indexesToEmit.isEmpty()) {
+			Tuple2<Integer, Integer> indexToEmit = indexesToEmit.peek();
+			ArrowRecordBatch arrowRecordBatch = loadBatch(indexToEmit.f0);
+			vectorLoader.load(arrowRecordBatch);
+			arrowRecordBatch.close();
+
+			ArrowReader<OUT> arrowReader = createArrowReader(root);
+			int rowCount = root.getRowCount();
+			int nextRowId = indexToEmit.f1;
+			while (nextRowId < rowCount) {
+				OUT element = arrowReader.read(nextRowId);
+				synchronized (ctx.getCheckpointLock()) {
+					ctx.collect(element);
+					indexToEmit.setField(++nextRowId, 1);
+				}
+			}
+
+			synchronized (ctx.getCheckpointLock()) {
+				indexesToEmit.pop();
+			}
+		}
+	}
+
+	@Override
+	public void cancel() {
+		running = false;
+	}
+
+	public abstract ArrowReader<OUT> createArrowReader(VectorSchemaRoot root);

Review comment:
       protected

##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -1107,6 +1107,63 @@ def _from_elements(self, elements, schema):
         finally:
             os.unlink(temp_file.name)
 
+    def from_pandas(self, pdf,

Review comment:
       Add detailed python docs for the API.
   BTW, do we plan to add Flink document for this API in another PR? If so, we can first create a jira to address it under FLINK-17146

##########
File path: flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/ArrowSourceFunctionTestBase.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow.sources;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import org.apache.flink.table.runtime.arrow.ArrowWriter;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Abstract test base for the Arrow source function processing.
+ */
+public abstract class ArrowSourceFunctionTestBase<T> {
+
+	static DataType dataType;
+	private static BufferAllocator allocator;
+
+	@BeforeClass
+	public static void init() {
+		dataType = DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.STRING()));
+		allocator = ArrowUtils.getRootAllocator().newChildAllocator("stdout", 0, Long.MAX_VALUE);
+	}
+
+	@Test
+	public void testRestore() throws Exception {
+		Tuple2<List<T>, Integer> testData = getTestData();
+		final AbstractArrowSourceFunction<T> arrowSourceFunction =
+			createTestArrowSourceFunction(testData.f0, testData.f1);
+
+		final AbstractStreamOperatorTestHarness<T> testHarness =
+			new AbstractStreamOperatorTestHarness<>(new StreamSource<>(arrowSourceFunction), 1, 1, 0);
+		testHarness.open();
+
+		final Throwable[] error = new Throwable[1];
+		final MultiShotLatch latch = new MultiShotLatch();
+		final AtomicInteger numOfEmittedElements = new AtomicInteger(0);
+
+		final DummySourceContext<T> sourceContext = new DummySourceContext<T>() {
+			@Override
+			public void collect(T element) {
+				if (numOfEmittedElements.get() == 2) {
+					latch.trigger();
+					// fail the source function at the the second element
+					throw new RuntimeException("Fail the arrow source");
+				}
+				numOfEmittedElements.incrementAndGet();
+			}
+		};
+
+		// run the source asynchronously
+		Thread runner = new Thread(() -> {
+			try {
+				arrowSourceFunction.run(sourceContext);
+			} catch (Throwable t) {
+				if (!t.getMessage().equals("Fail the arrow source")) {
+					error[0] = t;
+				}
+			}
+		});
+		runner.start();
+
+		if (!latch.isTriggered()) {
+			latch.await();
+		}
+
+		OperatorSubtaskState snapshot;
+		synchronized (sourceContext.getCheckpointLock()) {
+			snapshot = testHarness.snapshot(0, 0);
+		}
+
+		runner.join();
+		testHarness.close();
+
+		final AbstractArrowSourceFunction<T> arrowSourceFunction2 =
+			createTestArrowSourceFunction(testData.f0, testData.f1);
+		AbstractStreamOperatorTestHarness<T> testHarnessCopy =
+			new AbstractStreamOperatorTestHarness<>(new StreamSource<>(arrowSourceFunction2), 1, 1, 0);
+		testHarnessCopy.initializeState(snapshot);
+		testHarnessCopy.open();
+
+		// run the source asynchronously
+		Thread runner2 = new Thread(() -> {
+			try {
+				arrowSourceFunction2.run(new DummySourceContext<T>() {
+					@Override
+					public void collect(T element) {
+						if (numOfEmittedElements.incrementAndGet() == testData.f0.size()) {
+							latch.trigger();
+						}
+					}
+				});
+			} catch (Throwable t) {
+				error[0] = t;
+			}
+		});
+		runner2.start();
+
+		if (!latch.isTriggered()) {
+			latch.await();
+		}
+		runner2.join();
+
+		Assert.assertNull(error[0]);
+		Assert.assertEquals(testData.f0.size(), numOfEmittedElements.get());

Review comment:
       Also verify the content of the data?

##########
File path: flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/ArrowSourceFunctionTestBase.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow.sources;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import org.apache.flink.table.runtime.arrow.ArrowWriter;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Abstract test base for the Arrow source function processing.
+ */
+public abstract class ArrowSourceFunctionTestBase<T> {
+
+	static DataType dataType;
+	private static BufferAllocator allocator;
+
+	@BeforeClass
+	public static void init() {
+		dataType = DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.STRING()));
+		allocator = ArrowUtils.getRootAllocator().newChildAllocator("stdout", 0, Long.MAX_VALUE);
+	}
+
+	@Test
+	public void testRestore() throws Exception {
+		Tuple2<List<T>, Integer> testData = getTestData();
+		final AbstractArrowSourceFunction<T> arrowSourceFunction =
+			createTestArrowSourceFunction(testData.f0, testData.f1);
+
+		final AbstractStreamOperatorTestHarness<T> testHarness =
+			new AbstractStreamOperatorTestHarness<>(new StreamSource<>(arrowSourceFunction), 1, 1, 0);
+		testHarness.open();
+
+		final Throwable[] error = new Throwable[1];
+		final MultiShotLatch latch = new MultiShotLatch();
+		final AtomicInteger numOfEmittedElements = new AtomicInteger(0);
+
+		final DummySourceContext<T> sourceContext = new DummySourceContext<T>() {
+			@Override
+			public void collect(T element) {
+				if (numOfEmittedElements.get() == 2) {
+					latch.trigger();
+					// fail the source function at the the second element
+					throw new RuntimeException("Fail the arrow source");
+				}
+				numOfEmittedElements.incrementAndGet();
+			}
+		};
+
+		// run the source asynchronously
+		Thread runner = new Thread(() -> {
+			try {
+				arrowSourceFunction.run(sourceContext);
+			} catch (Throwable t) {
+				if (!t.getMessage().equals("Fail the arrow source")) {
+					error[0] = t;

Review comment:
       Add the corresponding assert to verify that error[0] is not null?

##########
File path: flink-python/src/main/java/org/apache/flink/table/runtime/arrow/sources/AbstractArrowSourceFunction.java
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow.sources;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.runtime.arrow.ArrowReader;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ReadChannel;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+/**
+ * An Arrow {@link SourceFunction} which takes the serialized arrow record batch data as input.
+ *
+ * @param <OUT> The type of the records produced by this source.
+ */
+@Internal
+public abstract class AbstractArrowSourceFunction<OUT>
+		extends RichParallelSourceFunction<OUT>
+		implements ResultTypeQueryable<OUT>, CheckpointedFunction {
+
+	private static final long serialVersionUID = 1L;
+
+	static {
+		ArrowUtils.checkArrowUsable();
+	}
+
+	/**
+	 * The type of the records produced by this source.
+	 */
+	final DataType dataType;
+
+	/**
+	 * The array of byte array of the source data. Each element is an array
+	 * representing an arrow batch.
+	 */
+	private final byte[][] arrowData;
+
+	/**
+	 * Allocator which is used for byte buffer allocation.
+	 */
+	private transient BufferAllocator allocator;
+
+	/**
+	 * Container that holds a set of vectors for the source data to emit.
+	 */
+	private transient VectorSchemaRoot root;
+
+	private transient volatile boolean running;
+
+	/**
+	 * The indexes of the collection of source data to emit. Each element is a tuple of
+	 * the index of the arrow batch and the staring index inside the arrow batch.
+	 */
+	private transient Deque<Tuple2<Integer, Integer>> indexesToEmit;
+
+	/**
+	 * The indexes of the source data which have not been emitted.
+	 */
+	private transient ListState<Tuple2<Integer, Integer>> checkpointedState;
+
+	AbstractArrowSourceFunction(DataType dataType, byte[][] arrowData) {
+		this.dataType = Preconditions.checkNotNull(dataType);
+		this.arrowData = Preconditions.checkNotNull(arrowData);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		allocator = ArrowUtils.getRootAllocator().newChildAllocator("ArrowSourceFunction", 0, Long.MAX_VALUE);
+		root = VectorSchemaRoot.create(ArrowUtils.toArrowSchema((RowType) dataType.getLogicalType()), allocator);
+		running = true;
+	}
+
+	@Override
+	public void close() throws Exception {
+		try {
+			super.close();
+		} finally {
+			if (root != null) {
+				root.close();
+				root = null;
+			}
+			if (allocator != null) {
+				allocator.close();
+				allocator = null;
+			}
+		}
+	}
+
+	@Override
+	public void initializeState(FunctionInitializationContext context) throws Exception {

Review comment:
       Maybe add some log in this method? For example, LOG.info the restored information. 

##########
File path: flink-python/src/test/java/org/apache/flink/table/runtime/arrow/sources/ArrowSourceFunctionTestBase.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.arrow.sources;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import org.apache.flink.table.runtime.arrow.ArrowWriter;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Abstract test base for the Arrow source function processing.
+ */
+public abstract class ArrowSourceFunctionTestBase<T> {
+
+	static DataType dataType;
+	private static BufferAllocator allocator;
+
+	@BeforeClass
+	public static void init() {
+		dataType = DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.STRING()));
+		allocator = ArrowUtils.getRootAllocator().newChildAllocator("stdout", 0, Long.MAX_VALUE);
+	}
+
+	@Test
+	public void testRestore() throws Exception {
+		Tuple2<List<T>, Integer> testData = getTestData();
+		final AbstractArrowSourceFunction<T> arrowSourceFunction =
+			createTestArrowSourceFunction(testData.f0, testData.f1);
+
+		final AbstractStreamOperatorTestHarness<T> testHarness =
+			new AbstractStreamOperatorTestHarness<>(new StreamSource<>(arrowSourceFunction), 1, 1, 0);
+		testHarness.open();
+
+		final Throwable[] error = new Throwable[1];
+		final MultiShotLatch latch = new MultiShotLatch();
+		final AtomicInteger numOfEmittedElements = new AtomicInteger(0);
+
+		final DummySourceContext<T> sourceContext = new DummySourceContext<T>() {
+			@Override
+			public void collect(T element) {
+				if (numOfEmittedElements.get() == 2) {
+					latch.trigger();
+					// fail the source function at the the second element
+					throw new RuntimeException("Fail the arrow source");
+				}
+				numOfEmittedElements.incrementAndGet();
+			}
+		};
+
+		// run the source asynchronously
+		Thread runner = new Thread(() -> {
+			try {
+				arrowSourceFunction.run(sourceContext);
+			} catch (Throwable t) {
+				if (!t.getMessage().equals("Fail the arrow source")) {
+					error[0] = t;
+				}
+			}
+		});
+		runner.start();
+
+		if (!latch.isTriggered()) {
+			latch.await();
+		}
+
+		OperatorSubtaskState snapshot;
+		synchronized (sourceContext.getCheckpointLock()) {
+			snapshot = testHarness.snapshot(0, 0);
+		}
+
+		runner.join();
+		testHarness.close();
+
+		final AbstractArrowSourceFunction<T> arrowSourceFunction2 =
+			createTestArrowSourceFunction(testData.f0, testData.f1);
+		AbstractStreamOperatorTestHarness<T> testHarnessCopy =
+			new AbstractStreamOperatorTestHarness<>(new StreamSource<>(arrowSourceFunction2), 1, 1, 0);
+		testHarnessCopy.initializeState(snapshot);
+		testHarnessCopy.open();
+
+		// run the source asynchronously
+		Thread runner2 = new Thread(() -> {
+			try {
+				arrowSourceFunction2.run(new DummySourceContext<T>() {
+					@Override
+					public void collect(T element) {
+						if (numOfEmittedElements.incrementAndGet() == testData.f0.size()) {
+							latch.trigger();
+						}
+					}
+				});
+			} catch (Throwable t) {
+				error[0] = t;
+			}
+		});
+		runner2.start();
+
+		if (!latch.isTriggered()) {
+			latch.await();
+		}
+		runner2.join();
+
+		Assert.assertNull(error[0]);
+		Assert.assertEquals(testData.f0.size(), numOfEmittedElements.get());
+	}
+
+	@Test
+	public void testParallelProcessing() throws Exception {
+		Tuple2<List<T>, Integer> testData = getTestData();
+		final AbstractArrowSourceFunction<T> arrowSourceFunction =
+			createTestArrowSourceFunction(testData.f0, testData.f1);
+
+		final AbstractStreamOperatorTestHarness<T> testHarness =
+			new AbstractStreamOperatorTestHarness<>(new StreamSource<>(arrowSourceFunction), 2, 2, 0);
+		testHarness.open();
+
+		final Throwable[] error = new Throwable[2];
+		final OneShotLatch latch = new OneShotLatch();
+		final AtomicInteger numOfEmittedElements = new AtomicInteger(0);
+
+		// run the source asynchronously
+		Thread runner = new Thread(() -> {
+			try {
+				arrowSourceFunction.run(new DummySourceContext<T>() {
+					@Override
+					public void collect(T element) {
+						if (numOfEmittedElements.incrementAndGet() == testData.f0.size()) {
+							latch.trigger();
+						}
+					}
+				});
+			} catch (Throwable t) {
+				error[0] = t;
+			}
+		});
+		runner.start();
+
+		final AbstractArrowSourceFunction<T> arrowSourceFunction2 =
+			createTestArrowSourceFunction(testData.f0, testData.f1);
+		final AbstractStreamOperatorTestHarness<T> testHarness2 =
+			new AbstractStreamOperatorTestHarness<>(new StreamSource<>(arrowSourceFunction2), 2, 2, 1);
+		testHarness2.open();
+
+		// run the source asynchronously
+		Thread runner2 = new Thread(() -> {
+			try {
+				arrowSourceFunction2.run(new DummySourceContext<T>() {
+					@Override
+					public void collect(T element) {
+						if (numOfEmittedElements.incrementAndGet() == testData.f0.size()) {
+							latch.trigger();
+						}
+					}
+				});
+			} catch (Throwable t) {
+				error[1] = t;
+			}
+		});
+		runner2.start();
+
+		if (!latch.isTriggered()) {
+			latch.await();
+		}
+
+		runner.join();
+		runner2.join();
+		testHarness.close();
+		testHarness2.close();
+
+		Assert.assertNull(error[0]);
+		Assert.assertNull(error[1]);
+		Assert.assertEquals(testData.f0.size(), numOfEmittedElements.get());

Review comment:
       Also verify the content of the data?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] hequn8128 commented on a change in pull request #11832: [FLINK-17148][python] Support converting pandas DataFrame to Flink Table

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on a change in pull request #11832:
URL: https://github.com/apache/flink/pull/11832#discussion_r417135937



##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -1119,6 +1119,90 @@ def _from_elements(self, elements, schema):
         finally:
             os.unlink(temp_file.name)
 
+    def from_pandas(self, pdf,
+                    schema: Union[RowType, List[str], Tuple[str], List[DataType],
+                                  Tuple[DataType]] = None,
+                    splits_num: int = 1) -> Table:
+        """
+        Creates a table from a pandas DataFrame.
+
+        Example:
+        ::
+
+            # use the second parameter to specify custom field names

Review comment:
       Move this comment after the creation of DataFrame. 

##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -1119,6 +1119,90 @@ def _from_elements(self, elements, schema):
         finally:
             os.unlink(temp_file.name)
 
+    def from_pandas(self, pdf,
+                    schema: Union[RowType, List[str], Tuple[str], List[DataType],
+                                  Tuple[DataType]] = None,
+                    splits_num: int = 1) -> Table:
+        """
+        Creates a table from a pandas DataFrame.
+
+        Example:
+        ::
+
+            # use the second parameter to specify custom field names
+            >>> pdf = pd.DataFrame(np.random.rand(1000, 2))
+            >>> table_env.from_pandas(pdf, ["a", "b"])
+            # use the second parameter to specify custom field types
+            >>> table_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()]))
+            # use the second parameter to specify custom table schema
+            >>> table_env.from_pandas(pdf,
+            ...                       DataTypes.ROW([DataTypes.FIELD("a", DataTypes.DOUBLE()),
+            ...                                      DataTypes.FIELD("b", DataTypes.DOUBLE())]))
+
+        :param pdf: The pandas DataFrame.
+        :param schema: The schema of the converted table.
+        :type schema: RowType or list[str] or list[DataType]

Review comment:
       duplicate type hint.

##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -1119,6 +1119,90 @@ def _from_elements(self, elements, schema):
         finally:
             os.unlink(temp_file.name)
 
+    def from_pandas(self, pdf,
+                    schema: Union[RowType, List[str], Tuple[str], List[DataType],
+                                  Tuple[DataType]] = None,
+                    splits_num: int = 1) -> Table:
+        """
+        Creates a table from a pandas DataFrame.
+
+        Example:
+        ::
+
+            # use the second parameter to specify custom field names
+            >>> pdf = pd.DataFrame(np.random.rand(1000, 2))
+            >>> table_env.from_pandas(pdf, ["a", "b"])
+            # use the second parameter to specify custom field types
+            >>> table_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()]))
+            # use the second parameter to specify custom table schema
+            >>> table_env.from_pandas(pdf,
+            ...                       DataTypes.ROW([DataTypes.FIELD("a", DataTypes.DOUBLE()),
+            ...                                      DataTypes.FIELD("b", DataTypes.DOUBLE())]))
+
+        :param pdf: The pandas DataFrame.
+        :param schema: The schema of the converted table.
+        :type schema: RowType or list[str] or list[DataType]
+        :param splits_num: The number of splits the given Pandas DataFrame will be split into. It
+                           determines the number of parallel source tasks.
+                           If not specified, the default parallelism will be used.
+        :type splits_num: int

Review comment:
       duplicate type hint.

##########
File path: flink-python/pyflink/table/table_environment.py
##########
@@ -1119,6 +1119,90 @@ def _from_elements(self, elements, schema):
         finally:
             os.unlink(temp_file.name)
 
+    def from_pandas(self, pdf,
+                    schema: Union[RowType, List[str], Tuple[str], List[DataType],
+                                  Tuple[DataType]] = None,
+                    splits_num: int = 1) -> Table:
+        """
+        Creates a table from a pandas DataFrame.
+
+        Example:
+        ::
+
+            # use the second parameter to specify custom field names
+            >>> pdf = pd.DataFrame(np.random.rand(1000, 2))
+            >>> table_env.from_pandas(pdf, ["a", "b"])
+            # use the second parameter to specify custom field types
+            >>> table_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()]))
+            # use the second parameter to specify custom table schema
+            >>> table_env.from_pandas(pdf,
+            ...                       DataTypes.ROW([DataTypes.FIELD("a", DataTypes.DOUBLE()),
+            ...                                      DataTypes.FIELD("b", DataTypes.DOUBLE())]))
+
+        :param pdf: The pandas DataFrame.
+        :param schema: The schema of the converted table.
+        :type schema: RowType or list[str] or list[DataType]
+        :param splits_num: The number of splits the given Pandas DataFrame will be split into. It
+                           determines the number of parallel source tasks.
+                           If not specified, the default parallelism will be used.
+        :type splits_num: int
+        :return: The result table.
+        :rtype: Table

Review comment:
       duplicate type hint.

##########
File path: docs/dev/table/python/index.md
##########
@@ -32,5 +32,6 @@ Apache Flink has provided Python Table API support since 1.9.0.
 - [Installation]({{ site.baseurl }}/dev/table/python/installation.html): Introduction of how to set up the Python Table API execution environment.
 - [User-defined Functions]({{ site.baseurl }}/dev/table/python/python_udfs.html): Explanation of how to define Python user-defined functions.
 - [Vectorized User-defined Functions]({{ site.baseurl }}/dev/table/python/vectorized_python_udfs.html): Explanation of how to define vectorized Python user-defined functions.
+- [Conversion between PyFlink Table and Pandas DataFrame]({{ site.baseurl }}/dev/table/python/conversion_of_pandas.html): Explanation of how to convert between PyFlink Table and Pandas DataFrame.

Review comment:
       Conversions?

##########
File path: docs/dev/table/python/index.zh.md
##########
@@ -32,5 +32,6 @@ Apache Flink has provided Python Table API support since 1.9.0.
 - [环境安装]({{ site.baseurl }}/zh/dev/table/python/installation.html): Introduction of how to set up the Python Table API execution environment.
 - [自定义函数]({{ site.baseurl }}/zh/dev/table/python/python_udfs.html): Explanation of how to define Python user-defined functions.
 - [自定义向量化函数]({{ site.baseurl }}/zh/dev/table/python/vectorized_python_udfs.html): Explanation of how to define vectorized Python user-defined functions.
+- [PyFlink Table和Pandas DataFrame互转]({{ site.baseurl }}/zh/dev/table/python/conversion_of_pandas.html): Explanation of how to convert between PyFlink Table and Pandas DataFrame.

Review comment:
       According to most copywriting guidelines, it's better to leave a blank between an English word and a Chinese word.

##########
File path: docs/dev/table/python/conversion_of_pandas.zh.md
##########
@@ -0,0 +1,48 @@
+---
+title: "PyFlink Table和Pandas DataFrame互转"
+nav-parent_id: python_tableapi
+nav-pos: 50
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+It supports to convert between PyFlink Table and Pandas DataFrame.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Convert Pandas DataFrame to PyFlink Table
+
+It supports to create a PyFlink Table from a Pandas DataFrame. Internally, it will serialize the Pandas DataFrame
+using Arrow columnar format at client side and the serialized data will be processed and deserialized in Arrow source
+during execution. The Arrow source could also be used in streaming jobs and it will properly handle the checkpoint
+and provides the exactly once guarantees.
+
+The following example shows how to create a PyFlink Table from a Pandas DataFrame:
+
+{% highlight python %}
+import pandas as pd
+import numpy as np
+
+# Create a Pandas DataFrame
+pdf = pd.DataFrame(np.random.rand(1000, 2))
+
+# Create a PyFlink Table from a Pandas DataFrame
+table = t_env.from_pandas(pdf)

Review comment:
       Maybe add more examples here. For example, how to specify table names, which is commonly required.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11832: [FLINK-17148][python] Support converting pandas dataframe to flink table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-616617636


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161065718",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b463662a390b45757a682cee7ea89ed52e1fa6f6 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/161065718) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11832: [FLINK-17148][python] Support converting pandas dataframe to flink table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-616617636


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161065718",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161369936",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b463662a390b45757a682cee7ea89ed52e1fa6f6 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/161065718) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796) 
   * c2105e0170d56fb95b849c4b830ca7a7b9d48fd3 UNKNOWN
   * 1e4aee0d61292ed92724bcabb97ce307833366da Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/161369936) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26) 
   * f427a2215c9bb6ecb5458e8693887cafffbef491 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11832: [FLINK-17148][python] Support converting pandas dataframe to flink table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-616617636


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161065718",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161369936",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32",
       "triggerID" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161375441",
       "triggerID" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c2105e0170d56fb95b849c4b830ca7a7b9d48fd3 UNKNOWN
   * 1e4aee0d61292ed92724bcabb97ce307833366da Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/161369936) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26) 
   * f427a2215c9bb6ecb5458e8693887cafffbef491 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/161375441) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu commented on pull request #11832: [FLINK-17148][python] Support converting pandas DataFrame to Flink Table

Posted by GitBox <gi...@apache.org>.
dianfu commented on pull request #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-621568192


   Not sure why the CI of Azure wasn't triggered. It has succeed in my private azure pipeline: https://dev.azure.com/dianfu/Flink/_build/results?buildId=50&view=results


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11832: [FLINK-17148][python] Support converting pandas dataframe to flink table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-616617636


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161065718",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161369936",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32",
       "triggerID" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161375441",
       "triggerID" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c2105e0170d56fb95b849c4b830ca7a7b9d48fd3 UNKNOWN
   * f427a2215c9bb6ecb5458e8693887cafffbef491 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/161375441) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11832: [FLINK-17148][python] Support converting pandas DataFrame to Flink Table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-616617636


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161065718",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161369936",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32",
       "triggerID" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161375441",
       "triggerID" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a30c4d84d218a3ab91a232c648f0a586df75eaa9",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=404",
       "triggerID" : "a30c4d84d218a3ab91a232c648f0a586df75eaa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a30c4d84d218a3ab91a232c648f0a586df75eaa9",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162643058",
       "triggerID" : "a30c4d84d218a3ab91a232c648f0a586df75eaa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0a37369514bce10618debc2947cf852e6d188b2f",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162714054",
       "triggerID" : "0a37369514bce10618debc2947cf852e6d188b2f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c2105e0170d56fb95b849c4b830ca7a7b9d48fd3 UNKNOWN
   * a30c4d84d218a3ab91a232c648f0a586df75eaa9 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/162643058) Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=404) 
   * 0a37369514bce10618debc2947cf852e6d188b2f Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/162714054) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11832: [FLINK-17148][python] Support converting pandas DataFrame to Flink Table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-616617636


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161065718",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161369936",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32",
       "triggerID" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161375441",
       "triggerID" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a30c4d84d218a3ab91a232c648f0a586df75eaa9",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=404",
       "triggerID" : "a30c4d84d218a3ab91a232c648f0a586df75eaa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a30c4d84d218a3ab91a232c648f0a586df75eaa9",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162643058",
       "triggerID" : "a30c4d84d218a3ab91a232c648f0a586df75eaa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0a37369514bce10618debc2947cf852e6d188b2f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0a37369514bce10618debc2947cf852e6d188b2f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c2105e0170d56fb95b849c4b830ca7a7b9d48fd3 UNKNOWN
   * a30c4d84d218a3ab91a232c648f0a586df75eaa9 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/162643058) Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=404) 
   * 0a37369514bce10618debc2947cf852e6d188b2f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] hequn8128 commented on pull request #11832: [FLINK-17148][python] Support converting pandas DataFrame to Flink Table

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on pull request #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-621568683


   @dianfu Thanks. Merging...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11832: [FLINK-17148][python] Support converting pandas DataFrame to Flink Table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-616617636


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161065718",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161369936",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32",
       "triggerID" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161375441",
       "triggerID" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a30c4d84d218a3ab91a232c648f0a586df75eaa9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=404",
       "triggerID" : "a30c4d84d218a3ab91a232c648f0a586df75eaa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a30c4d84d218a3ab91a232c648f0a586df75eaa9",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162643058",
       "triggerID" : "a30c4d84d218a3ab91a232c648f0a586df75eaa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0a37369514bce10618debc2947cf852e6d188b2f",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162714054",
       "triggerID" : "0a37369514bce10618debc2947cf852e6d188b2f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c2105e0170d56fb95b849c4b830ca7a7b9d48fd3 UNKNOWN
   * 0a37369514bce10618debc2947cf852e6d188b2f Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/162714054) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu commented on a change in pull request #11832: [FLINK-17148][python] Support converting pandas DataFrame to Flink Table

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11832:
URL: https://github.com/apache/flink/pull/11832#discussion_r417039454



##########
File path: flink-python/pyflink/table/tests/test_pandas_conversion.py
##########
@@ -0,0 +1,147 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import datetime
+import decimal
+
+from pyflink.table.types import DataTypes, Row
+from pyflink.testing import source_sink_utils
+from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase, \
+    PyFlinkBlinkStreamTableTestCase, PyFlinkStreamTableTestCase
+
+
+class PandasConversionTestBase(object):
+
+    @classmethod
+    def setUpClass(cls):
+        super(PandasConversionTestBase, cls).setUpClass()
+        cls.data = [(1, 1, 1, 1, True, 1.1, 1.2, 'hello', bytearray(b"aaa"),
+                     decimal.Decimal('1000000000000000000.01'), datetime.date(2014, 9, 13),
+                     datetime.time(hour=1, minute=0, second=1),
+                     datetime.datetime(1970, 1, 1, 0, 0, 0, 123000), ['hello', '中文'],
+                     Row(a=1, b='hello', c=datetime.datetime(1970, 1, 1, 0, 0, 0, 123000),
+                         d=[1, 2])),
+                    (2, 2, 2, 2, False, 2.1, 2.2, 'world', bytearray(b"bbb"),
+                     decimal.Decimal('1000000000000000000.02'), datetime.date(2014, 9, 13),
+                     datetime.time(hour=1, minute=0, second=1),
+                     datetime.datetime(1970, 1, 1, 0, 0, 0, 123000), ['hello', '中文'],
+                     Row(a=1, b='hello', c=datetime.datetime(1970, 1, 1, 0, 0, 0, 123000),
+                         d=[1, 2]))]
+        cls.data_type = DataTypes.ROW(
+            [DataTypes.FIELD("f1", DataTypes.TINYINT()),
+             DataTypes.FIELD("f2", DataTypes.SMALLINT()),
+             DataTypes.FIELD("f3", DataTypes.INT()),
+             DataTypes.FIELD("f4", DataTypes.BIGINT()),
+             DataTypes.FIELD("f5", DataTypes.BOOLEAN()),
+             DataTypes.FIELD("f6", DataTypes.FLOAT()),
+             DataTypes.FIELD("f7", DataTypes.DOUBLE()),
+             DataTypes.FIELD("f8", DataTypes.STRING()),
+             DataTypes.FIELD("f9", DataTypes.BYTES()),
+             DataTypes.FIELD("f10", DataTypes.DECIMAL(38, 18)),
+             DataTypes.FIELD("f11", DataTypes.DATE()),
+             DataTypes.FIELD("f12", DataTypes.TIME()),
+             DataTypes.FIELD("f13", DataTypes.TIMESTAMP(3)),
+             DataTypes.FIELD("f14", DataTypes.ARRAY(DataTypes.STRING())),
+             DataTypes.FIELD("f15", DataTypes.ROW(
+                 [DataTypes.FIELD("a", DataTypes.INT()),
+                  DataTypes.FIELD("b", DataTypes.STRING()),
+                  DataTypes.FIELD("c", DataTypes.TIMESTAMP(3)),
+                  DataTypes.FIELD("d", DataTypes.ARRAY(DataTypes.INT()))]))])
+        cls.pdf = cls.create_pandas_data_frame()
+
+    @classmethod
+    def create_pandas_data_frame(cls):
+        data_dict = {}
+        for j, name in enumerate(cls.data_type.names):
+            data_dict[name] = [cls.data[i][j] for i in range(len(cls.data))]
+        # need convert to numpy types

Review comment:
       The integer types will be parsed as `int64` by default and so we need to specify it explicitly. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11832: [FLINK-17148][python] Support converting pandas dataframe to flink table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-616617636


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161065718",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b463662a390b45757a682cee7ea89ed52e1fa6f6 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/161065718) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796) 
   * c2105e0170d56fb95b849c4b830ca7a7b9d48fd3 UNKNOWN
   * 1e4aee0d61292ed92724bcabb97ce307833366da UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11832: [FLINK-17148][python] Support converting pandas DataFrame to Flink Table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-616617636


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161065718",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161369936",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=32",
       "triggerID" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161375441",
       "triggerID" : "f427a2215c9bb6ecb5458e8693887cafffbef491",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a30c4d84d218a3ab91a232c648f0a586df75eaa9",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=404",
       "triggerID" : "a30c4d84d218a3ab91a232c648f0a586df75eaa9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a30c4d84d218a3ab91a232c648f0a586df75eaa9",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/162643058",
       "triggerID" : "a30c4d84d218a3ab91a232c648f0a586df75eaa9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c2105e0170d56fb95b849c4b830ca7a7b9d48fd3 UNKNOWN
   * a30c4d84d218a3ab91a232c648f0a586df75eaa9 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/162643058) Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=404) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] hequn8128 commented on pull request #11832: [FLINK-17148][python] Support converting pandas DataFrame to Flink Table

Posted by GitBox <gi...@apache.org>.
hequn8128 commented on pull request #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-620961520


   BTW. Would be great if you can rebase to the master. The interface in BaseRow has been changed, i.e., `getHeader()` has been replaced with `getRowKind()`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on issue #11832: [FLINK-17148][python] Support converting pandas dataframe to flink table

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-616617636


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b463662a390b45757a682cee7ea89ed52e1fa6f6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11832: [FLINK-17148][python] Support converting pandas dataframe to flink table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-616617636


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161065718",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b463662a390b45757a682cee7ea89ed52e1fa6f6 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/161065718) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796) 
   * c2105e0170d56fb95b849c4b830ca7a7b9d48fd3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11832: [FLINK-17148][python] Support converting pandas dataframe to flink table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-616617636


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161065718",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c2105e0170d56fb95b849c4b830ca7a7b9d48fd3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161369936",
       "triggerID" : "1e4aee0d61292ed92724bcabb97ce307833366da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b463662a390b45757a682cee7ea89ed52e1fa6f6 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/161065718) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796) 
   * c2105e0170d56fb95b849c4b830ca7a7b9d48fd3 UNKNOWN
   * 1e4aee0d61292ed92724bcabb97ce307833366da Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/161369936) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=26) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dianfu commented on a change in pull request #11832: [FLINK-17148][python] Support converting pandas DataFrame to Flink Table

Posted by GitBox <gi...@apache.org>.
dianfu commented on a change in pull request #11832:
URL: https://github.com/apache/flink/pull/11832#discussion_r417037819



##########
File path: flink-python/pyflink/table/tests/test_pandas_conversion.py
##########
@@ -0,0 +1,147 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import datetime
+import decimal
+
+from pyflink.table.types import DataTypes, Row
+from pyflink.testing import source_sink_utils
+from pyflink.testing.test_case_utils import PyFlinkBlinkBatchTableTestCase, \
+    PyFlinkBlinkStreamTableTestCase, PyFlinkStreamTableTestCase
+
+
+class PandasConversionTestBase(object):
+
+    @classmethod
+    def setUpClass(cls):

Review comment:
       The name `setUpClass` is from unittest.TestCase and I guess we can not change it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11832: [FLINK-17148][python] Support converting pandas dataframe to flink table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11832:
URL: https://github.com/apache/flink/pull/11832#issuecomment-616617636


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/161065718",
       "triggerID" : "b463662a390b45757a682cee7ea89ed52e1fa6f6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b463662a390b45757a682cee7ea89ed52e1fa6f6 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/161065718) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7796) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org