You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by si...@apache.org on 2021/10/08 17:35:04 UTC

[atlas] branch branch-2.0 updated: ATLAS-4425: Added support for Migration import to run on a directory with multiple zip files

This is an automated email from the ASF dual-hosted git repository.

sidmishra pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 25fe3b2  ATLAS-4425: Added support for Migration import to run on a directory with multiple zip files
25fe3b2 is described below

commit 25fe3b2966f848ca13e4ab006434bf0923edf21e
Author: Sidharth Mishra <si...@gmail.com>
AuthorDate: Thu Oct 7 16:56:22 2021 -0700

    ATLAS-4425: Added support for Migration import to run on a directory with multiple zip files
    
    Signed-off-by: sidmishra <si...@apache.org>
    (cherry picked from commit 11627e33d3b262d60ae0a3c6338f7963ffedaeab)
---
 .../atlas/repository/impexp/ZipSourceDirect.java   |   2 +-
 .../migration/ZipFileMigrationImporter.java        | 155 ++++++++++++++++++---
 2 files changed, 136 insertions(+), 21 deletions(-)

diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
index 04342fa..5cf1b74 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
@@ -257,9 +257,9 @@ public class ZipSourceDirect implements EntityImportStream {
         if (zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_TYPESDEF_NAME.toEntryFileName())) {
             String json = getJsonPayloadFromZipEntryStream(this.zipInputStream);
             this.typesDef = AtlasType.fromJson(json, AtlasTypesDef.class);
+            zipEntryNext = zipInputStream.getNextEntry();
         }
 
-        zipEntryNext = zipInputStream.getNextEntry();
         if (zipEntryNext.getName().equals(ZIP_ENTRY_ENTITIES)) {
             this.entitiesArrayParser = new EntitiesArrayParser(zipInputStream);
         } else {
diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
index d56261f..bfb1148 100644
--- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
@@ -27,6 +27,9 @@ import org.apache.atlas.model.migration.MigrationImportStatus;
 import org.apache.atlas.repository.graph.AtlasGraphProvider;
 import org.apache.atlas.repository.impexp.ImportService;
 import org.apache.atlas.type.AtlasType;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +38,11 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.FileFilter;
 import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.zip.ZipFile;
 
@@ -48,35 +55,143 @@ public class ZipFileMigrationImporter implements Runnable {
     private static final String DEFAULT_BATCH_SIZE = "100";
     private static final String ZIP_FILE_COMMENT_ENTITIES_COUNT = "entitiesCount";
     private static final String ZIP_FILE_COMMENT_TOTAL_COUNT = "total";
-
+    private static final String FILE_EXTENSION_ZIP = ".zip";
     private final static String ENV_USER_NAME = "user.name";
+    private final static String ARCHIVE_DIR = "archive";
 
-    private final ImportService importService;
-    private final String fileToImport;
-    private DataMigrationStatusService dataMigrationStatusService;
-    private MigrationImportStatus migrationImportStatus;
+    private final ImportService         importService;
+    private List<String>                filesToImport;
+    private DataMigrationStatusService  dataMigrationStatusService;
+    private MigrationImportStatus       migrationImportStatus;
+    private File                        archiveDir;
 
+    /**
+     * Input:
+     * fileName : can support wildcards. If it contains wildcards then all matching files will be imported
+     */
     public ZipFileMigrationImporter(ImportService importService, String fileName) {
         this.importService = importService;
-        this.fileToImport = fileName;
         this.dataMigrationStatusService = new DataMigrationStatusService(AtlasGraphProvider.getGraphInstance());
+
+        initialize(fileName);
+    }
+
+    private void initialize(String fileName) {
+        this.filesToImport = getAllFilesToImport(fileName);
+
+        if (CollectionUtils.isNotEmpty(this.filesToImport)) {
+            createArchiveDirectory(fileName);
+        }
     }
 
     @Override
     public void run() {
-        try {
-            detectFileToImport();
+        for (String fileToImport : filesToImport) {
+            try {
+                detectFileToImport(fileToImport);
 
-            int streamSize = getStreamSizeFromComment(fileToImport);
-            migrationImportStatus = getCreateMigrationStatus(fileToImport, streamSize);
-            performImport(fileToImport, streamSize, Long.toString(migrationImportStatus.getCurrentIndex()));
-            dataMigrationStatusService.setStatus("DONE");
-        } catch (IOException e) {
-            LOG.error("Migration Import: IO Error!", e);
-            dataMigrationStatusService.setStatus("FAIL");
-        } catch (AtlasBaseException e) {
-            LOG.error("Migration Import: Error!", e);
-            dataMigrationStatusService.setStatus("FAIL");
+                int streamSize = getStreamSizeFromComment(fileToImport);
+                migrationImportStatus = getCreateMigrationStatus(fileToImport, streamSize);
+                performImport(fileToImport, streamSize, Long.toString(migrationImportStatus.getCurrentIndex()));
+                dataMigrationStatusService.setStatus("DONE");
+
+                moveZipFileToArchiveDir(fileToImport);
+            } catch (IOException e) {
+                LOG.error("Migration Import: IO Error!", e);
+                dataMigrationStatusService.setStatus("FAIL");
+            } catch (AtlasBaseException e) {
+                LOG.error("Migration Import: Error!", e);
+                dataMigrationStatusService.setStatus("FAIL");
+            }
+        }
+    }
+
+    /**
+     * Input:
+     * fileName : If it contains wildcards then all matching files will be discovered
+     */
+    private List<String> getAllFilesToImport(String fileName) {
+        List<String> ret = new ArrayList<>();
+        File fileToImport = new File(fileName);
+
+        if (fileToImport.exists() && fileToImport.isFile()) {
+            //Input file present so no need to expand
+            LOG.info("Migration Import: zip file for import: " + fileToImport);
+
+            ret.add(fileToImport.getAbsolutePath());
+        } else {
+            //The fileName might have wildcard
+            String dirPath = new File(fileToImport.getParent()).getAbsolutePath();
+            File importDataDir = new File(dirPath);
+
+            if (importDataDir.exists() && importDataDir.isDirectory()) {
+                String fileNameWithWildcard = fileToImport.getName();
+                FileFilter fileFilter = new WildcardFileFilter(fileNameWithWildcard);
+
+                File[] importFiles = importDataDir.listFiles(fileFilter);
+
+                if (ArrayUtils.isNotEmpty(importFiles)) {
+                    Arrays.sort(importFiles);
+
+                    LOG.info("Migration Import: zip files for import: ");
+
+                    for (File importFile : importFiles) {
+                        if (isValidImportFile(importFile)) {
+                            LOG.info(importFile.getName() + " with absolute path - " + importFile.getAbsolutePath());
+                            ret.add(importFile.getAbsolutePath());
+                        } else {
+                            LOG.warn("Ignoring " + importFile.getAbsolutePath() + " as it is not a file or does not end with extension " + FILE_EXTENSION_ZIP);
+                        }
+                    }
+                } else {
+                    LOG.warn("Migration Import: No files to import");
+                }
+            }
+        }
+
+        return ret;
+    }
+
+    private boolean isValidImportFile(File importFile) {
+        return importFile.isFile() && StringUtils.endsWithIgnoreCase(importFile.getName(), FILE_EXTENSION_ZIP);
+    }
+
+    private void createArchiveDirectory(String fileName) {
+        File fileToImport = new File(fileName);
+        String parentPath = new File(fileToImport.getParent()).getAbsolutePath();
+
+        this.archiveDir = new File(parentPath + File.separator + ARCHIVE_DIR);
+
+        if (this.archiveDir.exists() && !this.archiveDir.canWrite()) {
+            LOG.warn("Migration Import: No write permission to archive directory {}", this.archiveDir.getAbsolutePath());
+            this.archiveDir = null;
+        } else if (!this.archiveDir.exists() && !this.archiveDir.getParentFile().canWrite()) {
+            LOG.warn("Migration Import: No permission to create archive directory {}", this.archiveDir.getAbsolutePath());
+            this.archiveDir = null;
+        } else {
+            this.archiveDir.mkdirs();
+            LOG.info("Migration Import: archive directory for zip files: {}", this.archiveDir.getAbsolutePath());
+        }
+    }
+
+    private void moveZipFileToArchiveDir(String srcFilePath) {
+        if (this.archiveDir == null) {
+            return;
+        }
+
+        File sourceFile = new File(srcFilePath);
+        String newFile = archiveDir.getAbsolutePath() + File.separator + sourceFile.getName();
+
+        if (!sourceFile.canWrite()) {
+            LOG.warn("Migration Import: No permission to archive the zip file {}", sourceFile.getAbsolutePath());
+            this.archiveDir = null;
+        } else {
+            if (sourceFile.renameTo(new File(newFile))) {
+                sourceFile.delete();
+                LOG.info("Migration Import: Successfully archived the zip file: " + srcFilePath + " to " + this.archiveDir.getAbsolutePath());
+            } else {
+                LOG.warn("Migration Import: Failed to archive the zip file: " + srcFilePath);
+            }
         }
     }
 
@@ -91,7 +206,7 @@ public class ZipFileMigrationImporter implements Runnable {
         return statusRetrieved;
     }
 
-    private void detectFileToImport() throws IOException {
+    private void detectFileToImport(String fileToImport) throws IOException {
         FileWatcher fileWatcher = new FileWatcher(fileToImport);
         fileWatcher.start();
     }
@@ -126,7 +241,7 @@ public class ZipFileMigrationImporter implements Runnable {
     private void performImport(String fileToImport, int streamSize, String startPosition) throws AtlasBaseException {
         try {
             LOG.info("Migration Import: {}: Starting at: {}...", fileToImport, startPosition);
-            InputStream fs = new FileInputStream(new File(fileToImport));
+            InputStream fs = new FileInputStream(fileToImport);
             RequestContext.get().setUser(getUserNameFromEnvironment(), null);
 
             importService.run(fs, getImportRequest(fileToImport, streamSize, startPosition),