You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/13 08:24:45 UTC

[GitHub] [flink-table-store] SteNicholas opened a new pull request, #213: [FLINK-28289] Introduce Spark2 Reader for table store

SteNicholas opened a new pull request, #213:
URL: https://github.com/apache/flink-table-store/pull/213

   Flink Table Store supports Spark3 reader at present. Spark2 reader is introduced to support for Spark 2.X version to read storage of FileStore.
   
   Supports Spark 2+:
   
   - Introduces `SparkTypeUtils`.
   - Introduces `SparkInternalRow`.
   - Introduces `SparkSource`.
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] pan3793 commented on a diff in pull request #213: [FLINK-28289] Introduce Spark2 Reader for table store

Posted by GitBox <gi...@apache.org>.
pan3793 commented on code in PR #213:
URL: https://github.com/apache/flink-table-store/pull/213#discussion_r920786613


##########
flink-table-store-spark2/pom.xml:
##########
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table-store-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.2-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-store-spark2</artifactId>
+    <name>Flink Table Store : Spark2</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+        <spark2.version>2.4.8</spark2.version>
+        <jackson.version>2.13.3</jackson.version>

Review Comment:
   You are using a different version w/ Spark build-in, and does not shade it, will it cause problem?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi merged pull request #213: [FLINK-28289] Introduce Spark2 Reader for table store

Posted by GitBox <gi...@apache.org>.
JingsongLi merged PR #213:
URL: https://github.com/apache/flink-table-store/pull/213


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #213: [FLINK-28289] Introduce Spark2 Reader for table store

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #213:
URL: https://github.com/apache/flink-table-store/pull/213#discussion_r920695866


##########
flink-table-store-spark2/pom.xml:
##########
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table-store-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.2-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-store-spark2</artifactId>
+    <name>Flink Table Store : Spark2</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+        <spark.version>2.4.8</spark.version>

Review Comment:
   minor: `spark2.version`



##########
flink-table-store-spark2/pom.xml:
##########
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table-store-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.2-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-store-spark2</artifactId>
+    <name>Flink Table Store : Spark2</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+        <spark.version>2.4.8</spark.version>
+    </properties>
+
+    <dependencies>
+        <!-- Flink All dependencies -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-shade</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>

Review Comment:
   We don't need to bundle hive catalog. Spark2 has no catalog.



##########
flink-table-store-spark2/pom.xml:
##########
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table-store-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.2-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-store-spark2</artifactId>
+    <name>Flink Table Store : Spark2</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+        <spark.version>2.4.8</spark.version>
+    </properties>
+
+    <dependencies>
+        <!-- Flink All dependencies -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-shade</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-hive-catalog</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.12</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+    <!-- Activate these profiles with -Pspark-x.x to build and test against different Spark versions -->

Review Comment:
   Remove this, this is for multiple spark versions.



##########
flink-table-store-spark2/pom.xml:
##########
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table-store-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.2-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-store-spark2</artifactId>
+    <name>Flink Table Store : Spark2</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+        <spark.version>2.4.8</spark.version>
+    </properties>
+
+    <dependencies>
+        <!-- Flink All dependencies -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-shade</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-hive-catalog</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.12</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+    <!-- Activate these profiles with -Pspark-x.x to build and test against different Spark versions -->
+    <profiles>
+        <profile>
+            <id>spark-2.4</id>
+            <properties>
+                <spark.version>2.4.8</spark.version>
+            </properties>
+        </profile>
+    </profiles>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-sources</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>src/main/${spark.version}</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <includes combine.children="append">
+                                    <include>org.apache.flink:flink-table-store-shade</include>
+                                    <include>org.apache.flink:flink-table-store-hive-catalog</include>

Review Comment:
   ditto



##########
flink-table-store-spark2/pom.xml:
##########
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table-store-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.2-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-store-spark2</artifactId>
+    <name>Flink Table Store : Spark2</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+        <spark.version>2.4.8</spark.version>
+    </properties>
+
+    <dependencies>
+        <!-- Flink All dependencies -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-shade</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-hive-catalog</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.12</artifactId>
+            <version>${spark.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+    <!-- Activate these profiles with -Pspark-x.x to build and test against different Spark versions -->
+    <profiles>
+        <profile>
+            <id>spark-2.4</id>
+            <properties>
+                <spark.version>2.4.8</spark.version>
+            </properties>
+        </profile>
+    </profiles>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>

Review Comment:
   Remove this, this is for multiple spark versions.



##########
flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkInputPartition.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+import org.apache.flink.table.store.utils.TypeUtils;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.UncheckedIOException;
+
+/** A Spark {@link InputPartition} for table store. */
+public class SparkInputPartition implements InputPartition<InternalRow> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FileStoreTable table;
+    private final int[] projectedFields;
+
+    private transient Split split;
+
+    public SparkInputPartition(FileStoreTable table, int[] projectedFields, Split split) {
+        this.table = table;
+        this.projectedFields = projectedFields;
+        this.split = split;
+    }
+
+    @Override
+    public InputPartitionReader<InternalRow> createPartitionReader() {
+        RecordReader<RowData> recordReader;
+        try {
+            TableRead tableRead = table.newRead();

Review Comment:
   Copy filter push down after https://github.com/apache/flink-table-store/pull/212 merged.



##########
pom.xml:
##########
@@ -514,7 +515,7 @@ under the License.
                             <rules>
                                 <bannedDependencies>
                                     <excludes>
-                                        <exclude>com.fasterxml.jackson*:*:(,2.9.0]</exclude>
+                                        <exclude>com.fasterxml.jackson*:*:(,2.5.0]</exclude>

Review Comment:
   Can we exclude `com.fasterxml.jackson` for `spark-sql`?



##########
flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkTypeUtils.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.UserDefinedType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Utils for Spark {@link DataType}. */
+public class SparkTypeUtils {
+
+    private SparkTypeUtils() {}
+
+    public static StructType fromFlinkRowType(RowType type) {
+        return (StructType) fromFlinkType(type);
+    }
+
+    public static DataType fromFlinkType(LogicalType type) {
+        return type.accept(FlinkToSparkTypeVisitor.INSTANCE);
+    }
+
+    public static LogicalType toFlinkType(DataType dataType) {
+        return SparkToFlinkTypeVisitor.visit(dataType);
+    }
+
+    private static class FlinkToSparkTypeVisitor extends LogicalTypeDefaultVisitor<DataType> {
+
+        private static final FlinkToSparkTypeVisitor INSTANCE = new FlinkToSparkTypeVisitor();
+
+        @Override
+        public DataType visit(CharType charType) {
+            return DataTypes.StringType;
+        }
+
+        @Override
+        public DataType visit(VarCharType varCharType) {
+            return DataTypes.StringType;
+        }
+
+        @Override
+        public DataType visit(BooleanType booleanType) {
+            return DataTypes.BooleanType;
+        }
+
+        @Override
+        public DataType visit(BinaryType binaryType) {
+            return DataTypes.BinaryType;
+        }
+
+        @Override
+        public DataType visit(VarBinaryType varBinaryType) {
+            return DataTypes.BinaryType;
+        }
+
+        @Override
+        public DataType visit(DecimalType decimalType) {
+            return DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale());
+        }
+
+        @Override
+        public DataType visit(TinyIntType tinyIntType) {
+            return DataTypes.ByteType;
+        }
+
+        @Override
+        public DataType visit(SmallIntType smallIntType) {
+            return DataTypes.ShortType;
+        }
+
+        @Override
+        public DataType visit(IntType intType) {
+            return DataTypes.IntegerType;
+        }
+
+        @Override
+        public DataType visit(BigIntType bigIntType) {
+            return DataTypes.LongType;
+        }
+
+        @Override
+        public DataType visit(FloatType floatType) {
+            return DataTypes.FloatType;
+        }
+
+        @Override
+        public DataType visit(DoubleType doubleType) {
+            return DataTypes.DoubleType;
+        }
+
+        @Override
+        public DataType visit(DateType dateType) {
+            return DataTypes.DateType;
+        }
+
+        @Override
+        public DataType visit(TimestampType timestampType) {
+            return DataTypes.TimestampType;
+        }
+
+        @Override
+        public DataType visit(LocalZonedTimestampType localZonedTimestampType) {
+            return DataTypes.TimestampType;
+        }
+
+        @Override
+        public DataType visit(ArrayType arrayType) {
+            LogicalType elementType = arrayType.getElementType();
+            return DataTypes.createArrayType(elementType.accept(this), elementType.isNullable());
+        }
+
+        @Override
+        public DataType visit(MultisetType multisetType) {
+            return DataTypes.createMapType(
+                    multisetType.getElementType().accept(this), DataTypes.IntegerType, false);
+        }
+
+        @Override
+        public DataType visit(MapType mapType) {
+            return DataTypes.createMapType(
+                    mapType.getKeyType().accept(this),
+                    mapType.getValueType().accept(this),
+                    mapType.getValueType().isNullable());
+        }
+
+        @Override
+        public DataType visit(RowType rowType) {
+            List<StructField> fields = new ArrayList<>(rowType.getFieldCount());
+            for (RowField field : rowType.getFields()) {
+                StructField structField =
+                        DataTypes.createStructField(
+                                field.getName(),
+                                field.getType().accept(this),
+                                field.getType().isNullable());
+                structField =
+                        field.getDescription().map(structField::withComment).orElse(structField);
+                fields.add(structField);
+            }
+            return DataTypes.createStructType(fields);
+        }
+
+        @Override
+        protected DataType defaultMethod(LogicalType logicalType) {
+            throw new UnsupportedOperationException("Unsupported type: " + logicalType);
+        }
+    }
+
+    private static class SparkToFlinkTypeVisitor {

Review Comment:
   Remove this, no usage.



##########
flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SpecializedGettersReader.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.spark.sql.catalyst.expressions.SpecializedGetters;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.CalendarIntervalType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.NullType;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.sql.types.UserDefinedType;
+
+/** Reader of Spark {@link SpecializedGetters}. */
+public final class SpecializedGettersReader {
+
+    private SpecializedGettersReader() {}
+
+    public static Object read(

Review Comment:
   Can we remove `handleNull` and `handleUserDefinedType`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #213: [FLINK-28289] Introduce Spark2 Reader for table store

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #213:
URL: https://github.com/apache/flink-table-store/pull/213#discussion_r920773837


##########
docs/content/docs/engines/spark2.md:
##########
@@ -0,0 +1,62 @@
+---
+title: "Spark2"
+weight: 4
+type: docs
+aliases:
+- /engines/spark2.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Spark2
+
+Table Store supports reading table store tables through Spark.
+
+## Version
+
+Table Store supports Spark 2.4+. It is highly recommended to use Spark 2.4+ version with many improvements.
+
+## Install
+
+{{< stable >}}
+Download [flink-table-store-spark2-{{< version >}}.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-table-store-spark2/{{< version >}}/flink-table-store-spark2-{{< version >}}.jar).
+{{< /stable >}}
+{{< unstable >}}
+You are using an unreleased version of Table Store, you need to manually [Build Spark Bundled Jar]({{< ref "docs/engines/build" >}}) from the source code.
+{{< /unstable >}}
+
+Use `--jars` in spark-sql:
+```bash
+spark-sql ... --jars flink-table-store-spark2-{{< version >}}.jar
+```
+
+Alternatively, you can copy `flink-table-store-spark2-{{< version >}}.jar` under `spark/jars` in your Spark installation.
+
+## Create Temporary View

Review Comment:
   Query Table?



##########
docs/content/docs/engines/spark2.md:
##########
@@ -0,0 +1,62 @@
+---
+title: "Spark2"
+weight: 4
+type: docs
+aliases:
+- /engines/spark2.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Spark2
+
+Table Store supports reading table store tables through Spark.
+
+## Version
+
+Table Store supports Spark 2.4+. It is highly recommended to use Spark 2.4+ version with many improvements.
+
+## Install
+
+{{< stable >}}
+Download [flink-table-store-spark2-{{< version >}}.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-table-store-spark2/{{< version >}}/flink-table-store-spark2-{{< version >}}.jar).
+{{< /stable >}}
+{{< unstable >}}
+You are using an unreleased version of Table Store, you need to manually [Build Spark Bundled Jar]({{< ref "docs/engines/build" >}}) from the source code.
+{{< /unstable >}}
+
+Use `--jars` in spark-sql:
+```bash
+spark-sql ... --jars flink-table-store-spark2-{{< version >}}.jar
+```
+
+Alternatively, you can copy `flink-table-store-spark2-{{< version >}}.jar` under `spark/jars` in your Spark installation.
+
+## Create Temporary View
+
+Use the `CREATE TEMPORARY VIEW` command to create a Spark mapping table on top of
+an existing Table Store table if you don't want to use Table Store Catalog.
+
+```sql
+CREATE TEMPORARY VIEW myTable
+USING tablestore
+OPTIONS (
+  path "file:/tmp/warehouse/default.db/myTable"
+)

Review Comment:
   Add a `SELECT * FROM myTable` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #213: [FLINK-28289] Introduce Spark2 Reader for table store

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #213:
URL: https://github.com/apache/flink-table-store/pull/213#discussion_r920773520


##########
docs/content/docs/engines/spark2.md:
##########
@@ -0,0 +1,62 @@
+---
+title: "Spark2"
+weight: 4
+type: docs
+aliases:
+- /engines/spark2.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Spark2
+
+Table Store supports reading table store tables through Spark.
+
+## Version
+
+Table Store supports Spark 2.4+. It is highly recommended to use Spark 2.4+ version with many improvements.
+
+## Install
+
+{{< stable >}}
+Download [flink-table-store-spark2-{{< version >}}.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-table-store-spark2/{{< version >}}/flink-table-store-spark-{{< version >}}.jar).
+{{< /stable >}}
+{{< unstable >}}
+You are using an unreleased version of Table Store, you need to manually [Build Spark Bundled Jar]({{< ref "docs/engines/build" >}}) from the source code.
+{{< /unstable >}}
+
+Use `--jars` in spark-sql:
+```bash
+spark-sql ... --jars flink-table-store-spark2-{{< version >}}.jar
+```
+
+Alternatively, you can copy `flink-table-store-spark2-{{< version >}}.jar` under `spark/jars` in your Spark installation.
+
+## Create Temporary View
+
+Use the `CREATE TEMPORARY VIEW` command to create a Spark mapping table on top of
+an existing Table Store table if you don't want to use Table Store Catalog.

Review Comment:
   There is no catalog



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] pan3793 commented on a diff in pull request #213: [FLINK-28289] Introduce Spark2 Reader for table store

Posted by GitBox <gi...@apache.org>.
pan3793 commented on code in PR #213:
URL: https://github.com/apache/flink-table-store/pull/213#discussion_r920786017


##########
flink-table-store-spark2/pom.xml:
##########
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table-store-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.2-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-store-spark2</artifactId>
+    <name>Flink Table Store : Spark2</name>
+
+    <packaging>jar</packaging>
+
+    <properties>
+        <spark2.version>2.4.8</spark2.version>
+        <jackson.version>2.13.3</jackson.version>
+    </properties>
+
+    <dependencies>
+        <!-- Flink All dependencies -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-shade</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.12</artifactId>

Review Comment:
   It's better to make the scala binary version configurable and test w/ Scala 2.11 as well, because only scala 2.11 in Spark 2.x has been adopted widely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #213: [FLINK-28289] Introduce Spark2 Reader for table store

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on code in PR #213:
URL: https://github.com/apache/flink-table-store/pull/213#discussion_r920772039


##########
docs/content/docs/engines/spark2.md:
##########
@@ -0,0 +1,62 @@
+---
+title: "Spark2"
+weight: 4
+type: docs
+aliases:
+- /engines/spark2.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Spark2
+
+Table Store supports reading table store tables through Spark.
+
+## Version
+
+Table Store supports Spark 2.4+. It is highly recommended to use Spark 2.4+ version with many improvements.
+
+## Install
+
+{{< stable >}}
+Download [flink-table-store-spark2-{{< version >}}.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-table-store-spark2/{{< version >}}/flink-table-store-spark-{{< version >}}.jar).

Review Comment:
   flink-table-store-spark2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-table-store] JingsongLi commented on pull request #213: [FLINK-28289] Introduce Spark2 Reader for table store

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on PR #213:
URL: https://github.com/apache/flink-table-store/pull/213#issuecomment-1184084765

   Thanks @pan3793 for the review, if you have more questions, feel free to add more comments and ping me~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org