You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by si...@apache.org on 2021/10/08 17:35:04 UTC
[atlas] branch branch-2.0 updated: ATLAS-4425: Added support for
Migration import to run on a directory with multiple zip files
This is an automated email from the ASF dual-hosted git repository.
sidmishra pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 25fe3b2 ATLAS-4425: Added support for Migration import to run on a directory with multiple zip files
25fe3b2 is described below
commit 25fe3b2966f848ca13e4ab006434bf0923edf21e
Author: Sidharth Mishra <si...@gmail.com>
AuthorDate: Thu Oct 7 16:56:22 2021 -0700
ATLAS-4425: Added support for Migration import to run on a directory with multiple zip files
Signed-off-by: sidmishra <si...@apache.org>
(cherry picked from commit 11627e33d3b262d60ae0a3c6338f7963ffedaeab)
---
.../atlas/repository/impexp/ZipSourceDirect.java | 2 +-
.../migration/ZipFileMigrationImporter.java | 155 ++++++++++++++++++---
2 files changed, 136 insertions(+), 21 deletions(-)
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
index 04342fa..5cf1b74 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
@@ -257,9 +257,9 @@ public class ZipSourceDirect implements EntityImportStream {
if (zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_TYPESDEF_NAME.toEntryFileName())) {
String json = getJsonPayloadFromZipEntryStream(this.zipInputStream);
this.typesDef = AtlasType.fromJson(json, AtlasTypesDef.class);
+ zipEntryNext = zipInputStream.getNextEntry();
}
- zipEntryNext = zipInputStream.getNextEntry();
if (zipEntryNext.getName().equals(ZIP_ENTRY_ENTITIES)) {
this.entitiesArrayParser = new EntitiesArrayParser(zipInputStream);
} else {
diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
index d56261f..bfb1148 100644
--- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
@@ -27,6 +27,9 @@ import org.apache.atlas.model.migration.MigrationImportStatus;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.type.AtlasType;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +38,11 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.FileFilter;
import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.zip.ZipFile;
@@ -48,35 +55,143 @@ public class ZipFileMigrationImporter implements Runnable {
private static final String DEFAULT_BATCH_SIZE = "100";
private static final String ZIP_FILE_COMMENT_ENTITIES_COUNT = "entitiesCount";
private static final String ZIP_FILE_COMMENT_TOTAL_COUNT = "total";
-
+ private static final String FILE_EXTENSION_ZIP = ".zip";
private final static String ENV_USER_NAME = "user.name";
+ private final static String ARCHIVE_DIR = "archive";
- private final ImportService importService;
- private final String fileToImport;
- private DataMigrationStatusService dataMigrationStatusService;
- private MigrationImportStatus migrationImportStatus;
+ private final ImportService importService;
+ private List<String> filesToImport;
+ private DataMigrationStatusService dataMigrationStatusService;
+ private MigrationImportStatus migrationImportStatus;
+ private File archiveDir;
+ /**
+ * Input:
+ * fileName : can support wildcards. If it contains wildcards then all matching files will be imported
+ */
public ZipFileMigrationImporter(ImportService importService, String fileName) {
this.importService = importService;
- this.fileToImport = fileName;
this.dataMigrationStatusService = new DataMigrationStatusService(AtlasGraphProvider.getGraphInstance());
+
+ initialize(fileName);
+ }
+
+ private void initialize(String fileName) {
+ this.filesToImport = getAllFilesToImport(fileName);
+
+ if (CollectionUtils.isNotEmpty(this.filesToImport)) {
+ createArchiveDirectory(fileName);
+ }
}
@Override
public void run() {
- try {
- detectFileToImport();
+ for (String fileToImport : filesToImport) {
+ try {
+ detectFileToImport(fileToImport);
- int streamSize = getStreamSizeFromComment(fileToImport);
- migrationImportStatus = getCreateMigrationStatus(fileToImport, streamSize);
- performImport(fileToImport, streamSize, Long.toString(migrationImportStatus.getCurrentIndex()));
- dataMigrationStatusService.setStatus("DONE");
- } catch (IOException e) {
- LOG.error("Migration Import: IO Error!", e);
- dataMigrationStatusService.setStatus("FAIL");
- } catch (AtlasBaseException e) {
- LOG.error("Migration Import: Error!", e);
- dataMigrationStatusService.setStatus("FAIL");
+ int streamSize = getStreamSizeFromComment(fileToImport);
+ migrationImportStatus = getCreateMigrationStatus(fileToImport, streamSize);
+ performImport(fileToImport, streamSize, Long.toString(migrationImportStatus.getCurrentIndex()));
+ dataMigrationStatusService.setStatus("DONE");
+
+ moveZipFileToArchiveDir(fileToImport);
+ } catch (IOException e) {
+ LOG.error("Migration Import: IO Error!", e);
+ dataMigrationStatusService.setStatus("FAIL");
+ } catch (AtlasBaseException e) {
+ LOG.error("Migration Import: Error!", e);
+ dataMigrationStatusService.setStatus("FAIL");
+ }
+ }
+ }
+
+ /**
+ * Input:
+ * fileName : If it contains wildcards then all matching files will be discovered
+ */
+ private List<String> getAllFilesToImport(String fileName) {
+ List<String> ret = new ArrayList<>();
+ File fileToImport = new File(fileName);
+
+ if (fileToImport.exists() && fileToImport.isFile()) {
+ //Input file present so no need to expand
+ LOG.info("Migration Import: zip file for import: " + fileToImport);
+
+ ret.add(fileToImport.getAbsolutePath());
+ } else {
+ //The fileName might have wildcard
+ String dirPath = new File(fileToImport.getParent()).getAbsolutePath();
+ File importDataDir = new File(dirPath);
+
+ if (importDataDir.exists() && importDataDir.isDirectory()) {
+ String fileNameWithWildcard = fileToImport.getName();
+ FileFilter fileFilter = new WildcardFileFilter(fileNameWithWildcard);
+
+ File[] importFiles = importDataDir.listFiles(fileFilter);
+
+ if (ArrayUtils.isNotEmpty(importFiles)) {
+ Arrays.sort(importFiles);
+
+ LOG.info("Migration Import: zip files for import: ");
+
+ for (File importFile : importFiles) {
+ if (isValidImportFile(importFile)) {
+ LOG.info(importFile.getName() + " with absolute path - " + importFile.getAbsolutePath());
+ ret.add(importFile.getAbsolutePath());
+ } else {
+ LOG.warn("Ignoring " + importFile.getAbsolutePath() + " as it is not a file or does not end with extension " + FILE_EXTENSION_ZIP);
+ }
+ }
+ } else {
+ LOG.warn("Migration Import: No files to import");
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ private boolean isValidImportFile(File importFile) {
+ return importFile.isFile() && StringUtils.endsWithIgnoreCase(importFile.getName(), FILE_EXTENSION_ZIP);
+ }
+
+ private void createArchiveDirectory(String fileName) {
+ File fileToImport = new File(fileName);
+ String parentPath = new File(fileToImport.getParent()).getAbsolutePath();
+
+ this.archiveDir = new File(parentPath + File.separator + ARCHIVE_DIR);
+
+ if (this.archiveDir.exists() && !this.archiveDir.canWrite()) {
+ LOG.warn("Migration Import: No write permission to archive directory {}", this.archiveDir.getAbsolutePath());
+ this.archiveDir = null;
+ } else if (!this.archiveDir.exists() && !this.archiveDir.getParentFile().canWrite()) {
+ LOG.warn("Migration Import: No permission to create archive directory {}", this.archiveDir.getAbsolutePath());
+ this.archiveDir = null;
+ } else {
+ this.archiveDir.mkdirs();
+ LOG.info("Migration Import: archive directory for zip files: {}", this.archiveDir.getAbsolutePath());
+ }
+ }
+
+ private void moveZipFileToArchiveDir(String srcFilePath) {
+ if (this.archiveDir == null) {
+ return;
+ }
+
+ File sourceFile = new File(srcFilePath);
+ String newFile = archiveDir.getAbsolutePath() + File.separator + sourceFile.getName();
+
+ if (!sourceFile.canWrite()) {
+ LOG.warn("Migration Import: No permission to archive the zip file {}", sourceFile.getAbsolutePath());
+ this.archiveDir = null;
+ } else {
+ if (sourceFile.renameTo(new File(newFile))) {
+ sourceFile.delete();
+ LOG.info("Migration Import: Successfully archived the zip file: " + srcFilePath + " to " + this.archiveDir.getAbsolutePath());
+ } else {
+ LOG.warn("Migration Import: Failed to archive the zip file: " + srcFilePath);
+ }
}
}
@@ -91,7 +206,7 @@ public class ZipFileMigrationImporter implements Runnable {
return statusRetrieved;
}
- private void detectFileToImport() throws IOException {
+ private void detectFileToImport(String fileToImport) throws IOException {
FileWatcher fileWatcher = new FileWatcher(fileToImport);
fileWatcher.start();
}
@@ -126,7 +241,7 @@ public class ZipFileMigrationImporter implements Runnable {
private void performImport(String fileToImport, int streamSize, String startPosition) throws AtlasBaseException {
try {
LOG.info("Migration Import: {}: Starting at: {}...", fileToImport, startPosition);
- InputStream fs = new FileInputStream(new File(fileToImport));
+ InputStream fs = new FileInputStream(fileToImport);
RequestContext.get().setUser(getUserNameFromEnvironment(), null);
importService.run(fs, getImportRequest(fileToImport, streamSize, startPosition),