You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by am...@apache.org on 2018/10/11 22:34:57 UTC

[4/8] atlas git commit: ATLAS-2862: Incremental Export now uses request context to determine change marker.

ATLAS-2862: Incremental Export now uses request context to determine change marker.


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/8903c9a6
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/8903c9a6
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/8903c9a6

Branch: refs/heads/master
Commit: 8903c9a642001b8aa75e744c741415beb735f241
Parents: 708e486
Author: Ashutosh Mestry <am...@hortonworks.com>
Authored: Mon Sep 10 11:51:49 2018 -0700
Committer: Ashutosh Mestry <am...@hortonworks.com>
Committed: Thu Oct 11 14:25:28 2018 -0700

----------------------------------------------------------------------
 .../atlas/model/impexp/AtlasExportRequest.java  | 69 ++++++++++++++----
 .../atlas/model/impexp/AtlasExportResult.java   | 42 +++--------
 .../atlas/model/impexp/AtlasImportResult.java   |  2 +-
 .../atlas/repository/impexp/AuditsWriter.java   |  4 +-
 .../atlas/repository/impexp/ExportService.java  | 77 +++++---------------
 .../apache/atlas/repository/impexp/ZipSink.java |  5 +-
 .../impexp/ExportIncrementalTest.java           |  6 +-
 .../impexp/ImportTransformsShaperTest.java      |  1 +
 .../impexp/ReplicationEntityAttributeTest.java  |  2 +-
 .../atlas/repository/impexp/ZipSinkTest.java    |  2 +-
 .../stocksDB-Entities/export-incremental.json   |  2 +-
 11 files changed, 99 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
index fc34847..7bb8b76 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportRequest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.commons.collections.MapUtils;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -46,19 +47,19 @@ public class AtlasExportRequest implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    public static final String OPTION_FETCH_TYPE                = "fetchType";
-    public static final String OPTION_ATTR_MATCH_TYPE           = "matchType";
-    public static final String OPTION_SKIP_LINEAGE              = "skipLineage";
-    public static final String OPTION_KEY_REPLICATED_TO         = "replicatedTo";
-    public static final String FETCH_TYPE_FULL                  = "full";
-    public static final String FETCH_TYPE_CONNECTED             = "connected";
-    public static final String FETCH_TYPE_INCREMENTAL           = "incremental";
-    public static final String FETCH_TYPE_INCREMENTAL_FROM_TIME = "fromTime";
-    public static final String MATCH_TYPE_STARTS_WITH           = "startsWith";
-    public static final String MATCH_TYPE_ENDS_WITH             = "endsWith";
-    public static final String MATCH_TYPE_CONTAINS              = "contains";
-    public static final String MATCH_TYPE_MATCHES               = "matches";
-    public static final String MATCH_TYPE_FOR_TYPE              = "forType";
+    public static final String OPTION_FETCH_TYPE                    = "fetchType";
+    public static final String OPTION_ATTR_MATCH_TYPE               = "matchType";
+    public static final String OPTION_SKIP_LINEAGE                  = "skipLineage";
+    public static final String OPTION_KEY_REPLICATED_TO             = "replicatedTo";
+    public static final String FETCH_TYPE_FULL                      = "full";
+    public static final String FETCH_TYPE_CONNECTED                 = "connected";
+    public static final String FETCH_TYPE_INCREMENTAL               = "incremental";
+    public static final String FETCH_TYPE_INCREMENTAL_CHANGE_MARKER = "changeMarker";
+    public static final String MATCH_TYPE_STARTS_WITH               = "startsWith";
+    public static final String MATCH_TYPE_ENDS_WITH                 = "endsWith";
+    public static final String MATCH_TYPE_CONTAINS                  = "contains";
+    public static final String MATCH_TYPE_MATCHES                   = "matches";
+    public static final String MATCH_TYPE_FOR_TYPE                  = "forType";
 
     private List<AtlasObjectId> itemsToExport = new ArrayList<>();
     private Map<String, Object> options       = new HashMap<>();
@@ -79,6 +80,48 @@ public class AtlasExportRequest implements Serializable {
         this.options = options;
     }
 
+    public String getMatchTypeOptionValue() {
+        String matchType = null;
+
+        if (MapUtils.isNotEmpty(getOptions())) {
+            if (getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) {
+                matchType = getOptions().get(OPTION_ATTR_MATCH_TYPE).toString();
+            }
+        }
+
+        return matchType;
+    }
+
+    public String getFetchTypeOptionValue() {
+        if(getOptions() == null || !getOptions().containsKey(OPTION_FETCH_TYPE)) {
+            return FETCH_TYPE_FULL;
+        }
+
+        Object o = getOptions().get(OPTION_FETCH_TYPE);
+        if (o instanceof String) {
+            return (String) o;
+        }
+
+        return FETCH_TYPE_FULL;
+    }
+
+    public boolean getSkipLineageOptionValue() {
+        if(!getOptions().containsKey(AtlasExportRequest.OPTION_SKIP_LINEAGE)) {
+            return false;
+        }
+
+        Object o = getOptions().get(AtlasExportRequest.OPTION_SKIP_LINEAGE);
+        if(o instanceof String) {
+            return Boolean.parseBoolean((String) o);
+        }
+
+        if(o instanceof Boolean) {
+            return (Boolean) o;
+        }
+
+        return false;
+    }
+
     public StringBuilder toString(StringBuilder sb) {
         if (sb == null) {
             sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
index a5203c9..c143120 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasExportResult.java
@@ -62,14 +62,15 @@ public class AtlasExportResult implements Serializable {
     private AtlasExportData      data;
     private OperationStatus      operationStatus;
     private String               sourceClusterName;
-    private long                 lastModifiedTimestamp;
+    private long                 changeMarker;
 
     public AtlasExportResult() {
-        this(null, null, null, null, System.currentTimeMillis());
+        this(null, null, null, null, System.currentTimeMillis(), 0L);
     }
 
     public AtlasExportResult(AtlasExportRequest request,
-                             String userName, String clientIpAddress, String hostName, long timeStamp) {
+                             String userName, String clientIpAddress, String hostName, long timeStamp,
+                              long changeMarker) {
         this.request         = request;
         this.userName        = userName;
         this.clientIpAddress = clientIpAddress;
@@ -78,6 +79,7 @@ public class AtlasExportResult implements Serializable {
         this.metrics         = new HashMap<>();
         this.operationStatus = OperationStatus.FAIL;
         this.data            = new AtlasExportData();
+        this.changeMarker    = changeMarker;
     }
 
     public AtlasExportRequest getRequest() {
@@ -136,12 +138,12 @@ public class AtlasExportResult implements Serializable {
         this.data = data;
     }
 
-    public void setLastModifiedTimestamp(long lastModifiedTimestamp) {
-        this.lastModifiedTimestamp = lastModifiedTimestamp;
+    public void setChangeMarker(long changeMarker) {
+        this.changeMarker = changeMarker;
     }
 
-    public long getLastModifiedTimestamp() {
-        return this.lastModifiedTimestamp;
+    public long getChangeMarker() {
+        return this.changeMarker;
     }
 
     public OperationStatus getOperationStatus() {
@@ -173,22 +175,6 @@ public class AtlasExportResult implements Serializable {
         metrics.put(key, currentValue + incrementBy);
     }
 
-    public AtlasExportResult shallowCopy() {
-        AtlasExportResult result  = new AtlasExportResult();
-
-        result.setRequest(getRequest());
-        result.setUserName(getUserName());
-        result.setClientIpAddress(getClientIpAddress());
-        result.setHostName(getHostName());
-        result.setTimeStamp(getTimeStamp());
-        result.setMetrics(getMetrics());
-        result.setOperationStatus(getOperationStatus());
-        result.setSourceClusterName(getSourceClusterName());
-        result.setLastModifiedTimestamp(getLastModifiedTimestamp());
-
-        return result;
-    }
-
     public StringBuilder toString(StringBuilder sb) {
         if (sb == null) {
             sb = new StringBuilder();
@@ -199,14 +185,13 @@ public class AtlasExportResult implements Serializable {
         sb.append(", userName='").append(userName).append("'");
         sb.append(", clientIpAddress='").append(clientIpAddress).append("'");
         sb.append(", hostName='").append(hostName).append("'");
-        sb.append(", lastModifiedTimestamp='").append(lastModifiedTimestamp).append("'");
+        sb.append(", changeMarker='").append(changeMarker).append("'");
         sb.append(", sourceCluster='").append(sourceClusterName).append("'");
         sb.append(", timeStamp='").append(timeStamp).append("'");
         sb.append(", metrics={");
         AtlasBaseTypeDef.dumpObjects(metrics, sb);
         sb.append("}");
 
-        sb.append(", data='").append(data).append("'");
         sb.append(", operationStatus='").append(operationStatus).append("'");
         sb.append("}");
 
@@ -237,13 +222,11 @@ public class AtlasExportResult implements Serializable {
         private static final long serialVersionUID = 1L;
 
         private AtlasTypesDef            typesDef;
-        private Map<String, AtlasEntity> entities;
         private List<String>             entityCreationOrder;
 
 
         public AtlasExportData() {
             typesDef            = new AtlasTypesDef();
-            entities            = new HashMap<>();
             entityCreationOrder = new ArrayList<>();
         }
 
@@ -251,10 +234,6 @@ public class AtlasExportResult implements Serializable {
 
         public void setTypesDef(AtlasTypesDef typesDef) { this.typesDef = typesDef; }
 
-        public Map<String, AtlasEntity> getEntities() { return entities; }
-
-        public void setEntities(Map<String, AtlasEntity> entities) { this.entities = entities; }
-
         public List<String> getEntityCreationOrder() { return entityCreationOrder; }
 
         public void setEntityCreationOrder(List<String> entityCreationOrder) { this.entityCreationOrder = entityCreationOrder; }
@@ -267,7 +246,6 @@ public class AtlasExportResult implements Serializable {
             sb.append("AtlasExportData {");
             sb.append(", typesDef={").append(typesDef).append("}");
             sb.append(", entities={");
-            AtlasBaseTypeDef.dumpObjects(entities, sb);
             sb.append("}");
             sb.append(", entityCreationOrder={");
             AtlasBaseTypeDef.dumpObjects(entityCreationOrder, sb);

http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
index 30e93d5..212517b 100644
--- a/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
+++ b/intg/src/main/java/org/apache/atlas/model/impexp/AtlasImportResult.java
@@ -149,7 +149,7 @@ public class AtlasImportResult {
     }
 
     public void setExportResult(AtlasExportResult exportResult) {
-        this.exportResultWithoutData = exportResult.shallowCopy();
+        this.exportResultWithoutData = exportResult;
     }
 
     public StringBuilder toString(StringBuilder sb) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
index 5b5d022..407b406 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/AuditsWriter.java
@@ -144,7 +144,7 @@ public class AuditsWriter {
             }
 
             updateReplicationAttribute(replicationOptionState, targetServerName,
-                    entityGuids, Constants.ATTR_NAME_REPLICATED_TO, result.getLastModifiedTimestamp());
+                    entityGuids, Constants.ATTR_NAME_REPLICATED_TO, result.getChangeMarker());
         }
 
         private void saveServers() throws AtlasBaseException {
@@ -182,7 +182,7 @@ public class AuditsWriter {
             }
 
             updateReplicationAttribute(replicationOptionState, this.sourceServerName, entityGuids,
-                    Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getLastModifiedTimestamp());
+                    Constants.ATTR_NAME_REPLICATED_FROM, result.getExportResult().getChangeMarker());
         }
 
         private void saveServers() throws AtlasBaseException {

http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 97c2123..d3cff78 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.impexp.AtlasExportRequest;
@@ -93,7 +94,9 @@ public class ExportService {
     public AtlasExportResult run(ZipSink exportSink, AtlasExportRequest request, String userName, String hostName,
                                  String requestingIP) throws AtlasBaseException {
         long              startTime = System.currentTimeMillis();
-        AtlasExportResult result    = new AtlasExportResult(request, userName, requestingIP, hostName, startTime);
+        AtlasExportResult result    = new AtlasExportResult(request, userName, requestingIP,
+                hostName, startTime, getCurrentChangeMarker());
+
         ExportContext     context   = new ExportContext(atlasGraph, result, exportSink);
                     exportTypeProcessor = new ExportTypeProcessor(typeRegistry, context);
 
@@ -117,6 +120,10 @@ public class ExportService {
         return context.result;
     }
 
+    private long getCurrentChangeMarker() {
+        return RequestContext.earliestActiveRequestTime();
+    }
+
     private void updateSinkWithOperationMetrics(String userName, ExportContext context,
                                                 AtlasExportResult.OperationStatus[] statuses,
                                                 long startTime, long endTime) throws AtlasBaseException {
@@ -125,7 +132,6 @@ public class ExportService {
         context.result.getData().getEntityCreationOrder().addAll(context.lineageProcessed);
         context.sink.setExportOrder(context.result.getData().getEntityCreationOrder());
         context.sink.setTypesDef(context.result.getData().getTypesDef());
-        context.result.setLastModifiedTimestamp(context.newestLastModifiedTimestamp);
         context.result.setOperationStatus(getOverallOperationStatus(statuses));
         context.result.incrementMeticsCounter("duration", duration);
         auditsWriter.write(userName, context.result, startTime, endTime, context.result.getData().getEntityCreationOrder());
@@ -362,7 +368,7 @@ public class ExportService {
         debugLog("<== processEntity({})", guid);
     }
 
-    private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException {
+    private void getConntedEntitiesBasedOnOption(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
         switch (context.fetchType) {
             case CONNECTED:
                 getEntityGuidsForConnectedFetch(entity, context, direction);
@@ -375,7 +381,7 @@ public class ExportService {
         }
     }
 
-    private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) throws AtlasBaseException {
+    private void getEntityGuidsForConnectedFetch(AtlasEntity entity, ExportContext context, TraversalDirection direction) {
         if (direction == null || direction == TraversalDirection.UNKNOWN) {
             getConnectedEntityGuids(entity, context, TraversalDirection.OUTWARD, TraversalDirection.INWARD);
         } else {
@@ -688,8 +694,7 @@ public class ExportService {
         private final ExportFetchType     fetchType;
         private final String              matchType;
         private final boolean             skipLineage;
-        private final long                lastModifiedTimestampRequested;
-        private       long                newestLastModifiedTimestamp;
+        private final long changeMarker;
 
         private       int                 progressReportCount = 0;
 
@@ -699,45 +704,16 @@ public class ExportService {
 
             scriptEngine = atlasGraph.getGremlinScriptEngine();
             bindings     = new HashMap<>();
-            fetchType    = getFetchType(result.getRequest());
-            matchType    = getMatchType(result.getRequest());
-            skipLineage  = getOptionSkipLineage(result.getRequest());
-            this.lastModifiedTimestampRequested = getLastModifiedTimestamp(fetchType, result.getRequest());
-            this.newestLastModifiedTimestamp = 0;
+            fetchType    = ExportFetchType.from(result.getRequest().getFetchTypeOptionValue());
+            matchType    = result.getRequest().getMatchTypeOptionValue();
+            skipLineage  = result.getRequest().getSkipLineageOptionValue();
+            this.changeMarker = getChangeTokenFromOptions(fetchType, result.getRequest());
         }
 
-        private ExportFetchType getFetchType(AtlasExportRequest request) {
-            Object fetchOption = request.getOptions() != null ? request.getOptions().get(OPTION_FETCH_TYPE) : null;
-
-            if (fetchOption instanceof String) {
-                return ExportFetchType.from((String) fetchOption);
-            } else if (fetchOption instanceof ExportFetchType) {
-                return (ExportFetchType) fetchOption;
-            }
-
-            return ExportFetchType.FULL;
-        }
-
-        private String getMatchType(AtlasExportRequest request) {
-            String matchType = null;
-
-            if (MapUtils.isNotEmpty(request.getOptions())) {
-                if (request.getOptions().get(OPTION_ATTR_MATCH_TYPE) != null) {
-                    matchType = request.getOptions().get(OPTION_ATTR_MATCH_TYPE).toString();
-                }
-            }
-
-            return matchType;
-        }
-
-        private boolean getOptionSkipLineage(AtlasExportRequest request) {
-            return request.getOptions().containsKey(AtlasExportRequest.OPTION_SKIP_LINEAGE) &&
-                    (boolean) request.getOptions().get(AtlasExportRequest.OPTION_SKIP_LINEAGE);
-        }
-
-        private long getLastModifiedTimestamp(ExportFetchType fetchType, AtlasExportRequest request) {
-            if(fetchType == ExportFetchType.INCREMENTAL && request.getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME)) {
-                return Long.parseLong(request.getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME).toString());
+        private long getChangeTokenFromOptions(ExportFetchType fetchType, AtlasExportRequest request) {
+            if(fetchType == ExportFetchType.INCREMENTAL &&
+                    request.getOptions().containsKey(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER)) {
+                return Long.parseLong(request.getOptions().get(AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER).toString());
             }
 
             return 0L;
@@ -794,19 +770,7 @@ public class ExportService {
                 return true;
             }
 
-            long entityModificationTimestamp = entity.getUpdateTime().getTime();
-            updateNewestLastModifiedTimestamp(entityModificationTimestamp);
-            return doesTimestampQualify(entityModificationTimestamp);
-        }
-
-        private void updateNewestLastModifiedTimestamp(long entityModificationTimestamp) {
-            if(newestLastModifiedTimestamp < entityModificationTimestamp) {
-                newestLastModifiedTimestamp = entityModificationTimestamp;
-            }
-        }
-
-        private boolean doesTimestampQualify(long modificationTimestamp) {
-            return lastModifiedTimestampRequested < modificationTimestamp;
+            return changeMarker <= entity.getUpdateTime().getTime();
         }
 
         public boolean getSkipLineage() {
@@ -814,7 +778,6 @@ public class ExportService {
         }
 
         public void addToSink(AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasBaseException {
-            updateNewestLastModifiedTimestamp(entityWithExtInfo.getEntity().getUpdateTime().getTime());
             sink.add(entityWithExtInfo);
         }
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
index 17ebbf1..9928c54 100644
--- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
+++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSink.java
@@ -36,10 +36,11 @@ import java.util.zip.ZipOutputStream;
 public class ZipSink {
     private static final Logger LOG = LoggerFactory.getLogger(ZipSink.class);
 
+    private static String FILE_EXTENSION_JSON = ".json";
+
     private ZipOutputStream zipOutputStream;
     final Set<String>       guids = new HashSet<>();
 
-
     public ZipSink(OutputStream outputStream) {
         zipOutputStream = new ZipOutputStream(outputStream);
     }
@@ -92,7 +93,7 @@ public class ZipSink {
 
     private void saveToZip(String fileName, String jsonData) throws AtlasBaseException {
         try {
-            addToZipStream(fileName.toString() + ".json", jsonData);
+            addToZipStream(fileName.toString() + FILE_EXTENSION_JSON, jsonData);
         } catch (IOException e) {
             throw new AtlasBaseException(String.format("Error writing file %s.", fileName), e);
         }

http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
index b2dcf44..86afd7f 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
@@ -41,7 +41,7 @@ import javax.inject.Inject;
 import java.io.IOException;
 import java.util.Map;
 
-import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_FROM_TIME;
+import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL_CHANGE_MARKER;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.createTypes;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.getEntities;
 import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runExportWithParameters;
@@ -98,7 +98,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
     }
 
     private long updateTimesampForNextIncrementalExport(ZipSource source) throws AtlasBaseException {
-        return source.getExportResult().getLastModifiedTimestamp();
+        return source.getExportResult().getChangeMarker();
     }
 
     @Test(dependsOnMethods = "atT0_ReturnsAllEntities")
@@ -161,7 +161,7 @@ public class ExportIncrementalTest extends ExportImportTestBase {
     private AtlasExportRequest getIncrementalRequest(long timestamp) {
         try {
             AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(ENTITIES_SUB_DIR, EXPORT_REQUEST_INCREMENTAL, AtlasExportRequest.class);
-            request.getOptions().put(FETCH_TYPE_INCREMENTAL_FROM_TIME, timestamp);
+            request.getOptions().put(FETCH_TYPE_INCREMENTAL_CHANGE_MARKER, timestamp);
 
             return request;
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
index f894553..06bdaa6 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsShaperTest.java
@@ -72,6 +72,7 @@ public class ImportTransformsShaperTest extends ExportImportTestBase {
         assertNotNull(classification);
         assertEntities(result.getProcessedEntities(), TAG_NAME);
     }
+
     private void assertEntities(List<String> entityGuids, String tagName) throws AtlasBaseException {
         for (String guid : entityGuids) {
             AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = this.entityStore.getById(guid);

http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
index 7ffef7f..79a5e05 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ReplicationEntityAttributeTest.java
@@ -159,7 +159,7 @@ public class ReplicationEntityAttributeTest extends ExportImportTestBase {
         long actualLastModifiedTimestamp = (long) cluster.getAdditionalInfoRepl(entity.getEntity().getGuid());
 
         assertTrue(cluster.getAdditionalInfo().size() > 0);
-        assertEquals(actualLastModifiedTimestamp, importResult.getExportResult().getLastModifiedTimestamp());
+        assertEquals(actualLastModifiedTimestamp, importResult.getExportResult().getChangeMarker());
     }
 
     private AtlasExportRequest getUpdateMetaInfoUpdateRequest() {

http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
----------------------------------------------------------------------
diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
index e8bbeb5..0988a30 100644
--- a/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
+++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ZipSinkTest.java
@@ -59,7 +59,7 @@ public class ZipSinkTest {
         itemsToExport.add(new AtlasObjectId("hive_db", "qualifiedName", "default"));
         request.setItemsToExport(itemsToExport);
 
-        defaultExportResult = new AtlasExportResult(request, "admin", "1.0.0.0", "root", 100);
+        defaultExportResult = new AtlasExportResult(request, "admin", "1.0.0.0", "root", 100, 0L);
         return defaultExportResult;
     }
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/8903c9a6/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
----------------------------------------------------------------------
diff --git a/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json b/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
index c2bc867..fdd3b01 100644
--- a/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
+++ b/repository/src/test/resources/json/stocksDB-Entities/export-incremental.json
@@ -6,6 +6,6 @@
   ],
   "options": {
     "fetchType": "incremental",
-    "fromTime": 0
+    "changeMarker": 0
   }
 }