You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/02/25 03:45:20 UTC

[GitHub] [iceberg] wang-x-xia opened a new pull request #4221: Dell: Add Dell EMC EcsCatalog.

wang-x-xia opened a new pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221


   Add 3 classes to implement the Dell EMC ECS catalog:
   
   1. `EcsCatalog` is similar to `HadoopCatalog`. It uses delimiter listing to make metadata objects like a tree to store namespace and tables.
   2. `EcsTableOperations` is the operation to update the table metadata location. It uses the put-if-absent and CAS to avoid concurrent issues.
   3. `PropertiesSerDes` is a util class to serialize and deserialize Map<String, String> used in Iceberg. The current version is a simple implementation with version. If the data structure needs to change, the version placeholder can be easy to upgrade.
   
   
   To test the catalog, the MockDellClientFactory now loads the shared mock client. The data under one `EcsS3MockRule` can be shared by all client factories.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r827609452



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private EcsURI warehouseLocation;
+  private FileIO fileIO;
+  private CloseableGroup closeableGroup;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.warehouseLocation = new EcsURI(properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.fileIO = initializeFileIO(properties);
+
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(client::destroy);
+    closeableGroup.addCloseable(fileIO);
+    closeableGroup.setSuppressCloseFailure(true);
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(String.format("%s.%s", catalogName, tableIdentifier),
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    String tableName = tableIdentifier.name();
+    StringBuilder sb = new StringBuilder();
+
+    sb.append(warehouseLocation).append('/');
+    for (String level : tableIdentifier.namespace().levels()) {
+      sb.append(level).append('/');
+    }
+    sb.append(tableName);
+
+    return sb.toString();
+  }
+
+  /**
+   * Iterate all table objects with the namespace prefix.
+   */
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter("/")
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX))
+          .map(object -> parseTableId(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);
+
+    LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results);
+    return results;
+  }
+
+  /**
+   * Get object prefix of namespace.
+   */
+  private EcsURI namespacePrefix(Namespace namespace) {
+    String prefix;
+    if (namespace.isEmpty()) {
+      prefix = warehouseLocation.name();
+    } else {
+      prefix = String.format("%s%s/", warehouseLocation.name(),
+          String.join("/", namespace.levels()));
+    }
+
+    return new EcsURI(warehouseLocation.bucket(), prefix);
+  }
+
+  private TableIdentifier parseTableId(Namespace namespace, EcsURI prefix, S3Object s3Object) {
+    String key = s3Object.getKey();
+    Preconditions.checkArgument(key.startsWith(prefix.name()),
+        "List result should have same prefix", key, prefix);
+
+    String tableName = key.substring(
+        prefix.name().length(),
+        key.length() - TABLE_OBJECT_SUFFIX.length());
+    return TableIdentifier.of(namespace, tableName);
+  }
+
+  /**
+   * Remove table object. If the purge flag is set, remove all data objects.
+   */
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    if (!tableExists(identifier)) {
+      throw new NoSuchTableException("Table %s does not exist", identifier);
+    }
+
+    EcsURI tableObjectURI = tableURI(identifier);
+    if (purge) {
+      // if re-use the same instance, current() will throw exception.
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata current = ops.current();
+      if (current == null) {
+        return false;
+      }
+
+      CatalogUtil.dropTableData(ops.io(), current);
+    }
+
+    client.deleteObject(tableObjectURI.bucket(), tableObjectURI.name());
+    return true;
+  }
+
+  private EcsURI tableURI(TableIdentifier id) {
+    EcsURI prefix = namespacePrefix(id.namespace());
+    // The prefix has the delimiter at the tail.
+    return new EcsURI(prefix.bucket(), prefix.name() + id.name() + TABLE_OBJECT_SUFFIX);
+  }
+
+  /**
+   * Table rename will only move table object, the data objects will still be in-place.
+   *
+   * @param from identifier of the table to rename
+   * @param to   new table name
+   */
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    if (!namespaceExists(to.namespace())) {
+      throw new NoSuchNamespaceException("Cannot rename %s to %s because namespace %s does not exist",
+              from, to, to.namespace());
+    }
+
+    if (tableExists(to)) {
+      throw new AlreadyExistsException("Cannot rename %s because destination table %s exists", from, to);
+    }
+
+    EcsURI fromURI = tableURI(from);
+    if (!objectMetadata(fromURI).isPresent()) {
+      throw new NoSuchTableException("Cannot rename table because table %s does not exist", from);
+    }
+
+    Properties properties = loadProperties(fromURI);
+    EcsURI toURI = tableURI(to);
+
+    if (!putNewProperties(toURI, properties.content())) {
+      throw new AlreadyExistsException("Cannot rename %s because destination table %s exists", from, to);
+    }
+
+    client.deleteObject(fromURI.bucket(), fromURI.name());
+    LOG.info("rename table {} to {}", from, to);

Review comment:
       > In the current approach, I'm concerning that we cannot guarantee the atomicity between the `putNewProperties` and `deleteObject`, right ? (which is validating the iceberg transaction semantic )
   
   Yes. It's not atomic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r818272012



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsTableOperations.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.util.Map;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+public class EcsTableOperations extends BaseMetastoreTableOperations {
+
+  public static final String ICEBERG_METADATA_LOCATION = "iceberg_metadata_location";
+
+  private final String tableName;
+  private final FileIO fileIO;
+  private final EcsCatalog catalog;
+  private final EcsURI tableObject;
+
+  /**
+   * Cached E-Tag for CAS commit
+   *
+   * @see #doRefresh() when reset this field
+   * @see #doCommit(TableMetadata, TableMetadata) when use this field
+   */
+  private String eTag;
+
+  public EcsTableOperations(String tableName, EcsURI tableObject, FileIO fileIO, EcsCatalog catalog) {
+    this.tableName = tableName;
+    this.tableObject = tableObject;
+    this.fileIO = fileIO;
+    this.catalog = catalog;
+  }
+
+  @Override
+  protected String tableName() {
+    return tableName;
+  }
+
+  @Override
+  public FileIO io() {
+    return fileIO;
+  }
+
+  @Override
+  protected void doRefresh() {
+    String metadataLocation;
+    if (!catalog.objectMetadata(tableObject).isPresent()) {
+      if (currentMetadataLocation() != null) {
+        throw new NoSuchTableException("Metadata object %s is absent", tableObject);
+      } else {
+        metadataLocation = null;
+      }
+    } else {
+      EcsCatalog.Properties metadata = catalog.loadProperties(tableObject);
+      this.eTag = metadata.eTag();
+      metadataLocation = metadata.content().get(ICEBERG_METADATA_LOCATION);
+      Preconditions.checkNotNull(metadataLocation,
+          "Can't find location from table metadata %s", tableObject);
+    }
+
+    refreshFromMetadataLocation(metadataLocation);
+  }
+
+  @Override
+  protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
+    if (base == null) {
+      // create a new table, the metadataKey should be absent
+      if (!catalog.putNewProperties(tableObject, buildProperties(newMetadataLocation))) {
+        throw new CommitFailedException("Table is existed when create table %s", tableName());
+      }
+    } else {
+      String cachedETag = eTag;
+      Preconditions.checkNotNull(cachedETag, "E-Tag must be not null when update table");
+      // replace to a new version, the E-Tag should be present and matched
+      boolean result = catalog.updatePropertiesObject(

Review comment:
       And the update is also atomic when specified E-Tag in If-Match. It's an extension of S3 API.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx merged pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx merged pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r824472092



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+  /**

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r826786502



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private EcsURI warehouseLocation;
+  private FileIO fileIO;
+  private CloseableGroup closeableGroup;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.warehouseLocation = new EcsURI(properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.fileIO = initializeFileIO(properties);
+
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(client::destroy);
+    closeableGroup.addCloseable(fileIO);
+    closeableGroup.setSuppressCloseFailure(true);
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(String.format("%s.%s", catalogName, tableIdentifier),
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    String tableName = tableIdentifier.name();
+    StringBuilder sb = new StringBuilder();
+
+    sb.append(warehouseLocation).append('/');

Review comment:
       Nit:  Let's define a final constant SLASH to replace the `/`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r828883531



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private EcsURI warehouseLocation;
+  private FileIO fileIO;
+  private CloseableGroup closeableGroup;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.warehouseLocation = cleanWarehousePath(properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.fileIO = initializeFileIO(properties);
+
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(client::destroy);
+    closeableGroup.addCloseable(fileIO);
+    closeableGroup.setSuppressCloseFailure(true);
+  }
+
+  private EcsURI cleanWarehousePath(String path) {
+    Preconditions.checkArgument(path != null && path.length() > 0,
+            "Cannot initialize EcsCatalog because warehousePath must not be null");

Review comment:
       Nit: must not be null or empty string.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r818278753



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";

Review comment:
       Fixed!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r814465522



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/PropertiesSerDes.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convert Map properties to bytes.
+ */
+public interface PropertiesSerDes {
+
+  /**
+   * Version of current implementation.
+   */
+  String CURRENT_VERSION = "0";
+
+  Logger log = LoggerFactory.getLogger(PropertiesSerDes.class);

Review comment:
       Nit: we usually use `LOG` in apache iceberg.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r826761412



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private EcsURI warehouseLocation;
+  private FileIO fileIO;
+  private CloseableGroup closeableGroup;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.warehouseLocation = new EcsURI(properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.fileIO = initializeFileIO(properties);
+
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(client::destroy);
+    closeableGroup.addCloseable(fileIO);
+    closeableGroup.setSuppressCloseFailure(true);
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(String.format("%s.%s", catalogName, tableIdentifier),
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    String tableName = tableIdentifier.name();
+    StringBuilder sb = new StringBuilder();
+
+    sb.append(warehouseLocation).append('/');
+    for (String level : tableIdentifier.namespace().levels()) {
+      sb.append(level).append('/');
+    }
+    sb.append(tableName);
+
+    return sb.toString();
+  }
+
+  /**
+   * Iterate all table objects with the namespace prefix.
+   */
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter("/")
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX))
+          .map(object -> parseTableId(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);
+
+    LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results);
+    return results;
+  }
+
+  /**
+   * Get object prefix of namespace.
+   */
+  private EcsURI namespacePrefix(Namespace namespace) {
+    String prefix;
+    if (namespace.isEmpty()) {
+      prefix = warehouseLocation.name();
+    } else {
+      prefix = String.format("%s%s/", warehouseLocation.name(),
+          String.join("/", namespace.levels()));
+    }
+
+    return new EcsURI(warehouseLocation.bucket(), prefix);
+  }
+
+  private TableIdentifier parseTableId(Namespace namespace, EcsURI prefix, S3Object s3Object) {
+    String key = s3Object.getKey();
+    Preconditions.checkArgument(key.startsWith(prefix.name()),
+        "List result should have same prefix", key, prefix);
+
+    String tableName = key.substring(
+        prefix.name().length(),
+        key.length() - TABLE_OBJECT_SUFFIX.length());
+    return TableIdentifier.of(namespace, tableName);
+  }
+
+  /**
+   * Remove table object. If the purge flag is set, remove all data objects.
+   */
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    if (!tableExists(identifier)) {
+      throw new NoSuchTableException("Table %s does not exist", identifier);
+    }

Review comment:
       As the `dropTable` 's interface definition,  we should return `false` rather than throwing a NoSuchTableException.
   
   ```java
   /**
      * Drop a table and delete all data and metadata files.
      *
      * @param identifier a table identifier
      * @return true if the table was dropped, false if the table did not exist
      */
     default boolean dropTable(TableIdentifier identifier) {
       return dropTable(identifier, true /* drop data and metadata files */);
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r826776915



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private EcsURI warehouseLocation;
+  private FileIO fileIO;
+  private CloseableGroup closeableGroup;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.warehouseLocation = new EcsURI(properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.fileIO = initializeFileIO(properties);
+
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(client::destroy);
+    closeableGroup.addCloseable(fileIO);
+    closeableGroup.setSuppressCloseFailure(true);
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(String.format("%s.%s", catalogName, tableIdentifier),
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    String tableName = tableIdentifier.name();
+    StringBuilder sb = new StringBuilder();
+
+    sb.append(warehouseLocation).append('/');
+    for (String level : tableIdentifier.namespace().levels()) {
+      sb.append(level).append('/');
+    }
+    sb.append(tableName);
+
+    return sb.toString();
+  }
+
+  /**
+   * Iterate all table objects with the namespace prefix.
+   */
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter("/")
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX))
+          .map(object -> parseTableId(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);
+
+    LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results);
+    return results;
+  }
+
+  /**
+   * Get object prefix of namespace.
+   */
+  private EcsURI namespacePrefix(Namespace namespace) {
+    String prefix;
+    if (namespace.isEmpty()) {
+      prefix = warehouseLocation.name();
+    } else {
+      prefix = String.format("%s%s/", warehouseLocation.name(),
+          String.join("/", namespace.levels()));
+    }
+
+    return new EcsURI(warehouseLocation.bucket(), prefix);
+  }
+
+  private TableIdentifier parseTableId(Namespace namespace, EcsURI prefix, S3Object s3Object) {
+    String key = s3Object.getKey();
+    Preconditions.checkArgument(key.startsWith(prefix.name()),
+        "List result should have same prefix", key, prefix);
+
+    String tableName = key.substring(
+        prefix.name().length(),
+        key.length() - TABLE_OBJECT_SUFFIX.length());
+    return TableIdentifier.of(namespace, tableName);
+  }
+
+  /**
+   * Remove table object. If the purge flag is set, remove all data objects.
+   */
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    if (!tableExists(identifier)) {
+      throw new NoSuchTableException("Table %s does not exist", identifier);
+    }
+
+    EcsURI tableObjectURI = tableURI(identifier);
+    if (purge) {
+      // if re-use the same instance, current() will throw exception.
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata current = ops.current();
+      if (current == null) {
+        return false;
+      }
+
+      CatalogUtil.dropTableData(ops.io(), current);
+    }
+
+    client.deleteObject(tableObjectURI.bucket(), tableObjectURI.name());
+    return true;
+  }
+
+  private EcsURI tableURI(TableIdentifier id) {
+    EcsURI prefix = namespacePrefix(id.namespace());
+    // The prefix has the delimiter at the tail.
+    return new EcsURI(prefix.bucket(), prefix.name() + id.name() + TABLE_OBJECT_SUFFIX);
+  }
+
+  /**
+   * Table rename will only move table object, the data objects will still be in-place.
+   *
+   * @param from identifier of the table to rename
+   * @param to   new table name
+   */
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    if (!namespaceExists(to.namespace())) {
+      throw new NoSuchNamespaceException("Cannot rename %s to %s because namespace %s does not exist",
+              from, to, to.namespace());
+    }
+
+    if (tableExists(to)) {
+      throw new AlreadyExistsException("Cannot rename %s because destination table %s exists", from, to);
+    }
+
+    EcsURI fromURI = tableURI(from);
+    if (!objectMetadata(fromURI).isPresent()) {
+      throw new NoSuchTableException("Cannot rename table because table %s does not exist", from);
+    }
+
+    Properties properties = loadProperties(fromURI);
+    EcsURI toURI = tableURI(to);
+
+    if (!putNewProperties(toURI, properties.content())) {
+      throw new AlreadyExistsException("Cannot rename %s because destination table %s exists", from, to);
+    }
+
+    client.deleteObject(fromURI.bucket(), fromURI.name());
+    LOG.info("rename table {} to {}", from, to);
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> properties) {
+    EcsURI namespaceObject = namespaceURI(namespace);
+    if (!putNewProperties(namespaceObject, properties)) {
+      throw new AlreadyExistsException("namespace %s(%s) has already existed", namespace, namespaceObject);
+    }
+  }
+
+  private EcsURI namespaceURI(Namespace namespace) {
+    return new EcsURI(
+        warehouseLocation.bucket(),
+        String.format("%s%s%s", warehouseLocation.name(), String.join("/", namespace.levels()),

Review comment:
       Nit: Same issue about the trailing slash of `warehouseLocation`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r814462552



##########
File path: build.gradle
##########
@@ -594,6 +594,13 @@ project(':iceberg-dell') {
     implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
     compileOnly 'com.emc.ecs:object-client-bundle'
 
+    compileOnly("org.apache.hadoop:hadoop-common") {
+      exclude group: 'org.apache.avro', module: 'avro'
+      exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+      exclude group: 'javax.servlet', module: 'servlet-api'
+      exclude group: 'com.google.code.gson', module: 'gson'
+    }
+
     testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
     testImplementation("org.apache.hadoop:hadoop-common") {

Review comment:
       There is another test only `hadoop-common` ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r814494578



##########
File path: dell/src/main/java/org/apache/iceberg/dell/DellProperties.java
##########
@@ -38,6 +40,21 @@
    */
   public static final String ECS_S3_ENDPOINT = "ecs.s3.endpoint";
 
+  /**
+   * Catalog prefix is used to store catalog data. If not set, use {@link CatalogProperties#WAREHOUSE_LOCATION}.
+   * <p>
+   * The value is an EcsURI which like ecs://bucket/prefix.
+   */
+  public static final String ECS_CATALOG_PREFIX = "ecs.catalog.prefix";

Review comment:
       If the customer needs to separate the data and catalog metadata, that can set this config. 
   This config is not required. The default value is `CatalogProperties#WAREHOUSE_LOCATION`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#issuecomment-1073792021


   Got this merged now, thanks @wang-x-xia for the contribution, and thanks all for reviewing !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r814465832



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/PropertiesSerDes.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convert Map properties to bytes.
+ */
+public interface PropertiesSerDes {
+
+  /**
+   * Version of current implementation.
+   */
+  String CURRENT_VERSION = "0";
+
+  Logger log = LoggerFactory.getLogger(PropertiesSerDes.class);
+
+  /**
+   * Read properties
+   *
+   * @param version is the version of {@link PropertiesSerDes}
+   */
+  Map<String, String> read(InputStream input, String version);
+
+  /**
+   * Write properties, the version is {@link #currentVersion()}
+   */
+  byte[] toBytes(Map<String, String> value);
+
+  /**
+   * Get version of current serializer implementation.
+   */
+  String currentVersion();
+
+  /**
+   * Use {@link Properties} to serialize and deserialize properties.
+   */
+  static PropertiesSerDes current() {
+    return new PropertiesSerDes() {
+      @Override
+      public Map<String, String> read(InputStream input, String version) {
+        Preconditions.checkArgument(version.equals(CURRENT_VERSION),
+            "Properties version is not match", version);
+        Properties jdkProperties = new Properties();
+        try {
+          jdkProperties.load(new InputStreamReader(input, StandardCharsets.UTF_8));
+        } catch (IOException e) {
+          log.error("fail to read properties", e);

Review comment:
       Nit: uppercase ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r817619977



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsTableOperations.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.util.Map;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+public class EcsTableOperations extends BaseMetastoreTableOperations {
+
+  public static final String ICEBERG_METADATA_LOCATION = "iceberg_metadata_location";
+
+  private final String tableName;
+  private final FileIO fileIO;
+  private final EcsCatalog catalog;
+  private final EcsURI tableObject;
+
+  /**
+   * Cached E-Tag for CAS commit
+   *
+   * @see #doRefresh() when reset this field
+   * @see #doCommit(TableMetadata, TableMetadata) when use this field
+   */
+  private String eTag;
+
+  public EcsTableOperations(String tableName, EcsURI tableObject, FileIO fileIO, EcsCatalog catalog) {
+    this.tableName = tableName;
+    this.tableObject = tableObject;
+    this.fileIO = fileIO;
+    this.catalog = catalog;
+  }
+
+  @Override
+  protected String tableName() {
+    return tableName;
+  }
+
+  @Override
+  public FileIO io() {
+    return fileIO;
+  }
+
+  @Override
+  protected void doRefresh() {
+    String metadataLocation;
+    if (!catalog.objectMetadata(tableObject).isPresent()) {
+      if (currentMetadataLocation() != null) {
+        throw new NoSuchTableException("Metadata object %s is absent", tableObject);
+      } else {
+        metadataLocation = null;
+      }
+    } else {
+      EcsCatalog.Properties metadata = catalog.loadProperties(tableObject);
+      this.eTag = metadata.eTag();
+      metadataLocation = metadata.content().get(ICEBERG_METADATA_LOCATION);
+      Preconditions.checkNotNull(metadataLocation,
+          "Can't find location from table metadata %s", tableObject);
+    }
+
+    refreshFromMetadataLocation(metadataLocation);
+  }
+
+  @Override
+  protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
+    if (base == null) {
+      // create a new table, the metadataKey should be absent
+      if (!catalog.putNewProperties(tableObject, buildProperties(newMetadataLocation))) {
+        throw new CommitFailedException("Table is existed when create table %s", tableName());
+      }
+    } else {
+      String cachedETag = eTag;
+      Preconditions.checkNotNull(cachedETag, "E-Tag must be not null when update table");
+      // replace to a new version, the E-Tag should be present and matched
+      boolean result = catalog.updatePropertiesObject(

Review comment:
       Same question for this line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r826769938



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private EcsURI warehouseLocation;
+  private FileIO fileIO;
+  private CloseableGroup closeableGroup;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.warehouseLocation = new EcsURI(properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.fileIO = initializeFileIO(properties);
+
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(client::destroy);
+    closeableGroup.addCloseable(fileIO);
+    closeableGroup.setSuppressCloseFailure(true);
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(String.format("%s.%s", catalogName, tableIdentifier),
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    String tableName = tableIdentifier.name();
+    StringBuilder sb = new StringBuilder();
+
+    sb.append(warehouseLocation).append('/');
+    for (String level : tableIdentifier.namespace().levels()) {
+      sb.append(level).append('/');
+    }
+    sb.append(tableName);
+
+    return sb.toString();
+  }
+
+  /**
+   * Iterate all table objects with the namespace prefix.
+   */
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter("/")
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX))
+          .map(object -> parseTableId(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);
+
+    LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results);
+    return results;
+  }
+
+  /**
+   * Get object prefix of namespace.
+   */
+  private EcsURI namespacePrefix(Namespace namespace) {
+    String prefix;
+    if (namespace.isEmpty()) {
+      prefix = warehouseLocation.name();
+    } else {
+      prefix = String.format("%s%s/", warehouseLocation.name(),
+          String.join("/", namespace.levels()));
+    }
+
+    return new EcsURI(warehouseLocation.bucket(), prefix);
+  }
+
+  private TableIdentifier parseTableId(Namespace namespace, EcsURI prefix, S3Object s3Object) {
+    String key = s3Object.getKey();
+    Preconditions.checkArgument(key.startsWith(prefix.name()),
+        "List result should have same prefix", key, prefix);
+
+    String tableName = key.substring(
+        prefix.name().length(),
+        key.length() - TABLE_OBJECT_SUFFIX.length());
+    return TableIdentifier.of(namespace, tableName);
+  }
+
+  /**
+   * Remove table object. If the purge flag is set, remove all data objects.
+   */
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    if (!tableExists(identifier)) {
+      throw new NoSuchTableException("Table %s does not exist", identifier);
+    }
+
+    EcsURI tableObjectURI = tableURI(identifier);
+    if (purge) {
+      // if re-use the same instance, current() will throw exception.
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata current = ops.current();
+      if (current == null) {
+        return false;
+      }
+
+      CatalogUtil.dropTableData(ops.io(), current);
+    }
+
+    client.deleteObject(tableObjectURI.bucket(), tableObjectURI.name());
+    return true;
+  }
+
+  private EcsURI tableURI(TableIdentifier id) {
+    EcsURI prefix = namespacePrefix(id.namespace());
+    // The prefix has the delimiter at the tail.
+    return new EcsURI(prefix.bucket(), prefix.name() + id.name() + TABLE_OBJECT_SUFFIX);
+  }
+
+  /**
+   * Table rename will only move table object, the data objects will still be in-place.
+   *
+   * @param from identifier of the table to rename
+   * @param to   new table name
+   */
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    if (!namespaceExists(to.namespace())) {
+      throw new NoSuchNamespaceException("Cannot rename %s to %s because namespace %s does not exist",
+              from, to, to.namespace());
+    }
+
+    if (tableExists(to)) {
+      throw new AlreadyExistsException("Cannot rename %s because destination table %s exists", from, to);
+    }
+
+    EcsURI fromURI = tableURI(from);
+    if (!objectMetadata(fromURI).isPresent()) {
+      throw new NoSuchTableException("Cannot rename table because table %s does not exist", from);
+    }
+
+    Properties properties = loadProperties(fromURI);
+    EcsURI toURI = tableURI(to);
+
+    if (!putNewProperties(toURI, properties.content())) {
+      throw new AlreadyExistsException("Cannot rename %s because destination table %s exists", from, to);
+    }
+
+    client.deleteObject(fromURI.bucket(), fromURI.name());
+    LOG.info("rename table {} to {}", from, to);

Review comment:
       In the current approach, I'm concerning that we cannot guarantee the atomicity between the `putNewProperties` and `deleteObject`, right ?   (which is validating the iceberg transaction semantic ) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r826776232



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private EcsURI warehouseLocation;
+  private FileIO fileIO;
+  private CloseableGroup closeableGroup;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.warehouseLocation = new EcsURI(properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.fileIO = initializeFileIO(properties);
+
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(client::destroy);
+    closeableGroup.addCloseable(fileIO);
+    closeableGroup.setSuppressCloseFailure(true);
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(String.format("%s.%s", catalogName, tableIdentifier),
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    String tableName = tableIdentifier.name();
+    StringBuilder sb = new StringBuilder();
+
+    sb.append(warehouseLocation).append('/');
+    for (String level : tableIdentifier.namespace().levels()) {
+      sb.append(level).append('/');
+    }
+    sb.append(tableName);
+
+    return sb.toString();
+  }
+
+  /**
+   * Iterate all table objects with the namespace prefix.
+   */
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter("/")
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX))
+          .map(object -> parseTableId(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);
+
+    LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results);
+    return results;
+  }
+
+  /**
+   * Get object prefix of namespace.
+   */
+  private EcsURI namespacePrefix(Namespace namespace) {
+    String prefix;
+    if (namespace.isEmpty()) {
+      prefix = warehouseLocation.name();
+    } else {
+      prefix = String.format("%s%s/", warehouseLocation.name(),
+          String.join("/", namespace.levels()));
+    }
+
+    return new EcsURI(warehouseLocation.bucket(), prefix);
+  }
+
+  private TableIdentifier parseTableId(Namespace namespace, EcsURI prefix, S3Object s3Object) {
+    String key = s3Object.getKey();
+    Preconditions.checkArgument(key.startsWith(prefix.name()),
+        "List result should have same prefix", key, prefix);
+
+    String tableName = key.substring(
+        prefix.name().length(),
+        key.length() - TABLE_OBJECT_SUFFIX.length());
+    return TableIdentifier.of(namespace, tableName);
+  }
+
+  /**
+   * Remove table object. If the purge flag is set, remove all data objects.
+   */
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    if (!tableExists(identifier)) {
+      throw new NoSuchTableException("Table %s does not exist", identifier);
+    }
+
+    EcsURI tableObjectURI = tableURI(identifier);
+    if (purge) {
+      // if re-use the same instance, current() will throw exception.
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata current = ops.current();
+      if (current == null) {
+        return false;
+      }
+
+      CatalogUtil.dropTableData(ops.io(), current);
+    }
+
+    client.deleteObject(tableObjectURI.bucket(), tableObjectURI.name());
+    return true;
+  }
+
+  private EcsURI tableURI(TableIdentifier id) {
+    EcsURI prefix = namespacePrefix(id.namespace());
+    // The prefix has the delimiter at the tail.
+    return new EcsURI(prefix.bucket(), prefix.name() + id.name() + TABLE_OBJECT_SUFFIX);
+  }
+
+  /**
+   * Table rename will only move table object, the data objects will still be in-place.
+   *
+   * @param from identifier of the table to rename
+   * @param to   new table name
+   */
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    if (!namespaceExists(to.namespace())) {
+      throw new NoSuchNamespaceException("Cannot rename %s to %s because namespace %s does not exist",
+              from, to, to.namespace());
+    }
+
+    if (tableExists(to)) {
+      throw new AlreadyExistsException("Cannot rename %s because destination table %s exists", from, to);
+    }
+
+    EcsURI fromURI = tableURI(from);
+    if (!objectMetadata(fromURI).isPresent()) {
+      throw new NoSuchTableException("Cannot rename table because table %s does not exist", from);
+    }
+
+    Properties properties = loadProperties(fromURI);
+    EcsURI toURI = tableURI(to);
+
+    if (!putNewProperties(toURI, properties.content())) {
+      throw new AlreadyExistsException("Cannot rename %s because destination table %s exists", from, to);
+    }
+
+    client.deleteObject(fromURI.bucket(), fromURI.name());
+    LOG.info("rename table {} to {}", from, to);

Review comment:
       Nit:  It's recommended to uppercase the first character from the message `rename table {} to {}`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r827655519



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/PropertiesSerDesUtil.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convert Map properties to bytes.
+ */
+public class PropertiesSerDesUtil {
+
+  private PropertiesSerDesUtil() {
+  }
+
+  /**
+   * Version of current implementation.
+   */
+  private static final String CURRENT_VERSION = "0";
+
+  private static final Logger LOG = LoggerFactory.getLogger(PropertiesSerDesUtil.class);
+
+  /**
+   * Read properties
+   *
+   * @param version is the version of {@link PropertiesSerDesUtil}
+   */
+  public static Map<String, String> read(InputStream input, String version) {
+    Preconditions.checkArgument(version.equals(CURRENT_VERSION),
+        "Properties version is not match", version);
+    Properties jdkProperties = new Properties();
+    try {
+      jdkProperties.load(new InputStreamReader(input, StandardCharsets.UTF_8));
+    } catch (IOException e) {
+      LOG.error("fail to read properties", e);
+      throw new UncheckedIOException(e);
+    }
+
+    Set<String> propertyNames = jdkProperties.stringPropertyNames();
+    Map<String, String> properties = Maps.newHashMap();
+    for (String name : propertyNames) {
+      properties.put(name, jdkProperties.getProperty(name));
+    }
+
+    return Collections.unmodifiableMap(properties);
+  }
+
+  /**
+   * Write properties, the version is {@link #currentVersion()}
+   */
+  public static byte[] toBytes(Map<String, String> value) {
+    Properties jdkProperties = new Properties();
+    for (Map.Entry<String, String> entry : value.entrySet()) {
+      jdkProperties.setProperty(entry.getKey(), entry.getValue());
+    }
+
+    try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
+      jdkProperties.store(new OutputStreamWriter(output, StandardCharsets.UTF_8), null);

Review comment:
       Fixed.

##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/PropertiesSerDesUtil.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convert Map properties to bytes.
+ */
+public class PropertiesSerDesUtil {
+
+  private PropertiesSerDesUtil() {
+  }
+
+  /**
+   * Version of current implementation.
+   */
+  private static final String CURRENT_VERSION = "0";
+
+  private static final Logger LOG = LoggerFactory.getLogger(PropertiesSerDesUtil.class);
+
+  /**
+   * Read properties
+   *
+   * @param version is the version of {@link PropertiesSerDesUtil}
+   */
+  public static Map<String, String> read(InputStream input, String version) {
+    Preconditions.checkArgument(version.equals(CURRENT_VERSION),
+        "Properties version is not match", version);
+    Properties jdkProperties = new Properties();
+    try {
+      jdkProperties.load(new InputStreamReader(input, StandardCharsets.UTF_8));

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r826757658



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private EcsURI warehouseLocation;
+  private FileIO fileIO;
+  private CloseableGroup closeableGroup;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.warehouseLocation = new EcsURI(properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.fileIO = initializeFileIO(properties);
+
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(client::destroy);
+    closeableGroup.addCloseable(fileIO);
+    closeableGroup.setSuppressCloseFailure(true);
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(String.format("%s.%s", catalogName, tableIdentifier),
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    String tableName = tableIdentifier.name();
+    StringBuilder sb = new StringBuilder();
+
+    sb.append(warehouseLocation).append('/');
+    for (String level : tableIdentifier.namespace().levels()) {
+      sb.append(level).append('/');
+    }
+    sb.append(tableName);
+
+    return sb.toString();
+  }
+
+  /**
+   * Iterate all table objects with the namespace prefix.
+   */
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter("/")
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX))
+          .map(object -> parseTableId(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);
+
+    LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results);
+    return results;
+  }
+
+  /**
+   * Get object prefix of namespace.
+   */
+  private EcsURI namespacePrefix(Namespace namespace) {
+    String prefix;
+    if (namespace.isEmpty()) {
+      prefix = warehouseLocation.name();
+    } else {
+      prefix = String.format("%s%s/", warehouseLocation.name(),

Review comment:
       Will the warehouseLocation has a slash `\`  as the last char ?  If so, then we may need to trim the last slash before constructing the string formatter.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r824473205



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private String warehouseLocation;
+  private DellProperties dellProperties;
+  private PropertiesSerDes propertiesSerDes;
+  private FileIO fileIO;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.dellProperties = new DellProperties(properties);
+    this.warehouseLocation =
+        cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter());
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.propertiesSerDes = PropertiesSerDes.current();
+    this.fileIO = initializeFileIO(properties);

Review comment:
       Add CloseableGroup to close `fileIO`.

##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private String warehouseLocation;
+  private DellProperties dellProperties;
+  private PropertiesSerDes propertiesSerDes;
+  private FileIO fileIO;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.dellProperties = new DellProperties(properties);
+    this.warehouseLocation =
+        cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter());
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.propertiesSerDes = PropertiesSerDes.current();
+    this.fileIO = initializeFileIO(properties);
+  }
+
+  private String cleanWarehouse(String path, String delimiter) {
+    Preconditions.checkArgument(
+        path != null && path.length() > 0,
+        "Cannot initialize EcsCatalog because warehousePath must not be null");
+    int len = path.length();
+    if (path.endsWith(delimiter)) {
+      return path.substring(0, len - delimiter.length());
+    } else {
+      return path;
+    }
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(String.format("%s.%s", catalogName, tableIdentifier),
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(warehouseLocation);
+    for (String level : tableIdentifier.namespace().levels()) {
+      builder.append(dellProperties.ecsCatalogDelimiter());
+      builder.append(level);
+    }
+
+    builder.append(dellProperties.ecsCatalogDelimiter());
+    builder.append(tableIdentifier.name());
+    return builder.toString();
+  }
+
+  /**
+   * Iterate all table objects with the namespace prefix.
+   */
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter(dellProperties.ecsCatalogDelimiter())
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX))
+          .map(object -> parseTableId(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);
+
+    LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results);
+    return results;
+  }
+
+  /**
+   * Get object prefix of namespace.
+   */
+  private EcsURI namespacePrefix(Namespace namespace) {
+    String prefix;
+    if (namespace.isEmpty()) {
+      prefix = dellProperties.ecsCatalogPrefix().name();
+    } else {
+      prefix = dellProperties.ecsCatalogPrefix().name() +
+          String.join(dellProperties.ecsCatalogDelimiter(), namespace.levels()) +
+          dellProperties.ecsCatalogDelimiter();
+    }
+
+    return new EcsURI(dellProperties.ecsCatalogPrefix().bucket(), prefix);
+  }
+
+  private TableIdentifier parseTableId(Namespace namespace, EcsURI prefix, S3Object s3Object) {
+    String key = s3Object.getKey();
+    Preconditions.checkArgument(key.startsWith(prefix.name()),
+        "List result should have same prefix", key, prefix);
+
+    String tableName = key.substring(
+        prefix.name().length(),
+        key.length() - TABLE_OBJECT_SUFFIX.length());
+    return TableIdentifier.of(namespace, tableName);
+  }
+
+  /**
+   * Remove table object. If the purge flag is set, remove all data objects.
+   */
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    if (!tableExists(identifier)) {
+      throw new NoSuchTableException("Table %s does not exist", identifier);
+    }
+
+    EcsURI tableObjectURI = tableURI(identifier);
+    if (purge) {
+      // if re-use the same instance, current() will throw exception.
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata current = ops.current();
+      if (current == null) {
+        return false;
+      }
+
+      CatalogUtil.dropTableData(ops.io(), current);
+    }
+
+    client.deleteObject(tableObjectURI.bucket(), tableObjectURI.name());
+    return true;
+  }
+
+  private EcsURI tableURI(TableIdentifier id) {
+    EcsURI prefix = namespacePrefix(id.namespace());
+    // The prefix has the delimiter at the tail.
+    return new EcsURI(prefix.bucket(), prefix.name() + id.name() + TABLE_OBJECT_SUFFIX);
+  }
+
+  /**
+   * Table rename will only move table object, the data objects will still be in-place.
+   *
+   * @param from identifier of the table to rename
+   * @param to   new table name
+   */
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    if (!namespaceExists(to.namespace())) {
+      throw new NoSuchNamespaceException("Cannot rename %s to %s because namespace %s does not exist",
+              from, to, to.namespace());
+    }
+
+    if (tableExists(to)) {
+      throw new AlreadyExistsException("Cannot rename %s because destination table %s exists", from, to);
+    }
+
+    EcsURI fromURI = tableURI(from);
+    if (!objectMetadata(fromURI).isPresent()) {
+      throw new NoSuchTableException("Cannot rename table because table %s does not exist", from);
+    }
+
+    Properties properties = loadProperties(fromURI);
+    EcsURI toURI = tableURI(to);
+
+    if (!putNewProperties(toURI, properties.content())) {
+      throw new AlreadyExistsException("Cannot rename %s because destination table %s exists", from, to);
+    }
+
+    client.deleteObject(fromURI.bucket(), fromURI.name());
+    LOG.info("rename table {} to {}", from, to);
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> properties) {
+    EcsURI namespaceObject = namespaceURI(namespace);
+    if (!putNewProperties(namespaceObject, properties)) {
+      throw new AlreadyExistsException("namespace %s(%s) has already existed", namespace, namespaceObject);
+    }
+  }
+
+  private EcsURI namespaceURI(Namespace namespace) {
+    return new EcsURI(
+        dellProperties.ecsCatalogPrefix().bucket(),
+        dellProperties.ecsCatalogPrefix().name() +
+            String.join(dellProperties.ecsCatalogDelimiter(), namespace.levels()) +
+            NAMESPACE_OBJECT_SUFFIX);
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<Namespace> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter(dellProperties.ecsCatalogDelimiter())
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(NAMESPACE_OBJECT_SUFFIX))
+          .map(object -> parseNamespace(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);
+
+    LOG.debug("Listing namespace {} returned namespaces: {}", namespace, results);
+    return results;
+  }
+
+  private Namespace parseNamespace(Namespace parent, EcsURI prefix, S3Object s3Object) {
+    String key = s3Object.getKey();
+    Preconditions.checkArgument(key.startsWith(prefix.name()),
+        "List result should have same prefix", key, prefix);
+
+    String namespaceName = key.substring(
+        prefix.name().length(),
+        key.length() - NAMESPACE_OBJECT_SUFFIX.length());
+    String[] namespace = Arrays.copyOf(parent.levels(), parent.levels().length + 1);
+    namespace[namespace.length - 1] = namespaceName;
+    return Namespace.of(namespace);
+  }
+
+  /**
+   * Load namespace properties.
+   */
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+    EcsURI namespaceObject = namespaceURI(namespace);
+    if (!objectMetadata(namespaceObject).isPresent()) {
+      throw new NoSuchNamespaceException("Namespace %s(%s) properties object is absent", namespace, namespaceObject);
+    }
+
+    Map<String, String> result = loadProperties(namespaceObject).content();
+
+    LOG.debug("Loaded metadata for namespace {} found {}", namespace, result);
+    return result;
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    if (!listNamespaces(namespace).isEmpty() || !listTables(namespace).isEmpty()) {
+      throw new NamespaceNotEmptyException("Namespace %s is not empty", namespace);
+    }
+
+    EcsURI namespaceObject = namespaceURI(namespace);
+    client.deleteObject(namespaceObject.bucket(), namespaceObject.name());
+    LOG.info("Dropped namespace: {}", namespace);
+    return true;
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+    return updateProperties(namespace, r -> r.putAll(properties));
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+    return updateProperties(namespace, r -> r.keySet().removeAll(properties));
+  }
+
+  public boolean updateProperties(Namespace namespace, Consumer<Map<String, String>> propertiesFn)
+      throws NoSuchNamespaceException {
+
+    // Load old properties
+    Properties oldProperties = loadProperties(namespaceURI(namespace));
+
+    // Put new properties
+    Map<String, String> newProperties = new LinkedHashMap<>(oldProperties.content());
+    propertiesFn.accept(newProperties);
+    LOG.debug("Successfully set properties {} for {}", newProperties.keySet(), namespace);
+    return updatePropertiesObject(namespaceURI(namespace), oldProperties.eTag(), newProperties);
+  }
+
+  @Override
+  public boolean namespaceExists(Namespace namespace) {
+    return objectMetadata(namespaceURI(namespace)).isPresent();
+  }
+
+  @Override
+  public boolean tableExists(TableIdentifier identifier) {
+    return objectMetadata(tableURI(identifier)).isPresent();
+  }
+
+  private void checkURI(EcsURI uri) {
+    Preconditions.checkArgument(uri.bucket().equals(dellProperties.ecsCatalogPrefix().bucket()),
+        "Properties object %s should be in same bucket", uri.location());
+    Preconditions.checkArgument(uri.name().startsWith(dellProperties.ecsCatalogPrefix().name()),
+        "Properties object %s should have prefix", uri.location());

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r827655264



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private EcsURI warehouseLocation;
+  private FileIO fileIO;
+  private CloseableGroup closeableGroup;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.warehouseLocation = new EcsURI(properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.fileIO = initializeFileIO(properties);
+
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(client::destroy);
+    closeableGroup.addCloseable(fileIO);
+    closeableGroup.setSuppressCloseFailure(true);
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(String.format("%s.%s", catalogName, tableIdentifier),
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    String tableName = tableIdentifier.name();
+    StringBuilder sb = new StringBuilder();
+
+    sb.append(warehouseLocation).append('/');
+    for (String level : tableIdentifier.namespace().levels()) {
+      sb.append(level).append('/');
+    }
+    sb.append(tableName);
+
+    return sb.toString();
+  }
+
+  /**
+   * Iterate all table objects with the namespace prefix.
+   */
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter("/")
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX))
+          .map(object -> parseTableId(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);
+
+    LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results);
+    return results;
+  }
+
+  /**
+   * Get object prefix of namespace.
+   */
+  private EcsURI namespacePrefix(Namespace namespace) {
+    String prefix;
+    if (namespace.isEmpty()) {
+      prefix = warehouseLocation.name();
+    } else {
+      prefix = String.format("%s%s/", warehouseLocation.name(),
+          String.join("/", namespace.levels()));
+    }
+
+    return new EcsURI(warehouseLocation.bucket(), prefix);
+  }
+
+  private TableIdentifier parseTableId(Namespace namespace, EcsURI prefix, S3Object s3Object) {
+    String key = s3Object.getKey();
+    Preconditions.checkArgument(key.startsWith(prefix.name()),
+        "List result should have same prefix", key, prefix);
+
+    String tableName = key.substring(
+        prefix.name().length(),
+        key.length() - TABLE_OBJECT_SUFFIX.length());
+    return TableIdentifier.of(namespace, tableName);
+  }
+
+  /**
+   * Remove table object. If the purge flag is set, remove all data objects.
+   */
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    if (!tableExists(identifier)) {
+      throw new NoSuchTableException("Table %s does not exist", identifier);
+    }

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r826792612



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsTableOperations.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.util.Map;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+public class EcsTableOperations extends BaseMetastoreTableOperations {
+
+  public static final String ICEBERG_METADATA_LOCATION = "iceberg_metadata_location";
+
+  private final String tableName;
+  private final FileIO fileIO;
+  private final EcsCatalog catalog;
+  private final EcsURI tableObject;
+
+  /**
+   * Cached E-Tag for CAS commit
+   *
+   * @see #doRefresh() when reset this field
+   * @see #doCommit(TableMetadata, TableMetadata) when use this field
+   */
+  private String eTag;
+
+  public EcsTableOperations(String tableName, EcsURI tableObject, FileIO fileIO, EcsCatalog catalog) {
+    this.tableName = tableName;
+    this.tableObject = tableObject;
+    this.fileIO = fileIO;
+    this.catalog = catalog;
+  }
+
+  @Override
+  protected String tableName() {
+    return tableName;
+  }
+
+  @Override
+  public FileIO io() {
+    return fileIO;
+  }
+
+  @Override
+  protected void doRefresh() {
+    String metadataLocation;
+    if (!catalog.objectMetadata(tableObject).isPresent()) {
+      if (currentMetadataLocation() != null) {
+        throw new NoSuchTableException("Metadata object %s is absent", tableObject);

Review comment:
       As there are some operations in `EcsCatalog` that we don't guarantee the atomicity, so there may remain some metadata files while the object meta data is absent ?  I think we need a more clear message to indicate the case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r817541051



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private String warehouseLocation;
+  private DellProperties dellProperties;
+  private PropertiesSerDes propertiesSerDes;
+  private FileIO fileIO;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.dellProperties = new DellProperties(properties);
+    this.warehouseLocation =
+        cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter());
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.propertiesSerDes = PropertiesSerDes.current();
+    this.fileIO = initializeFileIO(properties);
+  }
+
+  private String cleanWarehouse(String path, String delimiter) {
+    Preconditions.checkArgument(
+        path != null && path.length() > 0,
+        "Cannot initialize EcsCatalog because warehousePath must not be null");
+    int len = path.length();
+    if (path.endsWith(delimiter)) {
+      return path.substring(0, len - delimiter.length());
+    } else {
+      return path;
+    }
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(catalogName + "." + tableIdentifier,

Review comment:
       Nit: we usually don't use `+` to concat multiple strings in iceberg, it's recommended to use `String.format`.

##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";

Review comment:
       Please leave an empty line for the separate variable definitions.

##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private String warehouseLocation;
+  private DellProperties dellProperties;
+  private PropertiesSerDes propertiesSerDes;
+  private FileIO fileIO;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.dellProperties = new DellProperties(properties);
+    this.warehouseLocation =
+        cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter());
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.propertiesSerDes = PropertiesSerDes.current();
+    this.fileIO = initializeFileIO(properties);
+  }
+
+  private String cleanWarehouse(String path, String delimiter) {
+    Preconditions.checkArgument(
+        path != null && path.length() > 0,
+        "Cannot initialize EcsCatalog because warehousePath must not be null");
+    int len = path.length();
+    if (path.endsWith(delimiter)) {
+      return path.substring(0, len - delimiter.length());
+    } else {
+      return path;
+    }
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(catalogName + "." + tableIdentifier,
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(warehouseLocation);
+    for (String level : tableIdentifier.namespace().levels()) {

Review comment:
       All catalogs are following the same default location style (saying `<warehouse>/<database>.db/<table>` ),  Why do we need to introduce a new naming style to define the default location ? 
   https://github.com/apache/iceberg/blob/9d9bab1f27c2c0b22d0024766bd062011f0817a4/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java#L186-L190

##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private String warehouseLocation;
+  private DellProperties dellProperties;
+  private PropertiesSerDes propertiesSerDes;
+  private FileIO fileIO;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.dellProperties = new DellProperties(properties);
+    this.warehouseLocation =
+        cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter());
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.propertiesSerDes = PropertiesSerDes.current();
+    this.fileIO = initializeFileIO(properties);
+  }
+
+  private String cleanWarehouse(String path, String delimiter) {
+    Preconditions.checkArgument(
+        path != null && path.length() > 0,
+        "Cannot initialize EcsCatalog because warehousePath must not be null");
+    int len = path.length();
+    if (path.endsWith(delimiter)) {
+      return path.substring(0, len - delimiter.length());
+    } else {
+      return path;
+    }
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(catalogName + "." + tableIdentifier,
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(warehouseLocation);
+    for (String level : tableIdentifier.namespace().levels()) {
+      builder.append(dellProperties.ecsCatalogDelimiter());
+      builder.append(level);
+    }
+
+    builder.append(dellProperties.ecsCatalogDelimiter());
+    builder.append(tableIdentifier.name());
+    return builder.toString();
+  }
+
+  /**
+   * Iterate all table objects with the namespace prefix.
+   */
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter(dellProperties.ecsCatalogDelimiter())
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX))
+          .map(object -> parseTableId(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);

Review comment:
       Why not just following the `HadoopCatalog` to list tables based on the inherited path ? 

##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private String warehouseLocation;
+  private DellProperties dellProperties;
+  private PropertiesSerDes propertiesSerDes;
+  private FileIO fileIO;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.dellProperties = new DellProperties(properties);
+    this.warehouseLocation =
+        cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter());
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.propertiesSerDes = PropertiesSerDes.current();
+    this.fileIO = initializeFileIO(properties);
+  }
+
+  private String cleanWarehouse(String path, String delimiter) {
+    Preconditions.checkArgument(
+        path != null && path.length() > 0,
+        "Cannot initialize EcsCatalog because warehousePath must not be null");
+    int len = path.length();
+    if (path.endsWith(delimiter)) {
+      return path.substring(0, len - delimiter.length());
+    } else {
+      return path;
+    }
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(catalogName + "." + tableIdentifier,
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(warehouseLocation);
+    for (String level : tableIdentifier.namespace().levels()) {
+      builder.append(dellProperties.ecsCatalogDelimiter());
+      builder.append(level);
+    }
+
+    builder.append(dellProperties.ecsCatalogDelimiter());
+    builder.append(tableIdentifier.name());
+    return builder.toString();
+  }
+
+  /**
+   * Iterate all table objects with the namespace prefix.
+   */
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter(dellProperties.ecsCatalogDelimiter())
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX))
+          .map(object -> parseTableId(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);
+
+    LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results);
+    return results;
+  }
+
+  /**
+   * Get object prefix of namespace.
+   */
+  private EcsURI namespacePrefix(Namespace namespace) {
+    String prefix;
+    if (namespace.isEmpty()) {
+      prefix = dellProperties.ecsCatalogPrefix().name();
+    } else {
+      prefix = dellProperties.ecsCatalogPrefix().name() +
+          String.join(dellProperties.ecsCatalogDelimiter(), namespace.levels()) +
+          dellProperties.ecsCatalogDelimiter();

Review comment:
       Similar issue to the comment in line 144

##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsTableOperations.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.util.Map;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+public class EcsTableOperations extends BaseMetastoreTableOperations {
+
+  public static final String ICEBERG_METADATA_LOCATION = "iceberg_metadata_location";
+
+  private final String tableName;
+  private final FileIO fileIO;
+  private final EcsCatalog catalog;
+  private final EcsURI tableObject;
+
+  /**
+   * Cached E-Tag for CAS commit
+   *
+   * @see #doRefresh() when reset this field
+   * @see #doCommit(TableMetadata, TableMetadata) when use this field
+   */
+  private String eTag;
+
+  public EcsTableOperations(String tableName, EcsURI tableObject, FileIO fileIO, EcsCatalog catalog) {
+    this.tableName = tableName;
+    this.tableObject = tableObject;
+    this.fileIO = fileIO;
+    this.catalog = catalog;
+  }
+
+  @Override
+  protected String tableName() {
+    return tableName;
+  }
+
+  @Override
+  public FileIO io() {
+    return fileIO;
+  }
+
+  @Override
+  protected void doRefresh() {
+    String metadataLocation;
+    if (!catalog.objectMetadata(tableObject).isPresent()) {
+      if (currentMetadataLocation() != null) {
+        throw new NoSuchTableException("Metadata object %s is absent", tableObject);
+      } else {
+        metadataLocation = null;
+      }
+    } else {
+      EcsCatalog.Properties metadata = catalog.loadProperties(tableObject);
+      this.eTag = metadata.eTag();
+      metadataLocation = metadata.content().get(ICEBERG_METADATA_LOCATION);
+      Preconditions.checkNotNull(metadataLocation,
+          "Can't find location from table metadata %s", tableObject);
+    }
+
+    refreshFromMetadataLocation(metadataLocation);
+  }
+
+  @Override
+  protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
+    if (base == null) {
+      // create a new table, the metadataKey should be absent
+      if (!catalog.putNewProperties(tableObject, buildProperties(newMetadataLocation))) {

Review comment:
       Is the Dell EMC's `putObject`  atomic ?  In this [putNewProperties](https://github.com/apache/iceberg/pull/4221/files#diff-d7308f132665917b84cff30f2688171a11a64e951fd9ed995e3040d0463cda51R465-R471),  I think the `putObject` implementation will check the existence of the original key, if the key is present, then it will throw a `PreconditionFailed`, otherwise write the content into the backend storage.
   
   The key thing is :  the two operations ( check existence of key and write the content ) should be atomic in a single transaction. Otherwise, assume there are two concurrent writers A and B,  both A and B pass the existence check, and then try to overwrite the content to the same key.  Finally, the result will be unknown because we don't know which one has overwrote the other one.
   
   If the atomicity is not be guaranteed,  then we can not use this `putObject` to commit the transaction. Instead, we are recommended to use an external table lock service.

##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private String warehouseLocation;
+  private DellProperties dellProperties;
+  private PropertiesSerDes propertiesSerDes;
+  private FileIO fileIO;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.dellProperties = new DellProperties(properties);
+    this.warehouseLocation =
+        cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter());
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.propertiesSerDes = PropertiesSerDes.current();
+    this.fileIO = initializeFileIO(properties);
+  }
+
+  private String cleanWarehouse(String path, String delimiter) {
+    Preconditions.checkArgument(
+        path != null && path.length() > 0,
+        "Cannot initialize EcsCatalog because warehousePath must not be null");
+    int len = path.length();
+    if (path.endsWith(delimiter)) {
+      return path.substring(0, len - delimiter.length());
+    } else {
+      return path;
+    }
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(catalogName + "." + tableIdentifier,
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(warehouseLocation);
+    for (String level : tableIdentifier.namespace().levels()) {
+      builder.append(dellProperties.ecsCatalogDelimiter());
+      builder.append(level);
+    }
+
+    builder.append(dellProperties.ecsCatalogDelimiter());
+    builder.append(tableIdentifier.name());
+    return builder.toString();
+  }
+
+  /**
+   * Iterate all table objects with the namespace prefix.
+   */
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter(dellProperties.ecsCatalogDelimiter())
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX))
+          .map(object -> parseTableId(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);
+
+    LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results);
+    return results;
+  }
+
+  /**
+   * Get object prefix of namespace.
+   */
+  private EcsURI namespacePrefix(Namespace namespace) {
+    String prefix;
+    if (namespace.isEmpty()) {
+      prefix = dellProperties.ecsCatalogPrefix().name();
+    } else {
+      prefix = dellProperties.ecsCatalogPrefix().name() +
+          String.join(dellProperties.ecsCatalogDelimiter(), namespace.levels()) +
+          dellProperties.ecsCatalogDelimiter();
+    }
+
+    return new EcsURI(dellProperties.ecsCatalogPrefix().bucket(), prefix);
+  }
+
+  private TableIdentifier parseTableId(Namespace namespace, EcsURI prefix, S3Object s3Object) {
+    String key = s3Object.getKey();
+    Preconditions.checkArgument(key.startsWith(prefix.name()),
+        "List result should have same prefix", key, prefix);
+
+    String tableName = key.substring(
+        prefix.name().length(),
+        key.length() - TABLE_OBJECT_SUFFIX.length());
+    return TableIdentifier.of(namespace, tableName);
+  }
+
+  /**
+   * Remove table object. If the purge flag is set, remove all data objects.
+   */
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    if (!tableExists(identifier)) {
+      throw new NoSuchTableException("Table %s does not exist", identifier);
+    }
+
+    EcsURI tableObjectURI = tableURI(identifier);
+    if (purge) {
+      // if re-use the same instance, current() will throw exception.
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata current = ops.current();
+      if (current == null) {
+        return false;
+      }
+
+      CatalogUtil.dropTableData(ops.io(), current);
+    }
+
+    client.deleteObject(tableObjectURI.bucket(), tableObjectURI.name());
+    return true;
+  }
+
+  private EcsURI tableURI(TableIdentifier id) {
+    EcsURI prefix = namespacePrefix(id.namespace());
+    return new EcsURI(prefix.bucket(), prefix.name() + id.name() + TABLE_OBJECT_SUFFIX);

Review comment:
       If the table identifier don't have any namespace, then I think we should add a `delimiter` between the `prefix.name` and `id.name`, right ? 

##########
File path: dell/src/main/java/org/apache/iceberg/dell/DellProperties.java
##########
@@ -38,6 +40,21 @@
    */
   public static final String ECS_S3_ENDPOINT = "ecs.s3.endpoint";
 
+  /**
+   * Catalog prefix is used to store catalog data. If not set, use {@link CatalogProperties#WAREHOUSE_LOCATION}.
+   * <p>
+   * The value is an EcsURI which like ecs://bucket/prefix.
+   */
+  public static final String ECS_CATALOG_PREFIX = "ecs.catalog.prefix";

Review comment:
       You mean people will want to store their metadata into another separate URI,  what's the benefit ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r818270769



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private String warehouseLocation;
+  private DellProperties dellProperties;
+  private PropertiesSerDes propertiesSerDes;
+  private FileIO fileIO;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.dellProperties = new DellProperties(properties);
+    this.warehouseLocation =
+        cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter());
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.propertiesSerDes = PropertiesSerDes.current();
+    this.fileIO = initializeFileIO(properties);
+  }
+
+  private String cleanWarehouse(String path, String delimiter) {
+    Preconditions.checkArgument(
+        path != null && path.length() > 0,
+        "Cannot initialize EcsCatalog because warehousePath must not be null");
+    int len = path.length();
+    if (path.endsWith(delimiter)) {
+      return path.substring(0, len - delimiter.length());
+    } else {
+      return path;
+    }
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(catalogName + "." + tableIdentifier,
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(warehouseLocation);
+    for (String level : tableIdentifier.namespace().levels()) {
+      builder.append(dellProperties.ecsCatalogDelimiter());
+      builder.append(level);
+    }
+
+    builder.append(dellProperties.ecsCatalogDelimiter());
+    builder.append(tableIdentifier.name());
+    return builder.toString();
+  }
+
+  /**
+   * Iterate all table objects with the namespace prefix.
+   */
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter(dellProperties.ecsCatalogDelimiter())
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX))
+          .map(object -> parseTableId(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);
+
+    LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results);
+    return results;
+  }
+
+  /**
+   * Get object prefix of namespace.
+   */
+  private EcsURI namespacePrefix(Namespace namespace) {
+    String prefix;
+    if (namespace.isEmpty()) {
+      prefix = dellProperties.ecsCatalogPrefix().name();
+    } else {
+      prefix = dellProperties.ecsCatalogPrefix().name() +
+          String.join(dellProperties.ecsCatalogDelimiter(), namespace.levels()) +
+          dellProperties.ecsCatalogDelimiter();

Review comment:
       Because the delimiter listing is different with fs which allows leading '/'. The leading delimiter should be skipped when the prefix has no value.
   For example, the bucket has **t1.table**, **n1.namespace**, **n1/t2.table**, listing the table **t1** of empty namespace should use blank prefix ` `, and listing the table **t2** should use prefix `n1/`
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r823525601



##########
File path: dell/src/main/java/org/apache/iceberg/dell/DellProperties.java
##########
@@ -38,6 +40,21 @@
    */
   public static final String ECS_S3_ENDPOINT = "ecs.s3.endpoint";
 
+  /**
+   * Catalog prefix is used to store catalog data. If not set, use {@link CatalogProperties#WAREHOUSE_LOCATION}.
+   * <p>
+   * The value is an EcsURI which like ecs://bucket/prefix.
+   */
+  public static final String ECS_CATALOG_PREFIX = "ecs.catalog.prefix";
+
+  /**
+   * Catalog delimiter is separator of namespace levels. Default value is '/'.
+   * <p>
+   * For example, the properties object of namespace [a, b] is ecs://bucket/prefix/a/b.namespace when delimiter is '/',
+   * and is ecs://bucket/prefix-a-b when delimiter is '-'.
+   */
+  public static final String ECS_CATALOG_DELIMITER = "ecs.catalog.delimiter";

Review comment:
       The name has the similar issue as the above described. Why the `ecs` catalog need a `delimiter` ?  I suggest to name it `ecs.catalog.metadata.path-delimiter`, or other more clear name.

##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private String warehouseLocation;
+  private DellProperties dellProperties;
+  private PropertiesSerDes propertiesSerDes;
+  private FileIO fileIO;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.dellProperties = new DellProperties(properties);
+    this.warehouseLocation =
+        cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter());
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.propertiesSerDes = PropertiesSerDes.current();
+    this.fileIO = initializeFileIO(properties);

Review comment:
       Seems we've just missed to close the `fileIO`, right ?

##########
File path: dell/src/main/java/org/apache/iceberg/dell/DellProperties.java
##########
@@ -38,6 +40,21 @@
    */
   public static final String ECS_S3_ENDPOINT = "ecs.s3.endpoint";
 
+  /**
+   * Catalog prefix is used to store catalog data. If not set, use {@link CatalogProperties#WAREHOUSE_LOCATION}.
+   * <p>
+   * The value is an EcsURI which like ecs://bucket/prefix.
+   */
+  public static final String ECS_CATALOG_PREFIX = "ecs.catalog.prefix";

Review comment:
       I still don't understand what's the benefit that we made the metadata into a customized separate path.

##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private String warehouseLocation;
+  private DellProperties dellProperties;
+  private PropertiesSerDes propertiesSerDes;
+  private FileIO fileIO;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.dellProperties = new DellProperties(properties);
+    this.warehouseLocation =
+        cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter());
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.propertiesSerDes = PropertiesSerDes.current();
+    this.fileIO = initializeFileIO(properties);
+  }
+
+  private String cleanWarehouse(String path, String delimiter) {
+    Preconditions.checkArgument(
+        path != null && path.length() > 0,
+        "Cannot initialize EcsCatalog because warehousePath must not be null");
+    int len = path.length();
+    if (path.endsWith(delimiter)) {
+      return path.substring(0, len - delimiter.length());
+    } else {
+      return path;
+    }
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(String.format("%s.%s", catalogName, tableIdentifier),
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(warehouseLocation);
+    for (String level : tableIdentifier.namespace().levels()) {
+      builder.append(dellProperties.ecsCatalogDelimiter());
+      builder.append(level);
+    }
+
+    builder.append(dellProperties.ecsCatalogDelimiter());
+    builder.append(tableIdentifier.name());
+    return builder.toString();
+  }
+
+  /**
+   * Iterate all table objects with the namespace prefix.
+   */
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter(dellProperties.ecsCatalogDelimiter())
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX))
+          .map(object -> parseTableId(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);
+
+    LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results);
+    return results;
+  }
+
+  /**
+   * Get object prefix of namespace.
+   */
+  private EcsURI namespacePrefix(Namespace namespace) {
+    String prefix;
+    if (namespace.isEmpty()) {
+      prefix = dellProperties.ecsCatalogPrefix().name();
+    } else {
+      prefix = dellProperties.ecsCatalogPrefix().name() +
+          String.join(dellProperties.ecsCatalogDelimiter(), namespace.levels()) +
+          dellProperties.ecsCatalogDelimiter();
+    }
+
+    return new EcsURI(dellProperties.ecsCatalogPrefix().bucket(), prefix);
+  }
+
+  private TableIdentifier parseTableId(Namespace namespace, EcsURI prefix, S3Object s3Object) {
+    String key = s3Object.getKey();
+    Preconditions.checkArgument(key.startsWith(prefix.name()),
+        "List result should have same prefix", key, prefix);
+
+    String tableName = key.substring(
+        prefix.name().length(),
+        key.length() - TABLE_OBJECT_SUFFIX.length());
+    return TableIdentifier.of(namespace, tableName);
+  }
+
+  /**
+   * Remove table object. If the purge flag is set, remove all data objects.
+   */
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    if (!tableExists(identifier)) {
+      throw new NoSuchTableException("Table %s does not exist", identifier);
+    }
+
+    EcsURI tableObjectURI = tableURI(identifier);
+    if (purge) {
+      // if re-use the same instance, current() will throw exception.
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata current = ops.current();
+      if (current == null) {
+        return false;
+      }
+
+      CatalogUtil.dropTableData(ops.io(), current);
+    }
+
+    client.deleteObject(tableObjectURI.bucket(), tableObjectURI.name());
+    return true;
+  }
+
+  private EcsURI tableURI(TableIdentifier id) {
+    EcsURI prefix = namespacePrefix(id.namespace());
+    // The prefix has the delimiter at the tail.
+    return new EcsURI(prefix.bucket(), prefix.name() + id.name() + TABLE_OBJECT_SUFFIX);
+  }
+
+  /**
+   * Table rename will only move table object, the data objects will still be in-place.
+   *
+   * @param from identifier of the table to rename
+   * @param to   new table name
+   */
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    if (!namespaceExists(to.namespace())) {
+      throw new NoSuchNamespaceException("Cannot rename %s to %s because namespace %s does not exist",
+              from, to, to.namespace());
+    }
+
+    if (tableExists(to)) {
+      throw new AlreadyExistsException("Cannot rename %s because destination table %s exists", from, to);
+    }
+
+    EcsURI fromURI = tableURI(from);
+    if (!objectMetadata(fromURI).isPresent()) {
+      throw new NoSuchTableException("Cannot rename table because table %s does not exist", from);
+    }
+
+    Properties properties = loadProperties(fromURI);
+    EcsURI toURI = tableURI(to);
+
+    if (!putNewProperties(toURI, properties.content())) {
+      throw new AlreadyExistsException("Cannot rename %s because destination table %s exists", from, to);
+    }
+
+    client.deleteObject(fromURI.bucket(), fromURI.name());
+    LOG.info("rename table {} to {}", from, to);
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> properties) {
+    EcsURI namespaceObject = namespaceURI(namespace);
+    if (!putNewProperties(namespaceObject, properties)) {
+      throw new AlreadyExistsException("namespace %s(%s) has already existed", namespace, namespaceObject);
+    }
+  }
+
+  private EcsURI namespaceURI(Namespace namespace) {
+    return new EcsURI(
+        dellProperties.ecsCatalogPrefix().bucket(),
+        dellProperties.ecsCatalogPrefix().name() +
+            String.join(dellProperties.ecsCatalogDelimiter(), namespace.levels()) +
+            NAMESPACE_OBJECT_SUFFIX);
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<Namespace> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter(dellProperties.ecsCatalogDelimiter())
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(NAMESPACE_OBJECT_SUFFIX))
+          .map(object -> parseNamespace(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);
+
+    LOG.debug("Listing namespace {} returned namespaces: {}", namespace, results);
+    return results;
+  }
+
+  private Namespace parseNamespace(Namespace parent, EcsURI prefix, S3Object s3Object) {
+    String key = s3Object.getKey();
+    Preconditions.checkArgument(key.startsWith(prefix.name()),
+        "List result should have same prefix", key, prefix);
+
+    String namespaceName = key.substring(
+        prefix.name().length(),
+        key.length() - NAMESPACE_OBJECT_SUFFIX.length());
+    String[] namespace = Arrays.copyOf(parent.levels(), parent.levels().length + 1);
+    namespace[namespace.length - 1] = namespaceName;
+    return Namespace.of(namespace);
+  }
+
+  /**
+   * Load namespace properties.
+   */
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+    EcsURI namespaceObject = namespaceURI(namespace);
+    if (!objectMetadata(namespaceObject).isPresent()) {
+      throw new NoSuchNamespaceException("Namespace %s(%s) properties object is absent", namespace, namespaceObject);
+    }
+
+    Map<String, String> result = loadProperties(namespaceObject).content();
+
+    LOG.debug("Loaded metadata for namespace {} found {}", namespace, result);
+    return result;
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    if (!listNamespaces(namespace).isEmpty() || !listTables(namespace).isEmpty()) {
+      throw new NamespaceNotEmptyException("Namespace %s is not empty", namespace);
+    }
+
+    EcsURI namespaceObject = namespaceURI(namespace);
+    client.deleteObject(namespaceObject.bucket(), namespaceObject.name());
+    LOG.info("Dropped namespace: {}", namespace);
+    return true;
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+    return updateProperties(namespace, r -> r.putAll(properties));
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+    return updateProperties(namespace, r -> r.keySet().removeAll(properties));
+  }
+
+  public boolean updateProperties(Namespace namespace, Consumer<Map<String, String>> propertiesFn)
+      throws NoSuchNamespaceException {
+
+    // Load old properties
+    Properties oldProperties = loadProperties(namespaceURI(namespace));
+
+    // Put new properties
+    Map<String, String> newProperties = new LinkedHashMap<>(oldProperties.content());
+    propertiesFn.accept(newProperties);
+    LOG.debug("Successfully set properties {} for {}", newProperties.keySet(), namespace);
+    return updatePropertiesObject(namespaceURI(namespace), oldProperties.eTag(), newProperties);
+  }
+
+  @Override
+  public boolean namespaceExists(Namespace namespace) {
+    return objectMetadata(namespaceURI(namespace)).isPresent();
+  }
+
+  @Override
+  public boolean tableExists(TableIdentifier identifier) {
+    return objectMetadata(tableURI(identifier)).isPresent();
+  }
+
+  private void checkURI(EcsURI uri) {
+    Preconditions.checkArgument(uri.bucket().equals(dellProperties.ecsCatalogPrefix().bucket()),
+        "Properties object %s should be in same bucket", uri.location());
+    Preconditions.checkArgument(uri.name().startsWith(dellProperties.ecsCatalogPrefix().name()),
+        "Properties object %s should have prefix", uri.location());

Review comment:
       `Should have the expected prefix %s`, It's more friendly to add this ecs catalog prefix.

##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+  /**

Review comment:
       Nit:  Pls leave an empty blank ?

##########
File path: dell/src/main/java/org/apache/iceberg/dell/DellProperties.java
##########
@@ -38,6 +40,21 @@
    */
   public static final String ECS_S3_ENDPOINT = "ecs.s3.endpoint";
 
+  /**
+   * Catalog prefix is used to store catalog data. If not set, use {@link CatalogProperties#WAREHOUSE_LOCATION}.
+   * <p>
+   * The value is an EcsURI which like ecs://bucket/prefix.
+   */
+  public static final String ECS_CATALOG_PREFIX = "ecs.catalog.prefix";

Review comment:
       If you really want to make a separate catalog path for the ECS metadata,  I will suggest to name it as `ecs.catalog.metadata.path` . The `ecs.catalog.prefix` is quite confuse for me, why do the catalog need a prefix ?

##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/PropertiesSerDes.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convert Map properties to bytes.
+ */
+public interface PropertiesSerDes {

Review comment:
       Do we plan to introduce other `PropertiesSerDes` implementations ?  If not, why do we need to add this utility as a interface ?

##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private String warehouseLocation;
+  private DellProperties dellProperties;
+  private PropertiesSerDes propertiesSerDes;
+  private FileIO fileIO;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.dellProperties = new DellProperties(properties);
+    this.warehouseLocation =
+        cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter());
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.propertiesSerDes = PropertiesSerDes.current();
+    this.fileIO = initializeFileIO(properties);
+  }
+
+  private String cleanWarehouse(String path, String delimiter) {
+    Preconditions.checkArgument(
+        path != null && path.length() > 0,
+        "Cannot initialize EcsCatalog because warehousePath must not be null");
+    int len = path.length();
+    if (path.endsWith(delimiter)) {
+      return path.substring(0, len - delimiter.length());
+    } else {
+      return path;
+    }
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(String.format("%s.%s", catalogName, tableIdentifier),
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(warehouseLocation);
+    for (String level : tableIdentifier.namespace().levels()) {
+      builder.append(dellProperties.ecsCatalogDelimiter());
+      builder.append(level);
+    }
+
+    builder.append(dellProperties.ecsCatalogDelimiter());
+    builder.append(tableIdentifier.name());
+    return builder.toString();
+  }
+
+  /**
+   * Iterate all table objects with the namespace prefix.
+   */
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter(dellProperties.ecsCatalogDelimiter())
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX))
+          .map(object -> parseTableId(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);
+
+    LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results);
+    return results;
+  }
+
+  /**
+   * Get object prefix of namespace.
+   */
+  private EcsURI namespacePrefix(Namespace namespace) {
+    String prefix;
+    if (namespace.isEmpty()) {
+      prefix = dellProperties.ecsCatalogPrefix().name();
+    } else {
+      prefix = dellProperties.ecsCatalogPrefix().name() +
+          String.join(dellProperties.ecsCatalogDelimiter(), namespace.levels()) +
+          dellProperties.ecsCatalogDelimiter();
+    }
+
+    return new EcsURI(dellProperties.ecsCatalogPrefix().bucket(), prefix);
+  }
+
+  private TableIdentifier parseTableId(Namespace namespace, EcsURI prefix, S3Object s3Object) {
+    String key = s3Object.getKey();
+    Preconditions.checkArgument(key.startsWith(prefix.name()),
+        "List result should have same prefix", key, prefix);
+
+    String tableName = key.substring(
+        prefix.name().length(),
+        key.length() - TABLE_OBJECT_SUFFIX.length());
+    return TableIdentifier.of(namespace, tableName);
+  }
+
+  /**
+   * Remove table object. If the purge flag is set, remove all data objects.
+   */
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    if (!tableExists(identifier)) {
+      throw new NoSuchTableException("Table %s does not exist", identifier);
+    }
+
+    EcsURI tableObjectURI = tableURI(identifier);
+    if (purge) {
+      // if re-use the same instance, current() will throw exception.
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata current = ops.current();
+      if (current == null) {
+        return false;
+      }
+
+      CatalogUtil.dropTableData(ops.io(), current);
+    }
+
+    client.deleteObject(tableObjectURI.bucket(), tableObjectURI.name());
+    return true;
+  }
+
+  private EcsURI tableURI(TableIdentifier id) {
+    EcsURI prefix = namespacePrefix(id.namespace());
+    // The prefix has the delimiter at the tail.
+    return new EcsURI(prefix.bucket(), prefix.name() + id.name() + TABLE_OBJECT_SUFFIX);
+  }
+
+  /**
+   * Table rename will only move table object, the data objects will still be in-place.
+   *
+   * @param from identifier of the table to rename
+   * @param to   new table name
+   */
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    if (!namespaceExists(to.namespace())) {
+      throw new NoSuchNamespaceException("Cannot rename %s to %s because namespace %s does not exist",
+              from, to, to.namespace());
+    }
+
+    if (tableExists(to)) {
+      throw new AlreadyExistsException("Cannot rename %s because destination table %s exists", from, to);
+    }
+
+    EcsURI fromURI = tableURI(from);
+    if (!objectMetadata(fromURI).isPresent()) {
+      throw new NoSuchTableException("Cannot rename table because table %s does not exist", from);
+    }
+
+    Properties properties = loadProperties(fromURI);
+    EcsURI toURI = tableURI(to);
+
+    if (!putNewProperties(toURI, properties.content())) {
+      throw new AlreadyExistsException("Cannot rename %s because destination table %s exists", from, to);
+    }
+
+    client.deleteObject(fromURI.bucket(), fromURI.name());
+    LOG.info("rename table {} to {}", from, to);
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> properties) {
+    EcsURI namespaceObject = namespaceURI(namespace);
+    if (!putNewProperties(namespaceObject, properties)) {
+      throw new AlreadyExistsException("namespace %s(%s) has already existed", namespace, namespaceObject);
+    }
+  }
+
+  private EcsURI namespaceURI(Namespace namespace) {
+    return new EcsURI(
+        dellProperties.ecsCatalogPrefix().bucket(),
+        dellProperties.ecsCatalogPrefix().name() +
+            String.join(dellProperties.ecsCatalogDelimiter(), namespace.levels()) +
+            NAMESPACE_OBJECT_SUFFIX);

Review comment:
       Nit:  Please use the `String.format` to concat the strings, that makes easier to read the pattern.

##########
File path: dell/src/main/java/org/apache/iceberg/dell/DellProperties.java
##########
@@ -38,6 +40,21 @@
    */
   public static final String ECS_S3_ENDPOINT = "ecs.s3.endpoint";
 
+  /**
+   * Catalog prefix is used to store catalog data. If not set, use {@link CatalogProperties#WAREHOUSE_LOCATION}.
+   * <p>
+   * The value is an EcsURI which like ecs://bucket/prefix.
+   */
+  public static final String ECS_CATALOG_PREFIX = "ecs.catalog.prefix";
+
+  /**
+   * Catalog delimiter is separator of namespace levels. Default value is '/'.
+   * <p>
+   * For example, the properties object of namespace [a, b] is ecs://bucket/prefix/a/b.namespace when delimiter is '/',
+   * and is ecs://bucket/prefix-a-b when delimiter is '-'.
+   */
+  public static final String ECS_CATALOG_DELIMITER = "ecs.catalog.delimiter";

Review comment:
       Besides, I just don't understand why we need to expose this config key to the end users.  Why do people need to configure to use a customized delimiter rather than use the default slash ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r818267986



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private String warehouseLocation;
+  private DellProperties dellProperties;
+  private PropertiesSerDes propertiesSerDes;
+  private FileIO fileIO;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.dellProperties = new DellProperties(properties);
+    this.warehouseLocation =
+        cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter());
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.propertiesSerDes = PropertiesSerDes.current();
+    this.fileIO = initializeFileIO(properties);
+  }
+
+  private String cleanWarehouse(String path, String delimiter) {
+    Preconditions.checkArgument(
+        path != null && path.length() > 0,
+        "Cannot initialize EcsCatalog because warehousePath must not be null");
+    int len = path.length();
+    if (path.endsWith(delimiter)) {
+      return path.substring(0, len - delimiter.length());
+    } else {
+      return path;
+    }
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(catalogName + "." + tableIdentifier,
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(warehouseLocation);
+    for (String level : tableIdentifier.namespace().levels()) {
+      builder.append(dellProperties.ecsCatalogDelimiter());
+      builder.append(level);
+    }
+
+    builder.append(dellProperties.ecsCatalogDelimiter());
+    builder.append(tableIdentifier.name());
+    return builder.toString();
+  }
+
+  /**
+   * Iterate all table objects with the namespace prefix.
+   */
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter(dellProperties.ecsCatalogDelimiter())
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX))
+          .map(object -> parseTableId(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);

Review comment:
       The table needs an object to store the latest table snapshot location. So I reuse this object to list tables.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r824472868



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/PropertiesSerDes.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convert Map properties to bytes.
+ */
+public interface PropertiesSerDes {

Review comment:
       There is no change in the short term. I changed it to util class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r826799227



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/PropertiesSerDesUtil.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convert Map properties to bytes.
+ */
+public class PropertiesSerDesUtil {
+
+  private PropertiesSerDesUtil() {
+  }
+
+  /**
+   * Version of current implementation.
+   */
+  private static final String CURRENT_VERSION = "0";
+
+  private static final Logger LOG = LoggerFactory.getLogger(PropertiesSerDesUtil.class);
+
+  /**
+   * Read properties
+   *
+   * @param version is the version of {@link PropertiesSerDesUtil}
+   */
+  public static Map<String, String> read(InputStream input, String version) {
+    Preconditions.checkArgument(version.equals(CURRENT_VERSION),
+        "Properties version is not match", version);
+    Properties jdkProperties = new Properties();
+    try {
+      jdkProperties.load(new InputStreamReader(input, StandardCharsets.UTF_8));
+    } catch (IOException e) {
+      LOG.error("fail to read properties", e);
+      throw new UncheckedIOException(e);
+    }
+
+    Set<String> propertyNames = jdkProperties.stringPropertyNames();
+    Map<String, String> properties = Maps.newHashMap();
+    for (String name : propertyNames) {
+      properties.put(name, jdkProperties.getProperty(name));
+    }
+
+    return Collections.unmodifiableMap(properties);
+  }
+
+  /**
+   * Write properties, the version is {@link #currentVersion()}
+   */
+  public static byte[] toBytes(Map<String, String> value) {
+    Properties jdkProperties = new Properties();
+    for (Map.Entry<String, String> entry : value.entrySet()) {
+      jdkProperties.setProperty(entry.getKey(), entry.getValue());
+    }
+
+    try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
+      jdkProperties.store(new OutputStreamWriter(output, StandardCharsets.UTF_8), null);
+      return output.toByteArray();
+    } catch (IOException e) {
+      LOG.error("fail to store properties {} to file", value, e);

Review comment:
       Ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r826800148



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/PropertiesSerDesUtil.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convert Map properties to bytes.
+ */
+public class PropertiesSerDesUtil {
+
+  private PropertiesSerDesUtil() {
+  }
+
+  /**
+   * Version of current implementation.
+   */
+  private static final String CURRENT_VERSION = "0";
+
+  private static final Logger LOG = LoggerFactory.getLogger(PropertiesSerDesUtil.class);
+
+  /**
+   * Read properties
+   *
+   * @param version is the version of {@link PropertiesSerDesUtil}
+   */
+  public static Map<String, String> read(InputStream input, String version) {
+    Preconditions.checkArgument(version.equals(CURRENT_VERSION),
+        "Properties version is not match", version);
+    Properties jdkProperties = new Properties();
+    try {
+      jdkProperties.load(new InputStreamReader(input, StandardCharsets.UTF_8));
+    } catch (IOException e) {
+      LOG.error("fail to read properties", e);
+      throw new UncheckedIOException(e);
+    }
+
+    Set<String> propertyNames = jdkProperties.stringPropertyNames();
+    Map<String, String> properties = Maps.newHashMap();
+    for (String name : propertyNames) {
+      properties.put(name, jdkProperties.getProperty(name));
+    }
+
+    return Collections.unmodifiableMap(properties);
+  }
+
+  /**
+   * Write properties, the version is {@link #currentVersion()}
+   */
+  public static byte[] toBytes(Map<String, String> value) {
+    Properties jdkProperties = new Properties();
+    for (Map.Entry<String, String> entry : value.entrySet()) {
+      jdkProperties.setProperty(entry.getKey(), entry.getValue());
+    }
+
+    try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
+      jdkProperties.store(new OutputStreamWriter(output, StandardCharsets.UTF_8), null);

Review comment:
       Should we also close the `OutputStreamWriter` ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r830937617



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private EcsURI warehouseLocation;
+  private FileIO fileIO;
+  private CloseableGroup closeableGroup;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.warehouseLocation = cleanWarehousePath(properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.fileIO = initializeFileIO(properties);
+
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(client::destroy);
+    closeableGroup.addCloseable(fileIO);
+    closeableGroup.setSuppressCloseFailure(true);
+  }
+
+  private EcsURI cleanWarehousePath(String path) {
+    Preconditions.checkArgument(path != null && path.length() > 0,
+            "Cannot initialize EcsCatalog because warehousePath must not be null");

Review comment:
       Fixed!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r814463434



##########
File path: dell/src/main/java/org/apache/iceberg/dell/DellProperties.java
##########
@@ -38,6 +40,21 @@
    */
   public static final String ECS_S3_ENDPOINT = "ecs.s3.endpoint";
 
+  /**
+   * Catalog prefix is used to store catalog data. If not set, use {@link CatalogProperties#WAREHOUSE_LOCATION}.
+   * <p>
+   * The value is an EcsURI which like ecs://bucket/prefix.
+   */
+  public static final String ECS_CATALOG_PREFIX = "ecs.catalog.prefix";

Review comment:
       As we usually specify a warehouse location in the FS based catalog,  why do we still need an extra `ecs.catalog.prefix` config which has the similar meaning as the `CatalogProperties#WAREHOUSE_LOCATION` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r814462383



##########
File path: build.gradle
##########
@@ -594,6 +594,13 @@ project(':iceberg-dell') {
     implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
     compileOnly 'com.emc.ecs:object-client-bundle'
 
+    compileOnly("org.apache.hadoop:hadoop-common") {

Review comment:
       Why do we need the hadoop-common in the compile path for `iceberg-dell` FileIO & Catalog ?  In my mind, we had a several PR to remove all the hadoop dependencies for 3rd party `FileIO` and `Catalog` implementation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r818265156



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private String warehouseLocation;
+  private DellProperties dellProperties;
+  private PropertiesSerDes propertiesSerDes;
+  private FileIO fileIO;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.dellProperties = new DellProperties(properties);
+    this.warehouseLocation =
+        cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter());
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.propertiesSerDes = PropertiesSerDes.current();
+    this.fileIO = initializeFileIO(properties);
+  }
+
+  private String cleanWarehouse(String path, String delimiter) {
+    Preconditions.checkArgument(
+        path != null && path.length() > 0,
+        "Cannot initialize EcsCatalog because warehousePath must not be null");
+    int len = path.length();
+    if (path.endsWith(delimiter)) {
+      return path.substring(0, len - delimiter.length());
+    } else {
+      return path;
+    }
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(catalogName + "." + tableIdentifier,
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(warehouseLocation);
+    for (String level : tableIdentifier.namespace().levels()) {
+      builder.append(dellProperties.ecsCatalogDelimiter());
+      builder.append(level);
+    }
+
+    builder.append(dellProperties.ecsCatalogDelimiter());
+    builder.append(tableIdentifier.name());
+    return builder.toString();
+  }
+
+  /**
+   * Iterate all table objects with the namespace prefix.
+   */
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter(dellProperties.ecsCatalogDelimiter())
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX))
+          .map(object -> parseTableId(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);
+
+    LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results);
+    return results;
+  }
+
+  /**
+   * Get object prefix of namespace.
+   */
+  private EcsURI namespacePrefix(Namespace namespace) {
+    String prefix;
+    if (namespace.isEmpty()) {
+      prefix = dellProperties.ecsCatalogPrefix().name();
+    } else {
+      prefix = dellProperties.ecsCatalogPrefix().name() +
+          String.join(dellProperties.ecsCatalogDelimiter(), namespace.levels()) +
+          dellProperties.ecsCatalogDelimiter();
+    }
+
+    return new EcsURI(dellProperties.ecsCatalogPrefix().bucket(), prefix);
+  }
+
+  private TableIdentifier parseTableId(Namespace namespace, EcsURI prefix, S3Object s3Object) {
+    String key = s3Object.getKey();
+    Preconditions.checkArgument(key.startsWith(prefix.name()),
+        "List result should have same prefix", key, prefix);
+
+    String tableName = key.substring(
+        prefix.name().length(),
+        key.length() - TABLE_OBJECT_SUFFIX.length());
+    return TableIdentifier.of(namespace, tableName);
+  }
+
+  /**
+   * Remove table object. If the purge flag is set, remove all data objects.
+   */
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    if (!tableExists(identifier)) {
+      throw new NoSuchTableException("Table %s does not exist", identifier);
+    }
+
+    EcsURI tableObjectURI = tableURI(identifier);
+    if (purge) {
+      // if re-use the same instance, current() will throw exception.
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata current = ops.current();
+      if (current == null) {
+        return false;
+      }
+
+      CatalogUtil.dropTableData(ops.io(), current);
+    }
+
+    client.deleteObject(tableObjectURI.bucket(), tableObjectURI.name());
+    return true;
+  }
+
+  private EcsURI tableURI(TableIdentifier id) {
+    EcsURI prefix = namespacePrefix(id.namespace());
+    return new EcsURI(prefix.bucket(), prefix.name() + id.name() + TABLE_OBJECT_SUFFIX);

Review comment:
       The `namespacePrefix` method added the delimiter. I'll add a comment for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r818266839



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.dell.DellProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private String warehouseLocation;
+  private DellProperties dellProperties;
+  private PropertiesSerDes propertiesSerDes;
+  private FileIO fileIO;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.dellProperties = new DellProperties(properties);
+    this.warehouseLocation =
+        cleanWarehouse(properties.get(CatalogProperties.WAREHOUSE_LOCATION), dellProperties.ecsCatalogDelimiter());
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.propertiesSerDes = PropertiesSerDes.current();
+    this.fileIO = initializeFileIO(properties);
+  }
+
+  private String cleanWarehouse(String path, String delimiter) {
+    Preconditions.checkArgument(
+        path != null && path.length() > 0,
+        "Cannot initialize EcsCatalog because warehousePath must not be null");
+    int len = path.length();
+    if (path.endsWith(delimiter)) {
+      return path.substring(0, len - delimiter.length());
+    } else {
+      return path;
+    }
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(catalogName + "." + tableIdentifier,
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(warehouseLocation);
+    for (String level : tableIdentifier.namespace().levels()) {

Review comment:
       This catalog is similar to HadoopCatalog. So we use a similar implementation of it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r818271731



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsTableOperations.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.util.Map;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+public class EcsTableOperations extends BaseMetastoreTableOperations {
+
+  public static final String ICEBERG_METADATA_LOCATION = "iceberg_metadata_location";
+
+  private final String tableName;
+  private final FileIO fileIO;
+  private final EcsCatalog catalog;
+  private final EcsURI tableObject;
+
+  /**
+   * Cached E-Tag for CAS commit
+   *
+   * @see #doRefresh() when reset this field
+   * @see #doCommit(TableMetadata, TableMetadata) when use this field
+   */
+  private String eTag;
+
+  public EcsTableOperations(String tableName, EcsURI tableObject, FileIO fileIO, EcsCatalog catalog) {
+    this.tableName = tableName;
+    this.tableObject = tableObject;
+    this.fileIO = fileIO;
+    this.catalog = catalog;
+  }
+
+  @Override
+  protected String tableName() {
+    return tableName;
+  }
+
+  @Override
+  public FileIO io() {
+    return fileIO;
+  }
+
+  @Override
+  protected void doRefresh() {
+    String metadataLocation;
+    if (!catalog.objectMetadata(tableObject).isPresent()) {
+      if (currentMetadataLocation() != null) {
+        throw new NoSuchTableException("Metadata object %s is absent", tableObject);
+      } else {
+        metadataLocation = null;
+      }
+    } else {
+      EcsCatalog.Properties metadata = catalog.loadProperties(tableObject);
+      this.eTag = metadata.eTag();
+      metadataLocation = metadata.content().get(ICEBERG_METADATA_LOCATION);
+      Preconditions.checkNotNull(metadataLocation,
+          "Can't find location from table metadata %s", tableObject);
+    }
+
+    refreshFromMetadataLocation(metadataLocation);
+  }
+
+  @Override
+  protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
+    if (base == null) {
+      // create a new table, the metadataKey should be absent
+      if (!catalog.putNewProperties(tableObject, buildProperties(newMetadataLocation))) {

Review comment:
       Yes. It's atomic when specified `If-None-Match`. It's an additional extension of S3 API.

##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsTableOperations.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.util.Map;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+public class EcsTableOperations extends BaseMetastoreTableOperations {
+
+  public static final String ICEBERG_METADATA_LOCATION = "iceberg_metadata_location";
+
+  private final String tableName;
+  private final FileIO fileIO;
+  private final EcsCatalog catalog;
+  private final EcsURI tableObject;
+
+  /**
+   * Cached E-Tag for CAS commit
+   *
+   * @see #doRefresh() when reset this field
+   * @see #doCommit(TableMetadata, TableMetadata) when use this field
+   */
+  private String eTag;
+
+  public EcsTableOperations(String tableName, EcsURI tableObject, FileIO fileIO, EcsCatalog catalog) {
+    this.tableName = tableName;
+    this.tableObject = tableObject;
+    this.fileIO = fileIO;
+    this.catalog = catalog;
+  }
+
+  @Override
+  protected String tableName() {
+    return tableName;
+  }
+
+  @Override
+  public FileIO io() {
+    return fileIO;
+  }
+
+  @Override
+  protected void doRefresh() {
+    String metadataLocation;
+    if (!catalog.objectMetadata(tableObject).isPresent()) {
+      if (currentMetadataLocation() != null) {
+        throw new NoSuchTableException("Metadata object %s is absent", tableObject);
+      } else {
+        metadataLocation = null;
+      }
+    } else {
+      EcsCatalog.Properties metadata = catalog.loadProperties(tableObject);
+      this.eTag = metadata.eTag();
+      metadataLocation = metadata.content().get(ICEBERG_METADATA_LOCATION);
+      Preconditions.checkNotNull(metadataLocation,
+          "Can't find location from table metadata %s", tableObject);
+    }
+
+    refreshFromMetadataLocation(metadataLocation);
+  }
+
+  @Override
+  protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
+    if (base == null) {
+      // create a new table, the metadataKey should be absent
+      if (!catalog.putNewProperties(tableObject, buildProperties(newMetadataLocation))) {

Review comment:
       Yes. It's atomic when specified `If-None-Match`. It's an extension of S3 API.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r824471641



##########
File path: dell/src/main/java/org/apache/iceberg/dell/DellProperties.java
##########
@@ -38,6 +40,21 @@
    */
   public static final String ECS_S3_ENDPOINT = "ecs.s3.endpoint";
 
+  /**
+   * Catalog prefix is used to store catalog data. If not set, use {@link CatalogProperties#WAREHOUSE_LOCATION}.
+   * <p>
+   * The value is an EcsURI which like ecs://bucket/prefix.
+   */
+  public static final String ECS_CATALOG_PREFIX = "ecs.catalog.prefix";
+
+  /**
+   * Catalog delimiter is separator of namespace levels. Default value is '/'.
+   * <p>
+   * For example, the properties object of namespace [a, b] is ecs://bucket/prefix/a/b.namespace when delimiter is '/',
+   * and is ecs://bucket/prefix-a-b when delimiter is '-'.
+   */
+  public static final String ECS_CATALOG_DELIMITER = "ecs.catalog.delimiter";

Review comment:
       Both `ecs.catalog.prefix` and `ecs.catalog.delimiter` are removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r826796375



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsTableOperations.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.util.Map;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+public class EcsTableOperations extends BaseMetastoreTableOperations {
+
+  public static final String ICEBERG_METADATA_LOCATION = "iceberg_metadata_location";
+
+  private final String tableName;
+  private final FileIO fileIO;
+  private final EcsCatalog catalog;
+  private final EcsURI tableObject;
+
+  /**
+   * Cached E-Tag for CAS commit
+   *
+   * @see #doRefresh() when reset this field
+   * @see #doCommit(TableMetadata, TableMetadata) when use this field
+   */
+  private String eTag;
+
+  public EcsTableOperations(String tableName, EcsURI tableObject, FileIO fileIO, EcsCatalog catalog) {
+    this.tableName = tableName;
+    this.tableObject = tableObject;
+    this.fileIO = fileIO;
+    this.catalog = catalog;
+  }
+
+  @Override
+  protected String tableName() {
+    return tableName;
+  }
+
+  @Override
+  public FileIO io() {
+    return fileIO;
+  }
+
+  @Override
+  protected void doRefresh() {
+    String metadataLocation;
+    if (!catalog.objectMetadata(tableObject).isPresent()) {
+      if (currentMetadataLocation() != null) {
+        throw new NoSuchTableException("Metadata object %s is absent", tableObject);
+      } else {
+        metadataLocation = null;
+      }
+    } else {
+      EcsCatalog.Properties metadata = catalog.loadProperties(tableObject);
+      this.eTag = metadata.eTag();
+      metadataLocation = metadata.content().get(ICEBERG_METADATA_LOCATION);
+      Preconditions.checkNotNull(metadataLocation,
+          "Can't find location from table metadata %s", tableObject);
+    }
+
+    refreshFromMetadataLocation(metadataLocation);
+  }
+
+  @Override
+  protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
+    if (base == null) {
+      // create a new table, the metadataKey should be absent
+      if (!catalog.putNewProperties(tableObject, buildProperties(newMetadataLocation))) {
+        throw new CommitFailedException("Table is existed when create table %s", tableName());

Review comment:
       I'd prefer to see the metadata location in the error message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r826798695



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/PropertiesSerDesUtil.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convert Map properties to bytes.
+ */
+public class PropertiesSerDesUtil {
+
+  private PropertiesSerDesUtil() {
+  }
+
+  /**
+   * Version of current implementation.
+   */
+  private static final String CURRENT_VERSION = "0";
+
+  private static final Logger LOG = LoggerFactory.getLogger(PropertiesSerDesUtil.class);
+
+  /**
+   * Read properties
+   *
+   * @param version is the version of {@link PropertiesSerDesUtil}
+   */
+  public static Map<String, String> read(InputStream input, String version) {
+    Preconditions.checkArgument(version.equals(CURRENT_VERSION),
+        "Properties version is not match", version);
+    Properties jdkProperties = new Properties();
+    try {
+      jdkProperties.load(new InputStreamReader(input, StandardCharsets.UTF_8));
+    } catch (IOException e) {
+      LOG.error("fail to read properties", e);

Review comment:
       Nit: Uppercase the first character pls




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r814501273



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/PropertiesSerDes.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convert Map properties to bytes.
+ */
+public interface PropertiesSerDes {
+
+  /**
+   * Version of current implementation.
+   */
+  String CURRENT_VERSION = "0";
+
+  Logger log = LoggerFactory.getLogger(PropertiesSerDes.class);

Review comment:
       Fixed!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r814501427



##########
File path: build.gradle
##########
@@ -594,6 +594,13 @@ project(':iceberg-dell') {
     implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
     compileOnly 'com.emc.ecs:object-client-bundle'
 
+    compileOnly("org.apache.hadoop:hadoop-common") {

Review comment:
       The Hadoop dependencies are removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r826794988



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsTableOperations.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.util.Map;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+
+public class EcsTableOperations extends BaseMetastoreTableOperations {
+
+  public static final String ICEBERG_METADATA_LOCATION = "iceberg_metadata_location";
+
+  private final String tableName;
+  private final FileIO fileIO;
+  private final EcsCatalog catalog;
+  private final EcsURI tableObject;
+
+  /**
+   * Cached E-Tag for CAS commit
+   *
+   * @see #doRefresh() when reset this field
+   * @see #doCommit(TableMetadata, TableMetadata) when use this field
+   */
+  private String eTag;
+
+  public EcsTableOperations(String tableName, EcsURI tableObject, FileIO fileIO, EcsCatalog catalog) {
+    this.tableName = tableName;
+    this.tableObject = tableObject;
+    this.fileIO = fileIO;
+    this.catalog = catalog;
+  }
+
+  @Override
+  protected String tableName() {
+    return tableName;
+  }
+
+  @Override
+  public FileIO io() {
+    return fileIO;
+  }
+
+  @Override
+  protected void doRefresh() {
+    String metadataLocation;
+    if (!catalog.objectMetadata(tableObject).isPresent()) {
+      if (currentMetadataLocation() != null) {
+        throw new NoSuchTableException("Metadata object %s is absent", tableObject);
+      } else {
+        metadataLocation = null;
+      }
+    } else {
+      EcsCatalog.Properties metadata = catalog.loadProperties(tableObject);
+      this.eTag = metadata.eTag();
+      metadataLocation = metadata.content().get(ICEBERG_METADATA_LOCATION);
+      Preconditions.checkNotNull(metadataLocation,
+          "Can't find location from table metadata %s", tableObject);
+    }
+
+    refreshFromMetadataLocation(metadataLocation);
+  }
+
+  @Override
+  protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
+    if (base == null) {
+      // create a new table, the metadataKey should be absent
+      if (!catalog.putNewProperties(tableObject, buildProperties(newMetadataLocation))) {
+        throw new CommitFailedException("Table is existed when create table %s", tableName());

Review comment:
       Nit: `existed` -> `existing`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r826801005



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/PropertiesSerDesUtil.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convert Map properties to bytes.
+ */
+public class PropertiesSerDesUtil {
+
+  private PropertiesSerDesUtil() {
+  }
+
+  /**
+   * Version of current implementation.
+   */
+  private static final String CURRENT_VERSION = "0";
+
+  private static final Logger LOG = LoggerFactory.getLogger(PropertiesSerDesUtil.class);
+
+  /**
+   * Read properties
+   *
+   * @param version is the version of {@link PropertiesSerDesUtil}
+   */
+  public static Map<String, String> read(InputStream input, String version) {
+    Preconditions.checkArgument(version.equals(CURRENT_VERSION),
+        "Properties version is not match", version);
+    Properties jdkProperties = new Properties();
+    try {
+      jdkProperties.load(new InputStreamReader(input, StandardCharsets.UTF_8));

Review comment:
       Should we also close the `InputStreamReader` ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] wang-x-xia commented on a change in pull request #4221: Dell: Add Dell EMC EcsCatalog.

Posted by GitBox <gi...@apache.org>.
wang-x-xia commented on a change in pull request #4221:
URL: https://github.com/apache/iceberg/pull/4221#discussion_r827655359



##########
File path: dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java
##########
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell.ecs;
+
+import com.emc.object.s3.S3Client;
+import com.emc.object.s3.S3Exception;
+import com.emc.object.s3.S3ObjectMetadata;
+import com.emc.object.s3.bean.GetObjectResult;
+import com.emc.object.s3.bean.ListObjectsResult;
+import com.emc.object.s3.bean.S3Object;
+import com.emc.object.s3.request.ListObjectsRequest;
+import com.emc.object.s3.request.PutObjectRequest;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.dell.DellClientFactories;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EcsCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  /**
+   * Suffix of table metadata object
+   */
+  private static final String TABLE_OBJECT_SUFFIX = ".table";
+
+  /**
+   * Suffix of namespace metadata object
+   */
+  private static final String NAMESPACE_OBJECT_SUFFIX = ".namespace";
+
+  /**
+   * Key of properties version in ECS object user metadata.
+   */
+  private static final String PROPERTIES_VERSION_USER_METADATA_KEY = "iceberg_properties_version";
+
+  private static final Logger LOG = LoggerFactory.getLogger(EcsCatalog.class);
+
+  private S3Client client;
+  private Object hadoopConf;
+  private String catalogName;
+
+  /**
+   * Warehouse is unified with other catalog that without delimiter.
+   */
+  private EcsURI warehouseLocation;
+  private FileIO fileIO;
+  private CloseableGroup closeableGroup;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * All fields are initialized by calling {@link EcsCatalog#initialize(String, Map)} later.
+   */
+  public EcsCatalog() {
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    this.catalogName = name;
+    this.warehouseLocation = new EcsURI(properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+    this.client = DellClientFactories.from(properties).ecsS3();
+    this.fileIO = initializeFileIO(properties);
+
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(client::destroy);
+    closeableGroup.addCloseable(fileIO);
+    closeableGroup.setSuppressCloseFailure(true);
+  }
+
+  private FileIO initializeFileIO(Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    if (fileIOImpl == null) {
+      FileIO io = new EcsFileIO();
+      io.initialize(properties);
+      return io;
+    } else {
+      return CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf);
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new EcsTableOperations(String.format("%s.%s", catalogName, tableIdentifier),
+        tableURI(tableIdentifier), fileIO, this);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    String tableName = tableIdentifier.name();
+    StringBuilder sb = new StringBuilder();
+
+    sb.append(warehouseLocation).append('/');
+    for (String level : tableIdentifier.namespace().levels()) {
+      sb.append(level).append('/');
+    }
+    sb.append(tableName);
+
+    return sb.toString();
+  }
+
+  /**
+   * Iterate all table objects with the namespace prefix.
+   */
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    if (!namespace.isEmpty() && !namespaceExists(namespace)) {
+      throw new NoSuchNamespaceException("Namespace %s does not exist", namespace);
+    }
+
+    String marker = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    EcsURI prefix = namespacePrefix(namespace);
+    do {
+      ListObjectsResult listObjectsResult = client.listObjects(
+          new ListObjectsRequest(prefix.bucket())
+              .withDelimiter("/")
+              .withPrefix(prefix.name())
+              .withMarker(marker));
+      marker = listObjectsResult.getNextMarker();
+      results.addAll(listObjectsResult.getObjects().stream()
+          .filter(s3Object -> s3Object.getKey().endsWith(TABLE_OBJECT_SUFFIX))
+          .map(object -> parseTableId(namespace, prefix, object))
+          .collect(Collectors.toList()));
+    } while (marker != null);
+
+    LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results);
+    return results;
+  }
+
+  /**
+   * Get object prefix of namespace.
+   */
+  private EcsURI namespacePrefix(Namespace namespace) {
+    String prefix;
+    if (namespace.isEmpty()) {
+      prefix = warehouseLocation.name();
+    } else {
+      prefix = String.format("%s%s/", warehouseLocation.name(),
+          String.join("/", namespace.levels()));
+    }
+
+    return new EcsURI(warehouseLocation.bucket(), prefix);
+  }
+
+  private TableIdentifier parseTableId(Namespace namespace, EcsURI prefix, S3Object s3Object) {
+    String key = s3Object.getKey();
+    Preconditions.checkArgument(key.startsWith(prefix.name()),
+        "List result should have same prefix", key, prefix);
+
+    String tableName = key.substring(
+        prefix.name().length(),
+        key.length() - TABLE_OBJECT_SUFFIX.length());
+    return TableIdentifier.of(namespace, tableName);
+  }
+
+  /**
+   * Remove table object. If the purge flag is set, remove all data objects.
+   */
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    if (!tableExists(identifier)) {
+      throw new NoSuchTableException("Table %s does not exist", identifier);
+    }
+
+    EcsURI tableObjectURI = tableURI(identifier);
+    if (purge) {
+      // if re-use the same instance, current() will throw exception.
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata current = ops.current();
+      if (current == null) {
+        return false;
+      }
+
+      CatalogUtil.dropTableData(ops.io(), current);
+    }
+
+    client.deleteObject(tableObjectURI.bucket(), tableObjectURI.name());
+    return true;
+  }
+
+  private EcsURI tableURI(TableIdentifier id) {
+    EcsURI prefix = namespacePrefix(id.namespace());
+    // The prefix has the delimiter at the tail.
+    return new EcsURI(prefix.bucket(), prefix.name() + id.name() + TABLE_OBJECT_SUFFIX);
+  }
+
+  /**
+   * Table rename will only move table object, the data objects will still be in-place.
+   *
+   * @param from identifier of the table to rename
+   * @param to   new table name
+   */
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    if (!namespaceExists(to.namespace())) {
+      throw new NoSuchNamespaceException("Cannot rename %s to %s because namespace %s does not exist",
+              from, to, to.namespace());
+    }
+
+    if (tableExists(to)) {
+      throw new AlreadyExistsException("Cannot rename %s because destination table %s exists", from, to);
+    }
+
+    EcsURI fromURI = tableURI(from);
+    if (!objectMetadata(fromURI).isPresent()) {
+      throw new NoSuchTableException("Cannot rename table because table %s does not exist", from);
+    }
+
+    Properties properties = loadProperties(fromURI);
+    EcsURI toURI = tableURI(to);
+
+    if (!putNewProperties(toURI, properties.content())) {
+      throw new AlreadyExistsException("Cannot rename %s because destination table %s exists", from, to);
+    }
+
+    client.deleteObject(fromURI.bucket(), fromURI.name());
+    LOG.info("rename table {} to {}", from, to);

Review comment:
       > Nit: It's recommended to uppercase the first character from the message `rename table {} to {}`.
   
   Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org