You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2017/09/15 15:40:47 UTC
[4/4] atlas git commit: ATLAS-2129: import fix to handle shutdown
while in the middle of import
ATLAS-2129: import fix to handle shutdown while in the middle of import
Signed-off-by: Madhan Neethiraj <ma...@apache.org>
(cherry picked from commit 81e5444f4ce9635465632b90ac9d97eec3a16a6b)
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/c623835f
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/c623835f
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/c623835f
Branch: refs/heads/master
Commit: c623835f5376bdb04797ba9cb886da70fbef0f8c
Parents: f1c4646
Author: Ashutosh Mestry <am...@apache.org>
Authored: Fri Sep 15 07:43:55 2017 -0700
Committer: Madhan Neethiraj <ma...@apache.org>
Committed: Fri Sep 15 08:17:40 2017 -0700
----------------------------------------------------------------------
.../atlas/repository/impexp/ImportService.java | 10 +-
.../store/graph/AtlasEntityStore.java | 6 +-
.../repository/store/graph/BulkImporter.java | 34 ++++
.../store/graph/v1/AtlasEntityStoreV1.java | 156 +-------------
.../store/graph/v1/BulkImporterImpl.java | 202 +++++++++++++++++++
.../test/java/org/apache/atlas/TestModules.java | 1 +
.../repository/impexp/ImportServiceTest.java | 27 ++-
7 files changed, 258 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/c623835f/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index bbe9ed0..650741e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -22,7 +22,7 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.typedef.AtlasTypesDef;
-import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.BulkImporter;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.MapUtils;
@@ -42,17 +42,17 @@ public class ImportService {
private static final Logger LOG = LoggerFactory.getLogger(ImportService.class);
private final AtlasTypeDefStore typeDefStore;
- private final AtlasEntityStore entityStore;
private final AtlasTypeRegistry typeRegistry;
+ private final BulkImporter bulkImporter;
private long startTimestamp;
private long endTimestamp;
@Inject
- public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) {
+ public ImportService(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, BulkImporter bulkImporter) {
this.typeDefStore = typeDefStore;
- this.entityStore = entityStore;
this.typeRegistry = typeRegistry;
+ this.bulkImporter = bulkImporter;
}
public AtlasImportResult run(ZipSource source, String userName,
@@ -154,7 +154,7 @@ public class ImportService {
}
private void processEntities(ZipSource importSource, AtlasImportResult result) throws AtlasBaseException {
- this.entityStore.bulkImport(importSource, result);
+ this.bulkImporter.bulkImport(importSource, result);
endTimestamp = System.currentTimeMillis();
result.incrementMeticsCounter("duration", (int) (this.endTimestamp - this.startTimestamp));
http://git-wip-us.apache.org/repos/asf/atlas/blob/c623835f/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
index a4163f2..5a0b74e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
@@ -18,12 +18,10 @@
package org.apache.atlas.repository.store.graph;
import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.EntityMutationResponse;
-import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
import org.apache.atlas.repository.store.graph.v1.EntityStream;
import org.apache.atlas.type.AtlasEntityType;
@@ -69,12 +67,12 @@ public interface AtlasEntityStore {
EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate) throws AtlasBaseException;
/**
- * Create or update entities in the stream using repeated commits of connected entities
+ * Create or update entities with parameters necessary for import process
* @param entityStream AtlasEntityStream
* @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed
* @throws AtlasBaseException
*/
- EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException;
+ EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException;
/**
* Update a single entity
http://git-wip-us.apache.org/repos/asf/atlas/blob/c623835f/repository/src/main/java/org/apache/atlas/repository/store/graph/BulkImporter.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/BulkImporter.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/BulkImporter.java
new file mode 100644
index 0000000..7139eac
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/BulkImporter.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.store.graph.v1.EntityImportStream;
+
+public interface BulkImporter {
+
+ /**
+ * Create or update entities in the stream using repeated commits of connected entities
+ * @param entityStream AtlasEntityStream
+ * @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed
+ * @throws AtlasBaseException
+ */
+ EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException;
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/c623835f/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index 3c19d4d..a5db81b 100644
--- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -23,12 +23,10 @@ import org.apache.atlas.GraphTransactionInterceptor;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasClassification;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
-import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.repository.graphdb.AtlasVertex;
@@ -52,11 +50,8 @@ import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
@@ -146,115 +141,6 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
return ret;
}
- @Override
- public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("==> bulkImport()");
- }
-
- if (entityStream == null || !entityStream.hasNext()) {
- throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
- }
-
- EntityMutationResponse ret = new EntityMutationResponse();
- ret.setGuidAssignments(new HashMap<String, String>());
-
- Set<String> processedGuids = new HashSet<>();
- float currentPercent = 0f;
-
- List<String> residualList = new ArrayList<>();
- EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList);
- while (entityImportStreamWithResidualList.hasNext()) {
- AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
- AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
-
- if (entity == null || processedGuids.contains(entity.getGuid())) {
- continue;
- }
-
- AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream);
- try {
- EntityMutationResponse resp = createOrUpdateForImport(oneEntityStream);
-
- if (resp.getGuidAssignments() != null) {
- ret.getGuidAssignments().putAll(resp.getGuidAssignments());
- }
-
- currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, entityStream.getPosition(),
- entityImportStreamWithResidualList.getStreamSize(), currentPercent);
-
- entityStream.onImportComplete(entity.getGuid());
- } catch (AtlasBaseException e) {
- if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) {
- throw e;
- }
- }
- }
-
- importResult.getProcessedEntities().addAll(processedGuids);
- LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
-
- return ret;
- }
-
- private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) {
- if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) {
- return false;
- }
-
- lineageList.add(guid);
- return true;
- }
-
- private float updateImportMetrics(AtlasEntityWithExtInfo currentEntity,
- EntityMutationResponse resp,
- AtlasImportResult importResult,
- Set<String> processedGuids,
- int currentIndex, int streamSize, float currentPercent) {
- updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
- updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
- updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
-
- String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)",
- currentEntity.getEntity().getTypeName(),
- currentIndex,
- currentEntity.getEntity().getGuid());
-
- return updateImportProgress(LOG, currentIndex + 1, streamSize, currentPercent, lastEntityImported);
- }
-
- private static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent,
- String additionalInfo) {
- final double tolerance = 0.000001;
- final int MAX_PERCENT = 100;
-
- float percent = (float) ((currentIndex * MAX_PERCENT) / streamSize);
- boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
- float updatedPercent = (MAX_PERCENT < streamSize) ? percent :
- ((updateLog) ? ++currentPercent : currentPercent);
-
- if (updateLog) {
- log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), streamSize, additionalInfo);
- }
-
- return updatedPercent;
- }
-
- private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
- if (list == null) {
- return;
- }
-
- for (AtlasEntityHeader h : list) {
- if (processedGuids.contains(h.getGuid())) {
- continue;
- }
-
- processedGuids.add(h.getGuid());
- importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName()));
- }
- }
-
private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications) throws AtlasBaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("==> createOrUpdate()");
@@ -287,8 +173,9 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
return createOrUpdate(entityStream, isPartialUpdate, false);
}
+ @Override
@GraphTransaction
- private EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException {
+ public EntityMutationResponse createOrUpdateForImport(EntityStream entityStream) throws AtlasBaseException {
return createOrUpdate(entityStream, false, true);
}
@@ -763,43 +650,4 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore {
return ret;
}
-
- private static class EntityImportStreamWithResidualList {
- private final EntityImportStream stream;
- private final List<String> residualList;
- private boolean navigateResidualList;
- private int currentResidualListIndex;
-
-
- public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) {
- this.stream = stream;
- this.residualList = residualList;
- this.navigateResidualList = false;
- this.currentResidualListIndex = 0;
- }
-
- public AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
- if (navigateResidualList == false) {
- return stream.getNextEntityWithExtInfo();
- } else {
- stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
- return stream.getNextEntityWithExtInfo();
- }
- }
-
- public boolean hasNext() {
- if (!navigateResidualList) {
- boolean streamHasNext = stream.hasNext();
- navigateResidualList = (streamHasNext == false);
- return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size());
- } else {
- return (currentResidualListIndex < residualList.size());
- }
- }
-
- public int getStreamSize() {
- return stream.size() + residualList.size();
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/c623835f/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
new file mode 100644
index 0000000..e929d7f
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/BulkImporterImpl.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v1;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
+import org.apache.atlas.repository.store.graph.BulkImporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Component
+public class BulkImporterImpl implements BulkImporter {
+ private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityStoreV1.class);
+
+ private final AtlasEntityStore entityStore;
+
+ @Inject
+ public BulkImporterImpl(AtlasEntityStore entityStore) {
+ this.entityStore = entityStore;
+ }
+
+ @Override
+ public EntityMutationResponse bulkImport(EntityImportStream entityStream, AtlasImportResult importResult) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> bulkImport()");
+ }
+
+ if (entityStream == null || !entityStream.hasNext()) {
+ throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "no entities to create/update.");
+ }
+
+ EntityMutationResponse ret = new EntityMutationResponse();
+ ret.setGuidAssignments(new HashMap<String, String>());
+
+ Set<String> processedGuids = new HashSet<>();
+ float currentPercent = 0f;
+ List<String> residualList = new ArrayList<>();
+
+ EntityImportStreamWithResidualList entityImportStreamWithResidualList = new EntityImportStreamWithResidualList(entityStream, residualList);
+
+ while (entityImportStreamWithResidualList.hasNext()) {
+ AtlasEntityWithExtInfo entityWithExtInfo = entityImportStreamWithResidualList.getNextEntityWithExtInfo();
+ AtlasEntity entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
+
+ if (entity == null || processedGuids.contains(entity.getGuid())) {
+ continue;
+ }
+
+ AtlasEntityStreamForImport oneEntityStream = new AtlasEntityStreamForImport(entityWithExtInfo, entityStream);
+
+ try {
+ EntityMutationResponse resp = entityStore.createOrUpdateForImport(oneEntityStream);
+
+ if (resp.getGuidAssignments() != null) {
+ ret.getGuidAssignments().putAll(resp.getGuidAssignments());
+ }
+
+ currentPercent = updateImportMetrics(entityWithExtInfo, resp, importResult, processedGuids, entityStream.getPosition(), entityImportStreamWithResidualList.getStreamSize(), currentPercent);
+
+ entityStream.onImportComplete(entity.getGuid());
+ } catch (AtlasBaseException e) {
+ if (!updateResidualList(e, residualList, entityWithExtInfo.getEntity().getGuid())) {
+ throw e;
+ }
+ } catch (Throwable e) {
+ AtlasBaseException abe = new AtlasBaseException(e);
+
+ if (!updateResidualList(abe, residualList, entityWithExtInfo.getEntity().getGuid())) {
+ throw abe;
+ }
+ }
+ }
+
+ importResult.getProcessedEntities().addAll(processedGuids);
+ LOG.info("bulkImport(): done. Total number of entities (including referred entities) imported: {}", processedGuids.size());
+
+ return ret;
+ }
+
+
+ private boolean updateResidualList(AtlasBaseException e, List<String> lineageList, String guid) {
+ if (!e.getAtlasErrorCode().getErrorCode().equals(AtlasErrorCode.INVALID_OBJECT_ID.getErrorCode())) {
+ return false;
+ }
+
+ lineageList.add(guid);
+
+ return true;
+ }
+
+ private float updateImportMetrics(AtlasEntity.AtlasEntityWithExtInfo currentEntity,
+ EntityMutationResponse resp,
+ AtlasImportResult importResult,
+ Set<String> processedGuids,
+ int currentIndex, int streamSize, float currentPercent) {
+ updateImportMetrics("entity:%s:created", resp.getCreatedEntities(), processedGuids, importResult);
+ updateImportMetrics("entity:%s:updated", resp.getUpdatedEntities(), processedGuids, importResult);
+ updateImportMetrics("entity:%s:deleted", resp.getDeletedEntities(), processedGuids, importResult);
+
+ String lastEntityImported = String.format("entity:last-imported:%s:[%s]:(%s)", currentEntity.getEntity().getTypeName(), currentIndex, currentEntity.getEntity().getGuid());
+
+ return updateImportProgress(LOG, currentIndex + 1, streamSize, currentPercent, lastEntityImported);
+ }
+
+ private static float updateImportProgress(Logger log, int currentIndex, int streamSize, float currentPercent,
+ String additionalInfo) {
+ final double tolerance = 0.000001;
+ final int MAX_PERCENT = 100;
+
+ float percent = (float) ((currentIndex * MAX_PERCENT) / streamSize);
+ boolean updateLog = Double.compare(percent, currentPercent) > tolerance;
+ float updatedPercent = (MAX_PERCENT < streamSize) ? percent : ((updateLog) ? ++currentPercent : currentPercent);
+
+ if (updateLog) {
+ log.info("bulkImport(): progress: {}% (of {}) - {}", (int) Math.ceil(percent), streamSize, additionalInfo);
+ }
+
+ return updatedPercent;
+ }
+
+ private static void updateImportMetrics(String prefix, List<AtlasEntityHeader> list, Set<String> processedGuids, AtlasImportResult importResult) {
+ if (list == null) {
+ return;
+ }
+
+ for (AtlasEntityHeader h : list) {
+ if (processedGuids.contains(h.getGuid())) {
+ continue;
+ }
+
+ processedGuids.add(h.getGuid());
+ importResult.incrementMeticsCounter(String.format(prefix, h.getTypeName()));
+ }
+ }
+
+ private static class EntityImportStreamWithResidualList {
+ private final EntityImportStream stream;
+ private final List<String> residualList;
+ private boolean navigateResidualList;
+ private int currentResidualListIndex;
+
+
+ public EntityImportStreamWithResidualList(EntityImportStream stream, List<String> residualList) {
+ this.stream = stream;
+ this.residualList = residualList;
+ this.navigateResidualList = false;
+ this.currentResidualListIndex = 0;
+ }
+
+ public AtlasEntity.AtlasEntityWithExtInfo getNextEntityWithExtInfo() {
+ if (navigateResidualList == false) {
+ return stream.getNextEntityWithExtInfo();
+ } else {
+ stream.setPositionUsingEntityGuid(residualList.get(currentResidualListIndex++));
+ return stream.getNextEntityWithExtInfo();
+ }
+ }
+
+ public boolean hasNext() {
+ if (!navigateResidualList) {
+ boolean streamHasNext = stream.hasNext();
+ navigateResidualList = (streamHasNext == false);
+ return streamHasNext ? streamHasNext : (currentResidualListIndex < residualList.size());
+ } else {
+ return (currentResidualListIndex < residualList.size());
+ }
+ }
+
+ public int getStreamSize() {
+ return stream.size() + residualList.size();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/c623835f/repository/src/test/java/org/apache/atlas/TestModules.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/TestModules.java b/repository/src/test/java/org/apache/atlas/TestModules.java
index a442a01..e107556 100644
--- a/repository/src/test/java/org/apache/atlas/TestModules.java
+++ b/repository/src/test/java/org/apache/atlas/TestModules.java
@@ -151,6 +151,7 @@ public class TestModules {
bind(LineageService.class).to(DataSetLineageService.class).asEagerSingleton();
bind(AtlasLineageService.class).to(EntityLineageService.class).asEagerSingleton();
+ bind(BulkImporter.class).to(BulkImporterImpl.class).asEagerSingleton();
bindTypeCache();
http://git-wip-us.apache.org/repos/asf/atlas/blob/c623835f/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
index 3359850..8ec37e3 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportServiceTest.java
@@ -24,7 +24,6 @@ import org.apache.atlas.TestModules;
import org.apache.atlas.TestUtilsV2;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.impexp.AtlasImportRequest;
-import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.store.AtlasTypeDefStore;
import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasTypeRegistry;
@@ -43,11 +42,11 @@ import java.util.Map;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ImportServiceTest {
private static final Logger LOG = LoggerFactory.getLogger(ImportServiceTest.class);
+ private final ImportService importService;
@Inject
AtlasTypeRegistry typeRegistry;
@@ -56,7 +55,9 @@ public class ImportServiceTest {
private AtlasTypeDefStore typeDefStore;
@Inject
- private AtlasEntityStore entityStore;
+ public ImportServiceTest(ImportService importService) {
+ this.importService = importService;
+ }
@BeforeTest
public void setupTest() {
@@ -72,7 +73,7 @@ public class ImportServiceTest {
@Test(dataProvider = "sales")
public void importDB1(ZipSource zipSource) throws AtlasBaseException, IOException {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
- runAndVerifyQuickStart_v1_Import(new ImportService(typeDefStore, entityStore, typeRegistry), zipSource);
+ runAndVerifyQuickStart_v1_Import(importService, zipSource);
}
@DataProvider(name = "reporting")
@@ -83,7 +84,7 @@ public class ImportServiceTest {
@Test(dataProvider = "reporting")
public void importDB2(ZipSource zipSource) throws AtlasBaseException, IOException {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
- runAndVerifyQuickStart_v1_Import(new ImportService(typeDefStore, entityStore, typeRegistry), zipSource);
+ runAndVerifyQuickStart_v1_Import(importService, zipSource);
}
@DataProvider(name = "logging")
@@ -94,7 +95,7 @@ public class ImportServiceTest {
@Test(dataProvider = "logging")
public void importDB3(ZipSource zipSource) throws AtlasBaseException, IOException {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
- runAndVerifyQuickStart_v1_Import(new ImportService(typeDefStore, entityStore, typeRegistry), zipSource);
+ runAndVerifyQuickStart_v1_Import(importService, zipSource);
}
@DataProvider(name = "salesNewTypeAttrs")
@@ -105,7 +106,7 @@ public class ImportServiceTest {
@Test(dataProvider = "salesNewTypeAttrs", dependsOnMethods = "importDB1")
public void importDB4(ZipSource zipSource) throws AtlasBaseException, IOException {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
- runImportWithParameters(new ImportService(typeDefStore, entityStore, typeRegistry), getDefaultImportRequest(), zipSource);
+ runImportWithParameters(importService, getDefaultImportRequest(), zipSource);
}
@DataProvider(name = "salesNewTypeAttrs-next")
@@ -125,7 +126,7 @@ public class ImportServiceTest {
options.put("updateTypeDefinition", "false");
request.setOptions(options);
- runImportWithParameters(new ImportService(typeDefStore, entityStore, typeRegistry), request, zipSource);
+ runImportWithParameters(importService, request, zipSource);
assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 4);
}
@@ -141,7 +142,7 @@ public class ImportServiceTest {
options.put("updateTypeDefinition", "true");
request.setOptions(options);
- runImportWithParameters(new ImportService(typeDefStore, entityStore, typeRegistry), request, zipSource);
+ runImportWithParameters(importService, request, zipSource);
assertNotNull(typeDefStore.getEnumDefByName(newEnumDefName));
assertEquals(typeDefStore.getEnumDefByName(newEnumDefName).getElementDefs().size(), 8);
}
@@ -156,7 +157,7 @@ public class ImportServiceTest {
loadModelFromJson("0000-Area0/0010-base_model.json", typeDefStore, typeRegistry);
loadModelFromJson("1000-Hadoop/1030-hive_model.json", typeDefStore, typeRegistry);
- runImportWithNoParameters(getImportService(), zipSource);
+ runImportWithNoParameters(importService, zipSource);
}
@DataProvider(name = "hdfs_path1")
@@ -172,7 +173,7 @@ public class ImportServiceTest {
loadModelFromResourcesJson("tag1.json", typeDefStore, typeRegistry);
try {
- runImportWithNoParameters(getImportService(), zipSource);
+ runImportWithNoParameters(importService, zipSource);
} catch (AtlasBaseException e) {
assertEquals(e.getAtlasErrorCode(), AtlasErrorCode.INVALID_IMPORT_ATTRIBUTE_TYPE_CHANGED);
AtlasClassificationType tag1 = typeRegistry.getClassificationTypeByName("tag1");
@@ -181,8 +182,4 @@ public class ImportServiceTest {
throw e;
}
}
-
- private ImportService getImportService() {
- return new ImportService(typeDefStore, entityStore, typeRegistry);
- }
}