You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2022/09/22 17:03:55 UTC

[gobblin] branch master updated: [GOBBLIN-1709] Create Iceberg Datasets Finder, Iceberg Dataset and FileSet to generate Copy Entities to support Distcp for Iceberg (#3560)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 18ec55bcd [GOBBLIN-1709] Create Iceberg Datasets Finder, Iceberg Dataset and FileSet to generate Copy Entities to support Distcp for Iceberg (#3560)
18ec55bcd is described below

commit 18ec55bcd7f78ba0c977c7236856b3d39f5ed7a6
Author: meethngala <me...@gmail.com>
AuthorDate: Thu Sep 22 12:03:49 2022 -0500

    [GOBBLIN-1709] Create Iceberg Datasets Finder, Iceberg Dataset and FileSet to generate Copy Entities to support Distcp for Iceberg (#3560)
    
    * initial commit for iceberg distcp.
    
    * adding copy entity helper and icerbeg distcp template and test case.
    
    * Adding unit tests and refactoring method definitions for an Iceberg dataset.
    
    * resolve conflicts after cleaning history
    
    * update iceberg dataset and finder to include javadoc
    
    * addressed comments on PR and aligned code check style
    
    * renamed vars, added logging and updated javadoc
    
    * update dataset descriptor with ternary operation and rename fs to sourceFs
    
    * added source and target fs and update iceberg dataset finder constructor
    
    * Update source and dest dataset methods as protected and add req args constructor
    
    * change the order of attributes for iceberg dataset finder ctor
    
    * update iceberg dataset methods with correct source and target fs
    
    Co-authored-by: Meeth Gala <mg...@linkedin.com>
---
 .../apache/gobblin/dataset/DatasetConstants.java   |   1 +
 .../management/copy/iceberg/IcebergDataset.java    | 212 +++++++++++++++++++++
 .../copy/iceberg/IcebergDatasetFinder.java         |  97 ++++++++++
 .../copy/iceberg/IcebergTableFileSet.java          |  50 +++++
 .../copy/iceberg/IcebergDatasetTest.java           | 192 +++++++++++++++++++
 .../embedded/EmbeddedGobblinDistcpTest.java        |   2 +-
 6 files changed, 553 insertions(+), 1 deletion(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
index d7045258a..dde22c0fa 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
@@ -25,6 +25,7 @@ public class DatasetConstants {
   public static final String PLATFORM_HIVE = "hive";
   public static final String PLATFORM_SALESFORCE = "salesforce";
   public static final String PLATFORM_MYSQL = "mysql";
+  public static final String PLATFORM_ICEBERG = "iceberg";
 
   /** Common metadata */
   public static final String BRANCH = "branch";
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
new file mode 100644
index 000000000..8d905c8e6
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+  private final String dbName;
+  private final String inputTableName;
+  private final IcebergTable icebergTable;
+  protected final Properties properties;
+  protected final FileSystem sourceFs;
+
+  private final Optional<String> sourceMetastoreURI;
+  private final Optional<String> targetMetastoreURI;
+
+  /** Target metastore URI */
+  public static final String TARGET_METASTORE_URI_KEY =
+      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.metastore.uri";
+  /** Target database name */
+  public static final String TARGET_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+  public IcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, FileSystem sourceFs) {
+    this.dbName = db;
+    this.inputTableName = table;
+    this.icebergTable = icebergTbl;
+    this.properties = properties;
+    this.sourceFs = sourceFs;
+    this.sourceMetastoreURI =
+        Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY));
+    this.targetMetastoreURI =
+        Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+  }
+
+  /**
+   * Represents a source {@link FileStatus} and a {@link Path} destination.
+   */
+  @Data
+  private static class SourceAndDestination {
+    private final FileStatus source;
+    private final Path destination;
+  }
+
+  @Override
+  public String datasetURN() {
+    // TODO: verify!
+    return this.dbName + "." + this.inputTableName;
+  }
+
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, CopyConfiguration configuration) {
+    return getCopyEntities(targetFs, configuration);
+  }
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, CopyConfiguration configuration,
+      Comparator<FileSet<CopyEntity>> prioritizer, PushDownRequestor<FileSet<CopyEntity>> requestor) {
+    // TODO: Implement PushDownRequestor and priority based copy entity iteration
+    return getCopyEntities(targetFs, configuration);
+  }
+
+  /**
+   * Finds all files read by the table and generates {@link CopyEntity}s for duplicating the table.
+   */
+  Iterator<FileSet<CopyEntity>> getCopyEntities(FileSystem targetFs, CopyConfiguration configuration) {
+    FileSet<CopyEntity> fileSet = new IcebergTableFileSet(this.getInputTableName(), this, targetFs, configuration);
+    return Iterators.singletonIterator(fileSet);  }
+
+  /**
+   * Finds all files read by the table file set and generates {@link CopyEntity}s for duplicating the table.
+   */
+  @VisibleForTesting
+  Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfiguration configuration) throws IOException {
+    String fileSet = this.getInputTableName();
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    Map<Path, FileStatus> pathToFileStatus = getFilePathsToFileStatus();
+    log.info("{}.{} - found {} candidate source paths", dbName, inputTableName, pathToFileStatus.size());
+
+    for (CopyableFile.Builder builder : getCopyableFilesFromPaths(pathToFileStatus, configuration, targetFs)) {
+      CopyableFile fileEntity =
+          builder.fileSet(fileSet).datasetOutputPath(targetFs.getUri().getPath()).build();
+      fileEntity.setSourceData(getSourceDataset(this.sourceFs));
+      fileEntity.setDestinationData(getDestinationDataset(targetFs));
+      copyEntities.add(fileEntity);
+    }
+    log.info("{}.{} - generated {} copy entities", dbName, inputTableName, copyEntities.size());
+    return copyEntities;
+  }
+
+  /**
+   * Get builders for a {@link CopyableFile} for each file path
+   */
+  protected List<CopyableFile.Builder> getCopyableFilesFromPaths(Map<Path, FileStatus> pathToFileStatus, CopyConfiguration configuration, FileSystem targetFs) throws IOException {
+
+    List<CopyableFile.Builder> builders = Lists.newArrayList();
+    List<SourceAndDestination> dataFiles = Lists.newArrayList();
+    Configuration defaultHadoopConfiguration = new Configuration();
+    FileSystem actualSourceFs;
+
+    for (Map.Entry<Path, FileStatus> entry : pathToFileStatus.entrySet()) {
+      dataFiles.add(new SourceAndDestination(entry.getValue(), targetFs.makeQualified(entry.getKey())));
+    }
+
+    for (SourceAndDestination sourceAndDestination : dataFiles) {
+      actualSourceFs = sourceAndDestination.getSource().getPath().getFileSystem(defaultHadoopConfiguration);
+
+      // TODO: Add ancestor owner and permissions in future releases
+      builders.add(CopyableFile.fromOriginAndDestination(actualSourceFs, sourceAndDestination.getSource(),
+          sourceAndDestination.getDestination(), configuration));
+    }
+    return builders;
+  }
+  /**
+   * Finds all files of the Iceberg's current snapshot
+   * Returns a map of path, file status for each file that needs to be copied
+   */
+  protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
+    Map<Path, FileStatus> result = Maps.newHashMap();
+    IcebergTable icebergTable = this.getIcebergTable();
+    IcebergSnapshotInfo icebergSnapshotInfo = icebergTable.getCurrentSnapshotInfo();
+
+    log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
+        icebergSnapshotInfo.getSnapshotId(), icebergSnapshotInfo.getMetadataPath());
+    List<String> pathsToCopy = icebergSnapshotInfo.getAllPaths();
+
+    for (String pathString : pathsToCopy) {
+      Path path = new Path(pathString);
+      result.put(path, this.sourceFs.getFileStatus(path));
+    }
+    return result;
+  }
+
+  protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
+    return getDatasetDescriptor(sourceMetastoreURI, sourceFs);
+  }
+
+  protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
+    return getDatasetDescriptor(targetMetastoreURI, targetFs);
+  }
+
+  @NotNull
+  private DatasetDescriptor getDatasetDescriptor(Optional<String> stringMetastoreURI, FileSystem fs) {
+    String currentTable = this.getDbName() + "." + this.getInputTableName();
+
+    URI hiveMetastoreURI = stringMetastoreURI.isPresent() ? URI.create(stringMetastoreURI.get()) : null;
+
+    DatasetDescriptor currentDataset =
+        new DatasetDescriptor(DatasetConstants.PLATFORM_ICEBERG, hiveMetastoreURI, currentTable);
+    currentDataset.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
+    return currentDataset;
+  }
+}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
new file mode 100644
index 000000000..4eb77980b
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.util.HadoopUtils;
+
+/**
+ * Finds {@link IcebergDataset}s. Will look for tables in a database using a {@link IcebergCatalog},
+ * and creates a {@link IcebergDataset} for each one.
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class IcebergDatasetFinder implements IterableDatasetFinder<IcebergDataset> {
+
+  public static final String ICEBERG_DATASET_PREFIX = DatasetConstants.PLATFORM_ICEBERG + ".dataset";
+  public static final String ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY = ICEBERG_DATASET_PREFIX + ".hive.metastore.uri";
+  public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name";
+  public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name";
+
+  protected final FileSystem sourceFs;
+  private final Properties properties;
+
+  /**
+   * Finds all {@link IcebergDataset}s in the file system using the Iceberg Catalog.
+   * Both Iceberg database name and table name are mandatory based on current implementation.
+   * Later we may explore supporting datasets similar to Hive
+   * @return List of {@link IcebergDataset}s in the file system.
+   * @throws IOException
+   */
+  @Override
+  public List<IcebergDataset> findDatasets() throws IOException {
+    List<IcebergDataset> matchingDatasets = new ArrayList<>();
+    if (StringUtils.isBlank(properties.getProperty(ICEBERG_DB_NAME)) || StringUtils.isBlank(properties.getProperty(ICEBERG_TABLE_NAME))) {
+      throw new IllegalArgumentException(String.format("Iceberg database name: {%s} or Iceberg table name: {%s} is missing",
+          ICEBERG_DB_NAME, ICEBERG_TABLE_NAME));
+    }
+    String dbName = properties.getProperty(ICEBERG_DB_NAME);
+    String tblName = properties.getProperty(ICEBERG_TABLE_NAME);
+
+    Configuration configuration = HadoopUtils.getConfFromProperties(properties);
+
+    IcebergCatalog icebergCatalog = IcebergCatalogFactory.create(configuration);
+    /* Each Iceberg dataset maps to an Iceberg table
+     * TODO: The user provided database and table names needs to be pre-checked and verified against the existence of a valid Iceberg table
+     */
+    matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergCatalog, properties, sourceFs));
+    log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(), matchingDatasets, dbName, tblName);
+
+    return matchingDatasets;
+  }
+
+  @Override
+  public Path commonDatasetRoot() {
+    return new Path("/");
+  }
+
+  @Override
+  public Iterator<IcebergDataset> getDatasetsIterator() throws IOException {
+    return findDatasets().iterator();
+  }
+
+  protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
+    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
+    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  }
+}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableFileSet.java
new file mode 100644
index 000000000..1e5abfbee
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableFileSet.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.partition.FileSet;
+
+
+/**
+ * A {@link FileSet} for Iceberg datasets containing information associated with an Iceberg table and generates {@link CopyEntity}
+ */
+public class IcebergTableFileSet extends FileSet<CopyEntity> {
+
+  private final CopyConfiguration copyConfiguration;
+  private final FileSystem targetFs;
+  private final IcebergDataset icebergDataset;
+
+  public IcebergTableFileSet(String name, IcebergDataset icebergDataset, FileSystem targetFs, CopyConfiguration configuration) {
+    super(name, icebergDataset);
+    this.copyConfiguration = configuration;
+    this.targetFs = targetFs;
+    this.icebergDataset = icebergDataset;
+  }
+
+  @Override
+  protected Collection<CopyEntity> generateCopyEntities() throws IOException {
+    return this.icebergDataset.generateCopyEntities(this.targetFs, this.copyConfiguration);
+  }
+}
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
new file mode 100644
index 000000000..f409ee67a
--- /dev/null
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.api.client.util.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyContext;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+
+
+public class
+IcebergDatasetTest {
+
+  static final String METADATA_PATH = "/root/iceberg/test/metadata";
+  static final String MANIFEST_PATH = "/root/iceberg/test/metadata/test_manifest";
+  static final String MANIFEST_LIST_PATH = "/root/iceberg/test/metadata/test_manifest/data";
+  static final String MANIFEST_FILE_PATH1 = "/root/iceberg/test/metadata/test_manifest/data/a";
+  static final String MANIFEST_FILE_PATH2 = "/root/iceberg/test/metadata/test_manifest/data/b";
+
+  @Test
+  public void testGetFilePaths() throws IOException {
+
+    List<String> pathsToCopy = new ArrayList<>();
+    pathsToCopy.add(MANIFEST_FILE_PATH1);
+    pathsToCopy.add(MANIFEST_FILE_PATH2);
+    Map<Path, FileStatus> expected = Maps.newHashMap();
+    expected.put(new Path(MANIFEST_FILE_PATH1), null);
+    expected.put(new Path(MANIFEST_FILE_PATH2), null);
+
+    IcebergTable icebergTable = Mockito.mock(IcebergTable.class);
+    FileSystem fs = Mockito.mock(FileSystem.class);
+    IcebergSnapshotInfo icebergSnapshotInfo = Mockito.mock(IcebergSnapshotInfo.class);
+
+    Mockito.when(icebergTable.getCurrentSnapshotInfo()).thenReturn(icebergSnapshotInfo);
+    Mockito.when(icebergSnapshotInfo.getAllPaths()).thenReturn(pathsToCopy);
+    IcebergDataset icebergDataset = new IcebergDataset("test_db_name", "test_tbl_name", icebergTable, new Properties(), fs);
+
+    Map<Path, FileStatus> actual = icebergDataset.getFilePathsToFileStatus();
+    Assert.assertEquals(actual, expected);
+  }
+
+  /**
+   * Test case to generate copy entities for all the file paths for a mocked iceberg table.
+   * The assumption here is that we create copy entities for all the matching file paths,
+   * without calculating any difference between the source and destination
+   */
+  @Test
+  public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException, URISyntaxException {
+
+    FileSystem fs = Mockito.mock(FileSystem.class);
+    String test_db_name = "test_db_name";
+    String test_table_name = "test_tbl_name";
+    String test_qualified_path = "/root/iceberg/test/destination/sub_path_destination";
+    String test_uri_path = "/root/iceberg/test/output";
+    Properties properties = new Properties();
+    properties.setProperty("data.publisher.final.dir", "/test");
+    List<String> expected = new ArrayList<>(Arrays.asList(METADATA_PATH, MANIFEST_PATH, MANIFEST_LIST_PATH, MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2));
+
+    CopyConfiguration copyConfiguration = CopyConfiguration.builder(null, properties)
+        .preserve(PreserveAttributes.fromMnemonicString(""))
+        .copyContext(new CopyContext())
+        .build();
+
+    List<String> listedManifestFilePaths = Arrays.asList(MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2);
+    IcebergSnapshotInfo.ManifestFileInfo manifestFileInfo = new IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_LIST_PATH, listedManifestFilePaths);
+    List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles = Arrays.asList(manifestFileInfo);
+    IcebergTable icebergTable = new MockedIcebergTable(METADATA_PATH, MANIFEST_PATH, manifestFiles);
+    IcebergDataset icebergDataset = new IcebergDataset(test_db_name, test_table_name, icebergTable, new Properties(), fs);
+    DestinationFileSystem destinationFileSystem = new DestinationFileSystem();
+    destinationFileSystem.addPath(METADATA_PATH);
+    destinationFileSystem.addPath(MANIFEST_PATH);
+    destinationFileSystem.addPath(MANIFEST_LIST_PATH);
+    destinationFileSystem.addPath(MANIFEST_FILE_PATH1);
+    destinationFileSystem.addPath(MANIFEST_FILE_PATH2);
+
+    mockFileSystemMethodCalls(fs, destinationFileSystem.pathToFileStatus, test_qualified_path, test_uri_path);
+
+    Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(fs, copyConfiguration);
+    verifyCopyEntities(copyEntities, expected);
+
+  }
+
+  private void verifyCopyEntities(Collection<CopyEntity> copyEntities, List<String> expected) {
+    List<String> actual = new ArrayList<>();
+    for (CopyEntity copyEntity : copyEntities) {
+      String json = copyEntity.toString();
+      JsonObject jsonObject = new Gson().fromJson(json, JsonObject.class);
+      JsonObject objectData =
+          jsonObject.getAsJsonObject("object-data").getAsJsonObject("origin").getAsJsonObject("object-data");
+      JsonObject pathObject = objectData.getAsJsonObject("path").getAsJsonObject("object-data").getAsJsonObject("uri");
+      String filepath = pathObject.getAsJsonPrimitive("object-data").getAsString();
+      actual.add(filepath);
+    }
+    Assert.assertEquals(actual.size(), expected.size());
+    Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray());
+  }
+
+  private void mockFileSystemMethodCalls(FileSystem fs, Map<Path, FileStatus> pathToFileStatus, String qualifiedPath, String uriPath)
+      throws URISyntaxException, IOException {
+
+    Mockito.when(fs.getUri()).thenReturn(new URI(null, null, uriPath, null));
+    for (Map.Entry<Path, FileStatus> entry : pathToFileStatus.entrySet()) {
+      Path path = entry.getKey();
+      FileStatus fileStatus = entry.getValue();
+      Mockito.when(fs.getFileStatus(path)).thenReturn(fileStatus);
+      Mockito.when(fs.makeQualified(path)).thenReturn(new Path(qualifiedPath));
+    }
+  }
+
+  private static class MockedIcebergTable extends IcebergTable {
+
+    String metadataPath;
+    String manifestListPath;
+    List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles;
+
+    public MockedIcebergTable(String metadataPath, String manifestListPath, List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles) {
+      super(null);
+      this.metadataPath = metadataPath;
+      this.manifestListPath = manifestListPath;
+      this.manifestFiles = manifestFiles;
+    }
+
+    @Override
+    public IcebergSnapshotInfo getCurrentSnapshotInfo() {
+      Long snapshotId = 0L;
+      Instant timestamp  = Instant.ofEpochMilli(0L);
+      return new IcebergSnapshotInfo(snapshotId, timestamp, metadataPath, manifestListPath, manifestFiles);
+    }
+  }
+
+  private static class DestinationFileSystem {
+    Map<Path, FileStatus> pathToFileStatus;
+
+    public DestinationFileSystem() {
+      this.pathToFileStatus = Maps.newHashMap();
+    }
+
+    public void addPath(String pathString) {
+      if (StringUtils.isBlank(pathString)) {
+        throw new IllegalArgumentException("Missing path value for the file system");
+      }
+      Path path  = new Path(pathString);
+      FileStatus fileStatus = new FileStatus();
+      fileStatus.setPath(path);
+      this.pathToFileStatus.put(path, fileStatus);
+    }
+
+    public void addPath(String pathString, FileStatus fileStatus) {
+      Path path = new Path(pathString);
+      this.pathToFileStatus.put(path, fileStatus);
+    }
+  }
+}
+
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
index c042ac1b2..2fefded1f 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java
@@ -386,4 +386,4 @@ public class EmbeddedGobblinDistcpTest {
     }
   }
 
-}
+}
\ No newline at end of file