You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/09/10 19:02:51 UTC

[gobblin] branch master updated: [GOBBLIN-1533] Add completeness watermark to iceberg tables (#3385)

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 47707df  [GOBBLIN-1533] Add completeness watermark to iceberg tables (#3385)
47707df is described below

commit 47707df00a6884ada5974a5f5203408ce1efb890
Author: vbohra <vb...@linkedin.com>
AuthorDate: Fri Sep 10 12:02:42 2021 -0700

    [GOBBLIN-1533] Add completeness watermark to iceberg tables (#3385)
    
    * [GOBBLIN-1533] Add completeness watermark to iceberg tables
    
    * updated hive metadata writer test
    
    * Add apache header
    
    * Added correct default partition type
    
    * Fixed kafka audit url and logic to get topic name for iceberg table
    
    * Changes based on review
    
    * Make audit check granularity configurable
    
    * Added additional optimization to check for current hour during completion watermark calculation
    
    * optimization to skip audit check if its upto date by checking the seconds from epoch between current watermark and now
    
    * fixed test case
    
    * Replace hours from epoch with duration
    
    * Moved logging
    
    * Update partition spec with late field even when schema has been updated
---
 .../gobblin/compaction/audit/AuditCountClient.java |   4 +-
 .../compaction/audit/AuditCountClientFactory.java  |   2 +
 .../audit/KafkaAuditCountHttpClient.java           |   3 +
 .../audit/KafkaAuditCountHttpClientFactory.java    |   3 +
 .../verify/CompactionAuditCountVerifier.java       |   1 +
 .../build.gradle                                   |  22 ++-
 .../completeness}/audit/AuditCountClient.java      |  15 +-
 .../audit/AuditCountClientFactory.java             |   5 +-
 .../completeness/audit/AuditCountHttpClient.java   |  62 +++---
 .../audit/AuditCountHttpClientFactory.java         |  13 +-
 .../verifier/KafkaAuditCountVerifier.java          | 153 +++++++++++++++
 .../verifier/KafkaAuditCountVerifierTest.java      |  69 +++++++
 .../completeness/audit/TestAuditClient.java        |  27 ++-
 .../completeness/audit/TestAuditClientFactory.java |  13 +-
 gobblin-iceberg/build.gradle                       |   5 +-
 .../iceberg/writer/IcebergMetadataWriter.java      | 218 +++++++++++++++++++--
 .../writer/IcebergMetadataWriterConfigKeys.java    |  40 ++++
 .../iceberg/writer/HiveMetadataWriterTest.java     |   2 +-
 .../iceberg/writer/IcebergMetadataWriterTest.java  | 184 +++++++++++++++--
 .../java/org/apache/gobblin/time/TimeIterator.java |  39 +++-
 .../org/apache/gobblin/time/TimeIteratorTest.java  |   0
 gradle/scripts/defaultBuildProperties.gradle       |   2 +-
 settings.gradle                                    |   1 +
 23 files changed, 779 insertions(+), 104 deletions(-)

diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java
index b9179f3..5582f74 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java
@@ -22,7 +22,9 @@ import java.util.Map;
 
 /**
  * A type of client used to query the audit counts from Pinot backend
- */
+@Deprecated {@link org.apache.gobblin.completeness.audit.AuditCountClient}
+*/
+@Deprecated
 public interface AuditCountClient {
   Map<String, Long> fetch (String topic, long start, long end) throws IOException;
 }
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java
index 48748db..3c812aa 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java
@@ -21,7 +21,9 @@ import org.apache.gobblin.configuration.State;
 
 /**
  * A factory class responsible for creating {@link AuditCountClient}
+ * @Deprecated {@link org.apache.gobblin.completeness.audit.AuditCountClientFactory}
  */
+@Deprecated
 public interface AuditCountClientFactory {
   String AUDIT_COUNT_CLIENT_FACTORY = "audit.count.client.factory";
   AuditCountClient createAuditCountClient (State state);
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
index 04988fe..f75cc4d 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
@@ -40,14 +40,17 @@ import com.google.gson.JsonParser;
 import javax.annotation.concurrent.ThreadSafe;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.completeness.audit.AuditCountHttpClient;
 import org.apache.gobblin.configuration.State;
 
 /**
  * A {@link AuditCountClient} which uses {@link org.apache.http.client.HttpClient}
  * to perform audit count query.
+ * @Deprecated {@link AuditCountHttpClient}
  */
 @Slf4j
 @ThreadSafe
+@Deprecated
 public class KafkaAuditCountHttpClient implements AuditCountClient {
 
   // Keys
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java
index 21f7104..1b63031 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java
@@ -18,12 +18,15 @@
 package org.apache.gobblin.compaction.audit;
 
 import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.completeness.audit.AuditCountHttpClientFactory;
 import org.apache.gobblin.configuration.State;
 
 /**
  * Factory to create an instance of type {@link KafkaAuditCountHttpClient}
+ * @Deprecated {@link AuditCountHttpClientFactory}
  */
 @Alias("KafkaAuditCountHttpClientFactory")
+@Deprecated
 public class KafkaAuditCountHttpClientFactory implements AuditCountClientFactory {
 
   public KafkaAuditCountHttpClient createAuditCountClient (State state)  {
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
index e581bdf..c33be4c 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.util.ClassAliasResolver;
  * Use {@link AuditCountClient} to retrieve all record count across different tiers
  * Compare one specific tier (gobblin-tier) with all other refernce tiers and determine
  * if verification should be passed based on a pre-defined threshold.
+ * @TODO: 8/31/21 "Use @{@link org.apache.gobblin.completeness.verifier.KafkaAuditCountVerifier}"
  */
 @Slf4j
 public class CompactionAuditCountVerifier implements CompactionVerifier<FileSystemDataset> {
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java b/gobblin-completeness/build.gradle
similarity index 72%
copy from gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java
copy to gobblin-completeness/build.gradle
index b9179f3..4a9a20e 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java
+++ b/gobblin-completeness/build.gradle
@@ -15,14 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.compaction.audit;
+apply plugin: 'java'
+apply plugin: 'java-test-fixtures'
 
-import java.io.IOException;
-import java.util.Map;
+dependencies {
+  compile project(":gobblin-api")
+  compile externalDependency.httpclient
+
+  testCompile externalDependency.testng
+  testCompile externalDependency.mockito
 
-/**
- * A type of client used to query the audit counts from Pinot backend
- */
-public interface AuditCountClient {
-  Map<String, Long> fetch (String topic, long start, long end) throws IOException;
 }
+
+configurations {
+  compile { transitive = true }
+}
+
+ext.classification="library"
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountClient.java
similarity index 68%
copy from gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java
copy to gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountClient.java
index b9179f3..4b0401a 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java
+++ b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountClient.java
@@ -15,14 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.compaction.audit;
+package org.apache.gobblin.completeness.audit;
 
 import java.io.IOException;
 import java.util.Map;
 
+
 /**
- * A type of client used to query the audit counts from Pinot backend
+ * A type of client used to query audit counts
  */
 public interface AuditCountClient {
-  Map<String, Long> fetch (String topic, long start, long end) throws IOException;
+  /**
+   *
+   * @param datasetName query dataset
+   * @param start start timestamp in millis from epoch
+   * @param end end timestamp in millis from epoch
+   * @return a map of <tier, counts>
+   * @throws IOException
+   */
+  Map<String, Long> fetch(String datasetName, long start, long end) throws IOException;
 }
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountClientFactory.java
similarity index 90%
copy from gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java
copy to gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountClientFactory.java
index 48748db..b65d2b0 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java
+++ b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountClientFactory.java
@@ -15,14 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.compaction.audit;
+package org.apache.gobblin.completeness.audit;
 
 import org.apache.gobblin.configuration.State;
 
+
 /**
  * A factory class responsible for creating {@link AuditCountClient}
  */
 public interface AuditCountClientFactory {
   String AUDIT_COUNT_CLIENT_FACTORY = "audit.count.client.factory";
-  AuditCountClient createAuditCountClient (State state);
+  AuditCountClient createAuditCountClient(State state);
 }
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountHttpClient.java
similarity index 66%
copy from gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
copy to gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountHttpClient.java
index 04988fe..bf70d99 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
+++ b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountHttpClient.java
@@ -15,14 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.compaction.audit;
+package org.apache.gobblin.completeness.audit;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpUriRequest;
@@ -42,28 +40,29 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.State;
 
+
 /**
  * A {@link AuditCountClient} which uses {@link org.apache.http.client.HttpClient}
  * to perform audit count query.
  */
 @Slf4j
 @ThreadSafe
-public class KafkaAuditCountHttpClient implements AuditCountClient {
+public class AuditCountHttpClient implements AuditCountClient {
 
   // Keys
-  public static final String KAFKA_AUDIT_HTTP = "kafka.audit.http";
-  public static final String CONNECTION_MAX_TOTAL = KAFKA_AUDIT_HTTP + "max.total";
+  public static final String AUDIT_HTTP_PREFIX = "audit.http";
+  public static final String CONNECTION_MAX_TOTAL = AUDIT_HTTP_PREFIX + "max.total";
   public static final int DEFAULT_CONNECTION_MAX_TOTAL = 10;
-  public static final String MAX_PER_ROUTE = KAFKA_AUDIT_HTTP + "max.per.route";
+  public static final String MAX_PER_ROUTE = AUDIT_HTTP_PREFIX + "max.per.route";
   public static final int DEFAULT_MAX_PER_ROUTE = 10;
 
 
-  public static final String KAFKA_AUDIT_REST_BASE_URL = "kafka.audit.rest.base.url";
-  public static final String KAFKA_AUDIT_REST_MAX_TRIES = "kafka.audit.rest.max.tries";
-  public static final String KAFKA_AUDIT_REST_START_QUERYSTRING_KEY = "kafka.audit.rest.querystring.start";
-  public static final String KAFKA_AUDIT_REST_END_QUERYSTRING_KEY = "kafka.audit.rest.querystring.end";
-  public static final String KAFKA_AUDIT_REST_START_QUERYSTRING_DEFAULT = "begin";
-  public static final String KAFKA_AUDIT_REST_END_QUERYSTRING_DEFAULT = "end";
+  public static final String AUDIT_REST_BASE_URL = "audit.rest.base.url";
+  public static final String AUDIT_REST_MAX_TRIES = "audit.rest.max.tries";
+  public static final String AUDIT_REST_START_QUERYSTRING_KEY = "audit.rest.querystring.start";
+  public static final String AUDIT_REST_END_QUERYSTRING_KEY = "audit.rest.querystring.end";
+  public static final String AUDIT_REST_START_QUERYSTRING_DEFAULT = "start";
+  public static final String AUDIT_REST_END_QUERYSTRING_DEFAULT = "end";
 
 
   // Http Client
@@ -74,11 +73,12 @@ public class KafkaAuditCountHttpClient implements AuditCountClient {
   private final String baseUrl;
   private final String startQueryString;
   private final String endQueryString;
+  private String topicQueryString = "topic";
   private final int maxNumTries;
   /**
    * Constructor
    */
-  public KafkaAuditCountHttpClient (State state) {
+  public AuditCountHttpClient(State state) {
     int maxTotal = state.getPropAsInt(CONNECTION_MAX_TOTAL, DEFAULT_CONNECTION_MAX_TOTAL);
     int maxPerRoute = state.getPropAsInt(MAX_PER_ROUTE, DEFAULT_MAX_PER_ROUTE);
 
@@ -89,20 +89,20 @@ public class KafkaAuditCountHttpClient implements AuditCountClient {
             .setConnectionManager(cm)
             .build();
 
-    this.baseUrl = state.getProp(KAFKA_AUDIT_REST_BASE_URL);
-    this.maxNumTries = state.getPropAsInt(KAFKA_AUDIT_REST_MAX_TRIES, 5);
-    this.startQueryString = state.getProp(KAFKA_AUDIT_REST_START_QUERYSTRING_KEY, KAFKA_AUDIT_REST_START_QUERYSTRING_DEFAULT);
-    this.endQueryString = state.getProp(KAFKA_AUDIT_REST_END_QUERYSTRING_KEY, KAFKA_AUDIT_REST_END_QUERYSTRING_DEFAULT);
+    this.baseUrl = state.getProp(AUDIT_REST_BASE_URL);
+    this.maxNumTries = state.getPropAsInt(AUDIT_REST_MAX_TRIES, 5);
+    this.startQueryString = state.getProp(AUDIT_REST_START_QUERYSTRING_KEY, AUDIT_REST_START_QUERYSTRING_DEFAULT);
+    this.endQueryString = state.getProp(AUDIT_REST_END_QUERYSTRING_KEY, AUDIT_REST_END_QUERYSTRING_DEFAULT);
   }
 
 
-  public Map<String, Long> fetch (String datasetName, long start, long end)  throws IOException {
-    String fullUrl =
-            (this.baseUrl.endsWith("/") ? this.baseUrl : this.baseUrl + "/") + StringUtils.replaceChars(datasetName, '/', '.')
-                    + "?" + this.startQueryString + "=" + start + "&" + this.endQueryString + "=" + end;
+  public Map<String, Long> fetch (String topic, long start, long end)  throws IOException {
+    String fullUrl = (this.baseUrl.endsWith("/") ? this.baseUrl.substring(0, this.baseUrl.length() - 1)
+        : this.baseUrl) + "?" + this.topicQueryString + "=" + topic
+        + "&" + this.startQueryString + "=" + start + "&" + this.endQueryString + "=" + end;
     log.info("Full URL is " + fullUrl);
     String response = getHttpResponse(fullUrl);
-   return parseResponse (fullUrl, response, datasetName);
+   return parseResponse (fullUrl, response, topic);
   }
 
 
@@ -113,12 +113,8 @@ public class KafkaAuditCountHttpClient implements AuditCountClient {
    * <pre>
    * {
    *  "result": {
-   *     "hadoop-tracking-lva1tarock-08": 79341895,
-   *     "hadoop-tracking-uno-08": 79341892,
-   *     "kafka-08-tracking-local": 79341968,
-   *     "kafka-corp-lca1-tracking-agg": 79341968,
-   *     "kafka-corp-ltx1-tracking-agg": 79341968,
-   *     "producer": 69483513
+   *     "tier1": 79341895,
+   *     "tier2": 79341892,
    *   }
    * }
    * </pre>
@@ -130,15 +126,13 @@ public class KafkaAuditCountHttpClient implements AuditCountClient {
     try {
       JsonObject jsonObj = PARSER.parse(response).getAsJsonObject();
 
-      countsPerTier = jsonObj.getAsJsonObject("result");
+      countsPerTier = jsonObj.getAsJsonObject("totalsPerTier");
     } catch (Exception e) {
       throw new IOException(String.format("Unable to parse JSON response: %s for request url: %s ", response,
               fullUrl), e);
     }
 
-    Set<Map.Entry<String, JsonElement>> entrySet = countsPerTier.entrySet();
-
-    for(Map.Entry<String, JsonElement> entry : entrySet) {
+    for(Map.Entry<String, JsonElement> entry : countsPerTier.entrySet()) {
       String tier = entry.getKey();
       long count = Long.parseLong(entry.getValue().getAsString());
       result.put(tier, count);
@@ -147,8 +141,6 @@ public class KafkaAuditCountHttpClient implements AuditCountClient {
     return result;
   }
 
-
-
   private String getHttpResponse(String fullUrl) throws IOException {
     HttpUriRequest req = new HttpGet(fullUrl);
 
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountHttpClientFactory.java
similarity index 71%
copy from gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java
copy to gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountHttpClientFactory.java
index 21f7104..9c25bce 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java
+++ b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountHttpClientFactory.java
@@ -15,18 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.compaction.audit;
+package org.apache.gobblin.completeness.audit;
 
 import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.configuration.State;
 
+
 /**
- * Factory to create an instance of type {@link KafkaAuditCountHttpClient}
+ * Factory to create an instance of type {@link AuditCountHttpClient}
  */
-@Alias("KafkaAuditCountHttpClientFactory")
-public class KafkaAuditCountHttpClientFactory implements AuditCountClientFactory {
+@Alias("AuditCountHttpClientFactory")
+public class AuditCountHttpClientFactory implements AuditCountClientFactory {
 
-  public KafkaAuditCountHttpClient createAuditCountClient (State state)  {
-    return new KafkaAuditCountHttpClient(state);
+  public AuditCountHttpClient createAuditCountClient (State state)  {
+    return new AuditCountHttpClient(state);
   }
 }
diff --git a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
new file mode 100644
index 0000000..04d49a1
--- /dev/null
+++ b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.completeness.verifier;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.completeness.audit.AuditCountClient;
+import org.apache.gobblin.completeness.audit.AuditCountClientFactory;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ClassAliasResolver;
+
+
+/**
+ * Use {@link AuditCountClient} to retrieve all record count across different tiers
+ * Compare one source tier against all other reference tiers and determine
+ * if verification should be passed based on a pre-defined threshold.
+ * source tier is the tier being compared against single/multiple reference tiers
+ */
+@Slf4j
+public class KafkaAuditCountVerifier {
+  public static final String COMPLETENESS_PREFIX = "completeness.";
+  public static final String SOURCE_TIER = COMPLETENESS_PREFIX + "source.tier";
+  public static final String REFERENCE_TIERS = COMPLETENESS_PREFIX + "reference.tiers";
+  public static final String THRESHOLD = COMPLETENESS_PREFIX + "threshold";
+  private static final double DEFAULT_THRESHOLD = 0.999;
+
+  private final AuditCountClient auditCountClient;
+  private final String srcTier;
+  private final Collection<String> refTiers;
+  private final double threshold;
+
+  /**
+   * Constructor with audit count client from state
+   */
+  public KafkaAuditCountVerifier(State state) {
+    this(state, getAuditClient(state));
+  }
+
+  /**
+   * Constructor with user specified audit count client
+   */
+  public KafkaAuditCountVerifier(State state, AuditCountClient client) {
+    this.auditCountClient = client;
+    this.threshold =
+        state.getPropAsDouble(THRESHOLD, DEFAULT_THRESHOLD);
+    this.srcTier = state.getProp(SOURCE_TIER);
+    this.refTiers = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(state.getProp(REFERENCE_TIERS));
+  }
+
+  /**
+   * Obtain an {@link AuditCountClient} using a {@link AuditCountClientFactory}
+   * @param state job state
+   * @return {@link AuditCountClient}
+   */
+  private static AuditCountClient getAuditClient(State state) {
+    Preconditions.checkArgument(state.contains(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY),
+        String.format("Audit count factory %s not set ", AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY));
+    try {
+      String factoryName = state.getProp(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY);
+      ClassAliasResolver<AuditCountClientFactory> conditionClassAliasResolver = new ClassAliasResolver<>(AuditCountClientFactory.class);
+      AuditCountClientFactory factory = conditionClassAliasResolver.resolveClass(factoryName).newInstance();
+      return factory.createAuditCountClient(state);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Compare source tier against reference tiers.
+   * Compute completion percentage by srcCount/refCount. Return true iff the highest percentages is greater than threshold.
+   *
+   * @param datasetName A dataset short name like 'PageViewEvent'
+   * @param beginInMillis Unix timestamp in milliseconds
+   * @param endInMillis Unix timestamp in milliseconds
+   * @param threshold User defined threshold
+   */
+  public boolean isComplete(String datasetName, long beginInMillis, long endInMillis, double threshold)
+      throws IOException {
+    return getCompletenessPercentage(datasetName, beginInMillis, endInMillis) > threshold;
+  }
+
+  public boolean isComplete(String datasetName, long beginInMillis, long endInMillis)
+      throws IOException {
+    return isComplete(datasetName, beginInMillis, endInMillis, this.threshold);
+  }
+
+  /**
+   * Compare source tier against reference tiers. For each reference tier, calculates percentage by srcCount/refCount.
+   *
+   * @param datasetName A dataset short name like 'PageViewEvent'
+   * @param beginInMillis Unix timestamp in milliseconds
+   * @param endInMillis Unix timestamp in milliseconds
+   *
+   * @return The highest percentage value
+   */
+  private double getCompletenessPercentage(String datasetName, long beginInMillis, long endInMillis) throws IOException {
+    Map<String, Long> countsByTier = getTierAndCount(datasetName, beginInMillis, endInMillis);
+    log.info(String.format("Audit counts map for %s for range [%s,%s]", datasetName, beginInMillis, endInMillis));
+    countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+    double percent = -1;
+    if (!countsByTier.containsKey(this.srcTier)) {
+      throw new IOException(String.format("Source tier %s audit count cannot be retrieved for dataset %s between %s and %s", this.srcTier, datasetName, beginInMillis, endInMillis));
+    }
+
+    for (String refTier: this.refTiers) {
+      if (!countsByTier.containsKey(refTier)) {
+        throw new IOException(String.format("Reference tier %s audit count cannot be retrieved for dataset %s between %s and %s", refTier, datasetName, beginInMillis, endInMillis));
+      }
+      long refCount = countsByTier.get(refTier);
+      if(refCount <= 0) {
+        throw new IOException(String.format("Reference tier %s count cannot be less than or equal to zero", refTier));
+      }
+      long srcCount = countsByTier.get(this.srcTier);
+
+      percent = Double.max(percent, (double) srcCount / (double) refCount);
+    }
+
+    if (percent < 0) {
+      throw new IOException("Cannot calculate completion percentage");
+    }
+
+    return percent;
+  }
+
+  /**
+   * Fetch all <tier-count> pairs for a given dataset between a time range
+   */
+  private Map<String, Long> getTierAndCount(String datasetName, long beginInMillis, long endInMillis) throws IOException {
+    return auditCountClient.fetch(datasetName, beginInMillis, endInMillis);
+  }
+}
\ No newline at end of file
diff --git a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
new file mode 100644
index 0000000..7a005c6
--- /dev/null
+++ b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.completeness.verifier;
+
+import java.io.IOException;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.gobblin.completeness.audit.TestAuditClient;
+import org.apache.gobblin.configuration.State;
+
+@Test
+public class KafkaAuditCountVerifierTest {
+
+  public static final String SOURCE_TIER = "gobblin";
+  public static final String REFERENCE_TIERS = "producer";
+
+  public void testFetch() throws IOException {
+    final String topic = "testTopic";
+    State props = new State();
+    props.setProp(KafkaAuditCountVerifier.SOURCE_TIER, SOURCE_TIER);
+    props.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, REFERENCE_TIERS);
+    props.setProp(KafkaAuditCountVerifier.THRESHOLD, ".99");
+    TestAuditClient client = new TestAuditClient(props);
+    KafkaAuditCountVerifier verifier = new KafkaAuditCountVerifier(props, client);
+
+    // All complete
+    client.setTierCounts(ImmutableMap.of(
+        SOURCE_TIER, 1000L,
+        REFERENCE_TIERS,   1000L
+    ));
+    // Default threshold
+    Assert.assertTrue(verifier.isComplete(topic, 0L, 0L));
+
+    // 99.999 % complete
+    client.setTierCounts(ImmutableMap.of(
+        SOURCE_TIER, 999L,
+        REFERENCE_TIERS,   1000L
+    ));
+    Assert.assertTrue(verifier.isComplete(topic, 0L, 0L));
+
+    // <= 99% complete
+    client.setTierCounts(ImmutableMap.of(
+        SOURCE_TIER, 990L,
+        REFERENCE_TIERS,   1000L
+    ));
+    Assert.assertFalse(verifier.isComplete(topic, 0L, 0L));
+  }
+
+
+}
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java b/gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClient.java
similarity index 64%
copy from gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java
copy to gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClient.java
index 21f7104..4224bc5 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java
+++ b/gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClient.java
@@ -15,18 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.compaction.audit;
+package org.apache.gobblin.completeness.audit;
+
+import java.util.HashMap;
+import java.util.Map;
 
-import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.configuration.State;
 
-/**
- * Factory to create an instance of type {@link KafkaAuditCountHttpClient}
- */
-@Alias("KafkaAuditCountHttpClientFactory")
-public class KafkaAuditCountHttpClientFactory implements AuditCountClientFactory {
 
-  public KafkaAuditCountHttpClient createAuditCountClient (State state)  {
-    return new KafkaAuditCountHttpClient(state);
+public class TestAuditClient implements AuditCountClient {
+  Map<String, Long> tierCounts;
+
+  public TestAuditClient(State state) {
+    tierCounts = new HashMap<>();
+  }
+
+  public void setTierCounts(Map<String, Long> tierCounts) {
+    this.tierCounts = tierCounts;
+  }
+
+  @Override
+  public Map<String, Long> fetch(String topic, long start, long end) {
+    return tierCounts;
   }
 }
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java b/gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClientFactory.java
similarity index 74%
copy from gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java
copy to gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClientFactory.java
index 48748db..05c4f4d 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java
+++ b/gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClientFactory.java
@@ -15,14 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.compaction.audit;
+package org.apache.gobblin.completeness.audit;
 
 import org.apache.gobblin.configuration.State;
 
-/**
- * A factory class responsible for creating {@link AuditCountClient}
- */
-public interface AuditCountClientFactory {
-  String AUDIT_COUNT_CLIENT_FACTORY = "audit.count.client.factory";
-  AuditCountClient createAuditCountClient (State state);
+public class TestAuditClientFactory implements AuditCountClientFactory {
+  @Override
+  public AuditCountClient createAuditCountClient(State state) {
+    return new TestAuditClient(state);
+  }
 }
diff --git a/gobblin-iceberg/build.gradle b/gobblin-iceberg/build.gradle
index fc1ae37..fe030f8 100644
--- a/gobblin-iceberg/build.gradle
+++ b/gobblin-iceberg/build.gradle
@@ -19,6 +19,7 @@ apply plugin: 'java'
 
 dependencies {
     compile project(":gobblin-api")
+    compile project(":gobblin-completeness")
     compile project(":gobblin-core")
     compile project(":gobblin-hive-registration")
     compile project(":gobblin-metrics-libs:gobblin-metrics")
@@ -47,9 +48,11 @@ dependencies {
     compile externalDependency.findBugsAnnotations
     compile externalDependency.avroMapredH2
 
-    testCompile(group: 'org.apache.iceberg', name: 'iceberg-hive-metastore', version: '0.10.0', classifier: 'tests') {
+    testCompile(group: 'org.apache.iceberg', name: 'iceberg-hive-metastore', version: '0.11.1', classifier: 'tests') {
         transitive = false
     }
+    testCompile('org.apache.hadoop:hadoop-common:2.6.0')
+    testImplementation(testFixtures(project(":gobblin-completeness")))
     testCompile project(path: ':gobblin-modules:gobblin-kafka-common', configuration: 'tests')
     testCompile externalDependency.testng
     testCompile externalDependency.mockito
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 3abd491..f67e614 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -18,6 +18,11 @@
 package org.apache.gobblin.iceberg.writer;
 
 import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -28,6 +33,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -64,11 +71,13 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.hive.HiveCatalogs;
+import org.apache.iceberg.types.Types;
 import org.joda.time.DateTime;
 import org.joda.time.format.PeriodFormatter;
 import org.joda.time.format.PeriodFormatterBuilder;
 
 import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -85,6 +94,7 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.completeness.verifier.KafkaAuditCountVerifier;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
@@ -106,12 +116,13 @@ import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor.KafkaWatermark;
 import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.time.TimeIterator;
 import org.apache.gobblin.util.AvroUtils;
 import org.apache.gobblin.util.ClustersNames;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.ParallelRunner;
 import org.apache.gobblin.util.WriterUtils;
-
+import static org.apache.gobblin.iceberg.writer.IcebergMetadataWriterConfigKeys.*;
 
 /**
  * This writer is used to calculate iceberg metadata from GMCE and register to iceberg
@@ -148,6 +159,15 @@ public class IcebergMetadataWriter implements MetadataWriter {
   /* one of the fields in DataFile entry to describe the location URI of a data file with FS Scheme */
   private static final String ICEBERG_FILE_PATH_COLUMN = DataFile.FILE_PATH.name();
 
+  private final boolean completenessEnabled;
+  private final WhitelistBlacklist completenessWhitelistBlacklist;
+  private final String timeZone;
+  private final DateTimeFormatter HOURLY_DATEPARTITION_FORMAT;
+  private final String newPartitionColumn;
+  private final String newPartitionColumnType;
+  private Optional<KafkaAuditCountVerifier> auditCountVerifier;
+  private final String auditCheckGranularity;
+
   protected final MetricContext metricContext;
   protected EventSubmitter eventSubmitter;
   private final WhitelistBlacklist whitelistBlacklist;
@@ -165,8 +185,8 @@ public class IcebergMetadataWriter implements MetadataWriter {
   protected final Configuration conf;
   protected final ReadWriteLock readWriteLock;
   private final HiveLock locks;
-  private final ParallelRunner parallelRunner;
   private final boolean useDataLocationAsTableLocation;
+  private final ParallelRunner parallelRunner;
   private FsPermission permission;
 
   public IcebergMetadataWriter(State state) throws IOException {
@@ -198,6 +218,21 @@ public class IcebergMetadataWriter implements MetadataWriter {
           HadoopUtils.deserializeFsPermission(state, ICEBERG_METADATA_FILE_PERMISSION,
               FsPermission.getDefault());
     }
+    this.completenessEnabled = state.getPropAsBoolean(ICEBERG_COMPLETENESS_ENABLED, DEFAULT_ICEBERG_COMPLETENESS);
+    this.completenessWhitelistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_COMPLETENESS_WHITELIST, ""),
+        state.getProp(ICEBERG_COMPLETENESS_BLACKLIST, ""));
+    this.timeZone = state.getProp(TIME_ZONE_KEY, DEFAULT_TIME_ZONE);
+    this.HOURLY_DATEPARTITION_FORMAT = DateTimeFormatter.ofPattern(DATEPARTITION_FORMAT)
+        .withZone(ZoneId.of(this.timeZone));
+    this.auditCountVerifier = Optional.fromNullable(this.completenessEnabled ? new KafkaAuditCountVerifier(state) : null);
+    this.newPartitionColumn = state.getProp(NEW_PARTITION_KEY, DEFAULT_NEW_PARTITION);
+    this.newPartitionColumnType = state.getProp(NEW_PARTITION_TYPE_KEY, DEFAULT_PARTITION_COLUMN_TYPE);
+    this.auditCheckGranularity = state.getProp(AUDIT_CHECK_GRANULARITY, DEFAULT_AUDIT_CHECK_GRANULARITY);
+  }
+
+  @VisibleForTesting
+  protected void setAuditCountVerifier(KafkaAuditCountVerifier verifier) {
+    this.auditCountVerifier = Optional.of(verifier);
   }
 
   protected void initializeCatalog() {
@@ -243,6 +278,8 @@ public class IcebergMetadataWriter implements MetadataWriter {
    * The write method will be responsible for processing gmce and aggregating the metadata.
    * The logic of this function will be:
    * 1. Check whether a table exists, if not then create a iceberg table
+   *    - If completeness is enabled, Add new parititon column to
+   *      table {#NEW_PARTITION_KEY}
    * 2. Compute schema from the gmce and update the cache for candidate schemas
    * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile, dropFile or change_property.
    *
@@ -283,6 +320,12 @@ public class IcebergMetadataWriter implements MetadataWriter {
         if (gmce.getTopicPartitionOffsetsRange() != null) {
           mergeOffsets(gmce, tid);
         }
+        //compute topic name
+        if(!tableMetadata.newProperties.get().containsKey(TOPIC_NAME_KEY) &&
+            tableMetadata.dataOffsetRange.isPresent() && !tableMetadata.dataOffsetRange.get().isEmpty()) {
+          String topicPartition = tableMetadata.dataOffsetRange.get().keySet().iterator().next();
+          tableMetadata.newProperties.get().put(TOPIC_NAME_KEY, topicPartition.substring(0, topicPartition.lastIndexOf("-")));
+        }
         break;
       }
       case rewrite_files: {
@@ -364,8 +407,8 @@ public class IcebergMetadataWriter implements MetadataWriter {
 
   private void updateTableProperty(HiveSpec tableSpec, TableIdentifier tid) {
     org.apache.hadoop.hive.metastore.api.Table table = HiveMetaStoreUtils.getTable(tableSpec.getTable());
-    tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata()).newProperties =
-        Optional.of(IcebergUtils.getTableProperties(table));
+    TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
+    tableMetadata.newProperties = Optional.of(IcebergUtils.getTableProperties(table));
   }
 
   /**
@@ -422,6 +465,23 @@ public class IcebergMetadataWriter implements MetadataWriter {
     }
   }
 
+  /**
+   * Add a partition column to the schema and partition spec
+   * @param table incoming iceberg table
+   * @param fieldName name of partition column
+   * @param type datatype of partition column
+   * @return table with updated schema and partition spec
+   */
+  private Table addPartitionToIcebergTable(Table table, String fieldName, String type) {
+    if(!table.schema().columns().stream().anyMatch(x -> x.name().equalsIgnoreCase(fieldName))) {
+      table.updateSchema().addColumn(fieldName, Types.fromPrimitiveString(type)).commit();
+    }
+    if(!table.spec().fields().stream().anyMatch(x -> x.name().equalsIgnoreCase(fieldName))) {
+      table.updateSpec().addField(fieldName).commit();
+    }
+    return table;
+  }
+
   protected Table createTable(GobblinMetadataChangeEvent gmce, HiveSpec spec) throws IOException {
     String schema = gmce.getTableSchema();
     org.apache.hadoop.hive.metastore.api.Table table = HiveMetaStoreUtils.getTable(spec.getTable());
@@ -572,7 +632,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
   private Stream<DataFile> getIcebergDataFilesToBeAddedHelper(GobblinMetadataChangeEvent gmce, Table table,
       Map<String, Collection<HiveSpec>> newSpecsMap,
       TableMetadata tableMetadata) {
-    return getIcebergDataFilesToBeAdded(gmce.getNewFiles(), table.spec(), newSpecsMap,
+    return getIcebergDataFilesToBeAdded(table, tableMetadata, gmce, gmce.getNewFiles(), table.spec(), newSpecsMap,
         IcebergUtils.getSchemaIdMap(getSchemaWithOriginId(gmce), table.schema())).stream()
         .filter(dataFile -> tableMetadata.addedFiles.getIfPresent(dataFile.path()) == null);
   }
@@ -616,16 +676,29 @@ public class IcebergMetadataWriter implements MetadataWriter {
   /**
    * Method to get dataFiles with metrics information
    * This method is used to get files to be added to iceberg
+   * if completeness is enabled a new field (late) is added to table schema and partition spec
+   * computed based on datepartition and completion watermark
    * This method will call method {IcebergUtils.getIcebergDataFileWithMetric} to get DataFile for specific file path
    */
-  private Set<DataFile> getIcebergDataFilesToBeAdded(List<org.apache.gobblin.metadata.DataFile> files,
+  private Set<DataFile> getIcebergDataFilesToBeAdded(Table table, TableMetadata tableMetadata, GobblinMetadataChangeEvent gmce, List<org.apache.gobblin.metadata.DataFile> files,
       PartitionSpec partitionSpec, Map<String, Collection<HiveSpec>> newSpecsMap, Map<Integer, Integer> schemaIdMap) {
     Set<DataFile> dataFiles = new HashSet<>();
     for (org.apache.gobblin.metadata.DataFile file : files) {
       try {
-        StructLike partition = getIcebergPartitionVal(newSpecsMap.get(new Path(file.getFilePath()).getParent().toString()),
-            file.getFilePath(), partitionSpec);
-        dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file, partitionSpec, partition, conf, schemaIdMap));
+        Collection<HiveSpec> hiveSpecs = newSpecsMap.get(new Path(file.getFilePath()).getParent().toString());
+        StructLike partition = getIcebergPartitionVal(hiveSpecs, file.getFilePath(), partitionSpec);
+
+        if(tableMetadata.completenessEnabled && gmce.getOperationType() == OperationType.add_files) {
+          tableMetadata.prevCompletenessWatermark = Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
+              String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
+          // Assumes first partition value to be partitioned by date
+          // TODO Find better way to determine a partition value
+          String datepartition = partition.get(0, null);
+          partition = addLatePartitionValueToIcebergTable(table, tableMetadata,
+              hiveSpecs.iterator().next().getPartition().get(), datepartition);
+          tableMetadata.datePartitions.add(getDateTimeFromDatepartitionString(datepartition));
+        }
+        dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file, table.spec(), partition, conf, schemaIdMap));
       } catch (Exception e) {
         log.warn("Cannot get DataFile for {} dur to {}", file.getFilePath(), e);
       }
@@ -634,6 +707,46 @@ public class IcebergMetadataWriter implements MetadataWriter {
   }
 
   /**
+   * 1. Add "late" partition column to iceberg table if not exists
+   * 2. compute "late" partition value based on datepartition and completion watermark
+   * @param table
+   * @param tableMetadata
+   * @param hivePartition
+   * @param datepartition
+   * @return new iceberg partition value for file
+   */
+  private StructLike addLatePartitionValueToIcebergTable(Table table, TableMetadata tableMetadata, HivePartition hivePartition, String datepartition) {
+    table = addPartitionToIcebergTable(table, newPartitionColumn, newPartitionColumnType);
+    table.refresh();
+    PartitionSpec partitionSpec = table.spec();
+    long prevCompletenessWatermark = tableMetadata.prevCompletenessWatermark;
+    int late = isLate(datepartition, prevCompletenessWatermark);
+    List<String> partitionValues = new ArrayList<>(hivePartition.getValues());
+    partitionValues.add(String.valueOf(late));
+    return IcebergUtils.getPartition(partitionSpec.partitionType(), partitionValues);
+  }
+
+  private int isLate(String datepartition, long previousWatermark) {
+    ZonedDateTime partitionDateTime = ZonedDateTime.parse(datepartition, HOURLY_DATEPARTITION_FORMAT);
+    long partitionEpochTime = partitionDateTime.toInstant().toEpochMilli();
+    if(partitionEpochTime > previousWatermark) {
+      return 0;
+    } else if(partitionEpochTime <= previousWatermark && partitionDateTime.toLocalDate().equals(getDateFromEpochMillis(previousWatermark))) {
+      return 1;
+    } else {
+      return 2;
+    }
+  }
+
+  private LocalDate getDateFromEpochMillis(long epochMillis) {
+    return ZonedDateTime.ofInstant(Instant.ofEpochMilli(epochMillis), ZoneId.of(timeZone)).toLocalDate();
+  }
+
+  private ZonedDateTime getDateTimeFromDatepartitionString(String datepartition) {
+    return ZonedDateTime.parse(datepartition, HOURLY_DATEPARTITION_FORMAT);
+  }
+
+  /**
    * Obtain Iceberg partition value with a collection of {@link HiveSpec}.
    * @param specs A collection of {@link HiveSpec}s.
    * @param filePath URI of file, used for logging purpose in this method.
@@ -671,14 +784,30 @@ public class IcebergMetadataWriter implements MetadataWriter {
       TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new TableMetadata());
       if (tableMetadata.transaction.isPresent()) {
         Transaction transaction = tableMetadata.transaction.get();
+        Map<String, String> props = tableMetadata.newProperties.or(
+            Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
         if (tableMetadata.appendFiles.isPresent()) {
           tableMetadata.appendFiles.get().commit();
+          if(tableMetadata.completenessEnabled) {
+            String topicName = props.get(TOPIC_NAME_KEY);
+            if(topicName == null) {
+              log.error(String.format("Not performing audit check. %s is null. Please set as table property of %s.%s",
+                  TOPIC_NAME_KEY, dbName, tableName));
+            } else {
+              long newCompletenessWatermark =
+                  computeCompletenessWatermark(topicName, tableMetadata.datePartitions, tableMetadata.prevCompletenessWatermark);
+              if(newCompletenessWatermark > tableMetadata.prevCompletenessWatermark) {
+                log.info(String.format("Updating %s for %s.%s to %s", COMPLETION_WATERMARK_KEY, dbName, tableName, newCompletenessWatermark));
+                props.put(COMPLETION_WATERMARK_KEY, String.valueOf(newCompletenessWatermark));
+                props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
+                tableMetadata.newCompletenessWatermark = newCompletenessWatermark;
+              }
+            }
+          }
         }
         if (tableMetadata.deleteFiles.isPresent()) {
           tableMetadata.deleteFiles.get().commit();
         }
-        Map<String, String> props = tableMetadata.newProperties.or(
-            Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
         //Set high waterMark
         Long highWatermark = tableCurrentWatermarkMap.get(tid);
         props.put(String.format(GMCE_HIGH_WATERMARK_KEY, tableTopicPartitionMap.get(tid)), highWatermark.toString());
@@ -717,7 +846,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
         submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark);
 
         //Reset the table metadata for next accumulation period
-        tableMetadata.reset(currentProps, highWatermark);
+        tableMetadata.reset(currentProps, highWatermark, tableMetadata.newCompletenessWatermark);
         log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid.toString()));
       } else {
         log.info("There's no transaction initiated for the table {}", tid.toString());
@@ -731,6 +860,57 @@ public class IcebergMetadataWriter implements MetadataWriter {
     }
   }
 
+  /**
+   * For each timestamp in sorted collection of timestamps in descending order
+   * if timestamp is greater than previousWatermark
+   * and hour(now) > hour(prevWatermark) + 1
+   *    check audit counts for completeness between
+   *    a source and reference tier for [timestamp, timstamp + 1 unit of granularity]
+   *    If the audit count matches update the watermark to the timestamp and break
+   *    else continue
+   * else
+   *  break
+   * Using a {@link TimeIterator} that operates over a range of time in 1 unit
+   * given the start, end and granularity
+   * @param table
+   * @param timestamps a sorted set of timestamps in decreasing order
+   * @param previousWatermark previous completion watermark for the table
+   * @return updated completion watermark
+   */
+  private long computeCompletenessWatermark(String table, SortedSet<ZonedDateTime> timestamps, long previousWatermark) {
+    log.info(String.format("Compute completion watermark for %s and timestamps %s with previous watermark %s", table, timestamps, previousWatermark));
+    long completionWatermark = previousWatermark;
+    ZonedDateTime now = ZonedDateTime.now(ZoneId.of(this.timeZone));
+    try {
+      if(timestamps == null || timestamps.size() <= 0) {
+        log.error("Cannot create time iterator. Empty for null timestamps");
+        return previousWatermark;
+      }
+      TimeIterator.Granularity granularity = TimeIterator.Granularity.valueOf(this.auditCheckGranularity);
+      ZonedDateTime prevWatermarkDT = Instant.ofEpochMilli(previousWatermark)
+          .atZone(ZoneId.of(this.timeZone));
+      ZonedDateTime startDT = timestamps.first();
+      ZonedDateTime endDT = timestamps.last();
+      TimeIterator iterator = new TimeIterator(startDT, endDT, granularity, true);
+      while (iterator.hasNext()) {
+        ZonedDateTime timestampDT = iterator.next();
+        if (timestampDT.isAfter(prevWatermarkDT)
+            && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) > 1) {
+          long timestampMillis = timestampDT.toInstant().toEpochMilli();
+          if(auditCountVerifier.get().isComplete(table, timestampMillis, TimeIterator.inc(timestampDT, granularity, 1).toInstant().toEpochMilli())) {
+            completionWatermark = timestampMillis;
+            break;
+          }
+        } else {
+          break;
+        }
+      }
+    } catch (IOException e) {
+      log.warn("Exception during audit count check: ", e);
+    }
+    return completionWatermark;
+  }
+
   private void submitSnapshotCommitEvent(Snapshot snapshot, TableMetadata tableMetadata, String dbName,
       String tableName, Map<String, String> props, Long highWaterMark) {
     GobblinEventBuilder gobblinTrackingEvent =
@@ -840,6 +1020,10 @@ public class IcebergMetadataWriter implements MetadataWriter {
                 Optional.of(currentOffset - 1);
           }
           tableMetadataMap.get(tid).setDatasetName(gmce.getDatasetIdentifier().getNativeName());
+          if(this.completenessEnabled && this.completenessWhitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName())) {
+            tableMetadataMap.get(tid).setCompletenessEnabled(true);
+          }
+
           write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
           tableCurrentWatermarkMap.put(tid, currentOffset);
         } else {
@@ -885,9 +1069,14 @@ public class IcebergMetadataWriter implements MetadataWriter {
     Optional<Map<String, List<Range>>> dataOffsetRange = Optional.absent();
     Optional<String> lastSchemaVersion = Optional.absent();
     Optional<Long> lowWatermark = Optional.absent();
+    long prevCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
+    long newCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
+    SortedSet<ZonedDateTime> datePartitions = new TreeSet<>(Collections.reverseOrder());
 
     @Setter
     String datasetName;
+    @Setter
+    boolean completenessEnabled;
 
     Cache<CharSequence, String> addedFiles = CacheBuilder.newBuilder()
         .expireAfterAccess(conf.getInt(ADDED_FILES_CACHE_EXPIRING_TIME, DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME),
@@ -926,7 +1115,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
       }
     }
 
-    void reset(Map<String, String> props, Long lowWaterMark) {
+    void reset(Map<String, String> props, Long lowWaterMark, long newCompletionWatermark) {
       this.lastProperties = Optional.of(props);
       this.lastSchemaVersion = Optional.of(props.get(SCHEMA_CREATION_TIME_KEY));
       this.transaction = Optional.absent();
@@ -943,6 +1132,9 @@ public class IcebergMetadataWriter implements MetadataWriter {
       this.newProperties = Optional.absent();
       this.lowestGMCEEmittedTime = Long.MAX_VALUE;
       this.lowWatermark = Optional.of(lowWaterMark);
+      this.prevCompletenessWatermark = newCompletionWatermark;
+      this.newCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
+      this.datePartitions.clear();
     }
   }
 }
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
new file mode 100644
index 0000000..65c4e57
--- /dev/null
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.writer;
+
+public class IcebergMetadataWriterConfigKeys {
+
+  public static final String ICEBERG_COMPLETENESS_ENABLED = "iceberg.completeness.enabled";
+  public static final boolean DEFAULT_ICEBERG_COMPLETENESS = false;
+  public static final String ICEBERG_COMPLETENESS_WHITELIST = "iceberg.completeness.whitelist";
+  public static final String ICEBERG_COMPLETENESS_BLACKLIST = "iceberg.completeness.blacklist";
+  public static final String COMPLETION_WATERMARK_KEY = "completionWatermark";
+  public static final String COMPLETION_WATERMARK_TIMEZONE_KEY = "completionWatermarkTimezone";
+  public static final long DEFAULT_COMPLETION_WATERMARK = -1L;
+  public static final String TIME_ZONE_KEY = "iceberg.completeness.timezone";
+  public static final String DEFAULT_TIME_ZONE = "America/Los_Angeles";
+  public static final String DATEPARTITION_FORMAT = "yyyy-MM-dd-HH";
+  public static final String NEW_PARTITION_KEY = "iceberg.completeness.add.partition";
+  public static final String DEFAULT_NEW_PARTITION = "late";
+  public static final String NEW_PARTITION_TYPE_KEY = "iceberg.completeness.add.partition.type";
+  public static final String DEFAULT_PARTITION_COLUMN_TYPE = "string";
+  public static final String TOPIC_NAME_KEY = "topic.name";
+  public static final String AUDIT_CHECK_GRANULARITY = "iceberg.completeness.audit.check.granularity";
+  public static final String DEFAULT_AUDIT_CHECK_GRANULARITY = "HOUR";
+
+}
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
index 11756dd..70a3a2b 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
@@ -178,7 +178,7 @@ public class HiveMetadataWriterTest extends HiveMetastoreTest {
     gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
   }
 
-  @Test( priority = 3 )
+  @Test(dependsOnGroups={"icebergMetadataWriterTest"})
   public void testHiveWriteAddFileGMCE() throws IOException {
     gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
         new KafkaStreamingExtractor.KafkaWatermark(
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 2ae9859..c234675 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.file.DataFileWriter;
@@ -29,26 +30,35 @@ import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.iceberg.FindFiles;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hive.HiveMetastoreTest;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
+import org.apache.gobblin.completeness.audit.AuditCountClient;
+import org.apache.gobblin.completeness.audit.AuditCountClientFactory;
+import org.apache.gobblin.completeness.audit.TestAuditClientFactory;
+import org.apache.gobblin.completeness.verifier.KafkaAuditCountVerifier;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.hive.HivePartition;
+import org.apache.gobblin.hive.HiveRegistrationUnit;
 import org.apache.gobblin.hive.HiveTable;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
 import org.apache.gobblin.metadata.DataFile;
@@ -66,7 +76,7 @@ import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.util.ClustersNames;
 import org.apache.gobblin.util.ConfigUtils;
-
+import static org.apache.gobblin.iceberg.writer.IcebergMetadataWriterConfigKeys.*;
 
 public class IcebergMetadataWriterTest extends HiveMetastoreTest {
 
@@ -85,6 +95,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
   private String dbName = "hivedb";
 
   private GobblinMCEWriter gobblinMCEWriter;
+  private GobblinMCEWriter gobblinMCEWriterWithCompletness;
   private GobblinMCEWriter gobblinMCEWriterWithAcceptClusters;
 
   GobblinMetadataChangeEvent gmce;
@@ -104,7 +115,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
   public void setUp() throws Exception {
     Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
     startMetastore();
-    State state = ConfigUtils.configToState(ConfigUtils.propertiesToConfig(hiveConf.getAllProperties()));
+
     tmpDir = Files.createTempDir();
     hourlyDataFile_1 = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2020/03/17/08/data.avro");
     Files.createParentDirs(hourlyDataFile_1);
@@ -138,21 +149,46 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
         .setRegistrationPolicy(TestHiveRegistrationPolicyForIceberg.class.getName())
         .setRegistrationProperties(ImmutableMap.<String, String>builder().put("hive.database.name", dbName).build())
         .build();
-    state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS,
-        KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
-    state.setProp("default.hive.registration.policy",
-        TestHiveRegistrationPolicyForIceberg.class.getName());
-    state.setProp("use.data.path.as.table.location", true);
+
+    State state = getState();
     gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
-    state.setProp(GobblinMCEWriter.ACCEPTED_CLUSTER_NAMES, "randomCluster");
-    gobblinMCEWriterWithAcceptClusters = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
     ((IcebergMetadataWriter) gobblinMCEWriter.getMetadataWriters().iterator().next()).setCatalog(
         HiveMetastoreTest.catalog);
+
+    State stateWithCompletenessConfig = getStateWithCompletenessConfig();
+    gobblinMCEWriterWithCompletness = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), stateWithCompletenessConfig);
+    ((IcebergMetadataWriter) gobblinMCEWriterWithCompletness.getMetadataWriters().iterator().next()).setCatalog(
+        HiveMetastoreTest.catalog);
+
+    state.setProp(GobblinMCEWriter.ACCEPTED_CLUSTER_NAMES, "randomCluster");
+    gobblinMCEWriterWithAcceptClusters = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
+
     _avroPartitionSchema =
         SchemaBuilder.record("partitionTest").fields().name("ds").type().optional().stringType().endRecord();
   }
 
-  @Test ( priority = 0 )
+  private State getState() {
+    State state = ConfigUtils.configToState(ConfigUtils.propertiesToConfig(hiveConf.getAllProperties()));
+    state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS,
+        KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
+    state.setProp("default.hive.registration.policy",
+        TestHiveRegistrationPolicyForIceberg.class.getName());
+    state.setProp("use.data.path.as.table.location", true);
+    return state;
+  }
+
+  private State getStateWithCompletenessConfig() {
+    State state = getState();
+    state.setProp(ICEBERG_COMPLETENESS_ENABLED, true);
+    state.setProp(NEW_PARTITION_KEY, "late");
+    state.setProp(NEW_PARTITION_TYPE_KEY, "int");
+    state.setProp(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY, TestAuditClientFactory.class.getName());
+    state.setProp(KafkaAuditCountVerifier.SOURCE_TIER, "gobblin");
+    state.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, "producer");
+    return state;
+  }
+
+  @Test
   public void testWriteAddFileGMCE() throws IOException {
     // Creating a copy of gmce with static type in GenericRecord to work with writeEnvelop method
     // without risking running into type cast runtime error.
@@ -221,7 +257,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
   }
 
   //Make sure hive test execute later and close the metastore
-  @Test( priority = 1 )
+  @Test(dependsOnMethods={"testWriteAddFileGMCE"}, groups={"icebergMetadataWriterTest"})
   public void testWriteRewriteFileGMCE() throws IOException {
     gmce.setTopicPartitionOffsetsRange(null);
     FileSystem fs = FileSystem.get(new Configuration());
@@ -240,7 +276,8 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
         result = FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", filePath_1)).collect().iterator();
     Assert.assertEquals(table.currentSnapshot().allManifests().size(), 2);
     Assert.assertTrue(result.hasNext());
-    gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
+    GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+    gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
         new KafkaStreamingExtractor.KafkaWatermark(
             new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(40L))));
@@ -256,7 +293,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
     Assert.assertFalse(result.hasNext());
   }
 
-  @Test( priority = 2 )
+  @Test(dependsOnMethods={"testWriteRewriteFileGMCE"}, groups={"icebergMetadataWriterTest"} )
   public void testChangeProperty() throws IOException {
     Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), "0-3000");
@@ -273,7 +310,8 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
     gmce.setNewFiles(Lists.newArrayList(dailyFile));
     gmce.setOperationType(OperationType.change_property);
     gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "2000-4000").build());
-    gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
+    GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+    gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
         new KafkaStreamingExtractor.KafkaWatermark(
             new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(45L))));
@@ -287,6 +325,97 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
     Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"), "45");
   }
 
+  @Test(dependsOnMethods={"testChangeProperty"}, groups={"icebergMetadataWriterTest"})
+  public void testWriteAddFileGMCECompleteness() throws IOException {
+    // Creating a copy of gmce with static type in GenericRecord to work with writeEnvelop method
+    // without risking running into type cast runtime error.
+    gmce.setOperationType(OperationType.add_files);
+    File hourlyFile = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2020/03/17/10/data.avro");
+    long timestampMillis = 1584464400000L;
+    Files.createParentDirs(hourlyFile);
+    writeRecord(hourlyFile);
+    gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
+        .setFilePath(hourlyFile.toString())
+        .setFileFormat("avro")
+        .setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
+        .build()));
+    gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "3000-4000").build());
+    GenericRecord genericGmce_3000_4000 = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+    gobblinMCEWriterWithCompletness.writeEnvelope(new RecordEnvelope<>(genericGmce_3000_4000,
+        new KafkaStreamingExtractor.KafkaWatermark(
+            new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+            new LongWatermark(50L))));
+
+    Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+    Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), "0-4000");
+    Assert.assertTrue(table.spec().fields().size() == 2);
+    Assert.assertEquals(table.spec().fields().get(1).name(), "late");
+
+    // Test when completeness watermark < "2020-03-17-10"
+    KafkaAuditCountVerifier verifier = Mockito.mock(TestAuditCountVerifier.class);
+    Mockito.when(verifier.isComplete("testTopic", timestampMillis, timestampMillis + TimeUnit.HOURS.toMillis(1))).thenReturn(true);
+    ((IcebergMetadataWriter) gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
+    gobblinMCEWriterWithCompletness.flush();
+    table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+    //completeness watermark = "2020-03-17-10"
+    Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
+    Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), "America/Los_Angeles");
+    Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(timestampMillis));
+
+    Iterator<org.apache.iceberg.DataFile> dfl = FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", hourlyFile.getAbsolutePath())).collect().iterator();
+    Assert.assertTrue(dfl.hasNext());
+
+    // Test when completeness watermark is still "2020-03-17-10" but have a late file for "2020-03-17-10"
+    File hourlyFile1 = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2020/03/17/10/data1.avro");
+    Files.createParentDirs(hourlyFile1);
+    writeRecord(hourlyFile1);
+    gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
+        .setFilePath(hourlyFile1.toString())
+        .setFileFormat("avro")
+        .setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
+        .build()));
+    gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "4000-5000").build());
+    GenericRecord genericGmce_4000_5000 = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+    gobblinMCEWriterWithCompletness.writeEnvelope(new RecordEnvelope<>(genericGmce_4000_5000,
+        new KafkaStreamingExtractor.KafkaWatermark(
+            new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+            new LongWatermark(55L))));
+    gobblinMCEWriterWithCompletness.flush();
+    table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+    Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(timestampMillis));
+
+    dfl = FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", hourlyFile1.getAbsolutePath())).collect().iterator();
+    Assert.assertTrue(dfl.hasNext());
+    Assert.assertTrue(dfl.next().partition().get(1, Integer.class) == 1);
+
+    // Test when completeness watermark will advance to "2020-03-17-11"
+    File hourlyFile2 = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2020/03/17/11/data.avro");
+    long timestampMillis1 = timestampMillis + TimeUnit.HOURS.toMillis(1);
+    Files.createParentDirs(hourlyFile2);
+    writeRecord(hourlyFile2);
+    gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
+        .setFilePath(hourlyFile2.toString())
+        .setFileFormat("avro")
+        .setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
+        .build()));
+    gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "5000-6000").build());
+    GenericRecord genericGmce_5000_6000 = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+    gobblinMCEWriterWithCompletness.writeEnvelope(new RecordEnvelope<>(genericGmce_5000_6000,
+        new KafkaStreamingExtractor.KafkaWatermark(
+            new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+            new LongWatermark(60L))));
+
+    Mockito.when(verifier.isComplete("testTopic", timestampMillis1, timestampMillis1 + TimeUnit.HOURS.toMillis(1))).thenReturn(true);
+    gobblinMCEWriterWithCompletness.flush();
+    table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+    Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(timestampMillis1));
+
+    dfl = FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", hourlyFile2.getAbsolutePath())).collect().iterator();
+    Assert.assertTrue(dfl.hasNext());
+    Assert.assertTrue(dfl.next().partition().get(1, Integer.class) == 0);
+
+  }
+
   private String writeRecord(File file) throws IOException {
     GenericData.Record record = new GenericData.Record(avroDataSchema);
     record.put("id", 1L);
@@ -311,12 +440,28 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
         partitionValue = "2020-03-17-08";
       } else if (path.toString().contains("hourly/2020/03/17/09")) {
         partitionValue = "2020-03-17-09";
+      } else if (path.toString().contains("hourly/2020/03/17/10")) {
+        partitionValue = "2020-03-17-10";
+      }  else if (path.toString().contains("hourly/2020/03/17/11")) {
+        partitionValue = "2020-03-17-11";
+      } else if (path.toString().contains("hourly/2020/03/17/12")) {
+        partitionValue = "2020-03-17-12";
       } else if (path.toString().contains("daily/2020/03/17")) {
         partitionValue = "2020-03-17-00";
       }
       return Optional.of(new HivePartition.Builder().withPartitionValues(Lists.newArrayList(partitionValue))
           .withDbName("hivedb").withTableName("testIcebergTable").build());
     }
+    @Override
+    protected List<HiveTable> getTables(Path path) throws IOException {
+      List<HiveTable> tables = super.getTables(path);
+      for (HiveTable table : tables) {
+        table.setPartitionKeys(ImmutableList.<HiveRegistrationUnit.Column>of(
+            new HiveRegistrationUnit.Column("datepartition", serdeConstants.STRING_TYPE_NAME, StringUtils.EMPTY)));
+        //table.setLocation(tmpDir.getAbsolutePath());
+      }
+      return tables;
+    }
     protected Iterable<String> getDatabaseNames(Path path) {
       return Lists.newArrayList("hivedb");
     }
@@ -324,5 +469,16 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
       return Lists.newArrayList("testIcebergTable");
     }
   }
+
+  static class TestAuditCountVerifier extends KafkaAuditCountVerifier {
+
+    public TestAuditCountVerifier(State state) {
+      super(state);
+    }
+
+    public TestAuditCountVerifier(State state, AuditCountClient client) {
+      super(state, client);
+    }
+  }
 }
 
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java b/gobblin-utility/src/main/java/org/apache/gobblin/time/TimeIterator.java
similarity index 69%
rename from gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
rename to gobblin-utility/src/main/java/org/apache/gobblin/time/TimeIterator.java
index a9ef4d4..63f7cfe 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/time/TimeIterator.java
@@ -17,10 +17,14 @@
 
 package org.apache.gobblin.time;
 
+import java.time.Duration;
 import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
+import lombok.Getter;
+
 
 /**
  * A {@link TimeIterator} iterates over time points within [{@code startTime}, {@code endTime}]. It
@@ -32,28 +36,35 @@ public class TimeIterator implements Iterator {
     MINUTE, HOUR, DAY, MONTH
   }
 
+  @Getter
   private ZonedDateTime startTime;
   private ZonedDateTime endTime;
   private Granularity granularity;
+  private boolean reverse;
 
   public TimeIterator(ZonedDateTime startTime, ZonedDateTime endTime, Granularity granularity) {
+    this(startTime, endTime, granularity, false);
+  }
+
+  public TimeIterator(ZonedDateTime startTime, ZonedDateTime endTime, Granularity granularity, boolean reverse) {
     this.startTime = startTime;
     this.endTime = endTime;
     this.granularity = granularity;
+    this.reverse = reverse;
   }
 
   @Override
   public boolean hasNext() {
-    return !startTime.isAfter(endTime);
+    return (reverse) ? !endTime.isAfter(startTime) : !startTime.isAfter(endTime);
   }
 
   @Override
   public ZonedDateTime next() {
-    if (startTime.isAfter(endTime)) {
+    if ((!reverse && startTime.isAfter(endTime) || (reverse && endTime.isAfter(startTime)))) {
       throw new NoSuchElementException();
     }
     ZonedDateTime dateTime = startTime;
-    startTime = inc(startTime, granularity, 1);
+    startTime = (reverse) ? dec(startTime, granularity, 1) : inc(startTime, granularity, 1);
     return dateTime;
   }
 
@@ -95,4 +106,26 @@ public class TimeIterator implements Iterator {
     }
     throw new RuntimeException("Unsupported granularity: " + granularity);
   }
+
+  /**
+   * Return duration as long between 2 datetime objects based on granularity
+   * @param d1
+   * @param d2
+   * @param granularity
+   * @return a long representing the duration
+   */
+  public static long durationBetween(ZonedDateTime d1, ZonedDateTime d2, Granularity granularity) {
+    switch (granularity) {
+      case HOUR:
+        return Duration.between(d1, d2).toHours();
+      case MINUTE:
+        return Duration.between(d1, d2).toMinutes();
+      case DAY:
+        return Duration.between(d1, d2).toDays();
+      case MONTH:
+        return ChronoUnit.MONTHS.between(d1, d2);
+    }
+    throw new RuntimeException("Unsupported granularity: " + granularity);
+  }
+
 }
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/time/TimeIteratorTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/time/TimeIteratorTest.java
similarity index 100%
rename from gobblin-data-management/src/test/java/org/apache/gobblin/time/TimeIteratorTest.java
rename to gobblin-utility/src/test/java/org/apache/gobblin/time/TimeIteratorTest.java
diff --git a/gradle/scripts/defaultBuildProperties.gradle b/gradle/scripts/defaultBuildProperties.gradle
index fec6ef9..2d92216 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -31,7 +31,7 @@ def BuildProperties BUILD_PROPERTIES = new BuildProperties(project)
     .register(new BuildProperty("gobblinFlavor", "standard", "Build flavor (see http://gobblin.readthedocs.io/en/latest/developer-guide/GobblinModules/)"))
     .register(new BuildProperty("hadoopVersion", "2.3.0", "Hadoop dependencies version"))
     .register(new BuildProperty("hiveVersion", "1.0.1", "Hive dependencies version"))
-    .register(new BuildProperty("icebergVersion", "0.10.0", "Iceberg dependencies version"))
+    .register(new BuildProperty("icebergVersion", "0.11.1", "Iceberg dependencies version"))
     .register(new BuildProperty("jdkVersion", JavaVersion.VERSION_1_8.toString(),
     "Java languange compatibility; supported versions: " + JavaVersion.VERSION_1_8))
     .register(new BuildProperty("kafka08Version", "0.8.2.2", "Kafka 0.8 dependencies version"))
diff --git a/settings.gradle b/settings.gradle
index 210f5b2..5aa42f9 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -19,6 +19,7 @@ def modules = ['gobblin-admin',
                'gobblin-all',
                'gobblin-api',
                'gobblin-compaction',
+               'gobblin-completeness',
                'gobblin-config-management',
                'gobblin-core',
                'gobblin-core-base',