You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/03/16 17:09:47 UTC
[atlas] branch master updated: ATLAS-3663: ZipFileMigrator:
Automatic Resume During Migration.
This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 3d0c987 ATLAS-3663: ZipFileMigrator: Automatic Resume During Migration.
3d0c987 is described below
commit 3d0c98779f16c7f0c6faa2b441877458ea152675
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Mon Mar 16 09:58:27 2020 -0700
ATLAS-3663: ZipFileMigrator: Automatic Resume During Migration.
---
.../atlas/model/impexp/AtlasImportRequest.java | 3 +-
.../model/migration/MigrationImportStatus.java | 62 ++++++
.../java/org/apache/atlas/pc/StatusReporter.java | 41 +++-
.../org/apache/atlas/pc/StatusReporterTest.java | 18 ++
.../atlas/repository/impexp/ImportService.java | 10 +-
.../impexp/MigrationProgressService.java | 42 +++-
.../migration/DataMigrationStatusService.java | 245 +++++++++++++++++++++
.../migration/ZipFileMigrationImporter.java | 43 +++-
.../store/graph/v2/bulkimport/MigrationImport.java | 21 +-
.../graph/v2/bulkimport/pc/EntityConsumer.java | 5 +-
.../v2/bulkimport/pc/EntityCreationManager.java | 19 +-
.../impexp/DataMigrationStatusServiceTest.java | 66 ++++++
12 files changed, 548 insertions(+), 27 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
index 09dafdf..2c18704 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
@@ -44,12 +44,13 @@ public class AtlasImportRequest implements Serializable {
public static final String TRANSFORMS_KEY = "transforms";
public static final String TRANSFORMERS_KEY = "transformers";
public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
+ public static final String OPTION_KEY_MIGRATION_FILE_NAME = "migrationFileName";
public static final String OPTION_KEY_MIGRATION = "migration";
public static final String OPTION_KEY_NUM_WORKERS = "numWorkers";
public static final String OPTION_KEY_BATCH_SIZE = "batchSize";
public static final String OPTION_KEY_FORMAT = "format";
public static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect";
- private static final String START_POSITION_KEY = "startPosition";
+ public static final String START_POSITION_KEY = "startPosition";
private static final String START_GUID_KEY = "startGuid";
private static final String FILE_NAME_KEY = "fileName";
private static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition";
diff --git a/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
new file mode 100644
index 0000000..3430fda
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/migration/MigrationImportStatus.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.model.migration;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.AtlasBaseModelObject;
+import org.apache.atlas.model.impexp.MigrationStatus;
+import org.apache.commons.lang.StringUtils;
+
+import java.io.Serializable;
+import java.util.Date;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class MigrationImportStatus extends MigrationStatus {
+ private String name;
+
+ public MigrationImportStatus() {
+ }
+
+ public MigrationImportStatus(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(", name=").append(name);
+ sb.append(super.toString());
+ return sb.toString();
+ }
+}
diff --git a/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java b/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java
index f84e8d0..7baf973 100644
--- a/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java
+++ b/intg/src/main/java/org/apache/atlas/pc/StatusReporter.java
@@ -18,6 +18,9 @@
package org.apache.atlas.pc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -25,8 +28,20 @@ import java.util.Map;
import java.util.Set;
public class StatusReporter<T, U> {
+ private static final Logger LOG = LoggerFactory.getLogger(StatusReporter.class);
+
private Map<T,U> producedItems = new LinkedHashMap<>();
private Set<T> processedSet = new HashSet<>();
+ private long timeoutDuration;
+ private long lastAck;
+
+ public StatusReporter() {
+ this.timeoutDuration = -1;
+ }
+
+ public StatusReporter(long timeoutDurationInMs) {
+ this.timeoutDuration = timeoutDurationInMs;
+ }
public void produced(T item, U index) {
this.producedItems.put(item, index);
@@ -44,7 +59,8 @@ public class StatusReporter<T, U> {
U ack = null;
U ret;
do {
- ret = completionIndex(getFirstElement(this.producedItems));
+ Map.Entry<T, U> firstElement = getFirstElement(this.producedItems);
+ ret = completionIndex(firstElement);
if (ret != null) {
ack = ret;
}
@@ -63,13 +79,32 @@ public class StatusReporter<T, U> {
private U completionIndex(Map.Entry<T, U> lookFor) {
U ack = null;
- if (lookFor == null || !processedSet.contains(lookFor.getKey())) {
+ if (lookFor == null) {
return ack;
}
- ack = lookFor.getValue();
+ if (hasTimeoutDurationReached(System.currentTimeMillis())) {
+ LOG.warn("Ack: Timeout: {} - {}", lookFor.getKey(), lookFor.getValue());
+ return acknowledged(lookFor);
+ }
+
+ if (!processedSet.contains(lookFor.getKey())) {
+ return ack;
+ }
+
+ return acknowledged(lookFor);
+ }
+
+ private U acknowledged(Map.Entry<T, U> lookFor) {
+ U ack = lookFor.getValue();
producedItems.remove(lookFor.getKey());
processedSet.remove(lookFor);
return ack;
}
+
+ private boolean hasTimeoutDurationReached(long now) {
+ boolean b = (this.timeoutDuration > -1) && (this.lastAck != 0) && ((now - this.lastAck) >= timeoutDuration);
+ lastAck = System.currentTimeMillis();
+ return b;
+ }
}
diff --git a/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java b/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java
index 3e50562..45bdbb0 100644
--- a/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java
+++ b/intg/src/test/java/org/apache/atlas/pc/StatusReporterTest.java
@@ -23,6 +23,8 @@ import org.testng.annotations.Test;
import java.util.concurrent.BlockingQueue;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
public class StatusReporterTest {
private static class IntegerConsumer extends WorkItemConsumer<Integer> {
@@ -91,4 +93,20 @@ public class StatusReporterTest {
statusReporter.processed((Integer) result);
}
}
+
+ @Test
+ public void reportWithTimeout() throws InterruptedException {
+ StatusReporter<Integer, Integer> statusReporter = new StatusReporter<>(2000);
+ statusReporter.produced(1, 100);
+ statusReporter.produced(2, 200);
+
+ statusReporter.processed(2);
+ Integer ack = statusReporter.ack();
+ assertNull(ack);
+
+ Thread.sleep(3000);
+ ack = statusReporter.ack();
+ assertNotNull(ack);
+ assertEquals(ack, Integer.valueOf(200));
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index c18c4ab..1d29bf8 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -235,6 +235,10 @@ public class ImportService {
result.incrementMeticsCounter("duration", getDuration(this.endTimestamp, this.startTimestamp));
result.setOperationStatus(AtlasImportResult.OperationStatus.SUCCESS);
+ if (isMigrationMode(result.getRequest())) {
+ return;
+ }
+
auditsWriter.write(userName, result, startTimestamp, endTimestamp, importSource.getCreationOrder());
}
@@ -250,7 +254,7 @@ public class ImportService {
private EntityImportStream createZipSource(AtlasImportRequest request, InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
try {
- if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION) || (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) &&
+ if (isMigrationMode(request) || (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) &&
request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT))) {
LOG.info("ZipSource Format: ZipDirect: Size: {}", request.getOptions().get("size"));
return getZipDirectEntityImportStream(request, inputStream);
@@ -288,4 +292,8 @@ public class ImportService {
exportRequest.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL) &&
exportRequest.getSkipLineageOptionValue();
}
+
+ private boolean isMigrationMode(AtlasImportRequest request) {
+ return request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION);
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/MigrationProgressService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/MigrationProgressService.java
index 54ae32a..6bb5f1e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/MigrationProgressService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/MigrationProgressService.java
@@ -19,35 +19,61 @@
package org.apache.atlas.repository.impexp;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
import org.apache.atlas.annotation.AtlasService;
import org.apache.atlas.model.impexp.MigrationStatus;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.GraphDBMigrator;
+import org.apache.atlas.repository.migration.DataMigrationStatusService;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
+import static org.apache.atlas.AtlasConstants.ATLAS_MIGRATION_MODE_FILENAME;
+
@AtlasService
@Singleton
public class MigrationProgressService {
private static final Logger LOG = LoggerFactory.getLogger(MigrationProgressService.class);
+ private static final String FILE_EXTENSION_ZIP = ".zip";
public static final String MIGRATION_QUERY_CACHE_TTL = "atlas.migration.query.cache.ttlInSecs";
@VisibleForTesting
- static long DEFAULT_CACHE_TTL_IN_SECS = 30 * 1000; // 30 secs
+ static long DEFAULT_CACHE_TTL_IN_SECS = 120 * 1000; // 30 secs
private final long cacheValidity;
private final GraphDBMigrator migrator;
private MigrationStatus cachedStatus;
private long cacheExpirationTime = 0;
+ private DataMigrationStatusService dataMigrationStatusService;
+ private boolean zipFileBasedMigrationImport;
@Inject
public MigrationProgressService(Configuration configuration, GraphDBMigrator migrator) {
this.migrator = migrator;
this.cacheValidity = (configuration != null) ? configuration.getLong(MIGRATION_QUERY_CACHE_TTL, DEFAULT_CACHE_TTL_IN_SECS) : DEFAULT_CACHE_TTL_IN_SECS;
+
+ this.zipFileBasedMigrationImport = isZipFileBasedMigrationEnabled();
+ initConditionallyZipFileBasedMigrator();
+ }
+
+ private void initConditionallyZipFileBasedMigrator() {
+ if (!zipFileBasedMigrationImport) {
+ return;
+ }
+
+ dataMigrationStatusService = new DataMigrationStatusService(AtlasGraphProvider.getGraphInstance());
+ dataMigrationStatusService.init(getFileNameFromMigrationProperty());
+ }
+
+ private boolean isZipFileBasedMigrationEnabled() {
+ return StringUtils.endsWithIgnoreCase(getFileNameFromMigrationProperty(), FILE_EXTENSION_ZIP);
}
public MigrationStatus getStatus() {
@@ -58,7 +84,11 @@ public class MigrationProgressService {
long currentTime = System.currentTimeMillis();
if(resetCache(currentTime)) {
- cachedStatus = migrator.getMigrationStatus();
+ if (this.zipFileBasedMigrationImport) {
+ cachedStatus = dataMigrationStatusService.getStatus();
+ } else {
+ cachedStatus = migrator.getMigrationStatus();
+ }
}
return cachedStatus;
@@ -73,4 +103,12 @@ public class MigrationProgressService {
return ret;
}
+
+ public String getFileNameFromMigrationProperty() {
+ try {
+ return ApplicationProperties.get().getString(ATLAS_MIGRATION_MODE_FILENAME, StringUtils.EMPTY);
+ } catch (AtlasException e) {
+ return StringUtils.EMPTY;
+ }
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
new file mode 100644
index 0000000..8c7a3a8
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/migration/DataMigrationStatusService.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.migration;
+
+import org.apache.atlas.model.migration.MigrationImportStatus;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasIndexQuery;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.Iterator;
+
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getEncodedProperty;
+import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
+import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.encodePropertyKey;
+import static org.apache.atlas.type.Constants.INTERNAL_PROPERTY_KEY_PREFIX;
+
+public class DataMigrationStatusService {
+ private static final Logger LOG = LoggerFactory.getLogger(DataMigrationStatusService.class);
+ private final MigrationStatusVertexManagement migrationStatusVertexManagement;
+
+ private MigrationImportStatus status;
+
+ public DataMigrationStatusService() {
+ this.migrationStatusVertexManagement = new MigrationStatusVertexManagement(AtlasGraphProvider.getGraphInstance());
+ }
+
+ public DataMigrationStatusService(AtlasGraph atlasGraph) {
+ this.migrationStatusVertexManagement = new MigrationStatusVertexManagement(atlasGraph);
+ }
+
+
+ public void init(String fileToImport) {
+ this.status = new MigrationImportStatus(fileToImport);
+ if (!this.migrationStatusVertexManagement.exists(fileToImport)) {
+ return;
+ }
+
+ getCreate(fileToImport);
+ }
+
+ public MigrationImportStatus getCreate(String fileName) {
+ return getCreate(new MigrationImportStatus(fileName));
+ }
+
+ public MigrationImportStatus getCreate(MigrationImportStatus status) {
+ try {
+ this.status = this.migrationStatusVertexManagement.createOrUpdate(status);
+ } catch (Exception ex) {
+ LOG.error("DataMigrationStatusService: Setting status: {}: Resulted in error!", status.getName(), ex);
+ }
+
+ return this.status;
+ }
+
+ public MigrationImportStatus getStatus() {
+ if (this.status != null &&
+ StringUtils.isEmpty(this.status.getOperationStatus()) &&
+ this.migrationStatusVertexManagement.exists(this.status.getName())) {
+ return getCreate(this.status);
+ } else {
+ return this.status;
+ }
+ }
+
+ public MigrationImportStatus getByName(String name) {
+ return this.migrationStatusVertexManagement.findByName(name);
+ }
+
+ public void delete() {
+ if (this.status == null) {
+ return;
+ }
+
+ MigrationImportStatus status = getByName(this.status.getName());
+ this.migrationStatusVertexManagement.delete(status.getName());
+ this.status = null;
+ }
+
+ public void savePosition(String position) {
+ this.status.setCurrentIndex(Long.valueOf(position));
+ this.migrationStatusVertexManagement.updateVertexPartial(this.status);
+ }
+
+ public void setStatus(String status) {
+ this.status.setOperationStatus(status);
+ this.migrationStatusVertexManagement.updateVertexPartial(this.status);
+ }
+
+ private static class MigrationStatusVertexManagement {
+ public static final String PROPERTY_KEY_START_TIME = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "migration.startTime");
+ public static final String PROPERTY_KEY_SIZE = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "migration.size");
+ public static final String PROPERTY_KEY_POSITION = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "migration.position");
+ public static final String PROPERTY_KEY_STATUS = encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "migration.status");
+
+ private AtlasGraph atlasGraph;
+ private AtlasVertex vertex;
+
+ public MigrationStatusVertexManagement(AtlasGraph atlasGraph) {
+ this.atlasGraph = atlasGraph;
+ }
+
+ public MigrationImportStatus createOrUpdate(MigrationImportStatus status) {
+ this.vertex = findByNameInternal(status.getName());
+
+ if (this.vertex == null) {
+ this.vertex = atlasGraph.addVertex();
+ LOG.info("MigrationStatusVertexManagement: Vertex created!");
+ updateVertex(this.vertex, status);
+ }
+
+ return to(this.vertex);
+ }
+
+ public boolean exists(String name) {
+ return findByNameInternal(name) != null;
+ }
+
+ public MigrationImportStatus findByName(String name) {
+ if (this.vertex != null) {
+ return to(this.vertex);
+ }
+
+ AtlasVertex v = findByNameInternal(name);
+ if (v == null) {
+ return null;
+ }
+
+ this.vertex = v;
+ LOG.info("MigrationImportStatus: Vertex found!");
+ return to(v);
+ }
+
+ public void delete(String name) {
+ try {
+ AtlasVertex vertex = findByNameInternal(name);
+ atlasGraph.removeVertex(vertex);
+ this.vertex = null;
+ } finally {
+ atlasGraph.commit();
+ }
+ }
+
+ private AtlasVertex findByNameInternal(String name) {
+ try {
+ String idxQueryString = String.format("%s\"%s\":\"%s\"", AtlasGraphUtilsV2.getIndexSearchPrefix(), Constants.GUID_PROPERTY_KEY, name);
+ AtlasIndexQuery idxQuery = atlasGraph.indexQuery(Constants.VERTEX_INDEX, idxQueryString);
+ Iterator<AtlasIndexQuery.Result<Object, Object>> results = idxQuery.vertices();
+
+ AtlasIndexQuery.Result<?, ?> qryResult = results.hasNext() ? results.next() : null;
+ if (qryResult != null) {
+ return qryResult.getVertex();
+ } else {
+ return null;
+ }
+ } catch (Exception e) {
+ LOG.error("MigrationStatusVertexManagement.findByNameInternal: Failed!", e);
+ } finally {
+ atlasGraph.commit();
+ }
+
+ return null;
+ }
+
+ public void updateVertexPartial(MigrationImportStatus status) {
+ try {
+ setEncodedProperty(vertex, PROPERTY_KEY_POSITION, status.getCurrentIndex());
+ } catch (Exception e) {
+ LOG.warn("Error updating status. Please rely on log messages.", e);
+ } finally {
+ atlasGraph.commit();
+ }
+ }
+
+ private void updateVertex(AtlasVertex vertex, MigrationImportStatus status) {
+ try {
+ setEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, status.getName());
+
+ setEncodedProperty(vertex, PROPERTY_KEY_START_TIME,
+ (status.getStartTime() != null)
+ ? status.getStartTime().getTime()
+ : System.currentTimeMillis());
+
+ setEncodedProperty(vertex, PROPERTY_KEY_SIZE, status.getTotalCount());
+ setEncodedProperty(vertex, PROPERTY_KEY_POSITION, status.getCurrentIndex());
+ setEncodedProperty(vertex, PROPERTY_KEY_STATUS, status.getOperationStatus());
+ } catch (Exception ex) {
+ LOG.error("Error updating MigrationImportStatus vertex. Status may not be persisted correctly.", ex);
+ } finally {
+ atlasGraph.commit();
+ }
+ }
+
+ private static MigrationImportStatus to(AtlasVertex vertex) {
+ MigrationImportStatus ret = new MigrationImportStatus();
+
+ try {
+ ret.setName(getEncodedProperty(vertex, Constants.GUID_PROPERTY_KEY, String.class));
+
+ Long dateValue = getEncodedProperty(vertex, PROPERTY_KEY_START_TIME, Long.class);
+ if (dateValue != null) {
+ ret.setStartTime(new Date(dateValue));
+ }
+
+ Long size = getEncodedProperty(vertex, PROPERTY_KEY_SIZE, Long.class);
+ if (size != null) {
+ ret.setTotalCount(size);
+ }
+
+ Long position = getEncodedProperty(vertex, PROPERTY_KEY_POSITION, Long.class);
+ if (position != null) {
+ ret.setCurrentIndex(position);
+ }
+
+ ret.setOperationStatus(getEncodedProperty(vertex, PROPERTY_KEY_STATUS, String.class));
+ } catch (Exception ex) {
+ LOG.error("Error converting to MigrationImportStatus. Will proceed with default values.", ex);
+ }
+
+ return ret;
+ }
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
index 35a76ea..f44f2a8 100644
--- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
@@ -23,6 +23,8 @@ import org.apache.atlas.AtlasException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.migration.MigrationImportStatus;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.impexp.ImportService;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.lang.StringUtils;
@@ -41,7 +43,7 @@ public class ZipFileMigrationImporter implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class);
private static final String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers";
- private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size";
+ private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size";
private static final String DEFAULT_NUMBER_OF_WORKERS = "4";
private static final String DEFAULT_BATCH_SIZE = "100";
private static final String ZIP_FILE_COMMENT_ENTITIES_COUNT = "entitiesCount";
@@ -51,20 +53,24 @@ public class ZipFileMigrationImporter implements Runnable {
private final ImportService importService;
private final String fileToImport;
+ private DataMigrationStatusService dataMigrationStatusService;
+ private MigrationImportStatus migrationImportStatus;
public ZipFileMigrationImporter(ImportService importService, String fileName) {
this.importService = importService;
this.fileToImport = fileName;
+ this.dataMigrationStatusService = new DataMigrationStatusService(AtlasGraphProvider.getGraphInstance());
}
@Override
public void run() {
try {
- FileWatcher fileWatcher = new FileWatcher(fileToImport);
- fileWatcher.start();
+ detectFileToImport();
int streamSize = getStreamSizeFromComment(fileToImport);
- performImport(new FileInputStream(new File(fileToImport)), streamSize);
+ migrationImportStatus = getCreateMigrationStatus(fileToImport, streamSize);
+ performImport(fileToImport, streamSize, Long.toString(migrationImportStatus.getCurrentIndex()));
+ dataMigrationStatusService.setStatus("DONE");
} catch (IOException e) {
LOG.error("Migration Import: IO Error!", e);
} catch (AtlasBaseException e) {
@@ -72,6 +78,22 @@ public class ZipFileMigrationImporter implements Runnable {
}
}
+ private MigrationImportStatus getCreateMigrationStatus(String fileName, int streamSize) {
+ MigrationImportStatus status = new MigrationImportStatus(fileName);
+ status.setTotalCount(streamSize);
+
+ MigrationImportStatus statusRetrieved = dataMigrationStatusService.getCreate(status);
+
+ LOG.info("DataMigrationStatusService: Position: {}", statusRetrieved.getCurrentIndex());
+ dataMigrationStatusService.setStatus("STARTED");
+ return statusRetrieved;
+ }
+
+ private void detectFileToImport() throws IOException {
+ FileWatcher fileWatcher = new FileWatcher(fileToImport);
+ fileWatcher.start();
+ }
+
private int getStreamSizeFromComment(String fileToImport) {
int ret = 1;
try {
@@ -99,13 +121,13 @@ public class ZipFileMigrationImporter implements Runnable {
return entitiesCount;
}
- private void performImport(InputStream fs, int streamSize) throws AtlasBaseException {
+ private void performImport(String fileToImport, int streamSize, String startPosition) throws AtlasBaseException {
try {
- LOG.info("Migration Import: {}: Starting...", fileToImport);
-
+ LOG.info("Migration Import: {}: Starting at: {}...", fileToImport, startPosition);
+ InputStream fs = new FileInputStream(new File(fileToImport));
RequestContext.get().setUser(getUserNameFromEnvironment(), null);
- importService.run(fs, getImportRequest(streamSize),
+ importService.run(fs, getImportRequest(fileToImport, streamSize, startPosition),
getUserNameFromEnvironment(),
InetAddress.getLocalHost().getHostName(),
InetAddress.getLocalHost().getHostAddress());
@@ -122,16 +144,19 @@ public class ZipFileMigrationImporter implements Runnable {
return System.getProperty(ENV_USER_NAME);
}
- private AtlasImportRequest getImportRequest(int streamSize) throws AtlasException {
+ private AtlasImportRequest getImportRequest(String fileToImport, int streamSize, String position) throws AtlasException {
AtlasImportRequest request = new AtlasImportRequest();
+ request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION_FILE_NAME, fileToImport);
request.setSizeOption(streamSize);
request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true");
request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKERS));
request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE));
+ request.setOption(AtlasImportRequest.START_POSITION_KEY, (StringUtils.isEmpty(position) ? "0" : position));
return request;
}
+
private String getPropertyValue(String property, String defaultValue) throws AtlasException {
return ApplicationProperties.get().getString(property, defaultValue);
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
index 4c912fd..ff55e40 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
@@ -20,12 +20,14 @@ package org.apache.atlas.repository.store.graph.v2.bulkimport;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.converters.AtlasFormatConverters;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.graph.AtlasGraphProvider;
import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.migration.DataMigrationStatusService;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
@@ -65,10 +67,12 @@ public class MigrationImport extends ImportStrategy {
throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "importResult should contain request");
}
- int index = 0;
+ DataMigrationStatusService dataMigrationStatusService = createMigrationStatusService(importResult);
+
+ long index = 0;
int streamSize = entityStream.size();
EntityMutationResponse ret = new EntityMutationResponse();
- EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult);
+ EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult, dataMigrationStatusService);
try {
LOG.info("Migration Import: Size: {}: Starting...", streamSize);
@@ -85,14 +89,23 @@ public class MigrationImport extends ImportStrategy {
return ret;
}
- private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, AtlasImportResult importResult) {
+ private DataMigrationStatusService createMigrationStatusService(AtlasImportResult importResult) {
+ DataMigrationStatusService dataMigrationStatusService = new DataMigrationStatusService();
+ dataMigrationStatusService.init(importResult.getRequest().getOptions().get(AtlasImportRequest.OPTION_KEY_MIGRATION_FILE_NAME));
+ return dataMigrationStatusService;
+ }
+
+ private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph,
+ AtlasImportResult importResult,
+ DataMigrationStatusService dataMigrationStatusService) {
+ atlasGraph = threadedAtlasGraph;
int batchSize = importResult.getRequest().getOptionKeyBatchSize();
int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
EntityConsumerBuilder consumerBuilder =
new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize);
- return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult);
+ return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, dataMigrationStatusService);
}
private static int getNumWorkers(int numWorkersFromOptions) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
index e8f4b02..d0fac10 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
@@ -155,7 +155,10 @@ public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWith
LOG.error("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount);
pause(retryCount);
- LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex);
+ String exceptionClass = ex.getClass().getSimpleName();
+ if (!exceptionClass.equals("JanusGraphException") && !exceptionClass.equals("PermanentLockingException")) {
+ LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex);
+ }
retryProcessEntity(retryCount);
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
index 16bb49e..177b563 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
@@ -23,6 +23,7 @@ import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.pc.StatusReporter;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.migration.DataMigrationStatusService;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.commons.lang.StringUtils;
@@ -32,24 +33,28 @@ import org.slf4j.LoggerFactory;
public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager {
private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class);
private static final String WORKER_PREFIX = "migration-import";
+ private static final long STATUS_REPORT_TIMEOUT_DURATION = 1 * 60 * 1000; // 5 min
private final StatusReporter<String, String> statusReporter;
private final AtlasImportResult importResult;
+ private final DataMigrationStatusService dataMigrationStatusService;
private String currentTypeName;
private float currentPercent;
private EntityImportStream entityImportStream;
- public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult) {
+ public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult, DataMigrationStatusService dataMigrationStatusService) {
super(builder, WORKER_PREFIX, batchSize, numWorkers, true);
this.importResult = importResult;
+ this.dataMigrationStatusService = dataMigrationStatusService;
- this.statusReporter = new StatusReporter<>();
+ this.statusReporter = new StatusReporter<>(STATUS_REPORT_TIMEOUT_DURATION);
}
- public int read(EntityImportStream entityStream) {
- int currentIndex = 0;
+ public long read(EntityImportStream entityStream) {
+ long currentIndex = this.dataMigrationStatusService.getStatus().getCurrentIndex();
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
this.entityImportStream = entityStream;
+ this.dataMigrationStatusService.setStatus("IN_PROGRESS");
while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) {
AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
if (entity == null) {
@@ -66,7 +71,7 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag
return currentIndex;
}
- private void produce(int currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+ private void produce(long currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
String previousTypeName = getCurrentTypeName();
if (StringUtils.isNotEmpty(typeName)
@@ -104,7 +109,9 @@ public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManag
}
importResult.incrementMeticsCounter(split[0]);
- this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(split[1]), this.entityImportStream.size(), getCurrentPercent());
+ String currentPosition = split[1];
+ dataMigrationStatusService.savePosition(currentPosition);
+ this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(currentPosition), this.entityImportStream.size(), getCurrentPercent());
}
private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) {
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
new file mode 100644
index 0000000..bf1d9a0
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/DataMigrationStatusServiceTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import com.google.inject.Inject;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.migration.MigrationImportStatus;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.migration.DataMigrationStatusService;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.util.Date;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class DataMigrationStatusServiceTest {
+ @Inject
+ AtlasGraph atlasGraph;
+
+ @Test
+ public void createUpdateDelete() throws AtlasBaseException {
+ DataMigrationStatusService dataMigrationStatusService = new DataMigrationStatusService(atlasGraph);
+
+ MigrationImportStatus expected = new MigrationImportStatus("/tmp/defg.zip");
+ expected.setTotalCount(3333);
+ expected.setCurrentIndex(20);
+ expected.setStartTime(new Date());
+
+ MigrationImportStatus ret = dataMigrationStatusService.getCreate(expected);
+
+ assertNotNull(ret);
+ assertEquals(ret.getName(), expected.getName());
+ assertEquals(ret.getStartTime(), expected.getStartTime());
+ assertEquals(ret.getTotalCount(), expected.getTotalCount());
+ assertEquals(ret.getCurrentIndex(), expected.getCurrentIndex());
+
+ dataMigrationStatusService.savePosition("100");
+ assertNotNull(dataMigrationStatusService.getStatus());
+ assertNotNull(dataMigrationStatusService.getStatus().getCurrentIndex(), "100");
+ assertNotNull(dataMigrationStatusService.getCreate(expected).getCurrentIndex(), "100");
+
+ dataMigrationStatusService.delete();
+ assertNull(dataMigrationStatusService.getStatus());
+ assertNull(dataMigrationStatusService.getByName(ret.getName()));
+ }
+}