You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/30 08:54:05 UTC

[flink-table-store] branch master updated: [FLINK-29614] Introduce Spark writer for table store

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new ab766ec4 [FLINK-29614] Introduce Spark writer for table store
ab766ec4 is described below

commit ab766ec41e6c3876b38da147a92a3844e9b2a774
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Wed Nov 30 16:54:01 2022 +0800

    [FLINK-29614] Introduce Spark writer for table store
    
    This closes #394
---
 .../flink/table/store/table/SupportsWrite.java     |  31 ++
 .../store/table/sink/SerializableCommittable.java  |  63 ++++
 .../flink/table/store/spark/SparkCatalog.java      |   5 +-
 .../table/store/spark/SparkFilterConverter.java    |  22 +-
 .../flink/table/store/spark/SparkRowData.java      | 372 +++++++++++++++++++++
 .../flink/table/store/spark/SparkSource.java       |   4 +-
 .../apache/flink/table/store/spark/SparkTable.java |  43 ++-
 .../apache/flink/table/store/spark/SparkWrite.java | 148 ++++++++
 .../flink/table/store/spark/SparkWriteBuilder.java |  48 +++
 .../table/store/spark/SparkInternalRowTest.java    |   3 +
 .../flink/table/store/spark/SparkWriteITCase.java  | 104 ++++++
 11 files changed, 837 insertions(+), 6 deletions(-)

diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
index 362c9fc7..dce6f134 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
@@ -18,9 +18,19 @@
 
 package org.apache.flink.table.store.table;
 
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateFilter;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.table.sink.BucketComputer;
 import org.apache.flink.table.store.table.sink.TableCommit;
 import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.types.RowKind;
+
+import java.util.List;
 
 /** An interface for {@link Table} write support. */
 public interface SupportsWrite extends Table {
@@ -30,4 +40,25 @@ public interface SupportsWrite extends Table {
     TableWrite newWrite(String commitUser);
 
     TableCommit newCommit(String commitUser);
+
+    default void deleteWhere(String commitUser, List<Predicate> filters, Lock.Factory lockFactory) {
+        List<Split> splits = newScan().withFilter(filters).plan().splits();
+        try (RecordReader<RowData> reader = newRead().withFilter(filters).createReader(splits);
+                TableWrite write = newWrite(commitUser);
+                TableCommit commit = newCommit(commitUser).withLock(lockFactory.create())) {
+            RecordReaderIterator<RowData> iterator = new RecordReaderIterator<>(reader);
+            PredicateFilter filter = new PredicateFilter(rowType(), filters);
+            while (iterator.hasNext()) {
+                RowData row = iterator.next();
+                if (filter.test(row)) {
+                    row.setRowKind(RowKind.DELETE);
+                    write.write(row);
+                }
+            }
+
+            commit.commit(0, write.prepareCommit(true, 0));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SerializableCommittable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SerializableCommittable.java
new file mode 100644
index 00000000..116a6343
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SerializableCommittable.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import static org.apache.flink.table.store.file.utils.SerializationUtils.deserializedBytes;
+import static org.apache.flink.table.store.file.utils.SerializationUtils.serializeBytes;
+
+/** A serializable {@link FileCommittable}. */
+public class SerializableCommittable implements Serializable {
+
+    private static final ThreadLocal<FileCommittableSerializer> CACHE =
+            ThreadLocal.withInitial(FileCommittableSerializer::new);
+
+    private transient FileCommittable committable;
+
+    public FileCommittable delegate() {
+        return committable;
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+        FileCommittableSerializer serializer = CACHE.get();
+        out.writeInt(serializer.getVersion());
+        serializeBytes(new DataOutputViewStreamWrapper(out), serializer.serialize(committable));
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        int version = in.readInt();
+        byte[] bytes = deserializedBytes(new DataInputViewStreamWrapper(in));
+        committable = CACHE.get().deserialize(version, bytes);
+    }
+
+    public static SerializableCommittable wrap(FileCommittable committable) {
+        SerializableCommittable ret = new SerializableCommittable();
+        ret.committable = committable;
+        return ret;
+    }
+}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 0c9bef86..05ad2e2e 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.catalog.Catalog;
 import org.apache.flink.table.store.file.catalog.CatalogFactory;
+import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.file.schema.SchemaChange;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.types.logical.RowType;
@@ -201,7 +202,9 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
     @Override
     public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
         try {
-            return new SparkTable(catalog.getTable(objectPath(ident)));
+            ObjectPath path = objectPath(ident);
+            return new SparkTable(
+                    catalog.getTable(path), Lock.factory(catalog.lockFactory().orElse(null), path));
         } catch (Catalog.TableNotExistException e) {
             throw new NoSuchTableException(ident);
         }
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
index 285911bb..1c11cd25 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
@@ -38,6 +38,7 @@ import org.apache.spark.sql.sources.Or;
 import org.apache.spark.sql.sources.StringStartsWith;
 
 import java.util.Arrays;
+import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.file.predicate.PredicateBuilder.convertJavaObject;
@@ -45,6 +46,21 @@ import static org.apache.flink.table.store.file.predicate.PredicateBuilder.conve
 /** Conversion from {@link Filter} to {@link Predicate}. */
 public class SparkFilterConverter {
 
+    public static final List<String> SUPPORT_FILTERS =
+            Arrays.asList(
+                    "EqualTo",
+                    "GreaterThan",
+                    "GreaterThanOrEqual",
+                    "LessThan",
+                    "LessThanOrEqual",
+                    "In",
+                    "IsNull",
+                    "IsNotNull",
+                    "And",
+                    "Or",
+                    "Not",
+                    "StringStartsWith");
+
     private final RowType rowType;
     private final PredicateBuilder builder;
 
@@ -109,14 +125,16 @@ public class SparkFilterConverter {
         }
 
         // TODO: In, NotIn, AlwaysTrue, AlwaysFalse, EqualNullSafe
-        throw new UnsupportedOperationException();
+        throw new UnsupportedOperationException(
+                filter + " is unsupported. Support Filters: " + SUPPORT_FILTERS);
     }
 
     private int fieldIndex(String field) {
         int index = rowType.getFieldIndex(field);
         // TODO: support nested field
         if (index == -1) {
-            throw new UnsupportedOperationException();
+            throw new UnsupportedOperationException(
+                    String.format("Nested field '%s' is unsupported.", field));
         }
         return index;
     }
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkRowData.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkRowData.java
new file mode 100644
index 00000000..e2d3a20e
--- /dev/null
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkRowData.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.store.utils.DateTimeUtils;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.spark.sql.Row;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** A {@link RowData} wraps spark {@link Row}. */
+public class SparkRowData implements RowData {
+
+    private final RowType type;
+    private final Row row;
+
+    public SparkRowData(RowType type, Row row) {
+        this.type = type;
+        this.row = row;
+    }
+
+    @Override
+    public int getArity() {
+        return row.size();
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return RowKind.INSERT;
+    }
+
+    @Override
+    public void setRowKind(RowKind rowKind) {
+        if (rowKind == RowKind.INSERT) {
+            return;
+        }
+
+        throw new UnsupportedOperationException("Can not set row kind for this row except INSERT.");
+    }
+
+    @Override
+    public boolean isNullAt(int i) {
+        return row.isNullAt(i);
+    }
+
+    @Override
+    public boolean getBoolean(int i) {
+        return row.getBoolean(i);
+    }
+
+    @Override
+    public byte getByte(int i) {
+        return row.getByte(i);
+    }
+
+    @Override
+    public short getShort(int i) {
+        return row.getShort(i);
+    }
+
+    @Override
+    public int getInt(int i) {
+        if (type.getTypeAt(i) instanceof DateType) {
+            return toFlinkDate(row.get(i));
+        }
+        return row.getInt(i);
+    }
+
+    @Override
+    public long getLong(int i) {
+        return row.getLong(i);
+    }
+
+    @Override
+    public float getFloat(int i) {
+        return row.getFloat(i);
+    }
+
+    @Override
+    public double getDouble(int i) {
+        return row.getDouble(i);
+    }
+
+    @Override
+    public StringData getString(int i) {
+        return StringData.fromString(row.getString(i));
+    }
+
+    @Override
+    public DecimalData getDecimal(int i, int precision, int scale) {
+        return DecimalData.fromBigDecimal(row.getDecimal(i), precision, scale);
+    }
+
+    @Override
+    public TimestampData getTimestamp(int i, int precision) {
+        return toFlinkTimestamp(row.get(i));
+    }
+
+    @Override
+    public <T> RawValueData<T> getRawValue(int i) {
+        throw new UnsupportedOperationException(
+                "Raw value is not supported in Spark, please use SQL types.");
+    }
+
+    @Override
+    public byte[] getBinary(int i) {
+        return row.getAs(i);
+    }
+
+    @Override
+    public ArrayData getArray(int i) {
+        return new FlinkArrayData(((ArrayType) type.getTypeAt(i)).getElementType(), row.getList(i));
+    }
+
+    @Override
+    public MapData getMap(int i) {
+        return toFlinkMap((MapType) type.getTypeAt(i), row.getJavaMap(i));
+    }
+
+    @Override
+    public RowData getRow(int i, int i1) {
+        return new SparkRowData((RowType) type.getTypeAt(i), row.getStruct(i));
+    }
+
+    private static int toFlinkDate(Object object) {
+        if (object instanceof Date) {
+            return DateTimeUtils.toInternal((Date) object);
+        } else {
+            return DateTimeUtils.toInternal((LocalDate) object);
+        }
+    }
+
+    private static TimestampData toFlinkTimestamp(Object object) {
+        if (object instanceof Timestamp) {
+            return TimestampData.fromTimestamp((Timestamp) object);
+        } else {
+            return TimestampData.fromLocalDateTime((LocalDateTime) object);
+        }
+    }
+
+    private static MapData toFlinkMap(MapType mapType, Map<Object, Object> map) {
+        List<Object> keys = new ArrayList<>();
+        List<Object> values = new ArrayList<>();
+        map.forEach(
+                (k, v) -> {
+                    keys.add(k);
+                    values.add(v);
+                });
+
+        FlinkArrayData key = new FlinkArrayData(mapType.getKeyType(), keys);
+        FlinkArrayData value = new FlinkArrayData(mapType.getValueType(), values);
+        return new MapData() {
+            @Override
+            public int size() {
+                return map.size();
+            }
+
+            @Override
+            public ArrayData keyArray() {
+                return key;
+            }
+
+            @Override
+            public ArrayData valueArray() {
+                return value;
+            }
+        };
+    }
+
+    private static class FlinkArrayData implements ArrayData {
+
+        private final LogicalType elementType;
+        private final List<Object> list;
+
+        private FlinkArrayData(LogicalType elementType, List<Object> list) {
+            this.list = list;
+            this.elementType = elementType;
+        }
+
+        @Override
+        public int size() {
+            return list.size();
+        }
+
+        @Override
+        public boolean isNullAt(int i) {
+            return list.get(i) == null;
+        }
+
+        @SuppressWarnings("unchecked")
+        private <T> T getAs(int i) {
+            return (T) list.get(i);
+        }
+
+        @Override
+        public boolean getBoolean(int i) {
+            return getAs(i);
+        }
+
+        @Override
+        public byte getByte(int i) {
+            return getAs(i);
+        }
+
+        @Override
+        public short getShort(int i) {
+            return getAs(i);
+        }
+
+        @Override
+        public int getInt(int i) {
+            if (elementType instanceof DateType) {
+                return toFlinkDate(getAs(i));
+            }
+            return getAs(i);
+        }
+
+        @Override
+        public long getLong(int i) {
+            return getAs(i);
+        }
+
+        @Override
+        public float getFloat(int i) {
+            return getAs(i);
+        }
+
+        @Override
+        public double getDouble(int i) {
+            return getAs(i);
+        }
+
+        @Override
+        public StringData getString(int i) {
+            return StringData.fromString(getAs(i));
+        }
+
+        @Override
+        public DecimalData getDecimal(int i, int precision, int scale) {
+            return DecimalData.fromBigDecimal(getAs(i), precision, scale);
+        }
+
+        @Override
+        public TimestampData getTimestamp(int i, int precision) {
+            return toFlinkTimestamp(getAs(i));
+        }
+
+        @Override
+        public <T> RawValueData<T> getRawValue(int i) {
+            throw new UnsupportedOperationException(
+                    "Raw value is not supported in Spark, please use SQL types.");
+        }
+
+        @Override
+        public byte[] getBinary(int i) {
+            return getAs(i);
+        }
+
+        @Override
+        public ArrayData getArray(int i) {
+            return new FlinkArrayData(((ArrayType) elementType).getElementType(), getAs(i));
+        }
+
+        @Override
+        public MapData getMap(int i) {
+            return toFlinkMap((MapType) elementType, getAs(i));
+        }
+
+        @Override
+        public RowData getRow(int i, int i1) {
+            return new SparkRowData((RowType) elementType, getAs(i));
+        }
+
+        @Override
+        public boolean[] toBooleanArray() {
+            boolean[] res = new boolean[size()];
+            for (int i = 0; i < size(); i++) {
+                res[i] = getBoolean(i);
+            }
+            return res;
+        }
+
+        @Override
+        public byte[] toByteArray() {
+            byte[] res = new byte[size()];
+            for (int i = 0; i < size(); i++) {
+                res[i] = getByte(i);
+            }
+            return res;
+        }
+
+        @Override
+        public short[] toShortArray() {
+            short[] res = new short[size()];
+            for (int i = 0; i < size(); i++) {
+                res[i] = getShort(i);
+            }
+            return res;
+        }
+
+        @Override
+        public int[] toIntArray() {
+            int[] res = new int[size()];
+            for (int i = 0; i < size(); i++) {
+                res[i] = getInt(i);
+            }
+            return res;
+        }
+
+        @Override
+        public long[] toLongArray() {
+            long[] res = new long[size()];
+            for (int i = 0; i < size(); i++) {
+                res[i] = getLong(i);
+            }
+            return res;
+        }
+
+        @Override
+        public float[] toFloatArray() {
+            float[] res = new float[size()];
+            for (int i = 0; i < size(); i++) {
+                res[i] = getFloat(i);
+            }
+            return res;
+        }
+
+        @Override
+        public double[] toDoubleArray() {
+            double[] res = new double[size()];
+            for (int i = 0; i < size(); i++) {
+                res[i] = getDouble(i);
+            }
+            return res;
+        }
+    }
+}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
index 8f60fde8..8be455fb 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.spark;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
 
 import org.apache.spark.sql.connector.catalog.SessionConfigSupport;
@@ -69,7 +70,8 @@ public class SparkSource implements DataSourceRegister, SessionConfigSupport {
                 Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
         FileSystem.initialize(
                 configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
-        return new SparkTable(FileStoreTableFactory.create(Configuration.fromMap(options)));
+        return new SparkTable(
+                FileStoreTableFactory.create(Configuration.fromMap(options)), Lock.emptyFactory());
     }
 
     @Override
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
index aaebaeff..f5e0dc60 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
@@ -18,28 +18,44 @@
 
 package org.apache.flink.table.store.spark;
 
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.table.SupportsPartition;
 import org.apache.flink.table.store.table.Table;
 
+import org.apache.spark.sql.connector.catalog.SupportsDelete;
 import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.SupportsWrite;
 import org.apache.spark.sql.connector.catalog.TableCapability;
 import org.apache.spark.sql.connector.expressions.FieldReference;
 import org.apache.spark.sql.connector.expressions.IdentityTransform;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 
 /** A spark {@link org.apache.spark.sql.connector.catalog.Table} for table store. */
-public class SparkTable implements org.apache.spark.sql.connector.catalog.Table, SupportsRead {
+public class SparkTable
+        implements org.apache.spark.sql.connector.catalog.Table,
+                SupportsRead,
+                SupportsWrite,
+                SupportsDelete {
 
     private final Table table;
+    private final Lock.Factory lockFactory;
 
-    public SparkTable(Table table) {
+    public SparkTable(Table table, Lock.Factory lockFactory) {
         this.table = table;
+        this.lockFactory = lockFactory;
     }
 
     @Override
@@ -62,6 +78,7 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
     public Set<TableCapability> capabilities() {
         Set<TableCapability> capabilities = new HashSet<>();
         capabilities.add(TableCapability.BATCH_READ);
+        capabilities.add(TableCapability.V1_BATCH_WRITE);
         return capabilities;
     }
 
@@ -76,4 +93,26 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
         }
         return new Transform[0];
     }
+
+    @Override
+    public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
+        return new SparkWriteBuilder(table, info.queryId(), lockFactory);
+    }
+
+    @Override
+    public void deleteWhere(Filter[] filters) {
+        SparkFilterConverter converter = new SparkFilterConverter(table.rowType());
+        List<Predicate> predicates = new ArrayList<>();
+        for (Filter filter : filters) {
+            if ("AlwaysTrue()".equals(filter.toString())) {
+                continue;
+            }
+
+            predicates.add(converter.convert(filter));
+        }
+
+        String commitUser = UUID.randomUUID().toString();
+        ((org.apache.flink.table.store.table.SupportsWrite) table)
+                .deleteWhere(commitUser, predicates, lockFactory);
+    }
 }
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
new file mode 100644
index 00000000..0e85c46a
--- /dev/null
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.table.SupportsWrite;
+import org.apache.flink.table.store.table.Table;
+import org.apache.flink.table.store.table.sink.BucketComputer;
+import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.table.store.table.sink.SerializableCommittable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.connector.write.V1Write;
+import org.apache.spark.sql.sources.InsertableRelation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Spark {@link V1Write}, it is required to use v1 write for grouping by bucket. */
+public class SparkWrite implements V1Write {
+
+    private final Table table;
+    private final String queryId;
+    private final Lock.Factory lockFactory;
+
+    public SparkWrite(Table table, String queryId, Lock.Factory lockFactory) {
+        if (!(table instanceof SupportsWrite)) {
+            throw new UnsupportedOperationException("Unsupported table: " + table.getClass());
+        }
+        this.table = table;
+        this.queryId = queryId;
+        this.lockFactory = lockFactory;
+    }
+
+    @Override
+    public InsertableRelation toInsertableRelation() {
+        return (data, overwrite) -> {
+            if (overwrite) {
+                throw new UnsupportedOperationException("Overwrite is unsupported.");
+            }
+
+            long identifier = 0;
+            List<SerializableCommittable> committables =
+                    data.toJavaRDD()
+                            .groupBy(new ComputeBucket(table))
+                            .mapValues(new WriteRecords(table, queryId, identifier))
+                            .values()
+                            .reduce(new ListConcat<>());
+            try (TableCommit tableCommit =
+                    ((SupportsWrite) table).newCommit(queryId).withLock(lockFactory.create())) {
+                tableCommit.commit(
+                        identifier,
+                        committables.stream()
+                                .map(SerializableCommittable::delegate)
+                                .collect(Collectors.toList()));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        };
+    }
+
+    private static class ComputeBucket implements Function<Row, Integer> {
+
+        private final Table table;
+        private final RowType type;
+
+        private transient BucketComputer lazyComputer;
+
+        private ComputeBucket(Table table) {
+            this.table = table;
+            this.type = table.rowType();
+        }
+
+        private BucketComputer computer() {
+            if (lazyComputer == null) {
+                lazyComputer = ((SupportsWrite) table).bucketComputer();
+            }
+            return lazyComputer;
+        }
+
+        @Override
+        public Integer call(Row row) {
+            return computer().bucket(new SparkRowData(type, row));
+        }
+    }
+
+    private static class WriteRecords
+            implements Function<Iterable<Row>, List<SerializableCommittable>> {
+
+        private final Table table;
+        private final RowType type;
+        private final String queryId;
+        private final long commitIdentifier;
+
+        private WriteRecords(Table table, String queryId, long commitIdentifier) {
+            this.table = table;
+            this.type = table.rowType();
+            this.queryId = queryId;
+            this.commitIdentifier = commitIdentifier;
+        }
+
+        @Override
+        public List<SerializableCommittable> call(Iterable<Row> iterables) throws Exception {
+            try (TableWrite write = ((SupportsWrite) table).newWrite(queryId)) {
+                for (Row row : iterables) {
+                    write.write(new SparkRowData(type, row));
+                }
+                List<FileCommittable> committables = write.prepareCommit(true, commitIdentifier);
+                return committables.stream()
+                        .map(SerializableCommittable::wrap)
+                        .collect(Collectors.toList());
+            }
+        }
+    }
+
+    private static class ListConcat<T> implements Function2<List<T>, List<T>, List<T>> {
+
+        @Override
+        public List<T> call(List<T> v1, List<T> v2) {
+            List<T> ret = new ArrayList<>();
+            ret.addAll(v1);
+            ret.addAll(v2);
+            return ret;
+        }
+    }
+}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
new file mode 100644
index 00000000..4f761692
--- /dev/null
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.table.Table;
+
+import org.apache.spark.sql.connector.write.Write;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+
+/**
+ * Spark {@link WriteBuilder}.
+ *
+ * <p>TODO: Support overwrite.
+ */
+public class SparkWriteBuilder implements WriteBuilder {
+
+    private final Table table;
+    private final String queryId;
+    private final Lock.Factory lockFactory;
+
+    public SparkWriteBuilder(Table table, String queryId, Lock.Factory lockFactory) {
+        this.table = table;
+        this.queryId = queryId;
+        this.lockFactory = lockFactory;
+    }
+
+    @Override
+    public Write build() {
+        return new SparkWrite(table, queryId, lockFactory);
+    }
+}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
index ba8b5ddb..ded563b9 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
@@ -99,5 +99,8 @@ public class SparkInternalRowTest {
                         + "\"decimal3\":62123123.5"
                         + "}";
         assertThat(sparkRow.json()).isEqualTo(expected);
+
+        SparkRowData sparkRowData = new SparkRowData(ALL_TYPES, sparkRow);
+        assertThat(flinkConverter.toExternal(sparkRowData)).isEqualTo(row);
     }
 }
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java
new file mode 100644
index 00000000..073edea8
--- /dev/null
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.core.fs.Path;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for spark reader. */
+public class SparkWriteITCase {
+
+    private static SparkSession spark = null;
+
+    @BeforeAll
+    public static void startMetastoreAndSpark() throws Exception {
+        File warehouse = File.createTempFile("warehouse", null);
+        assertThat(warehouse.delete()).isTrue();
+        Path warehousePath = new Path("file:" + warehouse);
+        spark = SparkSession.builder().master("local[2]").getOrCreate();
+        spark.conf().set("spark.sql.catalog.tablestore", SparkCatalog.class.getName());
+        spark.conf().set("spark.sql.catalog.tablestore.warehouse", warehousePath.toString());
+        spark.sql("CREATE DATABASE tablestore.db");
+        spark.sql("USE tablestore.db");
+    }
+
+    @AfterEach
+    public void afterEach() {
+        spark.sql("DROP TABLE T");
+    }
+
+    @Test
+    public void testWrite() {
+        spark.sql(
+                "CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES"
+                        + " ('primary-key'='a', 'bucket'='4', 'file.format'='avro')");
+        innerSimpleWrite();
+    }
+
+    @Test
+    public void testWritePartitionTable() {
+        spark.sql(
+                "CREATE TABLE T (a INT, b INT, c STRING) PARTITIONED BY (a) TBLPROPERTIES"
+                        + " ('primary-key'='a,b', 'bucket'='4', 'file.format'='avro')");
+        innerSimpleWrite();
+    }
+
+    private void innerSimpleWrite() {
+        spark.sql("INSERT INTO T VALUES (1, 2, '3')").collectAsList();
+        List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).isEqualTo("[[1,2,3]]");
+
+        spark.sql("INSERT INTO T VALUES (4, 5, '6')").collectAsList();
+        spark.sql("INSERT INTO T VALUES (1, 2, '7')").collectAsList();
+        spark.sql("INSERT INTO T VALUES (4, 5, '8')").collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        rows.sort(Comparator.comparingInt(o -> o.getInt(0)));
+        assertThat(rows.toString()).isEqualTo("[[1,2,7], [4,5,8]]");
+
+        spark.sql("DELETE FROM T WHERE a=1").collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).isEqualTo("[[4,5,8]]");
+
+        spark.sql("DELETE FROM T").collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).isEqualTo("[]");
+    }
+
+    @Test
+    public void testDeleteWhereNonePk() {
+        spark.sql(
+                "CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES"
+                        + " ('primary-key'='a', 'file.format'='avro')");
+        spark.sql("INSERT INTO T VALUES (1, 11, '111'), (2, 22, '222')").collectAsList();
+        spark.sql("DELETE FROM T WHERE b=11").collectAsList();
+        List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).isEqualTo("[[2,22,222]]");
+    }
+}