You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@paimon.apache.org by lz...@apache.org on 2023/03/20 08:32:17 UTC

[incubator-paimon] branch master updated: [spark] extract SparkCatalogBase (#661)

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new aab07a9dc [spark] extract SparkCatalogBase (#661)
aab07a9dc is described below

commit aab07a9dc28eae42b62362103c1ad538c309d06a
Author: yuzelin <33...@users.noreply.github.com>
AuthorDate: Mon Mar 20 16:32:12 2023 +0800

    [spark] extract SparkCatalogBase (#661)
---
 .../java/org/apache/paimon/spark/SparkCatalog.java | 368 +--------------------
 .../{SparkCatalog.java => SparkCatalogBase.java}   |   4 +-
 2 files changed, 3 insertions(+), 369 deletions(-)

diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 9196bfef3..ab03200fe 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -18,373 +18,7 @@
 
 package org.apache.paimon.spark;
 
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.operation.Lock;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaChange;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Preconditions;
-
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
-import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
-import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
-import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.NamespaceChange;
-import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
-import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.apache.spark.sql.connector.catalog.TableChange;
-import org.apache.spark.sql.connector.catalog.TableChange.AddColumn;
-import org.apache.spark.sql.connector.catalog.TableChange.DeleteColumn;
-import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty;
-import org.apache.spark.sql.connector.catalog.TableChange.RenameColumn;
-import org.apache.spark.sql.connector.catalog.TableChange.SetProperty;
-import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnComment;
-import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnNullability;
-import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnPosition;
-import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnType;
-import org.apache.spark.sql.connector.expressions.FieldReference;
-import org.apache.spark.sql.connector.expressions.NamedReference;
-import org.apache.spark.sql.connector.expressions.Transform;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
 
 /** Spark {@link TableCatalog} for paimon. */
-public class SparkCatalog implements TableCatalog, SupportsNamespaces {
-
-    private static final String PRIMARY_KEY_IDENTIFIER = "primary-key";
-
-    private String name = null;
-    private Catalog catalog = null;
-
-    @Override
-    public void initialize(String name, CaseInsensitiveStringMap options) {
-        this.name = name;
-        CatalogContext catalogContext =
-                CatalogContext.create(
-                        Options.fromMap(options),
-                        SparkSession.active().sessionState().newHadoopConf());
-        this.catalog = CatalogFactory.createCatalog(catalogContext);
-    }
-
-    @Override
-    public String name() {
-        return name;
-    }
-
-    @Override
-    public String[] defaultNamespace() {
-        return new String[] {Catalog.DEFAULT_DATABASE};
-    }
-
-    @Override
-    public void createNamespace(String[] namespace, Map<String, String> metadata)
-            throws NamespaceAlreadyExistsException {
-        Preconditions.checkArgument(
-                isValidateNamespace(namespace),
-                "Namespace %s is not valid",
-                Arrays.toString(namespace));
-        try {
-            catalog.createDatabase(namespace[0], false);
-        } catch (Catalog.DatabaseAlreadyExistException e) {
-            throw new NamespaceAlreadyExistsException(namespace);
-        }
-    }
-
-    @Override
-    public String[][] listNamespaces() {
-        List<String> databases = catalog.listDatabases();
-        String[][] namespaces = new String[databases.size()][];
-        for (int i = 0; i < databases.size(); i++) {
-            namespaces[i] = new String[] {databases.get(i)};
-        }
-        return namespaces;
-    }
-
-    @Override
-    public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
-        if (namespace.length == 0) {
-            return listNamespaces();
-        }
-        if (!isValidateNamespace(namespace)) {
-            throw new NoSuchNamespaceException(namespace);
-        }
-        if (catalog.databaseExists(namespace[0])) {
-            return new String[0][];
-        }
-        throw new NoSuchNamespaceException(namespace);
-    }
-
-    @Override
-    public Map<String, String> loadNamespaceMetadata(String[] namespace)
-            throws NoSuchNamespaceException {
-        Preconditions.checkArgument(
-                isValidateNamespace(namespace),
-                "Namespace %s is not valid",
-                Arrays.toString(namespace));
-        if (catalog.databaseExists(namespace[0])) {
-            return Collections.emptyMap();
-        }
-        throw new NoSuchNamespaceException(namespace);
-    }
-
-    /**
-     * Drop a namespace from the catalog, recursively dropping all objects within the namespace.
-     * This interface implementation only supports the Spark 3.0, 3.1 and 3.2.
-     *
-     * <p>If the catalog implementation does not support this operation, it may throw {@link
-     * UnsupportedOperationException}.
-     *
-     * @param namespace a multi-part namespace
-     * @return true if the namespace was dropped
-     * @throws UnsupportedOperationException If drop is not a supported operation
-     */
-    public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException {
-        return dropNamespace(namespace, false);
-    }
-
-    /**
-     * Drop a namespace from the catalog with cascade mode, recursively dropping all objects within
-     * the namespace if cascade is true. This interface implementation supports the Spark 3.3+.
-     *
-     * <p>If the catalog implementation does not support this operation, it may throw {@link
-     * UnsupportedOperationException}.
-     *
-     * @param namespace a multi-part namespace
-     * @param cascade When true, deletes all objects under the namespace
-     * @return true if the namespace was dropped
-     * @throws UnsupportedOperationException If drop is not a supported operation
-     */
-    public boolean dropNamespace(String[] namespace, boolean cascade)
-            throws NoSuchNamespaceException {
-        Preconditions.checkArgument(
-                isValidateNamespace(namespace),
-                "Namespace %s is not valid",
-                Arrays.toString(namespace));
-        try {
-            catalog.dropDatabase(namespace[0], false, cascade);
-            return true;
-        } catch (Catalog.DatabaseNotExistException e) {
-            throw new NoSuchNamespaceException(namespace);
-        } catch (Catalog.DatabaseNotEmptyException e) {
-            throw new UnsupportedOperationException(
-                    String.format("Namespace %s is not empty", Arrays.toString(namespace)));
-        }
-    }
-
-    @Override
-    public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
-        Preconditions.checkArgument(
-                isValidateNamespace(namespace),
-                "Missing database in namespace: %s",
-                Arrays.toString(namespace));
-        try {
-            return catalog.listTables(namespace[0]).stream()
-                    .map(table -> Identifier.of(namespace, table))
-                    .toArray(Identifier[]::new);
-        } catch (Catalog.DatabaseNotExistException e) {
-            throw new NoSuchNamespaceException(namespace);
-        }
-    }
-
-    @Override
-    public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
-        try {
-            return new SparkTable(
-                    catalog.getTable(toIdentifier(ident)),
-                    Lock.factory(catalog.lockFactory().orElse(null), toIdentifier(ident)));
-        } catch (Catalog.TableNotExistException e) {
-            throw new NoSuchTableException(ident);
-        }
-    }
-
-    @Override
-    public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
-        List<SchemaChange> schemaChanges =
-                Arrays.stream(changes).map(this::toSchemaChange).collect(Collectors.toList());
-        try {
-            catalog.alterTable(toIdentifier(ident), schemaChanges, false);
-            return loadTable(ident);
-        } catch (Catalog.TableNotExistException e) {
-            throw new NoSuchTableException(ident);
-        }
-    }
-
-    @Override
-    public SparkTable createTable(
-            Identifier ident,
-            StructType schema,
-            Transform[] partitions,
-            Map<String, String> properties)
-            throws TableAlreadyExistsException, NoSuchNamespaceException {
-        try {
-            catalog.createTable(
-                    toIdentifier(ident), toUpdateSchema(schema, partitions, properties), false);
-            return loadTable(ident);
-        } catch (Catalog.TableAlreadyExistException e) {
-            throw new TableAlreadyExistsException(ident);
-        } catch (Catalog.DatabaseNotExistException e) {
-            throw new NoSuchNamespaceException(ident.namespace());
-        } catch (NoSuchTableException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public boolean dropTable(Identifier ident) {
-        try {
-            catalog.dropTable(toIdentifier(ident), false);
-            return true;
-        } catch (Catalog.TableNotExistException | NoSuchTableException e) {
-            return false;
-        }
-    }
-
-    private SchemaChange toSchemaChange(TableChange change) {
-        if (change instanceof SetProperty) {
-            SetProperty set = (SetProperty) change;
-            validateAlterProperty(set.property());
-            return SchemaChange.setOption(set.property(), set.value());
-        } else if (change instanceof RemoveProperty) {
-            RemoveProperty remove = (RemoveProperty) change;
-            validateAlterProperty(remove.property());
-            return SchemaChange.removeOption(remove.property());
-        } else if (change instanceof AddColumn) {
-            AddColumn add = (AddColumn) change;
-            validateAlterNestedField(add.fieldNames());
-            SchemaChange.Move move = getMove(add.position(), add.fieldNames());
-            return SchemaChange.addColumn(
-                    add.fieldNames()[0],
-                    toPaimonType(add.dataType()).copy(add.isNullable()),
-                    add.comment(),
-                    move);
-        } else if (change instanceof RenameColumn) {
-            RenameColumn rename = (RenameColumn) change;
-            validateAlterNestedField(rename.fieldNames());
-            return SchemaChange.renameColumn(rename.fieldNames()[0], rename.newName());
-        } else if (change instanceof DeleteColumn) {
-            DeleteColumn delete = (DeleteColumn) change;
-            validateAlterNestedField(delete.fieldNames());
-            return SchemaChange.dropColumn(delete.fieldNames()[0]);
-        } else if (change instanceof UpdateColumnType) {
-            UpdateColumnType update = (UpdateColumnType) change;
-            validateAlterNestedField(update.fieldNames());
-            return SchemaChange.updateColumnType(
-                    update.fieldNames()[0], toPaimonType(update.newDataType()));
-        } else if (change instanceof UpdateColumnNullability) {
-            UpdateColumnNullability update = (UpdateColumnNullability) change;
-            return SchemaChange.updateColumnNullability(update.fieldNames(), update.nullable());
-        } else if (change instanceof UpdateColumnComment) {
-            UpdateColumnComment update = (UpdateColumnComment) change;
-            return SchemaChange.updateColumnComment(update.fieldNames(), update.newComment());
-        } else if (change instanceof UpdateColumnPosition) {
-            UpdateColumnPosition update = (UpdateColumnPosition) change;
-            SchemaChange.Move move = getMove(update.position(), update.fieldNames());
-            return SchemaChange.updateColumnPosition(move);
-        } else {
-            throw new UnsupportedOperationException(
-                    "Change is not supported: " + change.getClass());
-        }
-    }
-
-    private static SchemaChange.Move getMove(
-            TableChange.ColumnPosition columnPosition, String[] fieldNames) {
-        SchemaChange.Move move = null;
-        if (columnPosition instanceof TableChange.First) {
-            move = SchemaChange.Move.first(fieldNames[0]);
-        } else if (columnPosition instanceof TableChange.After) {
-            move =
-                    SchemaChange.Move.after(
-                            fieldNames[0], ((TableChange.After) columnPosition).column());
-        }
-        return move;
-    }
-
-    private Schema toUpdateSchema(
-            StructType schema, Transform[] partitions, Map<String, String> properties) {
-        Preconditions.checkArgument(
-                Arrays.stream(partitions)
-                        .allMatch(
-                                partition -> {
-                                    NamedReference[] references = partition.references();
-                                    return references.length == 1
-                                            && references[0] instanceof FieldReference;
-                                }));
-        Map<String, String> normalizedProperties = new HashMap<>(properties);
-        normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
-        String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER);
-        List<String> primaryKeys =
-                pkAsString == null
-                        ? Collections.emptyList()
-                        : Arrays.stream(pkAsString.split(","))
-                                .map(String::trim)
-                                .collect(Collectors.toList());
-        return new Schema(
-                ((RowType) toPaimonType(schema)).getFields(),
-                Arrays.stream(partitions)
-                        .map(partition -> partition.references()[0].describe())
-                        .collect(Collectors.toList()),
-                primaryKeys,
-                normalizedProperties,
-                properties.getOrDefault(TableCatalog.PROP_COMMENT, ""));
-    }
-
-    private void validateAlterNestedField(String[] fieldNames) {
-        if (fieldNames.length > 1) {
-            throw new UnsupportedOperationException(
-                    "Alter nested column is not supported: " + Arrays.toString(fieldNames));
-        }
-    }
-
-    private void validateAlterProperty(String alterKey) {
-        if (PRIMARY_KEY_IDENTIFIER.equals(alterKey)) {
-            throw new UnsupportedOperationException("Alter primary key is not supported");
-        }
-    }
-
-    private boolean isValidateNamespace(String[] namespace) {
-        return namespace.length == 1;
-    }
-
-    private org.apache.paimon.catalog.Identifier toIdentifier(Identifier ident)
-            throws NoSuchTableException {
-        if (!isValidateNamespace(ident.namespace())) {
-            throw new NoSuchTableException(ident);
-        }
-
-        return new org.apache.paimon.catalog.Identifier(ident.namespace()[0], ident.name());
-    }
-
-    // --------------------- unsupported methods ----------------------------
-
-    @Override
-    public void alterNamespace(String[] namespace, NamespaceChange... changes) {
-        throw new UnsupportedOperationException("Alter namespace in Spark is not supported yet.");
-    }
-
-    @Override
-    public void renameTable(Identifier oldIdent, Identifier newIdent)
-            throws NoSuchTableException, TableAlreadyExistsException {
-        try {
-            catalog.renameTable(toIdentifier(oldIdent), toIdentifier(newIdent), false);
-        } catch (Catalog.TableNotExistException e) {
-            throw new NoSuchTableException(oldIdent);
-        } catch (Catalog.TableAlreadyExistException e) {
-            throw new TableAlreadyExistsException(newIdent);
-        }
-    }
-}
+public class SparkCatalog extends SparkCatalogBase {}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogBase.java
similarity index 99%
copy from paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
copy to paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogBase.java
index 9196bfef3..88d65cf1a 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogBase.java
@@ -63,8 +63,8 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
 
-/** Spark {@link TableCatalog} for paimon. */
-public class SparkCatalog implements TableCatalog, SupportsNamespaces {
+/** Base implementation of Spark {@link TableCatalog} for paimon. */
+public abstract class SparkCatalogBase implements TableCatalog, SupportsNamespaces {
 
     private static final String PRIMARY_KEY_IDENTIFIER = "primary-key";