You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2018/10/11 22:34:57 UTC
[4/8] atlas git commit: ATLAS-2862: Incremental Export now uses
request context to determine change marker.
ATLAS-2862: Incremental Export now uses request context to determine change marker.
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/8903c9a6
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/8903c9a6
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/8903c9a6
Branch: refs/heads/master
Commit: 8903c9a642001b8aa75e744c741415beb735f241
Parents: 708e486
Author: Ashutosh Mestry <am...@hortonworks.com>
Authored: Mon Sep 10 11:51:49 2018 -0700
Committer: Ashutosh Mestry <am...@hortonworks.com>
Committed: Thu Oct 11 14:25:28 2018 -0700
----------------------------------------------------------------------
.../atlas/model/impexp/AtlasExportRequest.java | 69 ++++++++++++++----
.../atlas/model/impexp/AtlasExportResult.java | 42 +++--------
.../atlas/model/impexp/AtlasImportResult.java | 2 +-
.../atlas/repository/impexp/AuditsWriter.java | 4 +-
.../atlas/repository/impexp/ExportService.java | 77 +++++---------------
.../apache/atlas/repository/impexp/ZipSink.java | 5 +-
.../impexp/ExportIncrementalTest.java | 6 +-
.../impexp/ImportTransformsShaperTest.java | 1 +
.../impexp/ReplicationEntityAttributeTest.java | 2 +-
.../atlas/repository/impexp/ZipSinkTest.java | 2 +-
.../stocksDB-Entities/export-incremental.json | 2 +-
11 files changed, 99 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
index fc34847..7bb8b76 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.commons.collections.MapUtils;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@@ -46,19 +47,19 @@ public class AtlasExportRequest implements Serializable {
private static final long serialVersionUID = 1L;
- public static final String OPTION_FETCH_TYPE = "fetchType";
- public static final String OPTION_ATTR_MATCH_TYPE = "matchType";
- public static final String OPTION_SKIP_LINEAGE = "skipLineage";
- public static final String OPTION_KEY_REPLICATED_TO = "replicatedTo";
- public static final String FETCH_TYPE_FULL = "full";
- public static final String FETCH_TYPE_CONNECTED = "connected";
- public static final String FETCH_TYPE_INCREMENTAL = "incremental";
- public static final String FETCH_TYPE_INCREMENTAL_FROM_TIME = "fromTime";
- public static final String MATCH_TYPE_STARTS_WITH = "startsWith";
- public static final String MATCH_TYPE_ENDS_WITH = "endsWith";
- public static final String MATCH_TYPE_CONTAINS = "contains";
- public static final String MATCH_TYPE_MATCHES = "matches";
- public static final String MATCH_TYPE_FOR_TYPE = "forType";
+ public static final String OPTION_FETCH_TYPE = "fetchType";
+ public static final String OPTION_ATTR_MATCH_TYPE = "matchType";
+ public static final String OPTION_SKIP_LINEAGE = "skipLineage";
+ public static final String OPTION_KEY_REPLICATED_TO = "replicatedTo";
+ public static final String FETCH_TYPE_FULL = "full";
+ public static final String FETCH_TYPE_CONNECTED = "connected";
+ public static final String FETCH_TYPE_INCREMENTAL = "incremental";
+ public static final String FETCH_TYPE_INCREMENTAL_CHANGE_MARKER = "changeMarker";
+ public static final String MATCH_TYPE_STARTS_WITH = "startsWith";
+ public static final String MATCH_TYPE_ENDS_WITH = "endsWith";
+ public static final String MATCH_TYPE_CONTAINS = "contains";
+ public static final String MATCH_TYPE_MATCHES = "matches";
+ public static final String MATCH_TYPE_FOR_TYPE = "forType";
private List<AtlasObjectId> itemsToExport = new ArrayList<>();
private Map<String, Object> options = new HashMap<>();
@@ -79,6 +80,48 @@ public class AtlasExportRequest implements Serializable {
this.options = options;
}
+ public String getMatchTypeOptionValue() {
+ String matchType = null;
+
+ if (MapUtils.isNotEmpty(getOptions())) {
+ if (getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) {
+ matchType = getOptions().get(OPTION_ATTR_MATCH_TYPE).toString();
+ }
+ }
+
+ return matchType;
+ }
+
+ public String getFetchTypeOptionValue() {
+ if(getOptions() == null || !getOptions().containsKey(OPTION_FETCH_TYPE)) {
+ return FETCH_TYPE_FULL;
+ }
+
+ Object o = getOptions().get(OPTION_FETCH_TYPE);
+ if (o instanceof String) {
+ return (String) o;
+ }
+
+ return FETCH_TYPE_FULL;
+ }
+
+ public boolean getSkipLineageOptionValue() {
+ if(!getOptions().containsKey(AtlasExportRequest.OPTION_SKIP_LINEAGE)) {
+ return false;
+ }
+
+ Object o = getOptions().get(AtlasExportRequest.OPTION_SKIP_LINEAGE);
+ if(o instanceof String) {
+ return Boolean.parseBoolean((String) o);
+ }
+
+ if(o instanceof Boolean) {
+ return (Boolean) o;
+ }
+
+ return false;
+ }
+
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
index a5203c9..c143120 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
@@ -62,14 +62,15 @@ public class AtlasExportResult implements Serializable {
private AtlasExportData data;
private OperationStatus operationStatus;
private String sourceClusterName;
- private long lastModifiedTimestamp;
+ private long changeMarker;
public AtlasExportResult() {
- this(null, null, null, null, System.currentTimeMillis());
+ this(null, null, null, null, System.currentTimeMillis(), 0L);
}
public AtlasExportResult(AtlasExportRequest request,
- String userName, String clientIpAddress, String hostName, long timeStamp) {
+ String userName, String clientIpAddress, String hostName, long timeStamp,
+ long changeMarker) {
this.request = request;
this.userName = userName;
this.clientIpAddress = clientIpAddress;
@@ -78,6 +79,7 @@ public class AtlasExportResult implements Serializable {
this.metrics = new HashMap<>();
this.operationStatus = OperationStatus.FAIL;
this.data = new AtlasExportData();
+ this.changeMarker = changeMarker;
}
public AtlasExportRequest getRequest() {
@@ -136,12 +138,12 @@ public class AtlasExportResult implements Serializable {
this.data = data;
}
- public void setLastModifiedTimestamp(long lastModifiedTimestamp) {
- this.lastModifiedTimestamp = lastModifiedTimestamp;
+ public void setChangeMarker(long changeMarker) {
+ this.changeMarker = changeMarker;
}
- public long getLastModifiedTimestamp() {
- return this.lastModifiedTimestamp;
+ public long getChangeMarker() {
+ return this.changeMarker;
}
public OperationStatus getOperationStatus() {
@@ -173,22 +175,6 @@ public class AtlasExportResult implements Serializable {
metrics.put(key, currentValue + incrementBy);
}
- public AtlasExportResult shallowCopy() {
- AtlasExportResult result = new AtlasExportResult();
-
- result.setRequest(getRequest());
- result.setUserName(getUserName());
- result.setClientIpAddress(getClientIpAddress());
- result.setHostName(getHostName());
- result.setTimeStamp(getTimeStamp());
- result.setMetrics(getMetrics());
- result.setOperationStatus(getOperationStatus());
- result.setSourceClusterName(getSourceClusterName());
- result.setLastModifiedTimestamp(getLastModifiedTimestamp());
-
- return result;
- }
-
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
sb = new StringBuilder();
@@ -199,14 +185,13 @@ public class AtlasExportResult implements Serializable {
sb.append(", userName='").append(userName).append("'");
sb.append(", clientIpAddress='").append(clientIpAddress).append("'");
sb.append(", hostName='").append(hostName).append("'");
- sb.append(", lastModifiedTimestamp='").append(lastModifiedTimestamp).append("'");
+ sb.append(", changeMarker='").append(changeMarker).append("'");
sb.append(", sourceCluster='").append(sourceClusterName).append("'");
sb.append(", timeStamp='").append(timeStamp).append("'");
sb.append(", metrics={");
AtlasBaseTypeDef.dumpObjects(metrics, sb);
sb.append("}");
- sb.append(", data='").append(data).append("'");
sb.append(", operationStatus='").append(operationStatus).append("'");
sb.append("}");
@@ -237,13 +222,11 @@ public class AtlasExportResult implements Serializable {
private static final long serialVersionUID = 1L;
private AtlasTypesDef typesDef;
- private Map<String, AtlasEntity> entities;
private List<String> entityCreationOrder;
public AtlasExportData() {
typesDef = new AtlasTypesDef();
- entities = new HashMap<>();
entityCreationOrder = new ArrayList<>();
}
@@ -251,10 +234,6 @@ public class AtlasExportResult implements Serializable {
public void setTypesDef(AtlasTypesDef typesDef) { this.typesDef = typesDef; }
- public Map<String, AtlasEntity> getEntities() { return entities; }
-
- public void setEntities(Map<String, AtlasEntity> entities) { this.entities = entities; }
-
public List<String> getEntityCreationOrder() { return entityCreationOrder; }
public void setEntityCreationOrder(List<String> entityCreationOrder) { this.entityCreationOrder = entityCreationOrder; }
@@ -267,7 +246,6 @@ public class AtlasExportResult implements Serializable {
sb.append("AtlasExportData {");
sb.append(", typesDef={").append(typesDef).append("}");
sb.append(", entities={");
- AtlasBaseTypeDef.dumpObjects(entities, sb);
sb.append("}");
sb.append(", entityCreationOrder={");
AtlasBaseTypeDef.dumpObjects(entityCreationOrder, sb);
http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
index 30e93d5..212517b 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
@@ -149,7 +149,7 @@ public class AtlasImportResult {
}
public void setExportResult(AtlasExportResult exportResult) {
- this.exportResultWithoutData = exportResult.shallowCopy();
+ this.exportResultWithoutData = exportResult;
}
public StringBuilder toString(StringBuilder sb) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
index 5b5d022..407b406 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
@@ -144,7 +144,7 @@ public class AuditsWriter {
}
updateReplicationAttribute(replicationOptionState, targetServerName,
- entityGuids, Constants.ATTR_NAME_REPLICATED_TO, result.getLastModifiedTimestamp());
+ entityGuids, Constants.ATTR_NAME_REPLICATED_TO, result.getChangeMarker());
}
private void saveServers() throws AtlasBaseException {
@@ -182,7 +182,7 @@ public class AuditsWriter {
}
updateReplicationAttribute(replicationOptionState, this.sourceServerName, entityGuids,
- Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getLastModifiedTimestamp());
+ Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker());
}
private void saveServers() throws AtlasBaseException {
http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 97c2123..d3cff78 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.atlas.AtlasErrorCode;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.impexp.AtlasExportRequest;
@@ -93,7 +94,9 @@ public class ExportService {
public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
String requestingIP) throws AtlasBaseException {
long startTime = System.currentTimeMillis();
- AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP, hostName, startTime);
+ AtlasExportResult result = new AtlasExportResult(request, userName, requestingIP,
+ hostName, startTime, getCurrentChangeMarker());
+
ExportContext context = new ExportContext(atlasGraph, result, exportSink);
exportTypeProcessor = new ExportTypeProcessor(typeRegistry, context);
@@ -117,6 +120,10 @@ public class ExportService {
return context.result;
}
+ private long getCurrentChangeMarker() {
+ return RequestContext.earliestActiveRequestTime();
+ }
+
private void updateSinkWithOperationMetrics(String userName, ExportContext context,
AtlasExportResult.OperationStatus[] statuses,
long startTime, long endTime) throws AtlasBaseException {
@@ -125,7 +132,6 @@ public class ExportService {
context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed);
context.sink.setExportOrder(context.result.getData().getEntityCreationOrder());
context.sink.setTypesDef(context.result.getData().getTypesDef());
- context.result.setLastModifiedTimestamp(context.newestLastModifiedTimestamp);
context.result.setOperationStatus(getOverallOperationStatus(statuses));
context.result.incrementMeticsCounter("duration", duration);
auditsWriter.write(userName, context.result, startTime, endTime, context.result.getData().getEntityCreationOrder());
@@ -362,7 +368,7 @@ public class ExportService {
debugLog("<== processEntity({})", guid);
}
- private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException {
+ private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
switch (context.fetchType) {
case CONNECTED:
getEntityGuidsForConnectedFetch(entity, context, direction);
@@ -375,7 +381,7 @@ public class ExportService {
}
}
- private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException {
+ private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
if (direction == null || direction == TraversalDirection.UNKNOWN) {
getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD);
} else {
@@ -688,8 +694,7 @@ public class ExportService {
private final ExportFetchType fetchType;
private final String matchType;
private final boolean skipLineage;
- private final long lastModifiedTimestampRequested;
- private long newestLastModifiedTimestamp;
+ private final long changeMarker;
private int progressReportCount = 0;
@@ -699,45 +704,16 @@ public class ExportService {
scriptEngine = atlasGraph.getGremlinScriptEngine();
bindings = new HashMap<>();
- fetchType = getFetchType(result.getRequest());
- matchType = getMatchType(result.getRequest());
- skipLineage = getOptionSkipLineage(result.getRequest());
- this.lastModifiedTimestampRequested = getLastModifiedTimestamp(fetchType, result.getRequest());
- this.newestLastModifiedTimestamp = 0;
+ fetchType = ExportFetchType.from(result.getRequest().getFetchTypeOptionValue());
+ matchType = result.getRequest().getMatchTypeOptionValue();
+ skipLineage = result.getRequest().getSkipLineageOptionValue();
+ this.changeMarker = getChangeTokenFromOptions(fetchType, result.getRequest());
}
- private ExportFetchType getFetchType(AtlasExportRequest request) {
- Object fetchOption = request.getOptions() != null ? request.getOptions().get(OPTION_FETCH_TYPE) : null;
-
- if (fetchOption instanceof String) {
- return ExportFetchType.from((String) fetchOption);
- } else if (fetchOption instanceof ExportFetchType) {
- return (ExportFetchType) fetchOption;
- }
-
- return ExportFetchType.FULL;
- }
-
- private String getMatchType(AtlasExportRequest request) {
- String matchType = null;
-
- if (MapUtils.isNotEmpty(request.getOptions())) {
- if (request.getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) {
- matchType = request.getOptions().get(OPTION_ATTR_MATCH_TYPE).toString();
- }
- }
-
- return matchType;
- }
-
- private boolean getOptionSkipLineage(AtlasExportRequest request) {
- return request.getOptions().containsKey(AtlasExportRequest.OPTION_SKIP_LINEAGE) &&
- (boolean) request.getOptions().get(AtlasExportRequest.OPTION_SKIP_LINEAGE);
- }
-
- private long getLastModifiedTimestamp(ExportFetchType fetchType, AtlasExportRequest request) {
- if(fetchType == ExportFetchType.INCREMENTAL && request.getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME)) {
- return Long.parseLong(request.getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME).toString());
+ private long getChangeTokenFromOptions(ExportFetchType fetchType, AtlasExportRequest request) {
+ if(fetchType == ExportFetchType.INCREMENTAL &&
+ request.getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER)) {
+ return Long.parseLong(request.getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER).toString());
}
return 0L;
@@ -794,19 +770,7 @@ public class ExportService {
return true;
}
- long entityModificationTimestamp = entity.getUpdateTime().getTime();
- updateNewestLastModifiedTimestamp(entityModificationTimestamp);
- return doesTimestampQualify(entityModificationTimestamp);
- }
-
- private void updateNewestLastModifiedTimestamp(long entityModificationTimestamp) {
- if(newestLastModifiedTimestamp < entityModificationTimestamp) {
- newestLastModifiedTimestamp = entityModificationTimestamp;
- }
- }
-
- private boolean doesTimestampQualify(long modificationTimestamp) {
- return lastModifiedTimestampRequested < modificationTimestamp;
+ return changeMarker <= entity.getUpdateTime().getTime();
}
public boolean getSkipLineage() {
@@ -814,7 +778,6 @@ public class ExportService {
}
public void addToSink(AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
- updateNewestLastModifiedTimestamp(entityWithExtInfo.getEntity().getUpdateTime().getTime());
sink.add(entityWithExtInfo);
}
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
index 17ebbf1..9928c54 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
@@ -36,10 +36,11 @@ import java.util.zip.ZipOutputStream;
public class ZipSink {
private static final Logger LOG = LoggerFactory.getLogger(ZipSink.class);
+ private static String FILE_EXTENSION_JSON = ".json";
+
private ZipOutputStream zipOutputStream;
final Set<String> guids = new HashSet<>();
-
public ZipSink(OutputStream outputStream) {
zipOutputStream = new ZipOutputStream(outputStream);
}
@@ -92,7 +93,7 @@ public class ZipSink {
private void saveToZip(String fileName, String jsonData) throws AtlasBaseException {
try {
- addToZipStream(fileName.toString() + ".json", jsonData);
+ addToZipStream(fileName.toString() + FILE_EXTENSION_JSON, jsonData);
} catch (IOException e) {
throw new AtlasBaseException(String.format("Error writing file %s.", fileName), e);
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
index b2dcf44..86afd7f 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
@@ -41,7 +41,7 @@ import javax.inject.Inject;
import java.io.IOException;
import java.util.Map;
-import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities;
import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
@@ -98,7 +98,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
}
private long updateTimesampForNextIncrementalExport(ZipSource source) throws AtlasBaseException {
- return source.getExportResult().getLastModifiedTimestamp();
+ return source.getExportResult().getChangeMarker();
}
@Test(dependsOnMethods = "atT0_ReturnsAllEntities")
@@ -161,7 +161,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
private AtlasExportRequest getIncrementalRequest(long timestamp) {
try {
AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_INCREMENTAL, AtlasExportRequest.class);
- request.getOptions().put(FETCH_TYPE_INCREMENTAL_FROM_TIME, timestamp);
+ request.getOptions().put(FETCH_TYPE_INCREMENTAL_CHANGE_MARKER, timestamp);
return request;
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
index f894553..06bdaa6 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
@@ -72,6 +72,7 @@ public class ImportTransformsShaperTest extends ExportImportTestBase {
assertNotNull(classification);
assertEntities(result.getProcessedEntities(), TAG_NAME);
}
+
private void assertEntities(List<String> entityGuids, String tagName) throws AtlasBaseException {
for (String guid : entityGuids) {
AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = this.entityStore.getById(guid);
http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
index 7ffef7f..79a5e05 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
@@ -159,7 +159,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
long actualLastModifiedTimestamp = (long) cluster.getAdditionalInfoRepl(entity.getEntity().getGuid());
assertTrue(cluster.getAdditionalInfo().size() > 0);
- assertEquals(actualLastModifiedTimestamp, importResult.getExportResult().getLastModifiedTimestamp());
+ assertEquals(actualLastModifiedTimestamp, importResult.getExportResult().getChangeMarker());
}
private AtlasExportRequest getUpdateMetaInfoUpdateRequest() {
http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
index e8bbeb5..0988a30 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
@@ -59,7 +59,7 @@ public class ZipSinkTest {
itemsToExport.add(new AtlasObjectId("hive_db", "qualifiedName", "default"));
request.setItemsToExport(itemsToExport);
- defaultExportResult = new AtlasExportResult(request, "admin", "1.0.0.0", "root", 100);
+ defaultExportResult = new AtlasExportResult(request, "admin", "1.0.0.0", "root", 100, 0L);
return defaultExportResult;
}
http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
----------------------------------------------------------------------
diff --git a/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json b/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
index c2bc867..fdd3b01 100644
--- a/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
+++ b/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
@@ -6,6 +6,6 @@
],
"options": {
"fetchType": "incremental",
- "fromTime": 0
+ "changeMarker": 0
}
}