You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/09/10 19:02:51 UTC
[gobblin] branch master updated: [GOBBLIN-1533] Add completeness
watermark to iceberg tables (#3385)
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 47707df [GOBBLIN-1533] Add completeness watermark to iceberg tables (#3385)
47707df is described below
commit 47707df00a6884ada5974a5f5203408ce1efb890
Author: vbohra <vb...@linkedin.com>
AuthorDate: Fri Sep 10 12:02:42 2021 -0700
[GOBBLIN-1533] Add completeness watermark to iceberg tables (#3385)
* [GOBBLIN-1533] Add completeness watermark to iceberg tables
* updated hive metadata writer test
* Add apache header
* Added correct default partition type
* Fixed kafka audit url and logic to get topic name for iceberg table
* Changes based on review
* Make audit check granularity configurable
* Added additional optimization to check for current hour during completion watermark calculation
* optimization to skip audit check if its upto date by checking the seconds from epoch between current watermark and now
* fixed test case
* Replace hours from epoch with duration
* Moved logging
* Update partition spec with late field even when schema has been updated
---
.../gobblin/compaction/audit/AuditCountClient.java | 4 +-
.../compaction/audit/AuditCountClientFactory.java | 2 +
.../audit/KafkaAuditCountHttpClient.java | 3 +
.../audit/KafkaAuditCountHttpClientFactory.java | 3 +
.../verify/CompactionAuditCountVerifier.java | 1 +
.../build.gradle | 22 ++-
.../completeness}/audit/AuditCountClient.java | 15 +-
.../audit/AuditCountClientFactory.java | 5 +-
.../completeness/audit/AuditCountHttpClient.java | 62 +++---
.../audit/AuditCountHttpClientFactory.java | 13 +-
.../verifier/KafkaAuditCountVerifier.java | 153 +++++++++++++++
.../verifier/KafkaAuditCountVerifierTest.java | 69 +++++++
.../completeness/audit/TestAuditClient.java | 27 ++-
.../completeness/audit/TestAuditClientFactory.java | 13 +-
gobblin-iceberg/build.gradle | 5 +-
.../iceberg/writer/IcebergMetadataWriter.java | 218 +++++++++++++++++++--
.../writer/IcebergMetadataWriterConfigKeys.java | 40 ++++
.../iceberg/writer/HiveMetadataWriterTest.java | 2 +-
.../iceberg/writer/IcebergMetadataWriterTest.java | 184 +++++++++++++++--
.../java/org/apache/gobblin/time/TimeIterator.java | 39 +++-
.../org/apache/gobblin/time/TimeIteratorTest.java | 0
gradle/scripts/defaultBuildProperties.gradle | 2 +-
settings.gradle | 1 +
23 files changed, 779 insertions(+), 104 deletions(-)
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java
index b9179f3..5582f74 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java
@@ -22,7 +22,9 @@ import java.util.Map;
/**
* A type of client used to query the audit counts from Pinot backend
- */
+@Deprecated {@link org.apache.gobblin.completeness.audit.AuditCountClient}
+*/
+@Deprecated
public interface AuditCountClient {
Map<String, Long> fetch (String topic, long start, long end) throws IOException;
}
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java
index 48748db..3c812aa 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java
@@ -21,7 +21,9 @@ import org.apache.gobblin.configuration.State;
/**
* A factory class responsible for creating {@link AuditCountClient}
+ * @Deprecated {@link org.apache.gobblin.completeness.audit.AuditCountClientFactory}
*/
+@Deprecated
public interface AuditCountClientFactory {
String AUDIT_COUNT_CLIENT_FACTORY = "audit.count.client.factory";
AuditCountClient createAuditCountClient (State state);
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
index 04988fe..f75cc4d 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
@@ -40,14 +40,17 @@ import com.google.gson.JsonParser;
import javax.annotation.concurrent.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.completeness.audit.AuditCountHttpClient;
import org.apache.gobblin.configuration.State;
/**
* A {@link AuditCountClient} which uses {@link org.apache.http.client.HttpClient}
* to perform audit count query.
+ * @Deprecated {@link AuditCountHttpClient}
*/
@Slf4j
@ThreadSafe
+@Deprecated
public class KafkaAuditCountHttpClient implements AuditCountClient {
// Keys
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java
index 21f7104..1b63031 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java
@@ -18,12 +18,15 @@
package org.apache.gobblin.compaction.audit;
import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.completeness.audit.AuditCountHttpClientFactory;
import org.apache.gobblin.configuration.State;
/**
* Factory to create an instance of type {@link KafkaAuditCountHttpClient}
+ * @Deprecated {@link AuditCountHttpClientFactory}
*/
@Alias("KafkaAuditCountHttpClientFactory")
+@Deprecated
public class KafkaAuditCountHttpClientFactory implements AuditCountClientFactory {
public KafkaAuditCountHttpClient createAuditCountClient (State state) {
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
index e581bdf..c33be4c 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
@@ -44,6 +44,7 @@ import org.apache.gobblin.util.ClassAliasResolver;
* Use {@link AuditCountClient} to retrieve all record count across different tiers
* Compare one specific tier (gobblin-tier) with all other refernce tiers and determine
* if verification should be passed based on a pre-defined threshold.
+ * @TODO: 8/31/21 "Use @{@link org.apache.gobblin.completeness.verifier.KafkaAuditCountVerifier}"
*/
@Slf4j
public class CompactionAuditCountVerifier implements CompactionVerifier<FileSystemDataset> {
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java b/gobblin-completeness/build.gradle
similarity index 72%
copy from gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java
copy to gobblin-completeness/build.gradle
index b9179f3..4a9a20e 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java
+++ b/gobblin-completeness/build.gradle
@@ -15,14 +15,20 @@
* limitations under the License.
*/
-package org.apache.gobblin.compaction.audit;
+apply plugin: 'java'
+apply plugin: 'java-test-fixtures'
-import java.io.IOException;
-import java.util.Map;
+dependencies {
+ compile project(":gobblin-api")
+ compile externalDependency.httpclient
+
+ testCompile externalDependency.testng
+ testCompile externalDependency.mockito
-/**
- * A type of client used to query the audit counts from Pinot backend
- */
-public interface AuditCountClient {
- Map<String, Long> fetch (String topic, long start, long end) throws IOException;
}
+
+configurations {
+ compile { transitive = true }
+}
+
+ext.classification="library"
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountClient.java
similarity index 68%
copy from gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java
copy to gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountClient.java
index b9179f3..4b0401a 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClient.java
+++ b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountClient.java
@@ -15,14 +15,23 @@
* limitations under the License.
*/
-package org.apache.gobblin.compaction.audit;
+package org.apache.gobblin.completeness.audit;
import java.io.IOException;
import java.util.Map;
+
/**
- * A type of client used to query the audit counts from Pinot backend
+ * A type of client used to query audit counts
*/
public interface AuditCountClient {
- Map<String, Long> fetch (String topic, long start, long end) throws IOException;
+ /**
+ *
+ * @param datasetName query dataset
+ * @param start start timestamp in millis from epoch
+ * @param end end timestamp in millis from epoch
+ * @return a map of <tier, counts>
+ * @throws IOException
+ */
+ Map<String, Long> fetch(String datasetName, long start, long end) throws IOException;
}
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountClientFactory.java
similarity index 90%
copy from gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java
copy to gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountClientFactory.java
index 48748db..b65d2b0 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java
+++ b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountClientFactory.java
@@ -15,14 +15,15 @@
* limitations under the License.
*/
-package org.apache.gobblin.compaction.audit;
+package org.apache.gobblin.completeness.audit;
import org.apache.gobblin.configuration.State;
+
/**
* A factory class responsible for creating {@link AuditCountClient}
*/
public interface AuditCountClientFactory {
String AUDIT_COUNT_CLIENT_FACTORY = "audit.count.client.factory";
- AuditCountClient createAuditCountClient (State state);
+ AuditCountClient createAuditCountClient(State state);
}
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountHttpClient.java
similarity index 66%
copy from gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
copy to gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountHttpClient.java
index 04988fe..bf70d99 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java
+++ b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountHttpClient.java
@@ -15,14 +15,12 @@
* limitations under the License.
*/
-package org.apache.gobblin.compaction.audit;
+package org.apache.gobblin.completeness.audit;
import java.io.IOException;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
@@ -42,28 +40,29 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.State;
+
/**
* A {@link AuditCountClient} which uses {@link org.apache.http.client.HttpClient}
* to perform audit count query.
*/
@Slf4j
@ThreadSafe
-public class KafkaAuditCountHttpClient implements AuditCountClient {
+public class AuditCountHttpClient implements AuditCountClient {
// Keys
- public static final String KAFKA_AUDIT_HTTP = "kafka.audit.http";
- public static final String CONNECTION_MAX_TOTAL = KAFKA_AUDIT_HTTP + "max.total";
+ public static final String AUDIT_HTTP_PREFIX = "audit.http";
+ public static final String CONNECTION_MAX_TOTAL = AUDIT_HTTP_PREFIX + "max.total";
public static final int DEFAULT_CONNECTION_MAX_TOTAL = 10;
- public static final String MAX_PER_ROUTE = KAFKA_AUDIT_HTTP + "max.per.route";
+ public static final String MAX_PER_ROUTE = AUDIT_HTTP_PREFIX + "max.per.route";
public static final int DEFAULT_MAX_PER_ROUTE = 10;
- public static final String KAFKA_AUDIT_REST_BASE_URL = "kafka.audit.rest.base.url";
- public static final String KAFKA_AUDIT_REST_MAX_TRIES = "kafka.audit.rest.max.tries";
- public static final String KAFKA_AUDIT_REST_START_QUERYSTRING_KEY = "kafka.audit.rest.querystring.start";
- public static final String KAFKA_AUDIT_REST_END_QUERYSTRING_KEY = "kafka.audit.rest.querystring.end";
- public static final String KAFKA_AUDIT_REST_START_QUERYSTRING_DEFAULT = "begin";
- public static final String KAFKA_AUDIT_REST_END_QUERYSTRING_DEFAULT = "end";
+ public static final String AUDIT_REST_BASE_URL = "audit.rest.base.url";
+ public static final String AUDIT_REST_MAX_TRIES = "audit.rest.max.tries";
+ public static final String AUDIT_REST_START_QUERYSTRING_KEY = "audit.rest.querystring.start";
+ public static final String AUDIT_REST_END_QUERYSTRING_KEY = "audit.rest.querystring.end";
+ public static final String AUDIT_REST_START_QUERYSTRING_DEFAULT = "start";
+ public static final String AUDIT_REST_END_QUERYSTRING_DEFAULT = "end";
// Http Client
@@ -74,11 +73,12 @@ public class KafkaAuditCountHttpClient implements AuditCountClient {
private final String baseUrl;
private final String startQueryString;
private final String endQueryString;
+ private String topicQueryString = "topic";
private final int maxNumTries;
/**
* Constructor
*/
- public KafkaAuditCountHttpClient (State state) {
+ public AuditCountHttpClient(State state) {
int maxTotal = state.getPropAsInt(CONNECTION_MAX_TOTAL, DEFAULT_CONNECTION_MAX_TOTAL);
int maxPerRoute = state.getPropAsInt(MAX_PER_ROUTE, DEFAULT_MAX_PER_ROUTE);
@@ -89,20 +89,20 @@ public class KafkaAuditCountHttpClient implements AuditCountClient {
.setConnectionManager(cm)
.build();
- this.baseUrl = state.getProp(KAFKA_AUDIT_REST_BASE_URL);
- this.maxNumTries = state.getPropAsInt(KAFKA_AUDIT_REST_MAX_TRIES, 5);
- this.startQueryString = state.getProp(KAFKA_AUDIT_REST_START_QUERYSTRING_KEY, KAFKA_AUDIT_REST_START_QUERYSTRING_DEFAULT);
- this.endQueryString = state.getProp(KAFKA_AUDIT_REST_END_QUERYSTRING_KEY, KAFKA_AUDIT_REST_END_QUERYSTRING_DEFAULT);
+ this.baseUrl = state.getProp(AUDIT_REST_BASE_URL);
+ this.maxNumTries = state.getPropAsInt(AUDIT_REST_MAX_TRIES, 5);
+ this.startQueryString = state.getProp(AUDIT_REST_START_QUERYSTRING_KEY, AUDIT_REST_START_QUERYSTRING_DEFAULT);
+ this.endQueryString = state.getProp(AUDIT_REST_END_QUERYSTRING_KEY, AUDIT_REST_END_QUERYSTRING_DEFAULT);
}
- public Map<String, Long> fetch (String datasetName, long start, long end) throws IOException {
- String fullUrl =
- (this.baseUrl.endsWith("/") ? this.baseUrl : this.baseUrl + "/") + StringUtils.replaceChars(datasetName, '/', '.')
- + "?" + this.startQueryString + "=" + start + "&" + this.endQueryString + "=" + end;
+ public Map<String, Long> fetch (String topic, long start, long end) throws IOException {
+ String fullUrl = (this.baseUrl.endsWith("/") ? this.baseUrl.substring(0, this.baseUrl.length() - 1)
+ : this.baseUrl) + "?" + this.topicQueryString + "=" + topic
+ + "&" + this.startQueryString + "=" + start + "&" + this.endQueryString + "=" + end;
log.info("Full URL is " + fullUrl);
String response = getHttpResponse(fullUrl);
- return parseResponse (fullUrl, response, datasetName);
+ return parseResponse (fullUrl, response, topic);
}
@@ -113,12 +113,8 @@ public class KafkaAuditCountHttpClient implements AuditCountClient {
* <pre>
* {
* "result": {
- * "hadoop-tracking-lva1tarock-08": 79341895,
- * "hadoop-tracking-uno-08": 79341892,
- * "kafka-08-tracking-local": 79341968,
- * "kafka-corp-lca1-tracking-agg": 79341968,
- * "kafka-corp-ltx1-tracking-agg": 79341968,
- * "producer": 69483513
+ * "tier1": 79341895,
+ * "tier2": 79341892,
* }
* }
* </pre>
@@ -130,15 +126,13 @@ public class KafkaAuditCountHttpClient implements AuditCountClient {
try {
JsonObject jsonObj = PARSER.parse(response).getAsJsonObject();
- countsPerTier = jsonObj.getAsJsonObject("result");
+ countsPerTier = jsonObj.getAsJsonObject("totalsPerTier");
} catch (Exception e) {
throw new IOException(String.format("Unable to parse JSON response: %s for request url: %s ", response,
fullUrl), e);
}
- Set<Map.Entry<String, JsonElement>> entrySet = countsPerTier.entrySet();
-
- for(Map.Entry<String, JsonElement> entry : entrySet) {
+ for(Map.Entry<String, JsonElement> entry : countsPerTier.entrySet()) {
String tier = entry.getKey();
long count = Long.parseLong(entry.getValue().getAsString());
result.put(tier, count);
@@ -147,8 +141,6 @@ public class KafkaAuditCountHttpClient implements AuditCountClient {
return result;
}
-
-
private String getHttpResponse(String fullUrl) throws IOException {
HttpUriRequest req = new HttpGet(fullUrl);
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountHttpClientFactory.java
similarity index 71%
copy from gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java
copy to gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountHttpClientFactory.java
index 21f7104..9c25bce 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java
+++ b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/audit/AuditCountHttpClientFactory.java
@@ -15,18 +15,19 @@
* limitations under the License.
*/
-package org.apache.gobblin.compaction.audit;
+package org.apache.gobblin.completeness.audit;
import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.configuration.State;
+
/**
- * Factory to create an instance of type {@link KafkaAuditCountHttpClient}
+ * Factory to create an instance of type {@link AuditCountHttpClient}
*/
-@Alias("KafkaAuditCountHttpClientFactory")
-public class KafkaAuditCountHttpClientFactory implements AuditCountClientFactory {
+@Alias("AuditCountHttpClientFactory")
+public class AuditCountHttpClientFactory implements AuditCountClientFactory {
- public KafkaAuditCountHttpClient createAuditCountClient (State state) {
- return new KafkaAuditCountHttpClient(state);
+ public AuditCountHttpClient createAuditCountClient (State state) {
+ return new AuditCountHttpClient(state);
}
}
diff --git a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
new file mode 100644
index 0000000..04d49a1
--- /dev/null
+++ b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.completeness.verifier;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.completeness.audit.AuditCountClient;
+import org.apache.gobblin.completeness.audit.AuditCountClientFactory;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ClassAliasResolver;
+
+
+/**
+ * Use {@link AuditCountClient} to retrieve all record count across different tiers
+ * Compare one source tier against all other reference tiers and determine
+ * if verification should be passed based on a pre-defined threshold.
+ * source tier is the tier being compared against single/multiple reference tiers
+ */
+@Slf4j
+public class KafkaAuditCountVerifier {
+ public static final String COMPLETENESS_PREFIX = "completeness.";
+ public static final String SOURCE_TIER = COMPLETENESS_PREFIX + "source.tier";
+ public static final String REFERENCE_TIERS = COMPLETENESS_PREFIX + "reference.tiers";
+ public static final String THRESHOLD = COMPLETENESS_PREFIX + "threshold";
+ private static final double DEFAULT_THRESHOLD = 0.999;
+
+ private final AuditCountClient auditCountClient;
+ private final String srcTier;
+ private final Collection<String> refTiers;
+ private final double threshold;
+
+ /**
+ * Constructor with audit count client from state
+ */
+ public KafkaAuditCountVerifier(State state) {
+ this(state, getAuditClient(state));
+ }
+
+ /**
+ * Constructor with user specified audit count client
+ */
+ public KafkaAuditCountVerifier(State state, AuditCountClient client) {
+ this.auditCountClient = client;
+ this.threshold =
+ state.getPropAsDouble(THRESHOLD, DEFAULT_THRESHOLD);
+ this.srcTier = state.getProp(SOURCE_TIER);
+ this.refTiers = Splitter.on(",").omitEmptyStrings().trimResults().splitToList(state.getProp(REFERENCE_TIERS));
+ }
+
+ /**
+ * Obtain an {@link AuditCountClient} using a {@link AuditCountClientFactory}
+ * @param state job state
+ * @return {@link AuditCountClient}
+ */
+ private static AuditCountClient getAuditClient(State state) {
+ Preconditions.checkArgument(state.contains(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY),
+ String.format("Audit count factory %s not set ", AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY));
+ try {
+ String factoryName = state.getProp(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY);
+ ClassAliasResolver<AuditCountClientFactory> conditionClassAliasResolver = new ClassAliasResolver<>(AuditCountClientFactory.class);
+ AuditCountClientFactory factory = conditionClassAliasResolver.resolveClass(factoryName).newInstance();
+ return factory.createAuditCountClient(state);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Compare source tier against reference tiers.
+ * Compute completion percentage by srcCount/refCount. Return true iff the highest percentages is greater than threshold.
+ *
+ * @param datasetName A dataset short name like 'PageViewEvent'
+ * @param beginInMillis Unix timestamp in milliseconds
+ * @param endInMillis Unix timestamp in milliseconds
+ * @param threshold User defined threshold
+ */
+ public boolean isComplete(String datasetName, long beginInMillis, long endInMillis, double threshold)
+ throws IOException {
+ return getCompletenessPercentage(datasetName, beginInMillis, endInMillis) > threshold;
+ }
+
+ public boolean isComplete(String datasetName, long beginInMillis, long endInMillis)
+ throws IOException {
+ return isComplete(datasetName, beginInMillis, endInMillis, this.threshold);
+ }
+
+ /**
+ * Compare source tier against reference tiers. For each reference tier, calculates percentage by srcCount/refCount.
+ *
+ * @param datasetName A dataset short name like 'PageViewEvent'
+ * @param beginInMillis Unix timestamp in milliseconds
+ * @param endInMillis Unix timestamp in milliseconds
+ *
+ * @return The highest percentage value
+ */
+ private double getCompletenessPercentage(String datasetName, long beginInMillis, long endInMillis) throws IOException {
+ Map<String, Long> countsByTier = getTierAndCount(datasetName, beginInMillis, endInMillis);
+ log.info(String.format("Audit counts map for %s for range [%s,%s]", datasetName, beginInMillis, endInMillis));
+ countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+ double percent = -1;
+ if (!countsByTier.containsKey(this.srcTier)) {
+ throw new IOException(String.format("Source tier %s audit count cannot be retrieved for dataset %s between %s and %s", this.srcTier, datasetName, beginInMillis, endInMillis));
+ }
+
+ for (String refTier: this.refTiers) {
+ if (!countsByTier.containsKey(refTier)) {
+ throw new IOException(String.format("Reference tier %s audit count cannot be retrieved for dataset %s between %s and %s", refTier, datasetName, beginInMillis, endInMillis));
+ }
+ long refCount = countsByTier.get(refTier);
+ if(refCount <= 0) {
+ throw new IOException(String.format("Reference tier %s count cannot be less than or equal to zero", refTier));
+ }
+ long srcCount = countsByTier.get(this.srcTier);
+
+ percent = Double.max(percent, (double) srcCount / (double) refCount);
+ }
+
+ if (percent < 0) {
+ throw new IOException("Cannot calculate completion percentage");
+ }
+
+ return percent;
+ }
+
+ /**
+ * Fetch all <tier-count> pairs for a given dataset between a time range
+ */
+ private Map<String, Long> getTierAndCount(String datasetName, long beginInMillis, long endInMillis) throws IOException {
+ return auditCountClient.fetch(datasetName, beginInMillis, endInMillis);
+ }
+}
\ No newline at end of file
diff --git a/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
new file mode 100644
index 0000000..7a005c6
--- /dev/null
+++ b/gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.completeness.verifier;
+
+import java.io.IOException;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.gobblin.completeness.audit.TestAuditClient;
+import org.apache.gobblin.configuration.State;
+
+@Test
+public class KafkaAuditCountVerifierTest {
+
+ public static final String SOURCE_TIER = "gobblin";
+ public static final String REFERENCE_TIERS = "producer";
+
+ public void testFetch() throws IOException {
+ final String topic = "testTopic";
+ State props = new State();
+ props.setProp(KafkaAuditCountVerifier.SOURCE_TIER, SOURCE_TIER);
+ props.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, REFERENCE_TIERS);
+ props.setProp(KafkaAuditCountVerifier.THRESHOLD, ".99");
+ TestAuditClient client = new TestAuditClient(props);
+ KafkaAuditCountVerifier verifier = new KafkaAuditCountVerifier(props, client);
+
+ // All complete
+ client.setTierCounts(ImmutableMap.of(
+ SOURCE_TIER, 1000L,
+ REFERENCE_TIERS, 1000L
+ ));
+ // Default threshold
+ Assert.assertTrue(verifier.isComplete(topic, 0L, 0L));
+
+ // 99.999 % complete
+ client.setTierCounts(ImmutableMap.of(
+ SOURCE_TIER, 999L,
+ REFERENCE_TIERS, 1000L
+ ));
+ Assert.assertTrue(verifier.isComplete(topic, 0L, 0L));
+
+ // <= 99% complete
+ client.setTierCounts(ImmutableMap.of(
+ SOURCE_TIER, 990L,
+ REFERENCE_TIERS, 1000L
+ ));
+ Assert.assertFalse(verifier.isComplete(topic, 0L, 0L));
+ }
+
+
+}
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java b/gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClient.java
similarity index 64%
copy from gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java
copy to gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClient.java
index 21f7104..4224bc5 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClientFactory.java
+++ b/gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClient.java
@@ -15,18 +15,27 @@
* limitations under the License.
*/
-package org.apache.gobblin.compaction.audit;
+package org.apache.gobblin.completeness.audit;
+
+import java.util.HashMap;
+import java.util.Map;
-import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.configuration.State;
-/**
- * Factory to create an instance of type {@link KafkaAuditCountHttpClient}
- */
-@Alias("KafkaAuditCountHttpClientFactory")
-public class KafkaAuditCountHttpClientFactory implements AuditCountClientFactory {
- public KafkaAuditCountHttpClient createAuditCountClient (State state) {
- return new KafkaAuditCountHttpClient(state);
+public class TestAuditClient implements AuditCountClient {
+ Map<String, Long> tierCounts;
+
+ public TestAuditClient(State state) {
+ tierCounts = new HashMap<>();
+ }
+
+ public void setTierCounts(Map<String, Long> tierCounts) {
+ this.tierCounts = tierCounts;
+ }
+
+ @Override
+ public Map<String, Long> fetch(String topic, long start, long end) {
+ return tierCounts;
}
}
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java b/gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClientFactory.java
similarity index 74%
copy from gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java
copy to gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClientFactory.java
index 48748db..05c4f4d 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/AuditCountClientFactory.java
+++ b/gobblin-completeness/src/testFixtures/java/org/apache/gobblin/completeness/audit/TestAuditClientFactory.java
@@ -15,14 +15,13 @@
* limitations under the License.
*/
-package org.apache.gobblin.compaction.audit;
+package org.apache.gobblin.completeness.audit;
import org.apache.gobblin.configuration.State;
-/**
- * A factory class responsible for creating {@link AuditCountClient}
- */
-public interface AuditCountClientFactory {
- String AUDIT_COUNT_CLIENT_FACTORY = "audit.count.client.factory";
- AuditCountClient createAuditCountClient (State state);
+public class TestAuditClientFactory implements AuditCountClientFactory {
+ @Override
+ public AuditCountClient createAuditCountClient(State state) {
+ return new TestAuditClient(state);
+ }
}
diff --git a/gobblin-iceberg/build.gradle b/gobblin-iceberg/build.gradle
index fc1ae37..fe030f8 100644
--- a/gobblin-iceberg/build.gradle
+++ b/gobblin-iceberg/build.gradle
@@ -19,6 +19,7 @@ apply plugin: 'java'
dependencies {
compile project(":gobblin-api")
+ compile project(":gobblin-completeness")
compile project(":gobblin-core")
compile project(":gobblin-hive-registration")
compile project(":gobblin-metrics-libs:gobblin-metrics")
@@ -47,9 +48,11 @@ dependencies {
compile externalDependency.findBugsAnnotations
compile externalDependency.avroMapredH2
- testCompile(group: 'org.apache.iceberg', name: 'iceberg-hive-metastore', version: '0.10.0', classifier: 'tests') {
+ testCompile(group: 'org.apache.iceberg', name: 'iceberg-hive-metastore', version: '0.11.1', classifier: 'tests') {
transitive = false
}
+ testCompile('org.apache.hadoop:hadoop-common:2.6.0')
+ testImplementation(testFixtures(project(":gobblin-completeness")))
testCompile project(path: ':gobblin-modules:gobblin-kafka-common', configuration: 'tests')
testCompile externalDependency.testng
testCompile externalDependency.mockito
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 3abd491..f67e614 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -18,6 +18,11 @@
package org.apache.gobblin.iceberg.writer;
import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -28,6 +33,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@@ -64,11 +71,13 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.HiveCatalogs;
+import org.apache.iceberg.types.Types;
import org.joda.time.DateTime;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -85,6 +94,7 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.completeness.verifier.KafkaAuditCountVerifier;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
@@ -106,12 +116,13 @@ import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor.KafkaWatermark;
import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.time.TimeIterator;
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.WriterUtils;
-
+import static org.apache.gobblin.iceberg.writer.IcebergMetadataWriterConfigKeys.*;
/**
* This writer is used to calculate iceberg metadata from GMCE and register to iceberg
@@ -148,6 +159,15 @@ public class IcebergMetadataWriter implements MetadataWriter {
/* one of the fields in DataFile entry to describe the location URI of a data file with FS Scheme */
private static final String ICEBERG_FILE_PATH_COLUMN = DataFile.FILE_PATH.name();
+ private final boolean completenessEnabled;
+ private final WhitelistBlacklist completenessWhitelistBlacklist;
+ private final String timeZone;
+ private final DateTimeFormatter HOURLY_DATEPARTITION_FORMAT;
+ private final String newPartitionColumn;
+ private final String newPartitionColumnType;
+ private Optional<KafkaAuditCountVerifier> auditCountVerifier;
+ private final String auditCheckGranularity;
+
protected final MetricContext metricContext;
protected EventSubmitter eventSubmitter;
private final WhitelistBlacklist whitelistBlacklist;
@@ -165,8 +185,8 @@ public class IcebergMetadataWriter implements MetadataWriter {
protected final Configuration conf;
protected final ReadWriteLock readWriteLock;
private final HiveLock locks;
- private final ParallelRunner parallelRunner;
private final boolean useDataLocationAsTableLocation;
+ private final ParallelRunner parallelRunner;
private FsPermission permission;
public IcebergMetadataWriter(State state) throws IOException {
@@ -198,6 +218,21 @@ public class IcebergMetadataWriter implements MetadataWriter {
HadoopUtils.deserializeFsPermission(state, ICEBERG_METADATA_FILE_PERMISSION,
FsPermission.getDefault());
}
+ this.completenessEnabled = state.getPropAsBoolean(ICEBERG_COMPLETENESS_ENABLED, DEFAULT_ICEBERG_COMPLETENESS);
+ this.completenessWhitelistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_COMPLETENESS_WHITELIST, ""),
+ state.getProp(ICEBERG_COMPLETENESS_BLACKLIST, ""));
+ this.timeZone = state.getProp(TIME_ZONE_KEY, DEFAULT_TIME_ZONE);
+ this.HOURLY_DATEPARTITION_FORMAT = DateTimeFormatter.ofPattern(DATEPARTITION_FORMAT)
+ .withZone(ZoneId.of(this.timeZone));
+ this.auditCountVerifier = Optional.fromNullable(this.completenessEnabled ? new KafkaAuditCountVerifier(state) : null);
+ this.newPartitionColumn = state.getProp(NEW_PARTITION_KEY, DEFAULT_NEW_PARTITION);
+ this.newPartitionColumnType = state.getProp(NEW_PARTITION_TYPE_KEY, DEFAULT_PARTITION_COLUMN_TYPE);
+ this.auditCheckGranularity = state.getProp(AUDIT_CHECK_GRANULARITY, DEFAULT_AUDIT_CHECK_GRANULARITY);
+ }
+
+ @VisibleForTesting
+ protected void setAuditCountVerifier(KafkaAuditCountVerifier verifier) {
+ this.auditCountVerifier = Optional.of(verifier);
}
protected void initializeCatalog() {
@@ -243,6 +278,8 @@ public class IcebergMetadataWriter implements MetadataWriter {
* The write method will be responsible for processing gmce and aggregating the metadata.
* The logic of this function will be:
* 1. Check whether a table exists, if not then create a iceberg table
+ * - If completeness is enabled, Add new parititon column to
+ * table {#NEW_PARTITION_KEY}
* 2. Compute schema from the gmce and update the cache for candidate schemas
* 3. Do the required operation of the gmce, i.e. addFile, rewriteFile, dropFile or change_property.
*
@@ -283,6 +320,12 @@ public class IcebergMetadataWriter implements MetadataWriter {
if (gmce.getTopicPartitionOffsetsRange() != null) {
mergeOffsets(gmce, tid);
}
+ //compute topic name
+ if(!tableMetadata.newProperties.get().containsKey(TOPIC_NAME_KEY) &&
+ tableMetadata.dataOffsetRange.isPresent() && !tableMetadata.dataOffsetRange.get().isEmpty()) {
+ String topicPartition = tableMetadata.dataOffsetRange.get().keySet().iterator().next();
+ tableMetadata.newProperties.get().put(TOPIC_NAME_KEY, topicPartition.substring(0, topicPartition.lastIndexOf("-")));
+ }
break;
}
case rewrite_files: {
@@ -364,8 +407,8 @@ public class IcebergMetadataWriter implements MetadataWriter {
private void updateTableProperty(HiveSpec tableSpec, TableIdentifier tid) {
org.apache.hadoop.hive.metastore.api.Table table = HiveMetaStoreUtils.getTable(tableSpec.getTable());
- tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata()).newProperties =
- Optional.of(IcebergUtils.getTableProperties(table));
+ TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
+ tableMetadata.newProperties = Optional.of(IcebergUtils.getTableProperties(table));
}
/**
@@ -422,6 +465,23 @@ public class IcebergMetadataWriter implements MetadataWriter {
}
}
+ /**
+ * Add a partition column to the schema and partition spec
+ * @param table incoming iceberg table
+ * @param fieldName name of partition column
+ * @param type datatype of partition column
+ * @return table with updated schema and partition spec
+ */
+ private Table addPartitionToIcebergTable(Table table, String fieldName, String type) {
+ if(!table.schema().columns().stream().anyMatch(x -> x.name().equalsIgnoreCase(fieldName))) {
+ table.updateSchema().addColumn(fieldName, Types.fromPrimitiveString(type)).commit();
+ }
+ if(!table.spec().fields().stream().anyMatch(x -> x.name().equalsIgnoreCase(fieldName))) {
+ table.updateSpec().addField(fieldName).commit();
+ }
+ return table;
+ }
+
protected Table createTable(GobblinMetadataChangeEvent gmce, HiveSpec spec) throws IOException {
String schema = gmce.getTableSchema();
org.apache.hadoop.hive.metastore.api.Table table = HiveMetaStoreUtils.getTable(spec.getTable());
@@ -572,7 +632,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
private Stream<DataFile> getIcebergDataFilesToBeAddedHelper(GobblinMetadataChangeEvent gmce, Table table,
Map<String, Collection<HiveSpec>> newSpecsMap,
TableMetadata tableMetadata) {
- return getIcebergDataFilesToBeAdded(gmce.getNewFiles(), table.spec(), newSpecsMap,
+ return getIcebergDataFilesToBeAdded(table, tableMetadata, gmce, gmce.getNewFiles(), table.spec(), newSpecsMap,
IcebergUtils.getSchemaIdMap(getSchemaWithOriginId(gmce), table.schema())).stream()
.filter(dataFile -> tableMetadata.addedFiles.getIfPresent(dataFile.path()) == null);
}
@@ -616,16 +676,29 @@ public class IcebergMetadataWriter implements MetadataWriter {
/**
* Method to get dataFiles with metrics information
* This method is used to get files to be added to iceberg
+ * if completeness is enabled a new field (late) is added to table schema and partition spec
+ * computed based on datepartition and completion watermark
* This method will call method {IcebergUtils.getIcebergDataFileWithMetric} to get DataFile for specific file path
*/
- private Set<DataFile> getIcebergDataFilesToBeAdded(List<org.apache.gobblin.metadata.DataFile> files,
+ private Set<DataFile> getIcebergDataFilesToBeAdded(Table table, TableMetadata tableMetadata, GobblinMetadataChangeEvent gmce, List<org.apache.gobblin.metadata.DataFile> files,
PartitionSpec partitionSpec, Map<String, Collection<HiveSpec>> newSpecsMap, Map<Integer, Integer> schemaIdMap) {
Set<DataFile> dataFiles = new HashSet<>();
for (org.apache.gobblin.metadata.DataFile file : files) {
try {
- StructLike partition = getIcebergPartitionVal(newSpecsMap.get(new Path(file.getFilePath()).getParent().toString()),
- file.getFilePath(), partitionSpec);
- dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file, partitionSpec, partition, conf, schemaIdMap));
+ Collection<HiveSpec> hiveSpecs = newSpecsMap.get(new Path(file.getFilePath()).getParent().toString());
+ StructLike partition = getIcebergPartitionVal(hiveSpecs, file.getFilePath(), partitionSpec);
+
+ if(tableMetadata.completenessEnabled && gmce.getOperationType() == OperationType.add_files) {
+ tableMetadata.prevCompletenessWatermark = Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
+ String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
+ // Assumes first partition value to be partitioned by date
+ // TODO Find better way to determine a partition value
+ String datepartition = partition.get(0, null);
+ partition = addLatePartitionValueToIcebergTable(table, tableMetadata,
+ hiveSpecs.iterator().next().getPartition().get(), datepartition);
+ tableMetadata.datePartitions.add(getDateTimeFromDatepartitionString(datepartition));
+ }
+ dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file, table.spec(), partition, conf, schemaIdMap));
} catch (Exception e) {
log.warn("Cannot get DataFile for {} dur to {}", file.getFilePath(), e);
}
@@ -634,6 +707,46 @@ public class IcebergMetadataWriter implements MetadataWriter {
}
/**
+ * 1. Add "late" partition column to iceberg table if not exists
+ * 2. compute "late" partition value based on datepartition and completion watermark
+ * @param table
+ * @param tableMetadata
+ * @param hivePartition
+ * @param datepartition
+ * @return new iceberg partition value for file
+ */
+ private StructLike addLatePartitionValueToIcebergTable(Table table, TableMetadata tableMetadata, HivePartition hivePartition, String datepartition) {
+ table = addPartitionToIcebergTable(table, newPartitionColumn, newPartitionColumnType);
+ table.refresh();
+ PartitionSpec partitionSpec = table.spec();
+ long prevCompletenessWatermark = tableMetadata.prevCompletenessWatermark;
+ int late = isLate(datepartition, prevCompletenessWatermark);
+ List<String> partitionValues = new ArrayList<>(hivePartition.getValues());
+ partitionValues.add(String.valueOf(late));
+ return IcebergUtils.getPartition(partitionSpec.partitionType(), partitionValues);
+ }
+
+ private int isLate(String datepartition, long previousWatermark) {
+ ZonedDateTime partitionDateTime = ZonedDateTime.parse(datepartition, HOURLY_DATEPARTITION_FORMAT);
+ long partitionEpochTime = partitionDateTime.toInstant().toEpochMilli();
+ if(partitionEpochTime > previousWatermark) {
+ return 0;
+ } else if(partitionEpochTime <= previousWatermark && partitionDateTime.toLocalDate().equals(getDateFromEpochMillis(previousWatermark))) {
+ return 1;
+ } else {
+ return 2;
+ }
+ }
+
+ private LocalDate getDateFromEpochMillis(long epochMillis) {
+ return ZonedDateTime.ofInstant(Instant.ofEpochMilli(epochMillis), ZoneId.of(timeZone)).toLocalDate();
+ }
+
+ private ZonedDateTime getDateTimeFromDatepartitionString(String datepartition) {
+ return ZonedDateTime.parse(datepartition, HOURLY_DATEPARTITION_FORMAT);
+ }
+
+ /**
* Obtain Iceberg partition value with a collection of {@link HiveSpec}.
* @param specs A collection of {@link HiveSpec}s.
* @param filePath URI of file, used for logging purpose in this method.
@@ -671,14 +784,30 @@ public class IcebergMetadataWriter implements MetadataWriter {
TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new TableMetadata());
if (tableMetadata.transaction.isPresent()) {
Transaction transaction = tableMetadata.transaction.get();
+ Map<String, String> props = tableMetadata.newProperties.or(
+ Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
+ if(tableMetadata.completenessEnabled) {
+ String topicName = props.get(TOPIC_NAME_KEY);
+ if(topicName == null) {
+ log.error(String.format("Not performing audit check. %s is null. Please set as table property of %s.%s",
+ TOPIC_NAME_KEY, dbName, tableName));
+ } else {
+ long newCompletenessWatermark =
+ computeCompletenessWatermark(topicName, tableMetadata.datePartitions, tableMetadata.prevCompletenessWatermark);
+ if(newCompletenessWatermark > tableMetadata.prevCompletenessWatermark) {
+ log.info(String.format("Updating %s for %s.%s to %s", COMPLETION_WATERMARK_KEY, dbName, tableName, newCompletenessWatermark));
+ props.put(COMPLETION_WATERMARK_KEY, String.valueOf(newCompletenessWatermark));
+ props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
+ tableMetadata.newCompletenessWatermark = newCompletenessWatermark;
+ }
+ }
+ }
}
if (tableMetadata.deleteFiles.isPresent()) {
tableMetadata.deleteFiles.get().commit();
}
- Map<String, String> props = tableMetadata.newProperties.or(
- Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
//Set high waterMark
Long highWatermark = tableCurrentWatermarkMap.get(tid);
props.put(String.format(GMCE_HIGH_WATERMARK_KEY, tableTopicPartitionMap.get(tid)), highWatermark.toString());
@@ -717,7 +846,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark);
//Reset the table metadata for next accumulation period
- tableMetadata.reset(currentProps, highWatermark);
+ tableMetadata.reset(currentProps, highWatermark, tableMetadata.newCompletenessWatermark);
log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid.toString()));
} else {
log.info("There's no transaction initiated for the table {}", tid.toString());
@@ -731,6 +860,57 @@ public class IcebergMetadataWriter implements MetadataWriter {
}
}
+ /**
+ * For each timestamp in sorted collection of timestamps in descending order
+ * if timestamp is greater than previousWatermark
+ * and hour(now) > hour(prevWatermark) + 1
+ * check audit counts for completeness between
+ * a source and reference tier for [timestamp, timstamp + 1 unit of granularity]
+ * If the audit count matches update the watermark to the timestamp and break
+ * else continue
+ * else
+ * break
+ * Using a {@link TimeIterator} that operates over a range of time in 1 unit
+ * given the start, end and granularity
+ * @param table
+ * @param timestamps a sorted set of timestamps in decreasing order
+ * @param previousWatermark previous completion watermark for the table
+ * @return updated completion watermark
+ */
+ private long computeCompletenessWatermark(String table, SortedSet<ZonedDateTime> timestamps, long previousWatermark) {
+ log.info(String.format("Compute completion watermark for %s and timestamps %s with previous watermark %s", table, timestamps, previousWatermark));
+ long completionWatermark = previousWatermark;
+ ZonedDateTime now = ZonedDateTime.now(ZoneId.of(this.timeZone));
+ try {
+ if(timestamps == null || timestamps.size() <= 0) {
+ log.error("Cannot create time iterator. Empty for null timestamps");
+ return previousWatermark;
+ }
+ TimeIterator.Granularity granularity = TimeIterator.Granularity.valueOf(this.auditCheckGranularity);
+ ZonedDateTime prevWatermarkDT = Instant.ofEpochMilli(previousWatermark)
+ .atZone(ZoneId.of(this.timeZone));
+ ZonedDateTime startDT = timestamps.first();
+ ZonedDateTime endDT = timestamps.last();
+ TimeIterator iterator = new TimeIterator(startDT, endDT, granularity, true);
+ while (iterator.hasNext()) {
+ ZonedDateTime timestampDT = iterator.next();
+ if (timestampDT.isAfter(prevWatermarkDT)
+ && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) > 1) {
+ long timestampMillis = timestampDT.toInstant().toEpochMilli();
+ if(auditCountVerifier.get().isComplete(table, timestampMillis, TimeIterator.inc(timestampDT, granularity, 1).toInstant().toEpochMilli())) {
+ completionWatermark = timestampMillis;
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+ } catch (IOException e) {
+ log.warn("Exception during audit count check: ", e);
+ }
+ return completionWatermark;
+ }
+
private void submitSnapshotCommitEvent(Snapshot snapshot, TableMetadata tableMetadata, String dbName,
String tableName, Map<String, String> props, Long highWaterMark) {
GobblinEventBuilder gobblinTrackingEvent =
@@ -840,6 +1020,10 @@ public class IcebergMetadataWriter implements MetadataWriter {
Optional.of(currentOffset - 1);
}
tableMetadataMap.get(tid).setDatasetName(gmce.getDatasetIdentifier().getNativeName());
+ if(this.completenessEnabled && this.completenessWhitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName())) {
+ tableMetadataMap.get(tid).setCompletenessEnabled(true);
+ }
+
write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
tableCurrentWatermarkMap.put(tid, currentOffset);
} else {
@@ -885,9 +1069,14 @@ public class IcebergMetadataWriter implements MetadataWriter {
Optional<Map<String, List<Range>>> dataOffsetRange = Optional.absent();
Optional<String> lastSchemaVersion = Optional.absent();
Optional<Long> lowWatermark = Optional.absent();
+ long prevCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
+ long newCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
+ SortedSet<ZonedDateTime> datePartitions = new TreeSet<>(Collections.reverseOrder());
@Setter
String datasetName;
+ @Setter
+ boolean completenessEnabled;
Cache<CharSequence, String> addedFiles = CacheBuilder.newBuilder()
.expireAfterAccess(conf.getInt(ADDED_FILES_CACHE_EXPIRING_TIME, DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME),
@@ -926,7 +1115,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
}
}
- void reset(Map<String, String> props, Long lowWaterMark) {
+ void reset(Map<String, String> props, Long lowWaterMark, long newCompletionWatermark) {
this.lastProperties = Optional.of(props);
this.lastSchemaVersion = Optional.of(props.get(SCHEMA_CREATION_TIME_KEY));
this.transaction = Optional.absent();
@@ -943,6 +1132,9 @@ public class IcebergMetadataWriter implements MetadataWriter {
this.newProperties = Optional.absent();
this.lowestGMCEEmittedTime = Long.MAX_VALUE;
this.lowWatermark = Optional.of(lowWaterMark);
+ this.prevCompletenessWatermark = newCompletionWatermark;
+ this.newCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
+ this.datePartitions.clear();
}
}
}
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
new file mode 100644
index 0000000..65c4e57
--- /dev/null
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.writer;
+
+public class IcebergMetadataWriterConfigKeys {
+
+ public static final String ICEBERG_COMPLETENESS_ENABLED = "iceberg.completeness.enabled";
+ public static final boolean DEFAULT_ICEBERG_COMPLETENESS = false;
+ public static final String ICEBERG_COMPLETENESS_WHITELIST = "iceberg.completeness.whitelist";
+ public static final String ICEBERG_COMPLETENESS_BLACKLIST = "iceberg.completeness.blacklist";
+ public static final String COMPLETION_WATERMARK_KEY = "completionWatermark";
+ public static final String COMPLETION_WATERMARK_TIMEZONE_KEY = "completionWatermarkTimezone";
+ public static final long DEFAULT_COMPLETION_WATERMARK = -1L;
+ public static final String TIME_ZONE_KEY = "iceberg.completeness.timezone";
+ public static final String DEFAULT_TIME_ZONE = "America/Los_Angeles";
+ public static final String DATEPARTITION_FORMAT = "yyyy-MM-dd-HH";
+ public static final String NEW_PARTITION_KEY = "iceberg.completeness.add.partition";
+ public static final String DEFAULT_NEW_PARTITION = "late";
+ public static final String NEW_PARTITION_TYPE_KEY = "iceberg.completeness.add.partition.type";
+ public static final String DEFAULT_PARTITION_COLUMN_TYPE = "string";
+ public static final String TOPIC_NAME_KEY = "topic.name";
+ public static final String AUDIT_CHECK_GRANULARITY = "iceberg.completeness.audit.check.granularity";
+ public static final String DEFAULT_AUDIT_CHECK_GRANULARITY = "HOUR";
+
+}
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
index 11756dd..70a3a2b 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
@@ -178,7 +178,7 @@ public class HiveMetadataWriterTest extends HiveMetastoreTest {
gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
}
- @Test( priority = 3 )
+ @Test(dependsOnGroups={"icebergMetadataWriterTest"})
public void testHiveWriteAddFileGMCE() throws IOException {
gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
new KafkaStreamingExtractor.KafkaWatermark(
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 2ae9859..c234675 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileWriter;
@@ -29,26 +30,35 @@ import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.iceberg.FindFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hive.HiveMetastoreTest;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
+import org.apache.gobblin.completeness.audit.AuditCountClient;
+import org.apache.gobblin.completeness.audit.AuditCountClientFactory;
+import org.apache.gobblin.completeness.audit.TestAuditClientFactory;
+import org.apache.gobblin.completeness.verifier.KafkaAuditCountVerifier;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.hive.HivePartition;
+import org.apache.gobblin.hive.HiveRegistrationUnit;
import org.apache.gobblin.hive.HiveTable;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
import org.apache.gobblin.metadata.DataFile;
@@ -66,7 +76,7 @@ import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.ConfigUtils;
-
+import static org.apache.gobblin.iceberg.writer.IcebergMetadataWriterConfigKeys.*;
public class IcebergMetadataWriterTest extends HiveMetastoreTest {
@@ -85,6 +95,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
private String dbName = "hivedb";
private GobblinMCEWriter gobblinMCEWriter;
+ private GobblinMCEWriter gobblinMCEWriterWithCompletness;
private GobblinMCEWriter gobblinMCEWriterWithAcceptClusters;
GobblinMetadataChangeEvent gmce;
@@ -104,7 +115,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
public void setUp() throws Exception {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
startMetastore();
- State state = ConfigUtils.configToState(ConfigUtils.propertiesToConfig(hiveConf.getAllProperties()));
+
tmpDir = Files.createTempDir();
hourlyDataFile_1 = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2020/03/17/08/data.avro");
Files.createParentDirs(hourlyDataFile_1);
@@ -138,21 +149,46 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
.setRegistrationPolicy(TestHiveRegistrationPolicyForIceberg.class.getName())
.setRegistrationProperties(ImmutableMap.<String, String>builder().put("hive.database.name", dbName).build())
.build();
- state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS,
- KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
- state.setProp("default.hive.registration.policy",
- TestHiveRegistrationPolicyForIceberg.class.getName());
- state.setProp("use.data.path.as.table.location", true);
+
+ State state = getState();
gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
- state.setProp(GobblinMCEWriter.ACCEPTED_CLUSTER_NAMES, "randomCluster");
- gobblinMCEWriterWithAcceptClusters = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
((IcebergMetadataWriter) gobblinMCEWriter.getMetadataWriters().iterator().next()).setCatalog(
HiveMetastoreTest.catalog);
+
+ State stateWithCompletenessConfig = getStateWithCompletenessConfig();
+ gobblinMCEWriterWithCompletness = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), stateWithCompletenessConfig);
+ ((IcebergMetadataWriter) gobblinMCEWriterWithCompletness.getMetadataWriters().iterator().next()).setCatalog(
+ HiveMetastoreTest.catalog);
+
+ state.setProp(GobblinMCEWriter.ACCEPTED_CLUSTER_NAMES, "randomCluster");
+ gobblinMCEWriterWithAcceptClusters = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), state);
+
_avroPartitionSchema =
SchemaBuilder.record("partitionTest").fields().name("ds").type().optional().stringType().endRecord();
}
- @Test ( priority = 0 )
+ private State getState() {
+ State state = ConfigUtils.configToState(ConfigUtils.propertiesToConfig(hiveConf.getAllProperties()));
+ state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS,
+ KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
+ state.setProp("default.hive.registration.policy",
+ TestHiveRegistrationPolicyForIceberg.class.getName());
+ state.setProp("use.data.path.as.table.location", true);
+ return state;
+ }
+
+ private State getStateWithCompletenessConfig() {
+ State state = getState();
+ state.setProp(ICEBERG_COMPLETENESS_ENABLED, true);
+ state.setProp(NEW_PARTITION_KEY, "late");
+ state.setProp(NEW_PARTITION_TYPE_KEY, "int");
+ state.setProp(AuditCountClientFactory.AUDIT_COUNT_CLIENT_FACTORY, TestAuditClientFactory.class.getName());
+ state.setProp(KafkaAuditCountVerifier.SOURCE_TIER, "gobblin");
+ state.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, "producer");
+ return state;
+ }
+
+ @Test
public void testWriteAddFileGMCE() throws IOException {
// Creating a copy of gmce with static type in GenericRecord to work with writeEnvelop method
// without risking running into type cast runtime error.
@@ -221,7 +257,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
}
//Make sure hive test execute later and close the metastore
- @Test( priority = 1 )
+ @Test(dependsOnMethods={"testWriteAddFileGMCE"}, groups={"icebergMetadataWriterTest"})
public void testWriteRewriteFileGMCE() throws IOException {
gmce.setTopicPartitionOffsetsRange(null);
FileSystem fs = FileSystem.get(new Configuration());
@@ -240,7 +276,8 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
result = FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", filePath_1)).collect().iterator();
Assert.assertEquals(table.currentSnapshot().allManifests().size(), 2);
Assert.assertTrue(result.hasNext());
- gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
+ GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+ gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
new KafkaStreamingExtractor.KafkaWatermark(
new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(40L))));
@@ -256,7 +293,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
Assert.assertFalse(result.hasNext());
}
- @Test( priority = 2 )
+ @Test(dependsOnMethods={"testWriteRewriteFileGMCE"}, groups={"icebergMetadataWriterTest"} )
public void testChangeProperty() throws IOException {
Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), "0-3000");
@@ -273,7 +310,8 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
gmce.setNewFiles(Lists.newArrayList(dailyFile));
gmce.setOperationType(OperationType.change_property);
gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "2000-4000").build());
- gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
+ GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+ gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
new KafkaStreamingExtractor.KafkaWatermark(
new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(45L))));
@@ -287,6 +325,97 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"), "45");
}
+ @Test(dependsOnMethods={"testChangeProperty"}, groups={"icebergMetadataWriterTest"})
+ public void testWriteAddFileGMCECompleteness() throws IOException {
+ // Creating a copy of gmce with static type in GenericRecord to work with writeEnvelop method
+ // without risking running into type cast runtime error.
+ gmce.setOperationType(OperationType.add_files);
+ File hourlyFile = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2020/03/17/10/data.avro");
+ long timestampMillis = 1584464400000L;
+ Files.createParentDirs(hourlyFile);
+ writeRecord(hourlyFile);
+ gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
+ .setFilePath(hourlyFile.toString())
+ .setFileFormat("avro")
+ .setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
+ .build()));
+ gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "3000-4000").build());
+ GenericRecord genericGmce_3000_4000 = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+ gobblinMCEWriterWithCompletness.writeEnvelope(new RecordEnvelope<>(genericGmce_3000_4000,
+ new KafkaStreamingExtractor.KafkaWatermark(
+ new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+ new LongWatermark(50L))));
+
+ Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+ Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), "0-4000");
+ Assert.assertTrue(table.spec().fields().size() == 2);
+ Assert.assertEquals(table.spec().fields().get(1).name(), "late");
+
+ // Test when completeness watermark < "2020-03-17-10"
+ KafkaAuditCountVerifier verifier = Mockito.mock(TestAuditCountVerifier.class);
+ Mockito.when(verifier.isComplete("testTopic", timestampMillis, timestampMillis + TimeUnit.HOURS.toMillis(1))).thenReturn(true);
+ ((IcebergMetadataWriter) gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
+ gobblinMCEWriterWithCompletness.flush();
+ table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+ //completeness watermark = "2020-03-17-10"
+ Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
+ Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), "America/Los_Angeles");
+ Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(timestampMillis));
+
+ Iterator<org.apache.iceberg.DataFile> dfl = FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", hourlyFile.getAbsolutePath())).collect().iterator();
+ Assert.assertTrue(dfl.hasNext());
+
+ // Test when completeness watermark is still "2020-03-17-10" but have a late file for "2020-03-17-10"
+ File hourlyFile1 = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2020/03/17/10/data1.avro");
+ Files.createParentDirs(hourlyFile1);
+ writeRecord(hourlyFile1);
+ gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
+ .setFilePath(hourlyFile1.toString())
+ .setFileFormat("avro")
+ .setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
+ .build()));
+ gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "4000-5000").build());
+ GenericRecord genericGmce_4000_5000 = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+ gobblinMCEWriterWithCompletness.writeEnvelope(new RecordEnvelope<>(genericGmce_4000_5000,
+ new KafkaStreamingExtractor.KafkaWatermark(
+ new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+ new LongWatermark(55L))));
+ gobblinMCEWriterWithCompletness.flush();
+ table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+ Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(timestampMillis));
+
+ dfl = FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", hourlyFile1.getAbsolutePath())).collect().iterator();
+ Assert.assertTrue(dfl.hasNext());
+ Assert.assertTrue(dfl.next().partition().get(1, Integer.class) == 1);
+
+ // Test when completeness watermark will advance to "2020-03-17-11"
+ File hourlyFile2 = new File(tmpDir, "data/tracking/testIcebergTable/hourly/2020/03/17/11/data.avro");
+ long timestampMillis1 = timestampMillis + TimeUnit.HOURS.toMillis(1);
+ Files.createParentDirs(hourlyFile2);
+ writeRecord(hourlyFile2);
+ gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
+ .setFilePath(hourlyFile2.toString())
+ .setFileFormat("avro")
+ .setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
+ .build()));
+ gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "5000-6000").build());
+ GenericRecord genericGmce_5000_6000 = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+ gobblinMCEWriterWithCompletness.writeEnvelope(new RecordEnvelope<>(genericGmce_5000_6000,
+ new KafkaStreamingExtractor.KafkaWatermark(
+ new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+ new LongWatermark(60L))));
+
+ Mockito.when(verifier.isComplete("testTopic", timestampMillis1, timestampMillis1 + TimeUnit.HOURS.toMillis(1))).thenReturn(true);
+ gobblinMCEWriterWithCompletness.flush();
+ table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+ Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(timestampMillis1));
+
+ dfl = FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", hourlyFile2.getAbsolutePath())).collect().iterator();
+ Assert.assertTrue(dfl.hasNext());
+ Assert.assertTrue(dfl.next().partition().get(1, Integer.class) == 0);
+
+ }
+
private String writeRecord(File file) throws IOException {
GenericData.Record record = new GenericData.Record(avroDataSchema);
record.put("id", 1L);
@@ -311,12 +440,28 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
partitionValue = "2020-03-17-08";
} else if (path.toString().contains("hourly/2020/03/17/09")) {
partitionValue = "2020-03-17-09";
+ } else if (path.toString().contains("hourly/2020/03/17/10")) {
+ partitionValue = "2020-03-17-10";
+ } else if (path.toString().contains("hourly/2020/03/17/11")) {
+ partitionValue = "2020-03-17-11";
+ } else if (path.toString().contains("hourly/2020/03/17/12")) {
+ partitionValue = "2020-03-17-12";
} else if (path.toString().contains("daily/2020/03/17")) {
partitionValue = "2020-03-17-00";
}
return Optional.of(new HivePartition.Builder().withPartitionValues(Lists.newArrayList(partitionValue))
.withDbName("hivedb").withTableName("testIcebergTable").build());
}
+ @Override
+ protected List<HiveTable> getTables(Path path) throws IOException {
+ List<HiveTable> tables = super.getTables(path);
+ for (HiveTable table : tables) {
+ table.setPartitionKeys(ImmutableList.<HiveRegistrationUnit.Column>of(
+ new HiveRegistrationUnit.Column("datepartition", serdeConstants.STRING_TYPE_NAME, StringUtils.EMPTY)));
+ //table.setLocation(tmpDir.getAbsolutePath());
+ }
+ return tables;
+ }
protected Iterable<String> getDatabaseNames(Path path) {
return Lists.newArrayList("hivedb");
}
@@ -324,5 +469,16 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
return Lists.newArrayList("testIcebergTable");
}
}
+
+ static class TestAuditCountVerifier extends KafkaAuditCountVerifier {
+
+ public TestAuditCountVerifier(State state) {
+ super(state);
+ }
+
+ public TestAuditCountVerifier(State state, AuditCountClient client) {
+ super(state, client);
+ }
+ }
}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java b/gobblin-utility/src/main/java/org/apache/gobblin/time/TimeIterator.java
similarity index 69%
rename from gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
rename to gobblin-utility/src/main/java/org/apache/gobblin/time/TimeIterator.java
index a9ef4d4..63f7cfe 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/time/TimeIterator.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/time/TimeIterator.java
@@ -17,10 +17,14 @@
package org.apache.gobblin.time;
+import java.time.Duration;
import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import lombok.Getter;
+
/**
* A {@link TimeIterator} iterates over time points within [{@code startTime}, {@code endTime}]. It
@@ -32,28 +36,35 @@ public class TimeIterator implements Iterator {
MINUTE, HOUR, DAY, MONTH
}
+ @Getter
private ZonedDateTime startTime;
private ZonedDateTime endTime;
private Granularity granularity;
+ private boolean reverse;
public TimeIterator(ZonedDateTime startTime, ZonedDateTime endTime, Granularity granularity) {
+ this(startTime, endTime, granularity, false);
+ }
+
+ public TimeIterator(ZonedDateTime startTime, ZonedDateTime endTime, Granularity granularity, boolean reverse) {
this.startTime = startTime;
this.endTime = endTime;
this.granularity = granularity;
+ this.reverse = reverse;
}
@Override
public boolean hasNext() {
- return !startTime.isAfter(endTime);
+ return (reverse) ? !endTime.isAfter(startTime) : !startTime.isAfter(endTime);
}
@Override
public ZonedDateTime next() {
- if (startTime.isAfter(endTime)) {
+ if ((!reverse && startTime.isAfter(endTime) || (reverse && endTime.isAfter(startTime)))) {
throw new NoSuchElementException();
}
ZonedDateTime dateTime = startTime;
- startTime = inc(startTime, granularity, 1);
+ startTime = (reverse) ? dec(startTime, granularity, 1) : inc(startTime, granularity, 1);
return dateTime;
}
@@ -95,4 +106,26 @@ public class TimeIterator implements Iterator {
}
throw new RuntimeException("Unsupported granularity: " + granularity);
}
+
+ /**
+ * Return duration as long between 2 datetime objects based on granularity
+ * @param d1
+ * @param d2
+ * @param granularity
+ * @return a long representing the duration
+ */
+ public static long durationBetween(ZonedDateTime d1, ZonedDateTime d2, Granularity granularity) {
+ switch (granularity) {
+ case HOUR:
+ return Duration.between(d1, d2).toHours();
+ case MINUTE:
+ return Duration.between(d1, d2).toMinutes();
+ case DAY:
+ return Duration.between(d1, d2).toDays();
+ case MONTH:
+ return ChronoUnit.MONTHS.between(d1, d2);
+ }
+ throw new RuntimeException("Unsupported granularity: " + granularity);
+ }
+
}
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/time/TimeIteratorTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/time/TimeIteratorTest.java
similarity index 100%
rename from gobblin-data-management/src/test/java/org/apache/gobblin/time/TimeIteratorTest.java
rename to gobblin-utility/src/test/java/org/apache/gobblin/time/TimeIteratorTest.java
diff --git a/gradle/scripts/defaultBuildProperties.gradle b/gradle/scripts/defaultBuildProperties.gradle
index fec6ef9..2d92216 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -31,7 +31,7 @@ def BuildProperties BUILD_PROPERTIES = new BuildProperties(project)
.register(new BuildProperty("gobblinFlavor", "standard", "Build flavor (see http://gobblin.readthedocs.io/en/latest/developer-guide/GobblinModules/)"))
.register(new BuildProperty("hadoopVersion", "2.3.0", "Hadoop dependencies version"))
.register(new BuildProperty("hiveVersion", "1.0.1", "Hive dependencies version"))
- .register(new BuildProperty("icebergVersion", "0.10.0", "Iceberg dependencies version"))
+ .register(new BuildProperty("icebergVersion", "0.11.1", "Iceberg dependencies version"))
.register(new BuildProperty("jdkVersion", JavaVersion.VERSION_1_8.toString(),
"Java languange compatibility; supported versions: " + JavaVersion.VERSION_1_8))
.register(new BuildProperty("kafka08Version", "0.8.2.2", "Kafka 0.8 dependencies version"))
diff --git a/settings.gradle b/settings.gradle
index 210f5b2..5aa42f9 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -19,6 +19,7 @@ def modules = ['gobblin-admin',
'gobblin-all',
'gobblin-api',
'gobblin-compaction',
+ 'gobblin-completeness',
'gobblin-config-management',
'gobblin-core',
'gobblin-core-base',