You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2018/09/13 22:20:36 UTC
atlas git commit: ATLAS-2864: Improved incremental export queries.
Repository: atlas
Updated Branches:
refs/heads/branch-0.8 ebf0aa22d -> 1dc0622a7
ATLAS-2864: Improved incremental export queries.
Signed-off-by: Ashutosh Mestry <am...@hortonworks.com>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/1dc0622a
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/1dc0622a
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/1dc0622a
Branch: refs/heads/branch-0.8
Commit: 1dc0622a77a57b01f67e190b287743a4244fc888
Parents: ebf0aa2
Author: Ashutosh Mestry <am...@hortonworks.com>
Authored: Wed Sep 12 21:51:09 2018 -0700
Committer: Ashutosh Mestry <am...@hortonworks.com>
Committed: Thu Sep 13 14:25:37 2018 -0700
----------------------------------------------------------------------
.../atlas/model/impexp/AtlasExportRequest.java | 33 ++--
.../atlas/repository/impexp/ExportService.java | 191 +++++++++++--------
.../impexp/IncrementalExportEntityProvider.java | 112 +++++++++++
.../atlas/repository/util/UniqueList.java | 6 +
.../IncrementalExportEntityProviderTest.java | 94 +++++++++
.../stocksDB-Entities/export-incremental.json | 1 +
6 files changed, 350 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/1dc0622a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
index 106a4a0..298645b 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
@@ -80,18 +80,6 @@ public class AtlasExportRequest implements Serializable {
this.options = options;
}
- public String getMatchTypeOptionValue() {
- String matchType = null;
-
- if (MapUtils.isNotEmpty(getOptions())) {
- if (getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) {
- matchType = getOptions().get(OPTION_ATTR_MATCH_TYPE).toString();
- }
- }
-
- return matchType;
- }
-
public String getFetchTypeOptionValue() {
if(getOptions() == null || !getOptions().containsKey(OPTION_FETCH_TYPE)) {
return FETCH_TYPE_FULL;
@@ -122,6 +110,27 @@ public class AtlasExportRequest implements Serializable {
return false;
}
+ public String getMatchTypeOptionValue() {
+ String matchType = null;
+
+ if (MapUtils.isNotEmpty(getOptions())) {
+ if (getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) {
+ matchType = getOptions().get(OPTION_ATTR_MATCH_TYPE).toString();
+ }
+ }
+
+ return matchType;
+ }
+
+ public long getChangeTokenFromOptions() {
+ if(getFetchTypeOptionValue().equalsIgnoreCase(FETCH_TYPE_INCREMENTAL) &&
+ getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER)) {
+ return Long.parseLong(getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER).toString());
+ }
+
+ return 0L;
+ }
+
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/atlas/blob/1dc0622a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 612549d..d1f4c8d 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -19,8 +19,6 @@ package org.apache.atlas.repository.impexp;
import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasErrorCode;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.exception.AtlasBaseException;
@@ -35,7 +33,9 @@ import org.apache.atlas.model.typedef.AtlasEntityDef;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
import org.apache.atlas.repository.util.UniqueList;
import org.apache.atlas.type.AtlasEntityType;
@@ -63,7 +63,7 @@ import static org.apache.atlas.model.impexp.AtlasExportRequest.*;
public class ExportService {
private static final Logger LOG = LoggerFactory.getLogger(ExportService.class);
- private static final String PROPERTY_GUID = "__guid";
+ public static final String PROPERTY_GUID = "__guid";
private static final String PROPERTY_IS_PROCESS = "isProcess";
@@ -75,6 +75,8 @@ public class ExportService {
private final AtlasGremlinQueryProvider gremlinQueryProvider;
private ExportTypeProcessor exportTypeProcessor;
private final HdfsPathEntityCreator hdfsPathEntityCreator;
+ private IncrementalExportEntityProvider incrementalExportEntityProvider;
+
@Inject
public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph atlasGraph,
AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator) {
@@ -88,12 +90,12 @@ public class ExportService {
public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
String requestingIP) throws AtlasBaseException {
- long startTime = System.currentTimeMillis();
- AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP,
+ long startTime = System.currentTimeMillis();
+ AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP,
hostName, startTime, getCurrentChangeMarker());
- ExportContext context = new ExportContext(atlasGraph, result, exportSink);
- exportTypeProcessor = new ExportTypeProcessor(typeRegistry, context);
+ ExportContext context = new ExportContext(atlasGraph, result, exportSink);
+ exportTypeProcessor = new ExportTypeProcessor(typeRegistry, context);
try {
LOG.info("==> export(user={}, from={})", userName, requestingIP);
@@ -107,9 +109,11 @@ public class ExportService {
LOG.error("Operation failed: ", ex);
} finally {
atlasGraph.releaseGremlinScriptEngine(context.scriptEngine);
- LOG.info("<== export(user={}, from={}): status {}", userName, requestingIP, context.result.getOperationStatus());
+ LOG.info("<== export(user={}, from={}): status {}: changeMarker: {}",
+ userName, requestingIP, context.result.getOperationStatus(), context.result.getChangeMarker());
context.clear();
result.clear();
+ incrementalExportEntityProvider = null;
}
return context.result;
@@ -170,7 +174,7 @@ public class ExportService {
}
}
- private AtlasExportResult.OperationStatus[] processItems(AtlasExportRequest request, ExportContext context) throws AtlasServiceException, AtlasException, AtlasBaseException {
+ private AtlasExportResult.OperationStatus[] processItems(AtlasExportRequest request, ExportContext context) {
AtlasExportResult.OperationStatus statuses[] = new AtlasExportResult.OperationStatus[request.getItemsToExport().size()];
List<AtlasObjectId> itemsToExport = request.getItemsToExport();
for (int i = 0; i < itemsToExport.size(); i++) {
@@ -204,13 +208,14 @@ public class ExportService {
}
for (String guid : entityGuids) {
- processEntity(guid, context);
+ processEntityGuid(guid, context);
+ populateEntitesForIncremental(guid, context);
}
while (!context.guidsToProcess.isEmpty()) {
while (!context.guidsToProcess.isEmpty()) {
String guid = context.guidsToProcess.remove(0);
- processEntity(guid, context);
+ processEntityGuid(guid, context);
}
if (!context.lineageToProcess.isEmpty()) {
@@ -246,55 +251,61 @@ public class ExportService {
if (StringUtils.isNotEmpty(item.getGuid())) {
ret = Collections.singletonList(item.getGuid());
} else if (StringUtils.equalsIgnoreCase(context.matchType, MATCH_TYPE_FOR_TYPE) && StringUtils.isNotEmpty(item.getTypeName())) {
- final String queryTemplate = getQueryTemplateForMatchType(context);
+ ret = getStartingEntityForMatchTypeForType(item, context);
+ } else if (StringUtils.isNotEmpty(item.getTypeName()) && MapUtils.isNotEmpty(item.getUniqueAttributes())) {
+ ret = getStartingEntityUsingQueryTemplate(item, context, ret);
+ }
- setupBindingsForTypeName(context, item.getTypeName());
+ if (ret == null) {
+ ret = Collections.emptyList();
+ }
- ret = executeGremlinQueryForGuids(queryTemplate, context);
- } else if (StringUtils.isNotEmpty(item.getTypeName()) && MapUtils.isNotEmpty(item.getUniqueAttributes())) {
- final String queryTemplate = getQueryTemplateForMatchType(context);
- final String typeName = item.getTypeName();
- final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
+ logInfoStartingEntitiesFound(item, context, ret);
+ return ret;
+ }
- if (entityType == null) {
- throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, typeName);
- }
+ private List<String> getStartingEntityUsingQueryTemplate(AtlasObjectId item, ExportContext context, List<String> ret) throws AtlasBaseException {
+ final String queryTemplate = getQueryTemplateForMatchType(context);
+ final String typeName = item.getTypeName();
+ final AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName);
- for (Map.Entry<String, Object> e : item.getUniqueAttributes().entrySet()) {
- String attrName = e.getKey();
- Object attrValue = e.getValue();
+ if (entityType == null) {
+ throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_TYPENAME, typeName);
+ }
- AtlasAttribute attribute = entityType.getAttribute(attrName);
- if (attribute == null || attrValue == null) {
- continue;
- }
+ for (Map.Entry<String, Object> e : item.getUniqueAttributes().entrySet()) {
+ String attrName = e.getKey();
+ Object attrValue = e.getValue();
- setupBindingsForTypeNameAttrNameAttrValue(context, typeName, attrValue, attribute);
+ AtlasAttribute attribute = entityType.getAttribute(attrName);
+ if (attribute == null || attrValue == null) {
+ continue;
+ }
- List<String> guids = executeGremlinQueryForGuids(queryTemplate, context);
+ setupBindingsForTypeNameAttrNameAttrValue(context, typeName, attrValue, attribute);
- if (CollectionUtils.isNotEmpty(guids)) {
- if (ret == null) {
- ret = new ArrayList<>();
- }
+ List<String> guids = executeGremlinQueryForGuids(queryTemplate, context);
+
+ if (CollectionUtils.isNotEmpty(guids)) {
+ if (ret == null) {
+ ret = new ArrayList<>();
+ }
- for (String guid : guids) {
- if (!ret.contains(guid)) {
- ret.add(guid);
- }
+ for (String guid : guids) {
+ if (!ret.contains(guid)) {
+ ret.add(guid);
}
}
}
}
-
- if (ret == null) {
- ret = Collections.emptyList();
- }
-
- logInfoStartingEntitiesFound(item, context, ret);
return ret;
}
+ private List<String> getStartingEntityForMatchTypeForType(AtlasObjectId item, ExportContext context) {
+ setupBindingsForTypeName(context, item.getTypeName());
+ return executeGremlinQueryForGuids(getQueryTemplateForMatchType(context), context);
+ }
+
private void logInfoStartingEntitiesFound(AtlasObjectId item, ExportContext context, List<String> ret) {
LOG.info("export(item={}; matchType={}, fetchType={}): found {} entities: options: {}", item,
context.matchType, context.fetchType, ret.size(), AtlasType.toJson(context.result.getRequest()));
@@ -337,34 +348,43 @@ public class ExportService {
return gremlinQueryProvider.getQuery(AtlasGremlinQuery.EXPORT_TYPE_DEFAULT);
}
- private void processEntity(String guid, ExportContext context) throws AtlasBaseException {
- debugLog("==> processEntity({})", guid);
+ private void processEntityGuid(String guid, ExportContext context) throws AtlasBaseException {
+ debugLog("==> processEntityGuid({})", guid);
- if (!context.guidsProcessed.contains(guid)) {
- TraversalDirection direction = context.guidDirection.get(guid);
- AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
+ if (context.guidsProcessed.contains(guid)) {
+ return;
+ }
+
+ TraversalDirection direction = context.guidDirection.get(guid);
+ AtlasEntityWithExtInfo entityWithExtInfo = entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
+
+ processEntity(guid, entityWithExtInfo, context, direction);
+
+ debugLog("<== processEntityGuid({})", guid);
+ }
+
+ public void processEntity(String guid, AtlasEntityWithExtInfo entityWithExtInfo,
+ ExportContext context,
+ TraversalDirection direction) throws AtlasBaseException {
if (!context.lineageProcessed.contains(guid) && context.doesTimestampQualify(entityWithExtInfo.getEntity())) {
context.result.getData().getEntityCreationOrder().add(entityWithExtInfo.getEntity().getGuid());
}
- addEntity(entityWithExtInfo, context);
- exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context);
+ addEntity(entityWithExtInfo, context);
+ exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(), context);
- context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
- getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction);
+ context.guidsProcessed.add(entityWithExtInfo.getEntity().getGuid());
+ getConntedEntitiesBasedOnOption(entityWithExtInfo.getEntity(), context, direction);
- if(entityWithExtInfo.getReferredEntities() != null) {
- for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
- exportTypeProcessor.addTypes(e, context);
- getConntedEntitiesBasedOnOption(e, context, direction);
- }
-
- context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet());
+ if (entityWithExtInfo.getReferredEntities() != null) {
+ for (AtlasEntity e : entityWithExtInfo.getReferredEntities().values()) {
+ exportTypeProcessor.addTypes(e, context);
+ getConntedEntitiesBasedOnOption(e, context, direction);
}
- }
- debugLog("<== processEntity({})", guid);
+ context.guidsProcessed.addAll(entityWithExtInfo.getReferredEntities().keySet());
+ }
}
private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
@@ -374,12 +394,25 @@ public class ExportService {
break;
case INCREMENTAL:
+ if(context.isHiveDBIncrementalSkipLineage()) {
+ break;
+ }
+
case FULL:
default:
getEntityGuidsForFullFetch(entity, context);
}
}
+ private void populateEntitesForIncremental(String topLevelEntityGuid, ExportContext context) throws AtlasBaseException {
+ if (context.isHiveDBIncrementalSkipLineage() == false || incrementalExportEntityProvider != null) {
+ return;
+ }
+
+ incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, context.scriptEngine);
+ incrementalExportEntityProvider.populate(topLevelEntityGuid, context.changeMarker, context.guidsToProcess);
+ }
+
private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
if (direction == null || direction == TraversalDirection.UNKNOWN) {
getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD);
@@ -533,13 +566,14 @@ public class ExportService {
}
}
- private enum TraversalDirection {
+ public enum TraversalDirection {
UNKNOWN,
INWARD,
OUTWARD,
BOTH;
}
+
public enum ExportFetchType {
FULL(FETCH_TYPE_FULL),
CONNECTED(FETCH_TYPE_CONNECTED),
@@ -563,9 +597,11 @@ public class ExportService {
static class ExportContext {
private static final int REPORTING_THREASHOLD = 1000;
+ private static final String ATLAS_TYPE_HIVE_DB = "hive_db";
+
final Set<String> guidsProcessed = new HashSet<>();
- final UniqueList<String> guidsToProcess = new UniqueList<>();
+ final private UniqueList<String> guidsToProcess = new UniqueList<>();
final UniqueList<String> lineageToProcess = new UniqueList<>();
final Set<String> lineageProcessed = new HashSet<>();
final Map<String, TraversalDirection> guidDirection = new HashMap<>();
@@ -581,7 +617,8 @@ public class ExportService {
private final ExportFetchType fetchType;
private final String matchType;
private final boolean skipLineage;
- private final long changeMarker;
+ private final long changeMarker;
+ private final boolean isHiveDBIncremental;
private int progressReportCount = 0;
@@ -594,16 +631,18 @@ public class ExportService {
fetchType = ExportFetchType.from(result.getRequest().getFetchTypeOptionValue());
matchType = result.getRequest().getMatchTypeOptionValue();
skipLineage = result.getRequest().getSkipLineageOptionValue();
- this.changeMarker = getChangeTokenFromOptions(fetchType, result.getRequest());
+ this.changeMarker = result.getRequest().getChangeTokenFromOptions();
+ this.isHiveDBIncremental = checkHiveDBIncrementalSkipLineage(result.getRequest());
}
- private long getChangeTokenFromOptions(ExportFetchType fetchType, AtlasExportRequest request) {
- if(fetchType == ExportFetchType.INCREMENTAL &&
- request.getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER)) {
- return Long.parseLong(request.getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER).toString());
+ private boolean checkHiveDBIncrementalSkipLineage(AtlasExportRequest request) {
+ if(request.getItemsToExport().size() == 0) {
+ return false;
}
- return 0L;
+ return request.getItemsToExport().get(0).getTypeName().equalsIgnoreCase(ATLAS_TYPE_HIVE_DB) &&
+ request.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL) &&
+ request.getSkipLineageOptionValue();
}
public List<AtlasEntity> getEntitiesWithModifiedTimestamp(AtlasEntityWithExtInfo entityWithExtInfo) {
@@ -633,12 +672,10 @@ public class ExportService {
}
public void addToBeProcessed(boolean isSuperTypeProcess, String guid, TraversalDirection direction) {
- if(!isSuperTypeProcess) {
- guidsToProcess.add(guid);
- }
-
if(isSuperTypeProcess) {
lineageToProcess.add(guid);
+ } else {
+ guidsToProcess.add(guid);
}
guidDirection.put(guid, direction);
@@ -667,5 +704,9 @@ public class ExportService {
public void addToSink(AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
sink.add(entityWithExtInfo);
}
+
+ public boolean isHiveDBIncrementalSkipLineage() {
+ return isHiveDBIncremental;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/1dc0622a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
new file mode 100644
index 0000000..4a0d1b2
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProvider.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.util.UniqueList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.inject.Inject;
+import javax.script.ScriptEngine;
+import javax.script.ScriptException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class IncrementalExportEntityProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(IncrementalExportEntityProvider.class);
+
+ private static final String QUERY_PARAMETER_START_GUID = "startGuid";
+ private static final String QUERY_PARAMETER_MODIFICATION_TIMESTAMP = "modificationTimestamp";
+
+ private AtlasGraph atlasGraph;
+
+ private static final String QUERY_DB = "g.V.has('__guid', startGuid)";
+ private static final String QUERY_TABLE = QUERY_DB + ".in('__hive_table.db')";
+ private static final String QUERY_SD = QUERY_TABLE + ".out('__hive_table.sd')";
+ private static final String QUERY_COLUMN = QUERY_TABLE + ".out('__hive_table.columns')";
+ private static final String TRANSFORM_CLAUSE = ".transform{[__guid:it.__guid]}.toList()";
+
+ private static final String TIMESTAMP_CLAUSE = ".has('__modificationTimestamp', T.gt, modificationTimestamp)";
+
+ private ScriptEngine scriptEngine;
+
+ @Inject
+ public IncrementalExportEntityProvider(AtlasGraph atlasGraph, ScriptEngine scriptEngine) {
+ this.atlasGraph = atlasGraph;
+ this.scriptEngine = scriptEngine;
+ }
+
+ public void populate(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) {
+ if(timeStamp == 0L) {
+ full(dbEntityGuid, guidsToProcess);
+ } else {
+ partial(dbEntityGuid, timeStamp, guidsToProcess);
+ }
+ }
+
+ private void partial(String dbEntityGuid, long timeStamp, UniqueList<String> guidsToProcess) {
+ guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_TABLE, timeStamp));
+ guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_SD, timeStamp));
+ guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_COLUMN, timeStamp));
+ }
+
+ private void full(String dbEntityGuid, UniqueList<String> guidsToProcess) {
+ guidsToProcess.addAll(fetchGuids(dbEntityGuid, QUERY_TABLE, 0L));
+ }
+
+ private List<String> fetchGuids(final String dbEntityGuid, String query, long timeStamp) {
+ Map<String, Object> bindings = new HashMap<String, Object>() {{
+ put(QUERY_PARAMETER_START_GUID, dbEntityGuid);
+ }};
+
+ String queryWithClause = query;
+ if(timeStamp > 0L) {
+ bindings.put(QUERY_PARAMETER_MODIFICATION_TIMESTAMP, timeStamp);
+ queryWithClause = queryWithClause.concat(TIMESTAMP_CLAUSE);
+ }
+
+ return executeGremlinQuery(queryWithClause, bindings);
+ }
+
+ private List<String> executeGremlinQuery(String query, Map<String, Object> bindings) {
+ try {
+ List<String> guids = new ArrayList<>();
+ String queryWithTransform = query + TRANSFORM_CLAUSE;
+ List<HashMap<String, Object>> result = (List<HashMap<String, Object>>)
+ atlasGraph.executeGremlinScript(scriptEngine, bindings, queryWithTransform, false);
+ if (result == null) {
+ return guids;
+ }
+
+ for (HashMap<String, Object> item : result) {
+ guids.add((String) item.get(ExportService.PROPERTY_GUID));
+ }
+
+ return guids;
+
+ } catch (ScriptException e) {
+ LOG.error("error executing query: {}: bindings: {}", query, bindings, e);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/1dc0622a/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java b/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java
index 9148ce0..eebbc4e 100644
--- a/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java
+++ b/repository/src/main/java/org/apache/atlas/repository/util/UniqueList.java
@@ -44,6 +44,12 @@ public class UniqueList<T> {
}
}
+ public void addAll(List<T> list) {
+ for (T item : list) {
+ add(item);
+ }
+ }
+
public T remove(int index) {
T e = list.remove(index);
set.remove(e);
http://git-wip-us.apache.org/repos/asf/atlas/blob/1dc0622a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
new file mode 100644
index 0000000..22ad66f
--- /dev/null
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/IncrementalExportEntityProviderTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.TestModules;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.v1.AtlasEntityStoreV1;
+import org.apache.atlas.repository.store.graph.v1.EntityGraphRetriever;
+import org.apache.atlas.repository.util.UniqueList;
+import org.apache.atlas.store.AtlasTypeDefStore;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import javax.inject.Inject;
+import javax.script.ScriptEngine;
+import java.io.IOException;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+@Guice(modules = TestModules.TestOnlyModule.class)
+public class IncrementalExportEntityProviderTest extends ExportImportTestBase {
+ @Inject
+ AtlasTypeRegistry typeRegistry;
+
+ @Inject
+ private AtlasTypeDefStore typeDefStore;
+
+ @Inject
+ private AtlasEntityStoreV1 entityStore;
+
+ @Inject
+ private AtlasGraph atlasGraph;
+
+ private IncrementalExportEntityProvider incrementalExportEntityProvider;
+ private ScriptEngine gremlinScriptEngine;
+
+ @BeforeClass
+ public void setup() throws IOException, AtlasBaseException {
+ basicSetup(typeDefStore, typeRegistry);
+ createEntities(entityStore, ENTITIES_SUB_DIR, new String[] { "db", "table-columns"});
+ final Object[] entityGuids = new Object[]{DB_GUID, TABLE_GUID};
+ verifyCreatedEntities(entityStore, entityGuids, 2);
+
+ gremlinScriptEngine = atlasGraph.getGremlinScriptEngine();
+ EntityGraphRetriever entityGraphRetriever = new EntityGraphRetriever(this.typeRegistry);
+ incrementalExportEntityProvider = new IncrementalExportEntityProvider(atlasGraph, gremlinScriptEngine);
+ }
+
+ @AfterClass
+ public void tearDown() {
+ if(gremlinScriptEngine != null) {
+ atlasGraph.releaseGremlinScriptEngine(gremlinScriptEngine);
+ }
+ }
+
+ @Test
+ public void verify() {
+ executeQueries(0L, 1);
+ executeQueries(1L, 9);
+ }
+
+ private void executeQueries(long timeStamp, int expectedEntityCount) {
+ UniqueList<String> uniqueList = new UniqueList<>();
+ incrementalExportEntityProvider.populate(DB_GUID, timeStamp, uniqueList);
+
+ for (String g : uniqueList.getList()) {
+ assertTrue(g instanceof String);
+ }
+
+ assertEquals(uniqueList.size(), expectedEntityCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/atlas/blob/1dc0622a/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
----------------------------------------------------------------------
diff --git a/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json b/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
index fdd3b01..ba125e7 100644
--- a/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
+++ b/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
@@ -6,6 +6,7 @@
],
"options": {
"fetchType": "incremental",
+ "skipLineage": "true",
"changeMarker": 0
}
}