You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/11/30 08:54:05 UTC
[flink-table-store] branch master updated: [FLINK-29614] Introduce Spark writer for table store
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new ab766ec4 [FLINK-29614] Introduce Spark writer for table store
ab766ec4 is described below
commit ab766ec41e6c3876b38da147a92a3844e9b2a774
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Wed Nov 30 16:54:01 2022 +0800
[FLINK-29614] Introduce Spark writer for table store
This closes #394
---
.../flink/table/store/table/SupportsWrite.java | 31 ++
.../store/table/sink/SerializableCommittable.java | 63 ++++
.../flink/table/store/spark/SparkCatalog.java | 5 +-
.../table/store/spark/SparkFilterConverter.java | 22 +-
.../flink/table/store/spark/SparkRowData.java | 372 +++++++++++++++++++++
.../flink/table/store/spark/SparkSource.java | 4 +-
.../apache/flink/table/store/spark/SparkTable.java | 43 ++-
.../apache/flink/table/store/spark/SparkWrite.java | 148 ++++++++
.../flink/table/store/spark/SparkWriteBuilder.java | 48 +++
.../table/store/spark/SparkInternalRowTest.java | 3 +
.../flink/table/store/spark/SparkWriteITCase.java | 104 ++++++
11 files changed, 837 insertions(+), 6 deletions(-)
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
index 362c9fc7..dce6f134 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/SupportsWrite.java
@@ -18,9 +18,19 @@
package org.apache.flink.table.store.table;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.predicate.PredicateFilter;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.table.sink.BucketComputer;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.types.RowKind;
+
+import java.util.List;
/** An interface for {@link Table} write support. */
public interface SupportsWrite extends Table {
@@ -30,4 +40,25 @@ public interface SupportsWrite extends Table {
TableWrite newWrite(String commitUser);
TableCommit newCommit(String commitUser);
+
+ default void deleteWhere(String commitUser, List<Predicate> filters, Lock.Factory lockFactory) {
+ List<Split> splits = newScan().withFilter(filters).plan().splits();
+ try (RecordReader<RowData> reader = newRead().withFilter(filters).createReader(splits);
+ TableWrite write = newWrite(commitUser);
+ TableCommit commit = newCommit(commitUser).withLock(lockFactory.create())) {
+ RecordReaderIterator<RowData> iterator = new RecordReaderIterator<>(reader);
+ PredicateFilter filter = new PredicateFilter(rowType(), filters);
+ while (iterator.hasNext()) {
+ RowData row = iterator.next();
+ if (filter.test(row)) {
+ row.setRowKind(RowKind.DELETE);
+ write.write(row);
+ }
+ }
+
+ commit.commit(0, write.prepareCommit(true, 0));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SerializableCommittable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SerializableCommittable.java
new file mode 100644
index 00000000..116a6343
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SerializableCommittable.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import static org.apache.flink.table.store.file.utils.SerializationUtils.deserializedBytes;
+import static org.apache.flink.table.store.file.utils.SerializationUtils.serializeBytes;
+
+/** A serializable {@link FileCommittable}. */
+public class SerializableCommittable implements Serializable {
+
+ private static final ThreadLocal<FileCommittableSerializer> CACHE =
+ ThreadLocal.withInitial(FileCommittableSerializer::new);
+
+ private transient FileCommittable committable;
+
+ public FileCommittable delegate() {
+ return committable;
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ FileCommittableSerializer serializer = CACHE.get();
+ out.writeInt(serializer.getVersion());
+ serializeBytes(new DataOutputViewStreamWrapper(out), serializer.serialize(committable));
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ int version = in.readInt();
+ byte[] bytes = deserializedBytes(new DataInputViewStreamWrapper(in));
+ committable = CACHE.get().deserialize(version, bytes);
+ }
+
+ public static SerializableCommittable wrap(FileCommittable committable) {
+ SerializableCommittable ret = new SerializableCommittable();
+ ret.committable = committable;
+ return ret;
+ }
+}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 0c9bef86..05ad2e2e 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.catalog.Catalog;
import org.apache.flink.table.store.file.catalog.CatalogFactory;
+import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.file.schema.SchemaChange;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.types.logical.RowType;
@@ -201,7 +202,9 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces {
@Override
public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
try {
- return new SparkTable(catalog.getTable(objectPath(ident)));
+ ObjectPath path = objectPath(ident);
+ return new SparkTable(
+ catalog.getTable(path), Lock.factory(catalog.lockFactory().orElse(null), path));
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(ident);
}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
index 285911bb..1c11cd25 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkFilterConverter.java
@@ -38,6 +38,7 @@ import org.apache.spark.sql.sources.Or;
import org.apache.spark.sql.sources.StringStartsWith;
import java.util.Arrays;
+import java.util.List;
import java.util.stream.Collectors;
import static org.apache.flink.table.store.file.predicate.PredicateBuilder.convertJavaObject;
@@ -45,6 +46,21 @@ import static org.apache.flink.table.store.file.predicate.PredicateBuilder.conve
/** Conversion from {@link Filter} to {@link Predicate}. */
public class SparkFilterConverter {
+ public static final List<String> SUPPORT_FILTERS =
+ Arrays.asList(
+ "EqualTo",
+ "GreaterThan",
+ "GreaterThanOrEqual",
+ "LessThan",
+ "LessThanOrEqual",
+ "In",
+ "IsNull",
+ "IsNotNull",
+ "And",
+ "Or",
+ "Not",
+ "StringStartsWith");
+
private final RowType rowType;
private final PredicateBuilder builder;
@@ -109,14 +125,16 @@ public class SparkFilterConverter {
}
// TODO: In, NotIn, AlwaysTrue, AlwaysFalse, EqualNullSafe
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException(
+ filter + " is unsupported. Support Filters: " + SUPPORT_FILTERS);
}
private int fieldIndex(String field) {
int index = rowType.getFieldIndex(field);
// TODO: support nested field
if (index == -1) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException(
+ String.format("Nested field '%s' is unsupported.", field));
}
return index;
}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkRowData.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkRowData.java
new file mode 100644
index 00000000..e2d3a20e
--- /dev/null
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkRowData.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.store.utils.DateTimeUtils;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.spark.sql.Row;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** A {@link RowData} wraps spark {@link Row}. */
+public class SparkRowData implements RowData {
+
+ private final RowType type;
+ private final Row row;
+
+ public SparkRowData(RowType type, Row row) {
+ this.type = type;
+ this.row = row;
+ }
+
+ @Override
+ public int getArity() {
+ return row.size();
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ return RowKind.INSERT;
+ }
+
+ @Override
+ public void setRowKind(RowKind rowKind) {
+ if (rowKind == RowKind.INSERT) {
+ return;
+ }
+
+ throw new UnsupportedOperationException("Can not set row kind for this row except INSERT.");
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return row.isNullAt(i);
+ }
+
+ @Override
+ public boolean getBoolean(int i) {
+ return row.getBoolean(i);
+ }
+
+ @Override
+ public byte getByte(int i) {
+ return row.getByte(i);
+ }
+
+ @Override
+ public short getShort(int i) {
+ return row.getShort(i);
+ }
+
+ @Override
+ public int getInt(int i) {
+ if (type.getTypeAt(i) instanceof DateType) {
+ return toFlinkDate(row.get(i));
+ }
+ return row.getInt(i);
+ }
+
+ @Override
+ public long getLong(int i) {
+ return row.getLong(i);
+ }
+
+ @Override
+ public float getFloat(int i) {
+ return row.getFloat(i);
+ }
+
+ @Override
+ public double getDouble(int i) {
+ return row.getDouble(i);
+ }
+
+ @Override
+ public StringData getString(int i) {
+ return StringData.fromString(row.getString(i));
+ }
+
+ @Override
+ public DecimalData getDecimal(int i, int precision, int scale) {
+ return DecimalData.fromBigDecimal(row.getDecimal(i), precision, scale);
+ }
+
+ @Override
+ public TimestampData getTimestamp(int i, int precision) {
+ return toFlinkTimestamp(row.get(i));
+ }
+
+ @Override
+ public <T> RawValueData<T> getRawValue(int i) {
+ throw new UnsupportedOperationException(
+ "Raw value is not supported in Spark, please use SQL types.");
+ }
+
+ @Override
+ public byte[] getBinary(int i) {
+ return row.getAs(i);
+ }
+
+ @Override
+ public ArrayData getArray(int i) {
+ return new FlinkArrayData(((ArrayType) type.getTypeAt(i)).getElementType(), row.getList(i));
+ }
+
+ @Override
+ public MapData getMap(int i) {
+ return toFlinkMap((MapType) type.getTypeAt(i), row.getJavaMap(i));
+ }
+
+ @Override
+ public RowData getRow(int i, int i1) {
+ return new SparkRowData((RowType) type.getTypeAt(i), row.getStruct(i));
+ }
+
+ private static int toFlinkDate(Object object) {
+ if (object instanceof Date) {
+ return DateTimeUtils.toInternal((Date) object);
+ } else {
+ return DateTimeUtils.toInternal((LocalDate) object);
+ }
+ }
+
+ private static TimestampData toFlinkTimestamp(Object object) {
+ if (object instanceof Timestamp) {
+ return TimestampData.fromTimestamp((Timestamp) object);
+ } else {
+ return TimestampData.fromLocalDateTime((LocalDateTime) object);
+ }
+ }
+
+ private static MapData toFlinkMap(MapType mapType, Map<Object, Object> map) {
+ List<Object> keys = new ArrayList<>();
+ List<Object> values = new ArrayList<>();
+ map.forEach(
+ (k, v) -> {
+ keys.add(k);
+ values.add(v);
+ });
+
+ FlinkArrayData key = new FlinkArrayData(mapType.getKeyType(), keys);
+ FlinkArrayData value = new FlinkArrayData(mapType.getValueType(), values);
+ return new MapData() {
+ @Override
+ public int size() {
+ return map.size();
+ }
+
+ @Override
+ public ArrayData keyArray() {
+ return key;
+ }
+
+ @Override
+ public ArrayData valueArray() {
+ return value;
+ }
+ };
+ }
+
+ private static class FlinkArrayData implements ArrayData {
+
+ private final LogicalType elementType;
+ private final List<Object> list;
+
+ private FlinkArrayData(LogicalType elementType, List<Object> list) {
+ this.list = list;
+ this.elementType = elementType;
+ }
+
+ @Override
+ public int size() {
+ return list.size();
+ }
+
+ @Override
+ public boolean isNullAt(int i) {
+ return list.get(i) == null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> T getAs(int i) {
+ return (T) list.get(i);
+ }
+
+ @Override
+ public boolean getBoolean(int i) {
+ return getAs(i);
+ }
+
+ @Override
+ public byte getByte(int i) {
+ return getAs(i);
+ }
+
+ @Override
+ public short getShort(int i) {
+ return getAs(i);
+ }
+
+ @Override
+ public int getInt(int i) {
+ if (elementType instanceof DateType) {
+ return toFlinkDate(getAs(i));
+ }
+ return getAs(i);
+ }
+
+ @Override
+ public long getLong(int i) {
+ return getAs(i);
+ }
+
+ @Override
+ public float getFloat(int i) {
+ return getAs(i);
+ }
+
+ @Override
+ public double getDouble(int i) {
+ return getAs(i);
+ }
+
+ @Override
+ public StringData getString(int i) {
+ return StringData.fromString(getAs(i));
+ }
+
+ @Override
+ public DecimalData getDecimal(int i, int precision, int scale) {
+ return DecimalData.fromBigDecimal(getAs(i), precision, scale);
+ }
+
+ @Override
+ public TimestampData getTimestamp(int i, int precision) {
+ return toFlinkTimestamp(getAs(i));
+ }
+
+ @Override
+ public <T> RawValueData<T> getRawValue(int i) {
+ throw new UnsupportedOperationException(
+ "Raw value is not supported in Spark, please use SQL types.");
+ }
+
+ @Override
+ public byte[] getBinary(int i) {
+ return getAs(i);
+ }
+
+ @Override
+ public ArrayData getArray(int i) {
+ return new FlinkArrayData(((ArrayType) elementType).getElementType(), getAs(i));
+ }
+
+ @Override
+ public MapData getMap(int i) {
+ return toFlinkMap((MapType) elementType, getAs(i));
+ }
+
+ @Override
+ public RowData getRow(int i, int i1) {
+ return new SparkRowData((RowType) elementType, getAs(i));
+ }
+
+ @Override
+ public boolean[] toBooleanArray() {
+ boolean[] res = new boolean[size()];
+ for (int i = 0; i < size(); i++) {
+ res[i] = getBoolean(i);
+ }
+ return res;
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ byte[] res = new byte[size()];
+ for (int i = 0; i < size(); i++) {
+ res[i] = getByte(i);
+ }
+ return res;
+ }
+
+ @Override
+ public short[] toShortArray() {
+ short[] res = new short[size()];
+ for (int i = 0; i < size(); i++) {
+ res[i] = getShort(i);
+ }
+ return res;
+ }
+
+ @Override
+ public int[] toIntArray() {
+ int[] res = new int[size()];
+ for (int i = 0; i < size(); i++) {
+ res[i] = getInt(i);
+ }
+ return res;
+ }
+
+ @Override
+ public long[] toLongArray() {
+ long[] res = new long[size()];
+ for (int i = 0; i < size(); i++) {
+ res[i] = getLong(i);
+ }
+ return res;
+ }
+
+ @Override
+ public float[] toFloatArray() {
+ float[] res = new float[size()];
+ for (int i = 0; i < size(); i++) {
+ res[i] = getFloat(i);
+ }
+ return res;
+ }
+
+ @Override
+ public double[] toDoubleArray() {
+ double[] res = new double[size()];
+ for (int i = 0; i < size(); i++) {
+ res[i] = getDouble(i);
+ }
+ return res;
+ }
+ }
+}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
index 8f60fde8..8be455fb 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.spark;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.spark.sql.connector.catalog.SessionConfigSupport;
@@ -69,7 +70,8 @@ public class SparkSource implements DataSourceRegister, SessionConfigSupport {
Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
FileSystem.initialize(
configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
- return new SparkTable(FileStoreTableFactory.create(Configuration.fromMap(options)));
+ return new SparkTable(
+ FileStoreTableFactory.create(Configuration.fromMap(options)), Lock.emptyFactory());
}
@Override
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
index aaebaeff..f5e0dc60 100644
--- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkTable.java
@@ -18,28 +18,44 @@
package org.apache.flink.table.store.spark;
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.table.SupportsPartition;
import org.apache.flink.table.store.table.Table;
+import org.apache.spark.sql.connector.catalog.SupportsDelete;
import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.expressions.FieldReference;
import org.apache.spark.sql.connector.expressions.IdentityTransform;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
+import java.util.UUID;
/** A spark {@link org.apache.spark.sql.connector.catalog.Table} for table store. */
-public class SparkTable implements org.apache.spark.sql.connector.catalog.Table, SupportsRead {
+public class SparkTable
+ implements org.apache.spark.sql.connector.catalog.Table,
+ SupportsRead,
+ SupportsWrite,
+ SupportsDelete {
private final Table table;
+ private final Lock.Factory lockFactory;
- public SparkTable(Table table) {
+ public SparkTable(Table table, Lock.Factory lockFactory) {
this.table = table;
+ this.lockFactory = lockFactory;
}
@Override
@@ -62,6 +78,7 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
public Set<TableCapability> capabilities() {
Set<TableCapability> capabilities = new HashSet<>();
capabilities.add(TableCapability.BATCH_READ);
+ capabilities.add(TableCapability.V1_BATCH_WRITE);
return capabilities;
}
@@ -76,4 +93,26 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
}
return new Transform[0];
}
+
+ @Override
+ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
+ return new SparkWriteBuilder(table, info.queryId(), lockFactory);
+ }
+
+ @Override
+ public void deleteWhere(Filter[] filters) {
+ SparkFilterConverter converter = new SparkFilterConverter(table.rowType());
+ List<Predicate> predicates = new ArrayList<>();
+ for (Filter filter : filters) {
+ if ("AlwaysTrue()".equals(filter.toString())) {
+ continue;
+ }
+
+ predicates.add(converter.convert(filter));
+ }
+
+ String commitUser = UUID.randomUUID().toString();
+ ((org.apache.flink.table.store.table.SupportsWrite) table)
+ .deleteWhere(commitUser, predicates, lockFactory);
+ }
}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
new file mode 100644
index 00000000..0e85c46a
--- /dev/null
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWrite.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.table.SupportsWrite;
+import org.apache.flink.table.store.table.Table;
+import org.apache.flink.table.store.table.sink.BucketComputer;
+import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.table.store.table.sink.SerializableCommittable;
+import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.connector.write.V1Write;
+import org.apache.spark.sql.sources.InsertableRelation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Spark {@link V1Write}, it is required to use v1 write for grouping by bucket. */
+public class SparkWrite implements V1Write {
+
+ private final Table table;
+ private final String queryId;
+ private final Lock.Factory lockFactory;
+
+ public SparkWrite(Table table, String queryId, Lock.Factory lockFactory) {
+ if (!(table instanceof SupportsWrite)) {
+ throw new UnsupportedOperationException("Unsupported table: " + table.getClass());
+ }
+ this.table = table;
+ this.queryId = queryId;
+ this.lockFactory = lockFactory;
+ }
+
+ @Override
+ public InsertableRelation toInsertableRelation() {
+ return (data, overwrite) -> {
+ if (overwrite) {
+ throw new UnsupportedOperationException("Overwrite is unsupported.");
+ }
+
+ long identifier = 0;
+ List<SerializableCommittable> committables =
+ data.toJavaRDD()
+ .groupBy(new ComputeBucket(table))
+ .mapValues(new WriteRecords(table, queryId, identifier))
+ .values()
+ .reduce(new ListConcat<>());
+ try (TableCommit tableCommit =
+ ((SupportsWrite) table).newCommit(queryId).withLock(lockFactory.create())) {
+ tableCommit.commit(
+ identifier,
+ committables.stream()
+ .map(SerializableCommittable::delegate)
+ .collect(Collectors.toList()));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
+
+ private static class ComputeBucket implements Function<Row, Integer> {
+
+ private final Table table;
+ private final RowType type;
+
+ private transient BucketComputer lazyComputer;
+
+ private ComputeBucket(Table table) {
+ this.table = table;
+ this.type = table.rowType();
+ }
+
+ private BucketComputer computer() {
+ if (lazyComputer == null) {
+ lazyComputer = ((SupportsWrite) table).bucketComputer();
+ }
+ return lazyComputer;
+ }
+
+ @Override
+ public Integer call(Row row) {
+ return computer().bucket(new SparkRowData(type, row));
+ }
+ }
+
+ private static class WriteRecords
+ implements Function<Iterable<Row>, List<SerializableCommittable>> {
+
+ private final Table table;
+ private final RowType type;
+ private final String queryId;
+ private final long commitIdentifier;
+
+ private WriteRecords(Table table, String queryId, long commitIdentifier) {
+ this.table = table;
+ this.type = table.rowType();
+ this.queryId = queryId;
+ this.commitIdentifier = commitIdentifier;
+ }
+
+ @Override
+ public List<SerializableCommittable> call(Iterable<Row> iterables) throws Exception {
+ try (TableWrite write = ((SupportsWrite) table).newWrite(queryId)) {
+ for (Row row : iterables) {
+ write.write(new SparkRowData(type, row));
+ }
+ List<FileCommittable> committables = write.prepareCommit(true, commitIdentifier);
+ return committables.stream()
+ .map(SerializableCommittable::wrap)
+ .collect(Collectors.toList());
+ }
+ }
+ }
+
+ private static class ListConcat<T> implements Function2<List<T>, List<T>, List<T>> {
+
+ @Override
+ public List<T> call(List<T> v1, List<T> v2) {
+ List<T> ret = new ArrayList<>();
+ ret.addAll(v1);
+ ret.addAll(v2);
+ return ret;
+ }
+ }
+}
diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
new file mode 100644
index 00000000..4f761692
--- /dev/null
+++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkWriteBuilder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.table.Table;
+
+import org.apache.spark.sql.connector.write.Write;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+
+/**
+ * Spark {@link WriteBuilder}.
+ *
+ * <p>TODO: Support overwrite.
+ */
+public class SparkWriteBuilder implements WriteBuilder {
+
+ private final Table table;
+ private final String queryId;
+ private final Lock.Factory lockFactory;
+
+ public SparkWriteBuilder(Table table, String queryId, Lock.Factory lockFactory) {
+ this.table = table;
+ this.queryId = queryId;
+ this.lockFactory = lockFactory;
+ }
+
+ @Override
+ public Write build() {
+ return new SparkWrite(table, queryId, lockFactory);
+ }
+}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
index ba8b5ddb..ded563b9 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkInternalRowTest.java
@@ -99,5 +99,8 @@ public class SparkInternalRowTest {
+ "\"decimal3\":62123123.5"
+ "}";
assertThat(sparkRow.json()).isEqualTo(expected);
+
+ SparkRowData sparkRowData = new SparkRowData(ALL_TYPES, sparkRow);
+ assertThat(flinkConverter.toExternal(sparkRowData)).isEqualTo(row);
}
}
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java
new file mode 100644
index 00000000..073edea8
--- /dev/null
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkWriteITCase.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.core.fs.Path;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for spark reader. */
+public class SparkWriteITCase {
+
+ private static SparkSession spark = null;
+
+ @BeforeAll
+ public static void startMetastoreAndSpark() throws Exception {
+ File warehouse = File.createTempFile("warehouse", null);
+ assertThat(warehouse.delete()).isTrue();
+ Path warehousePath = new Path("file:" + warehouse);
+ spark = SparkSession.builder().master("local[2]").getOrCreate();
+ spark.conf().set("spark.sql.catalog.tablestore", SparkCatalog.class.getName());
+ spark.conf().set("spark.sql.catalog.tablestore.warehouse", warehousePath.toString());
+ spark.sql("CREATE DATABASE tablestore.db");
+ spark.sql("USE tablestore.db");
+ }
+
+ @AfterEach
+ public void afterEach() {
+ spark.sql("DROP TABLE T");
+ }
+
+ @Test
+ public void testWrite() {
+ spark.sql(
+ "CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES"
+ + " ('primary-key'='a', 'bucket'='4', 'file.format'='avro')");
+ innerSimpleWrite();
+ }
+
+ @Test
+ public void testWritePartitionTable() {
+ spark.sql(
+ "CREATE TABLE T (a INT, b INT, c STRING) PARTITIONED BY (a) TBLPROPERTIES"
+ + " ('primary-key'='a,b', 'bucket'='4', 'file.format'='avro')");
+ innerSimpleWrite();
+ }
+
+ private void innerSimpleWrite() {
+ spark.sql("INSERT INTO T VALUES (1, 2, '3')").collectAsList();
+ List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).isEqualTo("[[1,2,3]]");
+
+ spark.sql("INSERT INTO T VALUES (4, 5, '6')").collectAsList();
+ spark.sql("INSERT INTO T VALUES (1, 2, '7')").collectAsList();
+ spark.sql("INSERT INTO T VALUES (4, 5, '8')").collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ rows.sort(Comparator.comparingInt(o -> o.getInt(0)));
+ assertThat(rows.toString()).isEqualTo("[[1,2,7], [4,5,8]]");
+
+ spark.sql("DELETE FROM T WHERE a=1").collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).isEqualTo("[[4,5,8]]");
+
+ spark.sql("DELETE FROM T").collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).isEqualTo("[]");
+ }
+
+ @Test
+ public void testDeleteWhereNonePk() {
+ spark.sql(
+ "CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES"
+ + " ('primary-key'='a', 'file.format'='avro')");
+ spark.sql("INSERT INTO T VALUES (1, 11, '111'), (2, 22, '222')").collectAsList();
+ spark.sql("DELETE FROM T WHERE b=11").collectAsList();
+ List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).isEqualTo("[[2,22,222]]");
+ }
+}