You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2020/02/21 05:27:48 UTC
[atlas] 01/02: ATLAS-3320: Import Service. Support concurrent
ingest.
This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
commit a2ccfb9f3577e911103041d8d4b91c169697f6a1
Author: Ashutosh Mestry <am...@cloudera.com>
AuthorDate: Thu Feb 20 11:46:37 2020 -0800
ATLAS-3320: Import Service. Support concurrent ingest.
---
.../repository/graphdb/janus/AtlasJanusGraph.java | 2 +-
.../java/org/apache/atlas/AtlasConfiguration.java | 1 +
.../atlas/model/impexp/AtlasImportRequest.java | 43 +++-
.../java/org/apache/atlas/pc/WorkItemConsumer.java | 11 +-
.../java/org/apache/atlas/pc/WorkItemManager.java | 9 +-
.../apache/atlas/GraphTransactionInterceptor.java | 4 +
.../atlas/repository/impexp/AuditsWriter.java | 3 +-
.../atlas/repository/impexp/ImportService.java | 22 +-
.../repository/impexp/ZipExportFileNames.java | 4 +
.../atlas/repository/impexp/ZipSourceDirect.java | 269 +++++++++++++++++++++
.../migration/ZipFileMigrationImporter.java | 58 ++++-
.../repository/patches/UniqueAttributePatch.java | 4 +-
.../repository/store/graph/AtlasEntityStore.java | 8 +
.../store/graph/v2/AtlasEntityStoreV2.java | 11 +-
.../store/graph/v2/AtlasRelationshipStoreV2.java | 4 +
.../store/graph/v2/BulkImporterImpl.java | 228 ++++-------------
.../store/graph/v2/EntityGraphMapper.java | 41 +++-
.../graph/v2/bulkimport/ImportStrategy.java} | 22 +-
.../store/graph/v2/bulkimport/MigrationImport.java | 122 ++++++++++
.../RegularImport.java} | 76 ++----
.../graph/v2/bulkimport/pc/EntityConsumer.java | 213 ++++++++++++++++
.../v2/bulkimport/pc/EntityConsumerBuilder.java | 50 ++++
.../v2/bulkimport/pc/EntityCreationManager.java | 130 ++++++++++
.../graph/v2/bulkimport/pc/StatusReporter.java | 131 ++++++++++
.../atlas/repository/impexp/ImportServiceTest.java | 16 ++
.../repository/impexp/MigrationImportTest.java | 77 ++++++
.../repository/impexp/StatusReporterTest.java | 99 ++++++++
.../atlas/repository/impexp/ZipDirectTest.java | 61 +++++
.../impexp/ZipFileResourceTestUtils.java | 7 +-
repository/src/test/resources/zip-direct-1.zip | Bin 0 -> 22 bytes
repository/src/test/resources/zip-direct-2.zip | Bin 0 -> 1720553 bytes
31 files changed, 1432 insertions(+), 294 deletions(-)
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
index 4acb371..0176ba7 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
@@ -116,7 +116,7 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE
}
}
- janusGraph = (StandardJanusGraph) AtlasJanusGraphDatabase.getGraphInstance();
+ janusGraph = (StandardJanusGraph) graphInstance;
}
@Override
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 1a0d0cc..f8d7f8c 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -64,6 +64,7 @@ public enum AtlasConfiguration {
CUSTOM_ATTRIBUTE_VALUE_MAX_LENGTH("atlas.custom.attribute.value.max.length", 500),
LABEL_MAX_LENGTH("atlas.entity.label.max.length", 50),
IMPORT_TEMP_DIRECTORY("atlas.import.temp.directory", ""),
+ MIGRATION_IMPORT_START_POSITION("atlas.migration.import.start.position", 0),
LINEAGE_USING_GREMLIN("atlas.lineage.query.use.gremlin", false);
private static final Configuration APPLICATION_PROPERTIES;
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
index 0b3ede9..0ad3673 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportRequest.java
@@ -44,10 +44,16 @@ public class AtlasImportRequest implements Serializable {
public static final String TRANSFORMS_KEY = "transforms";
public static final String TRANSFORMERS_KEY = "transformers";
public static final String OPTION_KEY_REPLICATED_FROM = "replicatedFrom";
- private static final String START_POSITION_KEY = "startPosition";
+ public static final String OPTION_KEY_MIGRATION = "migration";
+ public static final String OPTION_KEY_NUM_WORKERS = "numWorkers";
+ public static final String OPTION_KEY_BATCH_SIZE = "batchSize";
+ public static final String OPTION_KEY_FORMAT = "format";
+ public static final String OPTION_KEY_FORMAT_ZIP_DIRECT = "zipDirect";
+ public static final String START_POSITION_KEY = "startPosition";
private static final String START_GUID_KEY = "startGuid";
private static final String FILE_NAME_KEY = "fileName";
private static final String UPDATE_TYPE_DEFINITION_KEY = "updateTypeDefinition";
+ private static final String OPTION_KEY_STREAM_SIZE = "size";
private Map<String, String> options;
@@ -108,7 +114,7 @@ public class AtlasImportRequest implements Serializable {
return null;
}
- return (String) this.options.get(key);
+ return this.options.get(key);
}
@JsonIgnore
@@ -121,10 +127,41 @@ public class AtlasImportRequest implements Serializable {
return isReplicationOptionSet() ? options.get(OPTION_KEY_REPLICATED_FROM) : StringUtils.EMPTY;
}
+ @JsonIgnore
+ public int getOptionKeyNumWorkers() {
+ return getOptionsValue(OPTION_KEY_NUM_WORKERS, 1);
+ }
+
+ @JsonIgnore
+ public int getOptionKeyBatchSize() {
+ return getOptionsValue(OPTION_KEY_BATCH_SIZE, 1);
+ }
+
+ private int getOptionsValue(String optionKeyBatchSize, int defaultValue) {
+ String optionsValue = getOptionForKey(optionKeyBatchSize);
+
+ return StringUtils.isEmpty(optionsValue) ?
+ defaultValue :
+ Integer.valueOf(optionsValue);
+ }
+
@JsonAnySetter
public void setOption(String key, String value) {
if (null == options) {
options = new HashMap<>();
}
options.put(key, value);
- }}
+ }
+
+ public void setSizeOption(int size) {
+ setOption(OPTION_KEY_STREAM_SIZE, Integer.toString(size));
+ }
+
+ public int getSizeOption() {
+ if (!this.options.containsKey(OPTION_KEY_STREAM_SIZE)) {
+ return 1;
+ }
+
+ return Integer.valueOf(this.options.get(OPTION_KEY_STREAM_SIZE));
+ }
+}
diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
index 9ba4bf4..dd76697 100644
--- a/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
+++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemConsumer.java
@@ -21,6 +21,7 @@ package org.apache.atlas.pc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -37,7 +38,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
private final AtomicBoolean isDirty = new AtomicBoolean(false);
private final AtomicLong maxCommitTimeInMs = new AtomicLong(DEFAULT_COMMIT_TIME_IN_MS);
private CountDownLatch countdownLatch;
- private BlockingQueue<Object> results;
+ private Queue<Object> results;
public WorkItemConsumer(BlockingQueue<T> queue) {
this.queue = queue;
@@ -101,11 +102,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
protected abstract void processItem(T item);
protected void addResult(Object value) {
- try {
- results.put(value);
- } catch (InterruptedException e) {
- LOG.error("Interrupted while adding result: {}", value);
- }
+ results.add(value);
}
protected void updateCommitTime(long commitTime) {
@@ -118,7 +115,7 @@ public abstract class WorkItemConsumer<T> implements Runnable {
this.countdownLatch = countdownLatch;
}
- public <V> void setResults(BlockingQueue<Object> queue) {
+ public <V> void setResults(Queue<Object> queue) {
this.results = queue;
}
}
diff --git a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
index a7ba67c..351421e 100644
--- a/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
+++ b/intg/src/main/java/org/apache/atlas/pc/WorkItemManager.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.*;
public class WorkItemManager<T, U extends WorkItemConsumer> {
@@ -33,7 +34,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
private final ExecutorService service;
private final List<U> consumers = new ArrayList<>();
private CountDownLatch countdownLatch;
- private BlockingQueue<Object> resultsQueue;
+ private Queue<Object> resultsQueue;
public WorkItemManager(WorkItemBuilder builder, String namePrefix, int batchSize, int numWorkers, boolean collectResults) {
this.numWorkers = numWorkers;
@@ -49,13 +50,13 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
this(builder, "workItemConsumer", batchSize, numWorkers, false);
}
- public void setResultsCollection(BlockingQueue<Object> resultsQueue) {
+ public void setResultsCollection(Queue<Object> resultsQueue) {
this.resultsQueue = resultsQueue;
}
private void createConsumers(WorkItemBuilder builder, int numWorkers, boolean collectResults) {
if (collectResults) {
- setResultsCollection(new LinkedBlockingQueue<>());
+ setResultsCollection(new ConcurrentLinkedQueue<>());
}
for (int i = 0; i < numWorkers; i++) {
@@ -124,7 +125,7 @@ public class WorkItemManager<T, U extends WorkItemConsumer> {
LOG.info("WorkItemManager: Shutdown done!");
}
- public BlockingQueue getResults() {
+ public Queue getResults() {
return this.resultsQueue;
}
diff --git a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
index bbe0dc5..57e454a 100644
--- a/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
+++ b/repository/src/main/java/org/apache/atlas/GraphTransactionInterceptor.java
@@ -199,6 +199,10 @@ public class GraphTransactionInterceptor implements MethodInterceptor {
return cache.get(guid);
}
+ public static void clearCache() {
+ guidVertexCache.get().clear();
+ }
+
boolean logException(Throwable t) {
if (t instanceof AtlasBaseException) {
Response.Status httpCode = ((AtlasBaseException) t).getAtlasErrorCode().getHttpCode();
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
index 55990f7..373921d 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
@@ -247,7 +247,8 @@ public class AuditsWriter {
}
updateReplicationAttribute(replicationOptionState, sourceServerName, sourceServerFullName, entityGuids,
- Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker());
+ Constants.ATTR_NAME_REPLICATED_FROM,
+ (result.getExportResult() != null) ? result.getExportResult().getChangeMarker() : 0);
}
public void add(String userName, String sourceCluster, long startTime,
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 27001e3..cd1deab 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -92,7 +92,7 @@ public class ImportService {
request = new AtlasImportRequest();
}
- EntityImportStream source = createZipSource(inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
+ EntityImportStream source = createZipSource(request, inputStream, AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
return run(source, request, userName, hostName, requestingIP);
}
@@ -248,8 +248,18 @@ public class ImportService {
return (int) (endTime - startTime);
}
- private EntityImportStream createZipSource(InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
+ private EntityImportStream createZipSource(AtlasImportRequest request, InputStream inputStream, String configuredTemporaryDirectory) throws AtlasBaseException {
try {
+ if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION)) {
+ LOG.info("Migration mode: Detected...", request.getOptions().get("size"));
+ return getZipDirectEntityImportStream(request, inputStream);
+ }
+
+ if (request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_FORMAT) &&
+ request.getOptions().get(AtlasImportRequest.OPTION_KEY_FORMAT).equals(AtlasImportRequest.OPTION_KEY_FORMAT_ZIP_DIRECT) ) {
+ return getZipDirectEntityImportStream(request, inputStream);
+ }
+
if (StringUtils.isEmpty(configuredTemporaryDirectory)) {
return new ZipSource(inputStream);
}
@@ -260,9 +270,15 @@ public class ImportService {
}
}
+ private EntityImportStream getZipDirectEntityImportStream(AtlasImportRequest request, InputStream inputStream) throws IOException, AtlasBaseException {
+ ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, request.getSizeOption());
+ LOG.info("Using ZipSourceDirect: Size: {} entities", zipSourceDirect.size());
+ return zipSourceDirect;
+ }
+
@VisibleForTesting
boolean checkHiveTableIncrementalSkipLineage(AtlasImportRequest importRequest, AtlasExportRequest exportRequest) {
- if (CollectionUtils.isEmpty(exportRequest.getItemsToExport())) {
+ if (exportRequest == null || CollectionUtils.isEmpty(exportRequest.getItemsToExport())) {
return false;
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
index 351b475..8347b91 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
@@ -31,4 +31,8 @@ public enum ZipExportFileNames {
public String toString() {
return this.name;
}
+
+ public String toEntryFileName() {
+ return this.name + ".json";
+ }
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
new file mode 100644
index 0000000..260c4af
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSourceDirect.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.entitytransform.BaseEntityHandler;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import static org.apache.atlas.AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP;
+
+public class ZipSourceDirect implements EntityImportStream {
+ private static final Logger LOG = LoggerFactory.getLogger(ZipSourceDirect.class);
+
+ private final ZipInputStream zipInputStream;
+ private int currentPosition;
+
+ private ImportTransforms importTransform;
+ private List<BaseEntityHandler> entityHandlers;
+ private AtlasTypesDef typesDef;
+ private ZipEntry zipEntryNext;
+ private int streamSize = 1;
+
+ public ZipSourceDirect(InputStream inputStream, int streamSize) throws IOException, AtlasBaseException {
+ this.zipInputStream = new ZipInputStream(inputStream);
+ this.streamSize = streamSize;
+ prepareStreamForFetch();
+ }
+
+ @Override
+ public ImportTransforms getImportTransform() { return this.importTransform; }
+
+ @Override
+ public void setImportTransform(ImportTransforms importTransform) {
+ this.importTransform = importTransform;
+ }
+
+ @Override
+ public List<BaseEntityHandler> getEntityHandlers() {
+ return entityHandlers;
+ }
+
+ @Override
+ public void setEntityHandlers(List<BaseEntityHandler> entityHandlers) {
+ this.entityHandlers = entityHandlers;
+ }
+
+ @Override
+ public AtlasTypesDef getTypesDef() throws AtlasBaseException {
+ return this.typesDef;
+ }
+
+ @Override
+ public
+ AtlasExportResult getExportResult() throws AtlasBaseException {
+ return new AtlasExportResult();
+ }
+
+ @Override
+ public List<String> getCreationOrder() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public int getPosition() {
+ return currentPosition;
+ }
+
+ @Override
+ public AtlasEntity.AtlasEntityWithExtInfo getEntityWithExtInfo(String json) throws AtlasBaseException {
+ if (StringUtils.isEmpty(json)) {
+ return null;
+ }
+
+ AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntity.AtlasEntityWithExtInfo.class, json);
+
+ if (importTransform != null) {
+ entityWithExtInfo = importTransform.apply(entityWithExtInfo);
+ }
+
+ if (entityHandlers != null) {
+ applyTransformers(entityWithExtInfo);
+ }
+
+ return entityWithExtInfo;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return (this.zipEntryNext != null
+ && !zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_EXPORT_ORDER_NAME.toEntryFileName())
+ && !zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_EXPORT_INFO_NAME.toEntryFileName()));
+ }
+
+ @Override
+ public AtlasEntity next() {
+ AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getNextEntityWithExtInfo();
+
+ return entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
+ }
+
+ @Override
+ public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
+ try {
+ if (hasNext()) {
+ String json = moveNext();
+ return getEntityWithExtInfo(json);
+ }
+ } catch (AtlasBaseException e) {
+ LOG.error("getNextEntityWithExtInfo", e);
+ }
+ return null;
+ }
+
+ @Override
+ public void reset() {
+ currentPosition = 0;
+ }
+
+ @Override
+ public AtlasEntity getByGuid(String guid) {
+ try {
+ return getEntity(guid);
+ } catch (AtlasBaseException e) {
+ LOG.error("getByGuid: {} failed!", guid, e);
+ return null;
+ }
+ }
+
+ @Override
+ public void onImportComplete(String guid) {
+ }
+
+ @Override
+ public void setPosition(int index) {
+ try {
+ for (int i = 0; i < index; i++) {
+ moveNextEntry();
+ }
+ }
+ catch (IOException e) {
+ LOG.error("Error setting position: {}. Position may be beyond the stream size.", index);
+ }
+ }
+
+ @Override
+ public void setPositionUsingEntityGuid(String guid) {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ private void applyTransformers(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+ if (entityWithExtInfo == null) {
+ return;
+ }
+
+ transform(entityWithExtInfo.getEntity());
+
+ if (MapUtils.isNotEmpty(entityWithExtInfo.getReferredEntities())) {
+ for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
+ transform(e);
+ }
+ }
+ }
+
+ private void transform(AtlasEntity e) {
+ for (BaseEntityHandler handler : entityHandlers) {
+ handler.transform(e);
+ }
+ }
+
+ private <T> T convertFromJson(Class<T> clazz, String jsonData) throws AtlasBaseException {
+ try {
+ return AtlasType.fromJson(jsonData, clazz);
+
+ } catch (Exception e) {
+ throw new AtlasBaseException("Error converting file to JSON.", e);
+ }
+ }
+
+ private AtlasEntity getEntity(String guid) throws AtlasBaseException {
+ AtlasEntity.AtlasEntityWithExtInfo extInfo = getEntityWithExtInfo(guid);
+ return (extInfo != null) ? extInfo.getEntity() : null;
+ }
+
+ public int size() {
+ return this.streamSize;
+ }
+
+ private String moveNext() {
+ try {
+ moveNextEntry();
+ return getJsonPayloadFromZipEntryStream(this.zipInputStream);
+ } catch (IOException e) {
+ LOG.error("moveNext failed!", e);
+ }
+
+ return null;
+ }
+
+ private void moveNextEntry() throws IOException {
+ this.zipEntryNext = this.zipInputStream.getNextEntry();
+ this.currentPosition++;
+ }
+
+ private void prepareStreamForFetch() throws AtlasBaseException, IOException {
+ moveNextEntry();
+ if (this.zipEntryNext == null) {
+ throw new AtlasBaseException(IMPORT_ATTEMPTING_EMPTY_ZIP, "Attempting to import empty ZIP.");
+ }
+
+ if (this.zipEntryNext.getName().equals(ZipExportFileNames.ATLAS_TYPESDEF_NAME.toEntryFileName())) {
+ String json = getJsonPayloadFromZipEntryStream(this.zipInputStream);
+ this.typesDef = AtlasType.fromJson(json, AtlasTypesDef.class);
+ }
+ }
+
+ private String getJsonPayloadFromZipEntryStream(ZipInputStream zipInputStream) {
+ try {
+ final int BUFFER_LENGTH = 4096;
+ byte[] buf = new byte[BUFFER_LENGTH];
+
+ int n = 0;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ while ((n = zipInputStream.read(buf, 0, BUFFER_LENGTH)) > -1) {
+ bos.write(buf, 0, n);
+ }
+
+ return bos.toString();
+ } catch (IOException ex) {
+ LOG.error("Error fetching string from entry!", ex);
+ }
+
+ return null;
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
index ca0bc41..69d78cd 100644
--- a/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/migration/ZipFileMigrationImporter.java
@@ -24,6 +24,7 @@ import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.repository.impexp.ImportService;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,11 +33,20 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
+import java.util.zip.ZipFile;
+
+import static org.apache.atlas.AtlasConfiguration.MIGRATION_IMPORT_START_POSITION;
public class ZipFileMigrationImporter implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ZipFileMigrationImporter.class);
- private static String ENV_USER_NAME = "user.name";
+ private static final String APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS = "atlas.migration.mode.workers";
+ private static final String APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE = "atlas.migration.mode.batch.size";
+ private static final String DEFAULT_NUMBER_OF_WORKDERS = "4";
+ private static final String DEFAULT_BATCH_SIZE = "100";
+ private static final String ZIP_FILE_COMMENT = "streamSize";
+
+ private final static String ENV_USER_NAME = "user.name";
private final ImportService importService;
private final String fileToImport;
@@ -52,7 +62,8 @@ public class ZipFileMigrationImporter implements Runnable {
FileWatcher fileWatcher = new FileWatcher(fileToImport);
fileWatcher.start();
- performImport(new FileInputStream(new File(fileToImport)));
+ int streamSize = getStreamSizeFromComment(fileToImport);
+ performImport(new FileInputStream(new File(fileToImport)), streamSize);
} catch (IOException e) {
LOG.error("Migration Import: IO Error!", e);
} catch (AtlasBaseException e) {
@@ -60,19 +71,44 @@ public class ZipFileMigrationImporter implements Runnable {
}
}
- private void performImport(InputStream fs) throws AtlasBaseException {
+ private int getStreamSizeFromComment(String fileToImport) {
+ int ret = 1;
+ try {
+ ZipFile zipFile = new ZipFile(fileToImport);
+ String streamSizeComment = zipFile.getComment();
+ ret = processZipFileStreamSizeComment(streamSizeComment);
+ zipFile.close();
+ } catch (IOException e) {
+ LOG.error("Error opening ZIP file: {}", fileToImport, e);
+ }
+
+ return ret;
+ }
+
+ private int processZipFileStreamSizeComment(String streamSizeComment) {
+ if (!StringUtils.isNotEmpty(streamSizeComment) || !StringUtils.startsWith(streamSizeComment, ZIP_FILE_COMMENT)) {
+ return 1;
+ }
+
+ String s = StringUtils.substringAfter(streamSizeComment, ":");
+ LOG.debug("ZipFileMigrationImporter: streamSize: {}", streamSizeComment);
+
+ return Integer.valueOf(s);
+ }
+
+ private void performImport(InputStream fs, int streamSize) throws AtlasBaseException {
try {
LOG.info("Migration Import: {}: Starting...", fileToImport);
RequestContext.get().setUser(getUserNameFromEnvironment(), null);
- importService.run(fs, getImportRequest(),
+ importService.run(fs, getImportRequest(streamSize),
getUserNameFromEnvironment(),
InetAddress.getLocalHost().getHostName(),
InetAddress.getLocalHost().getHostAddress());
} catch (Exception ex) {
- LOG.error("Error loading zip for migration", ex);
+ LOG.error("Migration Import: Error loading zip for migration!", ex);
throw new AtlasBaseException(ex);
} finally {
LOG.info("Migration Import: {}: Done!", fileToImport);
@@ -83,8 +119,16 @@ public class ZipFileMigrationImporter implements Runnable {
return System.getProperty(ENV_USER_NAME);
}
- private AtlasImportRequest getImportRequest() throws AtlasException {
- return new AtlasImportRequest();
+ private AtlasImportRequest getImportRequest(int streamSize) throws AtlasException {
+ AtlasImportRequest request = new AtlasImportRequest();
+
+ request.setSizeOption(streamSize);
+ request.setOption(AtlasImportRequest.OPTION_KEY_MIGRATION, "true");
+ request.setOption(AtlasImportRequest.OPTION_KEY_NUM_WORKERS, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_NUMER_OF_WORKERS, DEFAULT_NUMBER_OF_WORKDERS));
+ request.setOption(AtlasImportRequest.OPTION_KEY_BATCH_SIZE, getPropertyValue(APPLICATION_PROPERTY_MIGRATION_BATCH_SIZE, DEFAULT_BATCH_SIZE));
+ request.setOption(AtlasImportRequest.START_POSITION_KEY, Integer.toString(MIGRATION_IMPORT_START_POSITION.getInt()));
+
+ return request;
}
private String getPropertyValue(String property, String defaultValue) throws AtlasException {
diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
index 2b58119..bee6378 100644
--- a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
+++ b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
@@ -81,9 +81,9 @@ public class UniqueAttributePatch extends AtlasPatchHandler {
AtlasGraph graph = getGraph();
for (AtlasEntityType entityType : typeRegistry.getAllEntityTypes()) {
- LOG.info("finding entities of type {}", entityType.getTypeName());
-
+ LOG.info("finding entities of type: {}", entityType.getTypeName());
Iterable<Object> iterable = graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, entityType.getTypeName()).vertexIds();
+ LOG.info("found entities of type: {}", entityType.getTypeName());
int count = 0;
for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); ) {
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
index 39ea3f8..805531c 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
@@ -150,6 +150,14 @@ public interface AtlasEntityStore {
EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException;
/**
+ * Create or update entities with parameters necessary for import process without commit. Caller will have to do take care of commit.
+ * @param entityStream AtlasEntityStream
+ * @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed
+ * @throws AtlasBaseException
+ */
+ EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException;
+
+ /**
* Update a single entity
* @param objectId ID of the entity
* @param updatedEntityInfo updated entity information
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index 30f5e5a..6f6ee17 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -332,6 +332,11 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
}
@Override
+ public EntityMutationResponse createOrUpdateForImportNoCommit(EntityStream entityStream) throws AtlasBaseException {
+ return createOrUpdate(entityStream, false, true, true);
+ }
+
+ @Override
@GraphTransaction
public EntityMutationResponse updateEntity(AtlasObjectId objectId, AtlasEntityWithExtInfo updatedEntityInfo, boolean isPartialUpdate) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
@@ -1210,8 +1215,10 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
ret.setGuidAssignments(context.getGuidAssignments());
- // Notify the change listeners
- entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress());
+ if (!RequestContext.get().isImportInProgress()) {
+ // Notify the change listeners
+ entityChangeNotifier.onEntitiesMutated(ret, RequestContext.get().isImportInProgress());
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("<== createOrUpdate()");
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
index fdf117a..857b709 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java
@@ -929,6 +929,10 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore {
}
private void sendNotifications(AtlasRelationship ret, OperationType relationshipUpdate) throws AtlasBaseException {
+ if (entityChangeNotifier == null) {
+ return;
+ }
+
entityChangeNotifier.notifyPropagatedEntities();
if (notificationsEnabled){
entityChangeNotifier.notifyRelationshipMutation(ret, relationshipUpdate);
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
index 54c32c5..4526002 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
@@ -18,33 +18,30 @@
package org.apache.atlas.repository.store.graph.v2;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.atlas.AtlasConfiguration;
-import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.RequestContext;
-import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity;
-import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.BulkImporter;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.ImportStrategy;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.MigrationImport;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.RegularImport;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.type.Constants;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -55,131 +52,24 @@ public class BulkImporterImpl implements BulkImporter {
private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class);
private final AtlasEntityStore entityStore;
- private final EntityGraphRetriever entityGraphRetriever;
private AtlasTypeRegistry typeRegistry;
- private final int MAX_ATTEMPTS = 2;
- private boolean directoryBasedImportConfigured;
@Inject
public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
this.entityStore = entityStore;
- this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.typeRegistry = typeRegistry;
- this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
}
@Override
public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> bulkImport()");
- }
-
- if (entityStream == null || !entityStream.hasNext()) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
- }
-
- EntityMutationResponse ret = new EntityMutationResponse();
- ret.setGuidAssignments(new HashMap<>());
-
- Set<String> processedGuids = new HashSet<>();
- float currentPercent = 0f;
- List<String> residualList = new ArrayList<>();
-
- EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList);
-
- while (entityImportStreamWithResidualList.hasNext()) {
- AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
- AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
-
- if (entity == null) {
- continue;
- }
-
- for (int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
- try {
- AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
- EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream);
-
- if (resp.getGuidAssignments() != null) {
- ret.getGuidAssignments().putAll(resp.getGuidAssignments());
- }
-
- currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids,
- entityStream.getPosition(),
- entityImportStreamWithResidualList.getStreamSize(),
- currentPercent);
-
- entityStream.onImportComplete(entity.getGuid());
- break;
- } catch (AtlasBaseException e) {
- if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) {
- throw e;
- }
- break;
- } catch (AtlasSchemaViolationException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Entity: {}", entity.getGuid(), e);
- }
-
- if (attempt == 0) {
- updateVertexGuid(entity);
- } else {
- LOG.error("Guid update failed: {}", entityWithExtInfo.getEntity().getGuid());
- throw e;
- }
- } catch (Throwable e) {
- AtlasBaseException abe = new AtlasBaseException(e);
- if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) {
- throw abe;
- }
-
- LOG.warn("Exception: {}", entity.getGuid(), e);
- break;
- } finally {
- RequestContext.get().clearCache();
- }
- }
- }
-
- importResult.getProcessedEntities().addAll(processedGuids);
- LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
-
- return ret;
- }
-
- @GraphTransaction
- public void updateVertexGuid(AtlasEntity entity) {
- String entityGuid = entity.getGuid();
- AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
-
- AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
- String vertexGuid = null;
- try {
- vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes());
- } catch (AtlasBaseException e) {
- LOG.warn("Entity: {}: Does not exist!", objectId);
- return;
- }
-
- if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) {
- return;
- }
-
- AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
- if (v == null) {
- return;
- }
-
- addHistoricalGuid(v, vertexGuid);
- AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid);
-
- LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid());
- }
-
- private void addHistoricalGuid(AtlasVertex v, String vertexGuid) {
- String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class);
-
- AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid));
+ ImportStrategy importStrategy =
+ (importResult.getRequest().getOptions() != null &&
+ importResult.getRequest().getOptions().containsKey(AtlasImportRequest.OPTION_KEY_MIGRATION))
+ ? new MigrationImport(new AtlasGraphProvider(), this.typeRegistry)
+ : new RegularImport(this.entityStore, this.typeRegistry);
+
+ LOG.info("BulkImportImpl: {}", importStrategy.getClass().getSimpleName());
+ return importStrategy.run(entityStream, importResult);
}
@VisibleForTesting
@@ -193,38 +83,16 @@ public class BulkImporterImpl implements BulkImporter {
return json;
}
- private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) {
- if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) {
- return false;
- }
-
- lineageList.add(guid);
-
- return true;
- }
-
- private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo currentEntity,
- EntityMutationResponse resp,
- AtlasImportResult importResult,
- Set<String> processedGuids,
- int currentIndex, int streamSize, float currentPercent) {
- if (!directoryBasedImportConfigured) {
- updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
- updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
- updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
- }
-
- String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
-
- return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
- }
-
@VisibleForTesting
- static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) {
+ public static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) {
final double tolerance = 0.000001;
final int MAX_PERCENT = 100;
int maxSize = (currentIndex <= streamSize) ? streamSize : currentIndex;
+ if (maxSize <= 0) {
+ return currentPercent;
+ }
+
float percent = (float) ((currentIndex * MAX_PERCENT) / maxSize);
boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
float updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent);
@@ -236,7 +104,7 @@ public class BulkImporterImpl implements BulkImporter {
return updatedPercent;
}
- private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
+ public static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
if (list == null) {
return;
}
@@ -251,41 +119,37 @@ public class BulkImporterImpl implements BulkImporter {
}
}
- private static class EntityImportStreamWithResidualList {
- private final EntityImportStream stream;
- private final List<String> residualList;
- private boolean navigateResidualList;
- private int currentResidualListIndex;
-
+ public static void updateVertexGuid(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityGraphRetriever, AtlasEntity entity) {
+ String entityGuid = entity.getGuid();
+ AtlasObjectId objectId = entityGraphRetriever.toAtlasObjectIdWithoutGuid(entity);
- public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) {
- this.stream = stream;
- this.residualList = residualList;
- this.navigateResidualList = false;
- this.currentResidualListIndex = 0;
+ AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
+ String vertexGuid = null;
+ try {
+ vertexGuid = AtlasGraphUtilsV2.getGuidByUniqueAttributes(entityType, objectId.getUniqueAttributes());
+ } catch (AtlasBaseException e) {
+ LOG.warn("Entity: {}: Does not exist!", objectId);
+ return;
}
- public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
- if (navigateResidualList == false) {
- return stream.getNextEntityWithExtInfo();
- } else {
- stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
- return stream.getNextEntityWithExtInfo();
- }
+ if (StringUtils.isEmpty(vertexGuid) || vertexGuid.equals(entityGuid)) {
+ return;
}
- public boolean hasNext() {
- if (!navigateResidualList) {
- boolean streamHasNext = stream.hasNext();
- navigateResidualList = (streamHasNext == false);
- return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size());
- } else {
- return (currentResidualListIndex < residualList.size());
- }
+ AtlasVertex v = AtlasGraphUtilsV2.findByGuid(vertexGuid);
+ if (v == null) {
+ return;
}
- public int getStreamSize() {
- return stream.size() + residualList.size();
- }
+ addHistoricalGuid(v, vertexGuid);
+ AtlasGraphUtilsV2.setProperty(v, Constants.GUID_PROPERTY_KEY, entityGuid);
+
+ LOG.warn("GUID Updated: Entity: {}: from: {}: to: {}", objectId, vertexGuid, entity.getGuid());
+ }
+
+ public static void addHistoricalGuid(AtlasVertex v, String vertexGuid) {
+ String existingJson = AtlasGraphUtilsV2.getProperty(v, HISTORICAL_GUID_PROPERTY_KEY, String.class);
+
+ AtlasGraphUtilsV2.setProperty(v, HISTORICAL_GUID_PROPERTY_KEY, getJsonArray(existingJson, vertexGuid));
}
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 2f3aad0..e76b341 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -361,7 +361,9 @@ public class EntityGraphMapper {
updateLabels(vertex, labels);
- entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), addedLabels, removedLabels);
+ if (entityChangeNotifier != null) {
+ entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), addedLabels, removedLabels);
+ }
}
public void addLabels(AtlasVertex vertex, Set<String> labels) throws AtlasBaseException {
@@ -378,7 +380,10 @@ public class EntityGraphMapper {
if (!updatedLabels.equals(existingLabels)) {
updateLabels(vertex, updatedLabels);
updatedLabels.removeAll(existingLabels);
- entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), updatedLabels, null);
+
+ if (entityChangeNotifier != null) {
+ entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), updatedLabels, null);
+ }
}
}
}
@@ -395,7 +400,10 @@ public class EntityGraphMapper {
if (!updatedLabels.equals(existingLabels)) {
updateLabels(vertex, updatedLabels);
existingLabels.removeAll(updatedLabels);
- entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), null, existingLabels);
+
+ if (entityChangeNotifier != null) {
+ entityChangeNotifier.onLabelsUpdatedFromEntity(GraphHelper.getGuid(vertex), null, existingLabels);
+ }
}
}
}
@@ -1948,7 +1956,9 @@ public class EntityGraphMapper {
Set<AtlasVertex> vertices = addedClassifications.get(classification);
List<AtlasEntity> propagatedEntities = updateClassificationText(classification, vertices);
- entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, Collections.singletonList(classification));
+ if (entityChangeNotifier != null) {
+ entityChangeNotifier.onClassificationsAddedToEntities(propagatedEntities, Collections.singletonList(classification));
+ }
}
RequestContext.get().endMetricRecord(metric);
@@ -2056,7 +2066,10 @@ public class EntityGraphMapper {
AtlasEntity entity = updateClassificationText(entry.getKey());
List<AtlasClassification> deletedClassificationNames = entry.getValue();
- entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames);
+
+ if (entityChangeNotifier != null) {
+ entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassificationNames);
+ }
}
}
@@ -2283,17 +2296,19 @@ public class EntityGraphMapper {
notificationVertices.addAll(entitiesToPropagateTo);
}
- for (AtlasVertex vertex : notificationVertices) {
- String entityGuid = GraphHelper.getGuid(vertex);
- AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
+ if (entityChangeNotifier != null) {
+ for (AtlasVertex vertex : notificationVertices) {
+ String entityGuid = GraphHelper.getGuid(vertex);
+ AtlasEntity entity = instanceConverter.getAndCacheEntity(entityGuid, ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
- if (isActive(entity)) {
- vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
- entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassifications);
+ if (isActive(entity)) {
+ vertex.setProperty(CLASSIFICATION_TEXT_KEY, fullTextMapperV2.getClassificationTextForEntity(entity));
+ entityChangeNotifier.onClassificationUpdatedToEntity(entity, updatedClassifications);
+ }
}
}
- if (MapUtils.isNotEmpty(removedPropagations)) {
+ if (entityChangeNotifier != null && MapUtils.isNotEmpty(removedPropagations)) {
for (AtlasClassification classification : removedPropagations.keySet()) {
List<AtlasVertex> propagatedVertices = removedPropagations.get(classification);
List<AtlasEntity> propagatedEntities = updateClassificationText(classification, propagatedVertices);
@@ -2526,7 +2541,7 @@ public class EntityGraphMapper {
private List<AtlasEntity> updateClassificationText(AtlasClassification classification, Collection<AtlasVertex> propagatedVertices) throws AtlasBaseException {
List<AtlasEntity> propagatedEntities = new ArrayList<>();
- if(CollectionUtils.isNotEmpty(propagatedVertices)) {
+ if (fullTextMapperV2 != null && CollectionUtils.isNotEmpty(propagatedVertices)) {
for(AtlasVertex vertex : propagatedVertices) {
AtlasEntity entity = instanceConverter.getAndCacheEntity(GraphHelper.getGuid(vertex), ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES);
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java
similarity index 58%
copy from repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
copy to repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java
index 351b475..6b70eab 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipExportFileNames.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/ImportStrategy.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,20 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.atlas.repository.impexp;
-public enum ZipExportFileNames {
- ATLAS_EXPORT_INFO_NAME("atlas-export-info"),
- ATLAS_EXPORT_ORDER_NAME("atlas-export-order"),
- ATLAS_TYPESDEF_NAME("atlas-typesdef");
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
- public final String name;
- ZipExportFileNames(String name) {
- this.name = name;
- }
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
- @Override
- public String toString() {
- return this.name;
- }
+public abstract class ImportStrategy {
+ public abstract EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException;
}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
new file mode 100644
index 0000000..8c66656
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/MigrationImport.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.converters.AtlasFormatConverters;
+import org.apache.atlas.repository.converters.AtlasInstanceConverter;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
+import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStoreV2;
+import org.apache.atlas.repository.store.graph.v2.AtlasRelationshipStoreV2;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityConsumerBuilder;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.EntityCreationManager;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MigrationImport extends ImportStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(MigrationImport.class);
+
+ private final AtlasTypeRegistry typeRegistry;
+ private AtlasGraph atlasGraph;
+ private EntityGraphRetriever entityGraphRetriever;
+ private EntityGraphMapper entityGraphMapper;
+ private AtlasEntityStore entityStore;
+
+ public MigrationImport(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) {
+ this.typeRegistry = typeRegistry;
+ setupEntityStore(atlasGraphProvider, typeRegistry);
+ LOG.info("MigrationImport: Using bulkLoading...");
+ }
+
+ public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
+ if (entityStream == null || !entityStream.hasNext()) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
+ }
+
+ if (importResult.getRequest() == null) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "importResult should contain request");
+ }
+
+ int index = 0;
+ int streamSize = entityStream.size();
+ EntityMutationResponse ret = new EntityMutationResponse();
+ EntityCreationManager creationManager = createEntityCreationManager(atlasGraph, importResult, streamSize);
+
+ try {
+ LOG.info("Migration Import: Size: {}: Starting...", streamSize);
+ index = creationManager.read(entityStream);
+ creationManager.drain();
+ creationManager.extractResults();
+ } catch (Exception ex) {
+ LOG.error("Migration Import: Error: Current position: {}", index, ex);
+ } finally {
+ shutdownEntityCreationManager(creationManager);
+ }
+
+ LOG.info("Migration Import: Size: {}: Done!", streamSize);
+ return ret;
+ }
+
+ private EntityCreationManager createEntityCreationManager(AtlasGraph threadedAtlasGraph, AtlasImportResult importResult, int streamSize) {
+ int batchSize = importResult.getRequest().getOptionKeyBatchSize();
+ int numWorkers = getNumWorkers(importResult.getRequest().getOptionKeyNumWorkers());
+
+ EntityConsumerBuilder consumerBuilder =
+ new EntityConsumerBuilder(threadedAtlasGraph, entityStore, entityGraphRetriever, typeRegistry, batchSize);
+
+ return new EntityCreationManager(consumerBuilder, batchSize, numWorkers, importResult, streamSize);
+ }
+
+ private static int getNumWorkers(int numWorkersFromOptions) {
+ int ret = (numWorkersFromOptions > 0) ? numWorkersFromOptions : 1;
+ LOG.info("Migration Import: Setting numWorkers: {}", ret);
+ return ret;
+ }
+
+ private void setupEntityStore(AtlasGraphProvider atlasGraphProvider, AtlasTypeRegistry typeRegistry) {
+ this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
+ this.atlasGraph = atlasGraphProvider.getBulkLoading();
+ DeleteHandlerDelegate deleteDelegate = new DeleteHandlerDelegate(typeRegistry);
+
+ AtlasRelationshipStore relationshipStore = new AtlasRelationshipStoreV2(typeRegistry, deleteDelegate, null);
+ AtlasFormatConverters formatConverters = new AtlasFormatConverters(typeRegistry);
+ AtlasInstanceConverter instanceConverter = new AtlasInstanceConverter(typeRegistry, formatConverters);
+ this.entityGraphMapper = new EntityGraphMapper(deleteDelegate, typeRegistry, atlasGraph, relationshipStore, null, instanceConverter, null);
+ this.entityStore = new AtlasEntityStoreV2(deleteDelegate, typeRegistry, null, entityGraphMapper);
+ }
+
+ private void shutdownEntityCreationManager(EntityCreationManager creationManager) {
+ try {
+ creationManager.shutdown();
+ } catch (InterruptedException e) {
+ LOG.error("Migration Import: Shutdown: Interrupted!", e);
+ }
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java
similarity index 80%
copy from repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
copy to repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java
index 54c32c5..4cc8ed4 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/BulkImporterImpl.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/RegularImport.java
@@ -6,16 +6,18 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.atlas.repository.store.graph.v2;
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport;
+
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasConfiguration;
@@ -33,15 +35,17 @@ import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
-import org.apache.atlas.repository.store.graph.BulkImporter;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-import javax.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -49,27 +53,25 @@ import java.util.List;
import java.util.Set;
import static org.apache.atlas.repository.Constants.HISTORICAL_GUID_PROPERTY_KEY;
+import static org.apache.atlas.repository.store.graph.v2.BulkImporterImpl.updateImportProgress;
-@Component
-public class BulkImporterImpl implements BulkImporter {
- private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV2.class);
-
+public class RegularImport extends ImportStrategy {
+ private static final Logger LOG = LoggerFactory.getLogger(RegularImport.class);
+ private static final int MAX_ATTEMPTS = 3;
private final AtlasEntityStore entityStore;
+ private final AtlasTypeRegistry typeRegistry;
private final EntityGraphRetriever entityGraphRetriever;
- private AtlasTypeRegistry typeRegistry;
- private final int MAX_ATTEMPTS = 2;
private boolean directoryBasedImportConfigured;
- @Inject
- public BulkImporterImpl(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
+ public RegularImport(AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
this.entityStore = entityStore;
- this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.typeRegistry = typeRegistry;
+ this.entityGraphRetriever = new EntityGraphRetriever(typeRegistry);
this.directoryBasedImportConfigured = StringUtils.isNotEmpty(AtlasConfiguration.IMPORT_TEMP_DIRECTORY.getString());
}
@Override
- public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
+ public EntityMutationResponse run(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> bulkImport()");
}
@@ -81,7 +83,7 @@ public class BulkImporterImpl implements BulkImporter {
EntityMutationResponse ret = new EntityMutationResponse();
ret.setGuidAssignments(new HashMap<>());
- Set<String> processedGuids = new HashSet<>();
+ Set<String> processedGuids = new HashSet<>();
float currentPercent = 0f;
List<String> residualList = new ArrayList<>();
@@ -209,9 +211,9 @@ public class BulkImporterImpl implements BulkImporter {
Set<String> processedGuids,
int currentIndex, int streamSize, float currentPercent) {
if (!directoryBasedImportConfigured) {
- updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
- updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
- updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
+ BulkImporterImpl.updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
+ BulkImporterImpl.updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
+ BulkImporterImpl.updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
}
String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
@@ -219,38 +221,6 @@ public class BulkImporterImpl implements BulkImporter {
return updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
}
- @VisibleForTesting
- static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent, String additionalInfo) {
- final double tolerance = 0.000001;
- final int MAX_PERCENT = 100;
-
- int maxSize = (currentIndex <= streamSize) ? streamSize : currentIndex;
- float percent = (float) ((currentIndex * MAX_PERCENT) / maxSize);
- boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
- float updatedPercent = (MAX_PERCENT < maxSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent);
-
- if (updateLog) {
- log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), maxSize, additionalInfo);
- }
-
- return updatedPercent;
- }
-
- private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
- if (list == null) {
- return;
- }
-
- for (AtlasEntityHeader h : list) {
- if (processedGuids.contains(h.getGuid())) {
- continue;
- }
-
- processedGuids.add(h.getGuid());
- importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName()));
- }
- }
-
private static class EntityImportStreamWithResidualList {
private final EntityImportStream stream;
private final List<String> residualList;
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
new file mode 100644
index 0000000..bb74205
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumer.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
+
+import org.apache.atlas.GraphTransactionInterceptor;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.pc.WorkItemConsumer;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v2.AtlasEntityStreamForImport;
+import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class EntityConsumer extends WorkItemConsumer<AtlasEntity.AtlasEntityWithExtInfo> {
+ private static final Logger LOG = LoggerFactory.getLogger(EntityConsumer.class);
+ private static final int MAX_COMMIT_RETRY_COUNT = 3;
+
+ private final int batchSize;
+ private AtomicLong counter = new AtomicLong(1);
+ private AtomicLong currentBatch = new AtomicLong(1);
+
+ private final AtlasGraph atlasGraph;
+ private final AtlasEntityStore entityStoreV2;
+ private final AtlasTypeRegistry typeRegistry;
+ private final EntityGraphRetriever entityGraphRetriever;
+
+ private List<AtlasEntity.AtlasEntityWithExtInfo> entityBuffer = new ArrayList<>();
+ private List<EntityMutationResponse> localResults = new ArrayList<>();
+
+ public EntityConsumer(AtlasGraph atlasGraph, AtlasEntityStore entityStore,
+ EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry,
+ BlockingQueue queue, int batchSize) {
+ super(queue);
+
+ this.atlasGraph = atlasGraph;
+ this.entityStoreV2 = entityStore;
+ this.entityGraphRetriever = entityGraphRetriever;
+ this.typeRegistry = typeRegistry;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ protected void processItem(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+ int delta = (MapUtils.isEmpty(entityWithExtInfo.getReferredEntities())
+ ? 1
+ : entityWithExtInfo.getReferredEntities().size()) + 1;
+
+ long currentCount = counter.addAndGet(delta);
+ currentBatch.addAndGet(delta);
+ entityBuffer.add(entityWithExtInfo);
+
+ try {
+ processEntity(entityWithExtInfo, currentCount);
+ attemptCommit();
+ } catch (Exception e) {
+ LOG.info("Data loss: Please re-submit!", e);
+ }
+ }
+
+ private void processEntity(AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo, long currentCount) {
+ try {
+ RequestContext.get().setImportInProgress(true);
+ AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, null);
+
+ LOG.debug("Processing: {}", currentCount);
+ EntityMutationResponse result = entityStoreV2.createOrUpdateForImportNoCommit(oneEntityStream);
+ localResults.add(result);
+ } catch (AtlasBaseException e) {
+ addResult(entityWithExtInfo.getEntity().getGuid());
+ LOG.warn("Exception: {}", entityWithExtInfo.getEntity().getGuid(), e);
+ } catch (AtlasSchemaViolationException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Entity: {}", entityWithExtInfo.getEntity().getGuid(), e);
+ }
+
+ BulkImporterImpl.updateVertexGuid(typeRegistry, entityGraphRetriever, entityWithExtInfo.getEntity());
+ }
+ }
+
+ private void attemptCommit() {
+ if (currentBatch.get() < batchSize) {
+ return;
+ }
+
+ doCommit();
+ }
+
+ @Override
+ protected void doCommit() {
+ for (int retryCount = 1; retryCount <= MAX_COMMIT_RETRY_COUNT; retryCount++) {
+ if (commitWithRetry(retryCount)) {
+ return;
+ }
+ }
+
+ LOG.error("Retries exceeded! Potential data loss! Please correct data and re-attempt. Buffer: {}: Counter: {}", entityBuffer.size(), counter.get());
+ clear();
+ }
+
+ @Override
+ protected void commitDirty() {
+ super.commitDirty();
+ LOG.info("Total: Commit: {}", counter.get());
+ counter.set(0);
+ }
+
+ private boolean commitWithRetry(int retryCount) {
+ try {
+ atlasGraph.commit();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Commit: Done!: Buffer: {}: Batch: {}: Counter: {}", entityBuffer.size(), currentBatch.get(), counter.get());
+ }
+
+ dispatchResults();
+ return true;
+ } catch (Exception ex) {
+ rollbackPauseRetry(retryCount, ex);
+ return false;
+ }
+ }
+
+ private void rollbackPauseRetry(int retryCount, Exception ex) {
+ atlasGraph.rollback();
+ clearCache();
+
+ LOG.error("Rollback: Done! Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount);
+ pause(retryCount);
+ if (ex.getClass().getName().endsWith("JanusGraphException") && retryCount >= MAX_COMMIT_RETRY_COUNT) {
+ LOG.warn("Commit error! Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount, ex);
+ } else {
+ LOG.info("Will pause and retry: Buffer: {}: Counter: {}: Retry count: {}", entityBuffer.size(), counter.get(), retryCount);
+ }
+ retryProcessEntity(retryCount);
+ }
+
+ private void retryProcessEntity(int retryCount) {
+ LOG.info("Replaying: Starting!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount);
+ for (AtlasEntity.AtlasEntityWithExtInfo e : entityBuffer) {
+ processEntity(e, counter.get());
+ }
+ LOG.info("Replaying: Done!: Buffer: {}: Retry count: {}", entityBuffer.size(), retryCount);
+ }
+
+ private void dispatchResults() {
+ localResults.stream().forEach(x -> {
+ addResultsFromResponse(x.getCreatedEntities());
+ addResultsFromResponse(x.getUpdatedEntities());
+ addResultsFromResponse(x.getDeletedEntities());
+ });
+
+ clear();
+ }
+
+ private void pause(int retryCount) {
+ try {
+ Thread.sleep(1000 * retryCount);
+ } catch (InterruptedException e) {
+ LOG.error("pause: Interrupted!", e);
+ }
+ }
+
+ private void addResultsFromResponse(List<AtlasEntityHeader> entities) {
+ if (CollectionUtils.isEmpty(entities)) {
+ return;
+ }
+
+ for (AtlasEntityHeader eh : entities) {
+ addResult(eh.getGuid());
+ }
+ }
+
+ private void clear() {
+ localResults.clear();
+ entityBuffer.clear();
+ clearCache();
+ currentBatch.set(0);
+ }
+
+ private void clearCache() {
+ GraphTransactionInterceptor.clearCache();
+ RequestContext.get().clearCache();
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
new file mode 100644
index 0000000..69d33b2
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityConsumerBuilder.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.pc.WorkItemBuilder;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
+import org.apache.atlas.type.AtlasTypeRegistry;
+
+import java.util.concurrent.BlockingQueue;
+
+public class EntityConsumerBuilder implements WorkItemBuilder<EntityConsumer, AtlasEntity.AtlasEntityWithExtInfo> {
+ private AtlasGraph atlasGraph;
+ private AtlasEntityStore entityStore;
+ private final EntityGraphRetriever entityGraphRetriever;
+ private final AtlasTypeRegistry typeRegistry;
+ private int batchSize;
+
+ public EntityConsumerBuilder(AtlasGraph atlasGraph, AtlasEntityStore entityStore,
+ EntityGraphRetriever entityGraphRetriever, AtlasTypeRegistry typeRegistry, int batchSize) {
+ this.atlasGraph = atlasGraph;
+ this.entityStore = entityStore;
+ this.entityGraphRetriever = entityGraphRetriever;
+ this.typeRegistry = typeRegistry;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public EntityConsumer build(BlockingQueue<AtlasEntity.AtlasEntityWithExtInfo> queue) {
+ return new EntityConsumer(atlasGraph, entityStore, entityGraphRetriever, typeRegistry, queue, this.batchSize);
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
new file mode 100644
index 0000000..0051941
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/EntityCreationManager.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
+
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.pc.WorkItemBuilder;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
+import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EntityCreationManager<AtlasEntityWithExtInfo> extends WorkItemManager {
+ private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class);
+ private static final String WORKER_PREFIX = "migration-import";
+
+ private final StatusReporter<String, String> statusReporter;
+ private final AtlasImportResult importResult;
+ private final int streamSize;
+ private final long STATUS_REPORT_TIMEOUT_DURATION = 5 * 60 * 1000; // 5 min
+ private String currentTypeName;
+ private float currentPercent;
+
+ public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult, int streamSize) {
+ super(builder, WORKER_PREFIX, batchSize, numWorkers, true);
+ this.importResult = importResult;
+ this.streamSize = streamSize;
+
+ this.statusReporter = new StatusReporter<>(STATUS_REPORT_TIMEOUT_DURATION);
+ }
+
+ public int read(EntityImportStream entityStream) {
+ int currentIndex = 0;
+ AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
+ while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) {
+ AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
+ if (entity == null) {
+ continue;
+ }
+
+ try {
+ produce(currentIndex++, entity.getTypeName(), entityWithExtInfo);
+ } catch (Throwable e) {
+ LOG.warn("Exception: {}", entity.getGuid(), e);
+ break;
+ }
+ }
+ return currentIndex;
+ }
+
+ private void produce(int currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
+ String previousTypeName = getCurrentTypeName();
+
+ if (StringUtils.isNotEmpty(typeName)
+ && StringUtils.isNotEmpty(previousTypeName)
+ && !StringUtils.equals(previousTypeName, typeName)) {
+ LOG.info("Waiting: '{}' to complete...", previousTypeName);
+ super.drain();
+ LOG.info("Switching entity type processing: From: '{}' To: '{}'...", previousTypeName, typeName);
+ }
+
+ setCurrentTypeName(typeName);
+ statusReporter.produced(entityWithExtInfo.getEntity().getGuid(), String.format("%s:%s", entityWithExtInfo.getEntity().getTypeName(), currentIndex));
+ super.checkProduce(entityWithExtInfo);
+ extractResults();
+ }
+
+ public void extractResults() {
+ Object result;
+ while (((result = getResults().poll())) != null) {
+ statusReporter.processed((String) result);
+ }
+
+ logStatus();
+ }
+
+ private void logStatus() {
+ String ack = statusReporter.ack();
+ if (StringUtils.isEmpty(ack)) {
+ return;
+ }
+
+ String[] split = ack.split(":");
+ if (split.length == 0 || split.length < 2) {
+ return;
+ }
+
+ importResult.incrementMeticsCounter(split[0]);
+ this.currentPercent = updateImportMetrics(split[0], Integer.parseInt(split[1]), getStreamSize(), getCurrentPercent());
+ }
+
+ private static float updateImportMetrics(String typeNameGuid, int currentIndex, int streamSize, float currentPercent) {
+ String lastEntityImported = String.format("entity:last-imported:%s:(%s)", typeNameGuid, currentIndex);
+ return BulkImporterImpl.updateImportProgress(LOG, currentIndex, streamSize, currentPercent, lastEntityImported);
+ }
+
+ private String getCurrentTypeName() {
+ return this.currentTypeName;
+ }
+
+ private void setCurrentTypeName(String typeName) {
+ this.currentTypeName = typeName;
+ }
+
+ private float getCurrentPercent() {
+ return this.currentPercent;
+ }
+
+ private int getStreamSize() {
+ return this.streamSize;
+ }
+}
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/StatusReporter.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/StatusReporter.java
new file mode 100644
index 0000000..1cd9860
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/bulkimport/pc/StatusReporter.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;
+
+import org.apache.atlas.v1.typesystem.types.utils.TypesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class StatusReporter<T, U> {
+ private static final Logger LOG = LoggerFactory.getLogger(StatusReporter.class);
+
+ private Map<T,U> producedItems = new LinkedHashMap<>();
+ private Set<T> processedSet = new HashSet<>();
+ private TypesUtil.Pair<T, Long> watchedItem;
+ private final long timeOut;
+
+ public StatusReporter(long timeOut) {
+ this.timeOut = timeOut;
+ }
+
+ public void produced(T item, U index) {
+ this.producedItems.put(item, index);
+ }
+
+ public void processed(T item) {
+ this.processedSet.add(item);
+ }
+
+ public void processed(T[] index) {
+ this.processedSet.addAll(Arrays.asList(index));
+ }
+
+ public U ack() {
+ U ack = null;
+ U ret;
+ Map.Entry<T, U> firstElement;
+ do {
+ firstElement = getFirstElement(this.producedItems);
+ ret = completionIndex(firstElement);
+ if (ret != null) {
+ ack = ret;
+ }
+ } while(ret != null);
+
+ return addToWatchIfNeeded(ack, firstElement);
+ }
+
+ private U addToWatchIfNeeded(U ack, Map.Entry<T, U> firstElement) {
+ if (ack == null && firstElement != null) {
+ ack = addToWatch(firstElement.getKey());
+ } else {
+ resetWatchItem();
+ }
+ return ack;
+ }
+
+ private void resetWatchItem() {
+ this.watchedItem = null;
+ }
+
+ private U addToWatch(T key) {
+ createNewWatchItem(key);
+ if (!hasTimedOut(this.watchedItem)) {
+ return null;
+ }
+
+ T producedItemKey = this.watchedItem.left;
+ resetWatchItem();
+ LOG.warn("Item: {}: Was produced but not successfully processed!", producedItemKey);
+ return this.producedItems.get(producedItemKey);
+
+ }
+
+ private void createNewWatchItem(T key) {
+ if (this.watchedItem != null) {
+ return;
+ }
+
+ this.watchedItem = new TypesUtil.Pair<T, Long>(key, System.currentTimeMillis());
+ }
+
+ private boolean hasTimedOut(TypesUtil.Pair<T, Long> watchedItem) {
+ if (watchedItem == null) {
+ return false;
+ }
+
+ return (System.currentTimeMillis() - watchedItem.right) >= timeOut;
+ }
+
+ private Map.Entry<T, U> getFirstElement(Map<T, U> map) {
+ if (map.isEmpty()) {
+ return null;
+ }
+
+ return map.entrySet().iterator().next();
+ }
+
+ private U completionIndex(Map.Entry<T, U> lookFor) {
+ U ack = null;
+ if (lookFor == null || !processedSet.contains(lookFor.getKey())) {
+ return ack;
+ }
+
+ ack = lookFor.getValue();
+ producedItems.remove(lookFor.getKey());
+ processedSet.remove(lookFor);
+ return ack;
+ }
+}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index c14850f..759be64 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -136,6 +136,11 @@ public class ImportServiceTest extends ExportImportTestBase {
return getZipSource("dup_col_deleted.zip");
}
+ @DataProvider(name = "zipDirect1")
+ public static Object[][] getZipDirect(ITestContext context) throws IOException, AtlasBaseException {
+ return getZipSource("dup_col_deleted.zip");
+ }
+
@Test(dataProvider = "sales")
public void importDB1(InputStream inputStream) throws AtlasBaseException, IOException {
loadBaseModel();
@@ -530,6 +535,17 @@ public class ImportServiceTest extends ExportImportTestBase {
}
}
+ @Test(dataProvider = "zipDirect1")
+ public void zipSourceDirect(InputStream inputStream) throws IOException, AtlasBaseException {
+ loadBaseModel();
+ loadFsModel();
+ loadHiveModel();
+
+ runImportWithNoParameters(importService, inputStream);
+
+ }
+
+
private AtlasImportRequest getImportRequest(String replicatedFrom){
AtlasImportRequest importRequest = getDefaultImportRequest();
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/MigrationImportTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/MigrationImportTest.java
new file mode 100644
index 0000000..2a22d88
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/MigrationImportTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+
+import com.google.inject.Inject;
+import org.apache.atlas.TestModules;
+import org.apache.atlas.discovery.EntityDiscoveryService;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.v2.bulkimport.MigrationImport;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.testng.Assert.assertNotNull;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class MigrationImportTest extends ExportImportTestBase {
+
+ private final ImportService importService;
+
+ @Inject
+ AtlasTypeRegistry typeRegistry;
+
+ @Inject
+ private AtlasTypeDefStore typeDefStore;
+
+ @Inject
+ private EntityDiscoveryService discoveryService;
+
+ @Inject
+ AtlasEntityStore entityStore;
+
+ @Inject
+ AtlasGraph atlasGraph;
+
+ @Inject
+ public MigrationImportTest(ImportService importService) {
+ this.importService = importService;
+ }
+
+ @Test
+ public void simpleImport() throws IOException, AtlasBaseException {
+ InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-2.zip");
+
+ AtlasImportRequest importRequest = new AtlasImportRequest();
+ importRequest.setOption("migration", "true");
+
+ AtlasImportResult result = importService.run(inputStream, importRequest, null, null, null);
+ assertNotNull(result);
+ }
+}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/StatusReporterTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/StatusReporterTest.java
new file mode 100644
index 0000000..5e15023
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/StatusReporterTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.repository.store.graph.v2.bulkimport.pc.StatusReporter;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+public class StatusReporterTest {
+ @Test
+ public void noneProducedNoneReported() {
+ StatusReporter<Integer, Integer> statusReporter = new StatusReporter<>(100);
+ assertNull(statusReporter.ack());
+ }
+
+ @Test
+ public void producedButNotAcknowledged() {
+ StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems();
+ assertNull(statusReporter.ack());
+ }
+
+ @Test
+ public void producedAcknowledged() {
+ StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems();
+ statusReporter.processed(1);
+
+ assertEquals(java.util.Optional.of(100).get(), statusReporter.ack());
+ }
+
+ @Test
+ public void producedAcknowledgeMaxAvailableInSequence() {
+ StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems();
+
+ statusReporter.processed(new Integer[]{1, 3, 5});
+
+ assertEquals(java.util.Optional.of(100).get(), statusReporter.ack());
+ }
+
+ @Test
+ public void producedAcknowledgeMaxAvailableInSequence2() {
+ StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems();
+ statusReporter.processed(new Integer[]{1, 2, 3, 6, 5});
+
+ assertEquals(java.util.Optional.of(300).get(), statusReporter.ack());
+ }
+
+ @Test
+ public void producedSetDisjointWithAckSet() {
+ StatusReporter<Integer, Integer> statusReporter = new StatusReporter(100);
+ statusReporter.produced(11, 1000);
+ statusReporter.produced(12, 2000);
+ statusReporter.produced(13, 3000);
+
+ statusReporter.processed(new Integer[]{1, 11, 12, 13});
+
+ assertEquals(java.util.Optional.of(3000).get(), statusReporter.ack());
+ }
+
+ @Test
+ public void missingAck() throws InterruptedException {
+ StatusReporter<Integer, Integer> statusReporter = createStatusReportWithItems(2, 3, 4);
+
+ assertNull(statusReporter.ack());
+ Thread.sleep(1002);
+ assertEquals(java.util.Optional.of(100).get(), statusReporter.ack());
+ }
+
+ private StatusReporter<Integer, Integer> createStatusReportWithItems(Integer... processed) {
+ StatusReporter<Integer, Integer> statusReporter = new StatusReporter(1000);
+ statusReporter.produced(1, 100);
+ statusReporter.produced(2, 200);
+ statusReporter.produced(3, 300);
+ statusReporter.produced(4, 400);
+ statusReporter.produced(5, 500);
+ statusReporter.produced(6, 600);
+
+ statusReporter.processed(processed);
+
+ return statusReporter;
+ }
+}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java
new file mode 100644
index 0000000..d191d8c
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipDirectTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+public class ZipDirectTest {
+ @Test(expectedExceptions = AtlasBaseException.class)
+ public void loadFileEmpty() throws IOException, AtlasBaseException {
+ InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-1.zip");
+ new ZipSourceDirect(inputStream, 1);
+ }
+
+ @Test
+ public void loadFile() throws IOException, AtlasBaseException {
+ final int EXPECTED_ENTITY_COUNT = 3434;
+
+ InputStream inputStream = ZipFileResourceTestUtils.getFileInputStream("zip-direct-2.zip");
+ ZipSourceDirect zipSourceDirect = new ZipSourceDirect(inputStream, EXPECTED_ENTITY_COUNT);
+
+ assertNotNull(zipSourceDirect);
+ assertNotNull(zipSourceDirect.getTypesDef());
+ assertTrue(zipSourceDirect.getTypesDef().getEntityDefs().size() > 0);
+ assertNotNull(zipSourceDirect.getExportResult());
+
+ int count = 0;
+ AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
+ while((entityWithExtInfo = zipSourceDirect.getNextEntityWithExtInfo()) != null) {
+ assertNotNull(entityWithExtInfo);
+ count++;
+ }
+
+ assertEquals(count, EXPECTED_ENTITY_COUNT);
+ }
+}
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
index 0ffc3d5..27a6668 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipFileResourceTestUtils.java
@@ -317,7 +317,9 @@ public class ZipFileResourceTestUtils {
}
public static AtlasImportRequest getDefaultImportRequest() {
- return new AtlasImportRequest();
+ AtlasImportRequest atlasImportRequest = new AtlasImportRequest();
+ atlasImportRequest.setOption("migration", "true");
+ return atlasImportRequest;
}
@@ -336,7 +338,8 @@ public class ZipFileResourceTestUtils {
final String hostName = "localhost";
final String userName = "admin";
- AtlasImportResult result = importService.run(inputStream, userName, hostName, requestingIP);
+ AtlasImportRequest request = getDefaultImportRequest();
+ AtlasImportResult result = runImportWithParameters(importService, request, inputStream);
assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS);
return result;
}
diff --git a/repository/src/test/resources/zip-direct-1.zip b/repository/src/test/resources/zip-direct-1.zip
new file mode 100644
index 0000000..15cb0ec
Binary files /dev/null and b/repository/src/test/resources/zip-direct-1.zip differ
diff --git a/repository/src/test/resources/zip-direct-2.zip b/repository/src/test/resources/zip-direct-2.zip
new file mode 100644
index 0000000..e7b8617
Binary files /dev/null and b/repository/src/test/resources/zip-direct-2.zip differ