You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2021/08/12 16:51:11 UTC
[gobblin] branch avro_1_9 updated: [Branch avro_1_9] Avro 1.9
upgrade compatible change - replaced deprecated public APIs with the
compatible APIs (#3349)
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch avro_1_9
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/avro_1_9 by this push:
new 77ef311 [Branch avro_1_9] Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs (#3349)
77ef311 is described below
commit 77ef311cc8645e3049403bc511932b2fbc7ed0c3
Author: Abhishek Nath <an...@linkedin.com>
AuthorDate: Thu Aug 12 22:21:03 2021 +0530
[Branch avro_1_9] Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs (#3349)
* Avro 1.9 upgrade compatible change - replaced deprecated public APIs with the compatible APIs.
* Avro 1.9 upgrade compatible change - Replaced guava library import from avro shaded with direct guava libraries
* Applied Gobblin codestyle formatting.
Co-authored-by: Lei <au...@users.noreply.github.com>
---
.../FieldAttributeBasedDeltaFieldsProvider.java | 5 +-
...FieldAttributeBasedDeltaFieldsProviderTest.java | 2 +-
.../converter/filter/AvroSchemaFieldRemover.java | 8 +-
.../apache/gobblin/test/SequentialTestSource.java | 2 +-
.../hive/query/HiveAvroORCQueryGenerator.java | 5 +-
.../copy/replication/ConfigBasedMultiDatasets.java | 2 +-
.../retention/dataset/CleanableIcebergDataset.java | 8 +-
.../management/conversion/hive/HiveSourceTest.java | 38 +++
.../copy/RecursiveCopyableDatasetTest.java | 4 +-
.../java/org/apache/gobblin/hive/HiveLock.java | 2 +-
.../hive/metastore/HiveMetaStoreBasedRegister.java | 4 +-
.../apache/gobblin/hive/writer/MetadataWriter.java | 10 +-
.../apache/gobblin/iceberg/Utils/IcebergUtils.java | 8 +-
.../gobblin/iceberg/writer/GobblinMCEWriter.java | 6 +
.../iceberg/writer/IcebergMetadataWriter.java | 260 ++++++++++++++-------
.../iceberg/writer/IcebergMetadataWriterTest.java | 18 +-
.../reporter/FileFailureEventReporterTest.java | 2 +-
.../azkaban/AzkabanGobblinYarnAppLauncher.java | 8 +
.../gobblin/writer/AvroOrcSchemaConverter.java | 5 +-
.../google/webmaster/GoogleWebmasterExtractor.java | 2 +-
.../GoogleWebmasterExtractorIterator.java | 2 +-
.../restli/throttling/ThrottlingClientTest.java | 2 +-
.../throttling/ConfigStoreBasedPolicyTest.java | 2 +-
.../throttling/LimiterServerResourceTest.java | 2 +-
...stHadoopKerberosKeytabAuthenticationPlugin.java | 2 +-
.../TestStandardGobblinInstanceDriver.java | 2 +-
.../instance/hadoop/TestHadoopConfigLoader.java | 2 +-
.../service/monitoring/KafkaJobStatusMonitor.java | 2 +-
.../org/apache/gobblin/util/AvroFlattener.java | 12 +-
.../org/apache/gobblin/util/AvroSchemaUtils.java | 67 ++++++
.../java/org/apache/gobblin/util/AvroUtils.java | 16 +-
.../org/apache/gobblin/util/AvroUtilsTest.java | 8 +-
.../gobblin/yarn/GobblinYarnAppLauncher.java | 88 ++++---
.../gobblin/yarn/GobblinYarnConfigurationKeys.java | 2 +
34 files changed, 439 insertions(+), 169 deletions(-)
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
index 5f2e9fd..0632045 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.util.AvroSchemaUtils;
import org.apache.hadoop.conf.Configuration;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParser;
@@ -79,7 +80,9 @@ public class FieldAttributeBasedDeltaFieldsProvider implements AvroDeltaFieldNam
private List<String> getDeltaFieldNamesForNewSchema(Schema originalSchema) {
List<String> deltaFields = new ArrayList<>();
for (Field field : originalSchema.getFields()) {
- String deltaAttributeField = field.getJsonProp(this.attributeField).getValueAsText();
+ // Avro 1.9 compatible change - replaced deprecated public api getJsonProp with getObjectProp
+ // Use AvroSchemaUtils to convert object to the string value
+ String deltaAttributeField = AvroSchemaUtils.getValueAsString(field, this.attributeField);
ObjectNode objectNode = getDeltaPropValue(deltaAttributeField);
if (objectNode == null || objectNode.get(this.deltaPropName) == null) {
continue;
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProviderTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProviderTest.java
index 8557a8c..225f339 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProviderTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProviderTest.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.testng.Assert;
import org.testng.annotations.Test;
-import avro.shaded.com.google.common.collect.Lists;
+import com.google.common.collect.Lists;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
index 87d4101..d8d227b 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroSchemaFieldRemover.java
@@ -22,7 +22,6 @@ import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
-import org.codehaus.jackson.JsonNode;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
@@ -108,7 +107,7 @@ public class AvroSchemaFieldRemover {
private Schema removeFieldsFromRecords(Schema schema, Map<String, Schema> schemaMap) {
Schema newRecord = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
- convertFieldToSchemaWithProps(schema.getJsonProps(), newRecord);
+ convertFieldToSchemaWithProps(schema.getObjectProps(), newRecord);
// Put an incomplete schema into schemaMap to avoid re-processing a recursive field.
// The fields in the incomplete schema will be populated once the current schema is completely processed.
@@ -125,8 +124,9 @@ public class AvroSchemaFieldRemover {
newField = new Field(field.name(), DO_NOTHING_INSTANCE.removeFields(field.schema(), schemaMap), field.doc(),
field.defaultValue());
}
- for (Map.Entry<String, JsonNode> stringJsonNodeEntry : field.getJsonProps().entrySet()) {
- newField.addProp(stringJsonNodeEntry.getKey(), stringJsonNodeEntry.getValue());
+ // Avro 1.9 compatible change - replaced deprecated public api getJsonProps with getObjectProps
+ for (Map.Entry<String, Object> objectEntry : field.getObjectProps().entrySet()) {
+ newField.addProp(objectEntry.getKey(), objectEntry.getValue());
}
newFields.add(newField);
}
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java b/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java
index 5872808..8aaf328 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/test/SequentialTestSource.java
@@ -29,7 +29,7 @@ import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import avro.shaded.com.google.common.base.Throwables;
+import com.google.common.base.Throwables;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.SourceState;
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
index fb2a0ec..5aaf73d 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/query/HiveAvroORCQueryGenerator.java
@@ -29,6 +29,7 @@ import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.util.AvroSchemaUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde.serdeConstants;
@@ -538,7 +539,9 @@ public class HiveAvroORCQueryGenerator {
.equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) {
int maxLength = 0;
try {
- maxLength = schema.getJsonProp(AvroSerDe.AVRO_PROP_MAX_LENGTH).getValueAsInt();
+ // Avro 1.9 compatible change - replaced deprecated public api getJsonProp with getObjectProp
+ // Use AvroSchemaUtils to convert object to the string value
+ maxLength = AvroSchemaUtils.getValueAsInteger(schema, AvroSerDe.AVRO_PROP_MAX_LENGTH);
} catch (Exception ex) {
throw new AvroSerdeException("Failed to obtain maxLength value from file schema: " + schema, ex);
}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java
index 3f9a57f..076b279 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java
@@ -17,7 +17,7 @@
package org.apache.gobblin.data.management.copy.replication;
-import avro.shaded.com.google.common.annotations.VisibleForTesting;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.gobblin.dataset.Dataset;
import java.io.IOException;
import java.net.URI;
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableIcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableIcebergDataset.java
index c351ef1..e4d4450 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableIcebergDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/CleanableIcebergDataset.java
@@ -48,8 +48,8 @@ import org.slf4j.LoggerFactory;
/**
* A subclass of {@link ConfigurableCleanableDataset} that overwrite the {@link ConfigurableCleanableDataset#cleanImpl(Collection)}
- * to firstly send gmce to delete dataset logically from iceberg and then
- * call {@link org.apache.iceberg.Table#expireSnapshots()} to do physically data and metadata retention
+ * to firstly send gmce to delete dataset logically from iceberg and then process GMCEs within metadata-ingestion pipeline
+ * by calling {@link org.apache.iceberg.Table#expireSnapshots()} to materialize data/metadata retention
*/
public class CleanableIcebergDataset<T extends FileSystemDatasetVersion> extends ConfigurableCleanableDataset<T> {
private final static String RETENTION_INTERVAL_TIME = "retention.interval.time";
@@ -127,6 +127,10 @@ public class CleanableIcebergDataset<T extends FileSystemDatasetVersion> extends
}
}
+ /**
+ * Only in charge of filing {@link org.apache.gobblin.metadata.GobblinMetadataChangeEvent}
+ * The processing of these events can be seen in {@link org.apache.gobblin.iceberg.writer.IcebergMetadataWriter}.
+ */
protected void cleanImpl(Collection<T> deletableVersions, Config retentionConfig) throws IOException {
List<String> deletablePrefix = new ArrayList<>();
for (T version : deletableVersions) {
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
index 2351fd3..e6e4a72 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/HiveSourceTest.java
@@ -22,6 +22,8 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.joda.time.DateTime;
import org.testng.Assert;
@@ -87,6 +89,42 @@ public class HiveSourceTest {
}
@Test
+ public void testAlreadyExistsPartition() throws Exception {
+ String dbName = "testdb";
+ String tableSdLoc = new File(this.tmpDir, TEST_TABLE_1).getAbsolutePath();
+
+ this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName, false, true, true);
+
+ Table tbl = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, TEST_TABLE_1, tableSdLoc, Optional.of("field"));
+
+ this.hiveMetastoreTestUtils.addTestPartition(tbl, ImmutableList.of("f1"), (int) System.currentTimeMillis());
+
+ try {
+ this.hiveMetastoreTestUtils.addTestPartition(tbl, ImmutableList.of("f1"), (int) System.currentTimeMillis());
+ } catch (AlreadyExistsException e) {
+ return;
+ }
+ Assert.fail();
+ }
+
+ @Test
+ public void testPartitionNotExists() throws Exception {
+ String dbName = "testdb1";
+ String tableSdLoc = new File(this.tmpDir, TEST_TABLE_1).getAbsolutePath();
+
+ this.hiveMetastoreTestUtils.getLocalMetastoreClient().dropDatabase(dbName, false, true, true);
+
+ Table tbl = this.hiveMetastoreTestUtils.createTestAvroTable(dbName, TEST_TABLE_1, tableSdLoc, Optional.of("field"));
+
+ try {
+ this.hiveMetastoreTestUtils.getLocalMetastoreClient().getPartition(tbl.getDbName(), tbl.getTableName(), "field");
+ } catch (NoSuchObjectException e) {
+ return;
+ }
+ Assert.fail();
+ }
+
+ @Test
public void testGetWorkUnitsForPartitions() throws Exception {
String dbName = "testdb3";
String tableSdLoc = new File(this.tmpDir, TEST_TABLE_3).getAbsolutePath();
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java
index 14011d2..d624f82 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDatasetTest.java
@@ -42,8 +42,8 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
-import avro.shaded.com.google.common.base.Predicate;
-import avro.shaded.com.google.common.collect.Iterables;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
import javax.annotation.Nullable;
import lombok.Data;
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLock.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLock.java
index 653ddcb..9442cd9 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLock.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveLock.java
@@ -35,7 +35,7 @@ import org.apache.gobblin.util.AutoCloseableLock;
*
* <p>
* Obtaining a table lock does <em>not</em> lock the database, which permits concurrent operations on different
- * tables in the same database. Similarly, obtianing a partition lock does not lock the table or the database.
+ * tables in the same database. Similarly, obtaining a partition lock does not lock the table or the database.
* </p>
*/
public class HiveLock {
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index 1a8711e..d5b2393 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -586,7 +586,7 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
}
log.info(String.format("Added partition %s to table %s with location %s", stringifyPartition(nativePartition),
table.getTableName(), nativePartition.getSd().getLocation()));
- } catch (TException e) {
+ } catch (AlreadyExistsException e) {
try {
if (this.skipDiffComputation) {
onPartitionExistWithoutComputingDiff(table, nativePartition, e);
@@ -632,7 +632,7 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
onPartitionExist(client, table, partition, nativePartition, existedPartition);
}
}
- } catch (TException e) {
+ } catch (NoSuchObjectException e) {
try (Timer.Context context = this.metricContext.timer(ADD_PARTITION_TIMER).time()) {
client.add_partition(getPartitionWithCreateTimeNow(nativePartition));
}
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
index 054c8a9..6333688 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriter.java
@@ -36,8 +36,14 @@ public interface MetadataWriter extends Closeable {
String CACHE_EXPIRING_TIME = "GMCEWriter.cache.expiring.time.hours";
int DEFAULT_CACHE_EXPIRING_TIME = 1;
- /*
- Register the metadata of specific table to the metadata store
+ /**
+ * Register the metadata of specific table to the metadata store. This is a blocking method,
+ * meaning once it returns, as long as the underlying metadata storage is transactional (e.g. Mysql as for HMS),
+ * one could expect the metadata registration going through and being persisted already.
+ *
+ * @param dbName The db name of metadata-registration target.
+ * @param tableName The table name of metadata-registration target.
+ * @throws IOException
*/
void flush(String dbName, String tableName) throws IOException;
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/IcebergUtils.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/IcebergUtils.java
index 53cec5f..65fb551 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/IcebergUtils.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/IcebergUtils.java
@@ -75,7 +75,7 @@ public class IcebergUtils {
/**
* Given a avro schema string and a hive table,
* calculate the iceberg table schema and partition schema.
- * (Since we use 'datepartition' as partition column, which is not included inside the data schema,
+ * (E.g. we use 'datepartition' as the partition column, which is not included inside the data schema,
* we'll need to add that column to data schema to construct table schema
*/
public static IcebergDataAndPartitionSchema getIcebergSchema(String schema,
@@ -212,15 +212,15 @@ public class IcebergUtils {
* This method is mainly used to get the file to be deleted
*/
public static DataFile getIcebergDataFileWithoutMetric(String file, PartitionSpec partitionSpec,
- StructLike partition) {
+ StructLike partitionVal) {
//Use raw Path to support federation.
String rawPath = new Path(file).toUri().getRawPath();
//Just want to remove the old files, so set the record number and file size to a random value
DataFiles.Builder dataFileBuilder =
DataFiles.builder(partitionSpec).withPath(rawPath).withFileSizeInBytes(0).withRecordCount(0);
- if (partition != null) {
- dataFileBuilder.withPartition(partition);
+ if (partitionVal != null) {
+ dataFileBuilder.withPartition(partitionVal);
}
return dataFileBuilder.build();
}
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
index ebbf74b..a6036e3 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
@@ -266,6 +266,12 @@ public class GobblinMCEWriter implements DataWriter<GenericRecord> {
* Call the metadata writers to do flush each table metadata.
* Flush of metadata writer is the place that do real metadata
* registrations (e.g. for iceberg, this method will generate a snapshot)
+ * Flush of metadata writer is the place that do really metadata
+ * registration (For iceberg, this method will generate a snapshot)
+ *
+ * Note that this is one of the place where the materialization of aggregated metadata happens.
+ * When there's a change of {@link OperationType}, it also interrupts metadata aggregation,
+ * and triggers materialization of metadata.
* @throws IOException
*/
@Override
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 59a8706..3abd491 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -104,7 +104,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor.KafkaWatermark;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.ClustersNames;
@@ -123,15 +123,16 @@ import org.apache.gobblin.util.WriterUtils;
@Slf4j
public class IcebergMetadataWriter implements MetadataWriter {
+ // Critical when there's dataset-level ACL enforced for both data and Iceberg metadata
public static final String USE_DATA_PATH_AS_TABLE_LOCATION = "use.data.path.as.table.location";
public static final String TABLE_LOCATION_SUFFIX = "/_iceberg_metadata/%s";
public static final String GMCE_HIGH_WATERMARK_KEY = "gmce.high.watermark.%s";
public static final String GMCE_LOW_WATERMARK_KEY = "gmce.low.watermark.%s";
private final static String EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "gobblin.iceberg.dataset.expire.snapshots.lookBackTime";
private final static String DEFAULT_EXPIRE_SNAPSHOTS_LOOKBACK_TIME = "3d";
- public static final String ICEBERG_REGISTRATION_BLACKLIST = "iceberg.registration.blacklist";
- public static final String ICEBERG_REGISTRATION_WHITELIST = "iceberg.registration.whitelist";
- public static final String ICEBERG_METADATA_FILE_PERMISSION = "iceberg.metadata.file.permission";
+ private static final String ICEBERG_REGISTRATION_BLACKLIST = "iceberg.registration.blacklist";
+ private static final String ICEBERG_REGISTRATION_WHITELIST = "iceberg.registration.whitelist";
+ private static final String ICEBERG_METADATA_FILE_PERMISSION = "iceberg.metadata.file.permission";
private static final String CLUSTER_IDENTIFIER_KEY_NAME = "clusterIdentifier";
private static final String CREATE_TABLE_TIME = "iceberg.create.table.time";
private static final String SCHEMA_CREATION_TIME_KEY = "schema.creation.time";
@@ -139,17 +140,23 @@ public class IcebergMetadataWriter implements MetadataWriter {
private static final int DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME = 1;
private static final String OFFSET_RANGE_KEY_PREFIX = "offset.range.";
private static final String OFFSET_RANGE_KEY_FORMAT = OFFSET_RANGE_KEY_PREFIX + "%s";
- private static final String ICEBERG_FILE_PATH_COLUMN = "file_path";
+
private static final String DEFAULT_CREATION_TIME = "0";
private static final String SNAPSHOT_EXPIRE_THREADS = "snapshot.expire.threads";
private static final long DEFAULT_WATERMARK = -1L;
+
+ /* one of the fields in DataFile entry to describe the location URI of a data file with FS Scheme */
+ private static final String ICEBERG_FILE_PATH_COLUMN = DataFile.FILE_PATH.name();
+
protected final MetricContext metricContext;
protected EventSubmitter eventSubmitter;
- private final WhitelistBlacklist whiteistBlacklist;
+ private final WhitelistBlacklist whitelistBlacklist;
private final Closer closer = Closer.create();
- private final Map<TableIdentifier, Long> tableCurrentWaterMarkMap;
- //Used to store the relationship between table and the gmce topicPartition
- private final Map<TableIdentifier, String> tableTopicpartitionMap;
+
+ // Mapping between table-id and currently processed watermark
+ private final Map<TableIdentifier, Long> tableCurrentWatermarkMap;
+ // Used to store the relationship between table and the gmce topicPartition
+ private final Map<TableIdentifier, String> tableTopicPartitionMap;
@Getter
private final KafkaSchemaRegistry schemaRegistry;
private final Map<TableIdentifier, TableMetadata> tableMetadataMap;
@@ -159,16 +166,16 @@ public class IcebergMetadataWriter implements MetadataWriter {
protected final ReadWriteLock readWriteLock;
private final HiveLock locks;
private final ParallelRunner parallelRunner;
- private final boolean useDataLoacationAsTableLocation;
+ private final boolean useDataLocationAsTableLocation;
private FsPermission permission;
public IcebergMetadataWriter(State state) throws IOException {
this.schemaRegistry = KafkaSchemaRegistry.get(state.getProperties());
conf = HadoopUtils.getConfFromState(state);
initializeCatalog();
- tableTopicpartitionMap = new HashMap<>();
+ tableTopicPartitionMap = new HashMap<>();
tableMetadataMap = new HashMap<>();
- tableCurrentWaterMarkMap = new HashMap<>();
+ tableCurrentWatermarkMap = new HashMap<>();
List<Tag<?>> tags = Lists.newArrayList();
String clusterIdentifier = ClustersNames.getInstance().getClusterName();
tags.add(new Tag<>(CLUSTER_IDENTIFIER_KEY_NAME, clusterIdentifier));
@@ -176,15 +183,17 @@ public class IcebergMetadataWriter implements MetadataWriter {
GobblinMetricsRegistry.getInstance().getMetricContext(state, IcebergMetadataWriter.class, tags));
this.eventSubmitter =
new EventSubmitter.Builder(this.metricContext, IcebergMCEMetadataKeys.METRICS_NAMESPACE_ICEBERG_WRITER).build();
- this.whiteistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_REGISTRATION_WHITELIST, ""),
+ this.whitelistBlacklist = new WhitelistBlacklist(state.getProp(ICEBERG_REGISTRATION_WHITELIST, ""),
state.getProp(ICEBERG_REGISTRATION_BLACKLIST, ""));
- // Use lock to make it safe when flush and write are called async
+
+ // Use rw-lock to make it thread-safe when flush and write(which is essentially aggregate & reading metadata),
+ // are called in separate threads.
readWriteLock = new ReentrantReadWriteLock();
this.locks = new HiveLock(state.getProperties());
parallelRunner = closer.register(new ParallelRunner(state.getPropAsInt(SNAPSHOT_EXPIRE_THREADS, 20),
FileSystem.get(HadoopUtils.getConfFromState(state))));
- useDataLoacationAsTableLocation = state.getPropAsBoolean(USE_DATA_PATH_AS_TABLE_LOCATION, false);
- if (useDataLoacationAsTableLocation) {
+ useDataLocationAsTableLocation = state.getPropAsBoolean(USE_DATA_PATH_AS_TABLE_LOCATION, false);
+ if (useDataLocationAsTableLocation) {
permission =
HadoopUtils.deserializeFsPermission(state, ICEBERG_METADATA_FILE_PERMISSION,
FsPermission.getDefault());
@@ -204,12 +213,14 @@ public class IcebergMetadataWriter implements MetadataWriter {
}
/**
- * The method is used to get current watermark of the gmce topic partition for a table
+ * The method is used to get current watermark of the gmce topic partition for a table, and persist the value
+ * in the {@link #tableMetadataMap} as a side effect.
+ *
* Make the watermark config name contains topicPartition in case we change the gmce topic name for some reason
*/
- private Long getCurrentWaterMark(TableIdentifier tid, String topicPartition) {
- if (tableCurrentWaterMarkMap.containsKey(tid)) {
- return tableCurrentWaterMarkMap.get(tid);
+ private Long getAndPersistCurrentWatermark(TableIdentifier tid, String topicPartition) {
+ if (tableCurrentWatermarkMap.containsKey(tid)) {
+ return tableCurrentWatermarkMap.get(tid);
}
org.apache.iceberg.Table icebergTable;
Long currentWatermark = DEFAULT_WATERMARK;
@@ -229,13 +240,15 @@ public class IcebergMetadataWriter implements MetadataWriter {
}
/**
- *The write method will be responsible for process a given gmce and aggregate the metadata
+ * The write method will be responsible for processing gmce and aggregating the metadata.
* The logic of this function will be:
- * 1. Check whether a table exists, if not then create the iceberg table
+ * 1. Check whether a table exists, if not then create a iceberg table
* 2. Compute schema from the gmce and update the cache for candidate schemas
- * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile or dropFile. change_property means only
- * update the table level property but no data modification.
- * Note: this method only aggregate the metadata in cache without committing. The actual commit will be done in flush method
+ * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile, dropFile or change_property.
+ *
+ * Note: this method only aggregates the metadata in cache without committing,
+ * while the actual commit will be done in flush method (except rewrite and drop methods where preserving older file
+ * information increases the memory footprints, therefore we would like to flush them eagerly).
*/
public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap,
Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec) throws IOException {
@@ -261,7 +274,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
}
}
computeCandidateSchema(gmce, tid, tableSpec);
- tableMetadata.transaction = Optional.of(tableMetadata.transaction.or(table::newTransaction));
+ tableMetadata.ensureTxnInit();
tableMetadata.lowestGMCEEmittedTime = Long.min(tableMetadata.lowestGMCEEmittedTime, gmce.getGMCEmittedTime());
switch (gmce.getOperationType()) {
case add_files: {
@@ -314,7 +327,11 @@ public class IcebergMetadataWriter implements MetadataWriter {
return offsets;
}
- private void mergeOffsets(GobblinMetadataChangeEvent gmce, TableIdentifier tid) throws IOException {
+ /**
+ * The side effect of this method is to update the offset-range of the table identified by
+ * the given {@link TableIdentifier} with the input {@link GobblinMetadataChangeEvent}
+ */
+ private void mergeOffsets(GobblinMetadataChangeEvent gmce, TableIdentifier tid) {
TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
tableMetadata.dataOffsetRange = Optional.of(tableMetadata.dataOffsetRange.or(() -> getLastOffset(tableMetadata)));
Map<String, List<Range>> offsets = tableMetadata.dataOffsetRange.get();
@@ -360,8 +377,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
* @param gmce
* @param tid
*/
- private void computeCandidateSchema(GobblinMetadataChangeEvent gmce, TableIdentifier tid, HiveSpec spec)
- throws IOException {
+ private void computeCandidateSchema(GobblinMetadataChangeEvent gmce, TableIdentifier tid, HiveSpec spec) {
Table table = getIcebergTable(tid);
TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
org.apache.hadoop.hive.metastore.api.Table hiveTable = HiveMetaStoreUtils.getTable(spec.getTable());
@@ -416,7 +432,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
PartitionSpec partitionSpec = IcebergUtils.getPartitionSpec(tableSchema, schemas.partitionSchema);
Table icebergTable = null;
String tableLocation = null;
- if (useDataLoacationAsTableLocation) {
+ if (useDataLocationAsTableLocation) {
tableLocation = gmce.getDatasetIdentifier().getNativeName() + String.format(TABLE_LOCATION_SUFFIX, table.getDbName());
//Set the path permission
Path tablePath = new Path(tableLocation);
@@ -435,8 +451,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
protected void rewriteFiles(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap,
Map<String, Collection<HiveSpec>> oldSpecsMap, Table table, TableMetadata tableMetadata) throws IOException {
PartitionSpec partitionSpec = table.spec();
- tableMetadata.transaction = Optional.of(tableMetadata.transaction.or(table::newTransaction));
- Transaction transaction = tableMetadata.transaction.get();
+ tableMetadata.ensureTxnInit();
Set<DataFile> newDataFiles = new HashSet<>();
getIcebergDataFilesToBeAddedHelper(gmce, table, newSpecsMap, tableMetadata)
.forEach(dataFile -> {
@@ -444,22 +459,26 @@ public class IcebergMetadataWriter implements MetadataWriter {
tableMetadata.addedFiles.put(dataFile.path(), "");
});
Set<DataFile> oldDataFiles = getIcebergDataFilesToBeDeleted(gmce, table, newSpecsMap, oldSpecsMap, partitionSpec);
+
+ // Dealing with the case when old file doesn't exist, in which it could either be converted into noop or AppendFile.
if (oldDataFiles.isEmpty() && !newDataFiles.isEmpty()) {
- //We randomly check whether one of the new data files already exists in the db to avoid duplication of re-write events
- DataFile file = newDataFiles.iterator().next();
- Expression exp = Expressions.startsWith(ICEBERG_FILE_PATH_COLUMN, (String) file.path());
+ //We randomly check whether one of the new data files already exists in the db to avoid reprocessing re-write events
+ DataFile dataFile = newDataFiles.iterator().next();
+ Expression exp = Expressions.startsWith(ICEBERG_FILE_PATH_COLUMN, dataFile.path().toString());
if (FindFiles.in(table).withMetadataMatching(exp).collect().iterator().hasNext()) {
- //This means this re-write event is duplicated with the one we already handled, so directly return
+ //This means this re-write event is duplicated with the one we already handled, so noop.
return;
}
- //This is the case when the files to be deleted do not exist in table
- //So we directly call addFiles interface to add new files into the table.
- tableMetadata.appendFiles = Optional.of(tableMetadata.appendFiles.or(transaction::newAppend));
- AppendFiles appendFiles = tableMetadata.appendFiles.get();
+ // This is the case when the files to be deleted do not exist in table
+ // So we directly call addFiles interface to add new files into the table.
+ // Note that this AppendFiles won't be committed here, in contrast to a real rewrite operation
+ // where the commit will be called at once to save memory footprints.
+ AppendFiles appendFiles = tableMetadata.getOrInitAppendFiles();
newDataFiles.forEach(appendFiles::appendFile);
return;
}
- transaction.newRewrite().rewriteFiles(oldDataFiles, newDataFiles).commit();
+
+ tableMetadata.transaction.get().newRewrite().rewriteFiles(oldDataFiles, newDataFiles).commit();
}
/**
@@ -478,15 +497,22 @@ public class IcebergMetadataWriter implements MetadataWriter {
return schemaWithOriginId;
}
+ /**
+ * Deal with both regular file deletions manifested by GMCE(aggregation but no commit),
+ * and expiring older snapshots(commit).
+ */
protected void dropFiles(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> oldSpecsMap, Table table,
TableMetadata tableMetadata, TableIdentifier tid) throws IOException {
PartitionSpec partitionSpec = table.spec();
- Transaction transaction = tableMetadata.transaction.get();
+
+ // Update DeleteFiles in tableMetadata: This is regular file deletion
+ DeleteFiles deleteFiles = tableMetadata.getOrInitDeleteFiles();
Set<DataFile> oldDataFiles =
getIcebergDataFilesToBeDeleted(gmce, table, new HashMap<>(), oldSpecsMap, partitionSpec);
- tableMetadata.deleteFiles = Optional.of(tableMetadata.deleteFiles.or(transaction::newDelete));
- DeleteFiles deleteFiles = tableMetadata.deleteFiles.get();
- oldDataFiles.stream().forEach(deleteFiles::deleteFile);
+ oldDataFiles.forEach(deleteFiles::deleteFile);
+
+ // Update ExpireSnapshots and commit the updates at once: This is for expiring snapshots that are
+ // beyond look-back allowance for time-travel.
parallelRunner.submitCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
@@ -495,6 +521,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
long start = System.currentTimeMillis();
ExpireSnapshots expireSnapshots = table.expireSnapshots();
final Table tmpTable = table;
+
expireSnapshots.deleteWith(new Consumer<String>() {
@Override
public void accept(String file) {
@@ -533,10 +560,8 @@ public class IcebergMetadataWriter implements MetadataWriter {
}
protected void addFiles(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap, Table table,
- TableMetadata tableMetadata) throws IOException {
- Transaction transaction = tableMetadata.transaction.get();
- tableMetadata.appendFiles = Optional.of(tableMetadata.appendFiles.or(transaction::newAppend));
- AppendFiles appendFiles = tableMetadata.appendFiles.get();
+ TableMetadata tableMetadata) {
+ AppendFiles appendFiles = tableMetadata.getOrInitAppendFiles();
getIcebergDataFilesToBeAddedHelper(gmce, table, newSpecsMap, tableMetadata)
.forEach(dataFile -> {
appendFiles.appendFile(dataFile);
@@ -544,18 +569,19 @@ public class IcebergMetadataWriter implements MetadataWriter {
});
}
- private Stream<DataFile> getIcebergDataFilesToBeAddedHelper(GobblinMetadataChangeEvent gmce, Table table, Map<String, Collection<HiveSpec>> newSpecsMap,
- TableMetadata tableMetadata) throws IOException {
+ private Stream<DataFile> getIcebergDataFilesToBeAddedHelper(GobblinMetadataChangeEvent gmce, Table table,
+ Map<String, Collection<HiveSpec>> newSpecsMap,
+ TableMetadata tableMetadata) {
return getIcebergDataFilesToBeAdded(gmce.getNewFiles(), table.spec(), newSpecsMap,
IcebergUtils.getSchemaIdMap(getSchemaWithOriginId(gmce), table.schema())).stream()
.filter(dataFile -> tableMetadata.addedFiles.getIfPresent(dataFile.path()) == null);
}
/**
- * Method to get dataFiles without metrics information
+ * Method to get a {@link DataFile} collection without metrics information
* This method is used to get files to be deleted from iceberg
* If oldFilePrefixes is specified in gmce, this method will use those prefixes to find old file in iceberg,
- * or the method will callmethod {IcebergUtils.getIcebergDataFileWithMetric} to get DataFile for specific file path
+ * or the method will call method {IcebergUtils.getIcebergDataFileWithMetric} to get DataFile for specific file path
*/
private Set<DataFile> getIcebergDataFilesToBeDeleted(GobblinMetadataChangeEvent gmce, Table table,
Map<String, Collection<HiveSpec>> newSpecsMap, Map<String, Collection<HiveSpec>> oldSpecsMap,
@@ -564,7 +590,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
if (gmce.getOldFilePrefixes() != null) {
Expression exp = Expressions.alwaysFalse();
for (String prefix : gmce.getOldFilePrefixes()) {
- //Use both full path and raw path to delete old files
+ // Use both full path and raw path to filter old files
exp = Expressions.or(exp, Expressions.startsWith(ICEBERG_FILE_PATH_COLUMN, prefix));
String rawPathPrefix = new Path(prefix).toUri().getRawPath();
exp = Expressions.or(exp, Expressions.startsWith(ICEBERG_FILE_PATH_COLUMN, rawPathPrefix));
@@ -572,16 +598,16 @@ public class IcebergMetadataWriter implements MetadataWriter {
long start = System.currentTimeMillis();
oldDataFiles.addAll(Sets.newHashSet(FindFiles.in(table).withMetadataMatching(exp).collect().iterator()));
//Use INFO level log here to get better estimate.
- //This shouldn't overwhelm the log since we receive less than 3 rewrite_file gmces for one table in one day
+ //This shouldn't overwhelm the log since we receive limited number of rewrite_file gmces for one table in a day
log.info("Spent {}ms to query all old files in iceberg.", System.currentTimeMillis() - start);
} else {
for (String file : gmce.getOldFiles()) {
String specPath = new Path(file).getParent().toString();
// For the use case of recompaction, the old path may contains /daily path, in this case, we find the spec from newSpecsMap
- StructLike partition = getIcebergPartition(
+ StructLike partitionVal = getIcebergPartitionVal(
oldSpecsMap.containsKey(specPath) ? oldSpecsMap.get(specPath) : newSpecsMap.get(specPath), file,
partitionSpec);
- oldDataFiles.add(IcebergUtils.getIcebergDataFileWithoutMetric(file, partitionSpec, partition));
+ oldDataFiles.add(IcebergUtils.getIcebergDataFileWithoutMetric(file, partitionSpec, partitionVal));
}
}
return oldDataFiles;
@@ -593,12 +619,11 @@ public class IcebergMetadataWriter implements MetadataWriter {
* This method will call method {IcebergUtils.getIcebergDataFileWithMetric} to get DataFile for specific file path
*/
private Set<DataFile> getIcebergDataFilesToBeAdded(List<org.apache.gobblin.metadata.DataFile> files,
- PartitionSpec partitionSpec, Map<String, Collection<HiveSpec>> newSpecsMap, Map<Integer, Integer> schemaIdMap)
- throws IOException {
+ PartitionSpec partitionSpec, Map<String, Collection<HiveSpec>> newSpecsMap, Map<Integer, Integer> schemaIdMap) {
Set<DataFile> dataFiles = new HashSet<>();
for (org.apache.gobblin.metadata.DataFile file : files) {
try {
- StructLike partition = getIcebergPartition(newSpecsMap.get(new Path(file.getFilePath()).getParent().toString()),
+ StructLike partition = getIcebergPartitionVal(newSpecsMap.get(new Path(file.getFilePath()).getParent().toString()),
file.getFilePath(), partitionSpec);
dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file, partitionSpec, partition, conf, schemaIdMap));
} catch (Exception e) {
@@ -608,15 +633,23 @@ public class IcebergMetadataWriter implements MetadataWriter {
return dataFiles;
}
- private StructLike getIcebergPartition(Collection<HiveSpec> specs, String filePath, PartitionSpec partitionSpec)
+ /**
+ * Obtain Iceberg partition value with a collection of {@link HiveSpec}.
+ * @param specs A collection of {@link HiveSpec}s.
+ * @param filePath URI of file, used for logging purpose in this method.
+ * @param partitionSpec The scheme of partition.
+ * @return The value of partition based on the given {@link PartitionSpec}.
+ * @throws IOException
+ */
+ private StructLike getIcebergPartitionVal(Collection<HiveSpec> specs, String filePath, PartitionSpec partitionSpec)
throws IOException {
if (specs == null || specs.isEmpty()) {
throw new IOException("Cannot get hive spec for " + filePath);
}
HivePartition hivePartition = specs.iterator().next().getPartition().orNull();
- StructLike partition = hivePartition == null ? null
+ StructLike partitionVal = hivePartition == null ? null
: IcebergUtils.getPartition(partitionSpec.partitionType(), hivePartition.getValues());
- return partition;
+ return partitionVal;
}
/**
@@ -647,10 +680,10 @@ public class IcebergMetadataWriter implements MetadataWriter {
Map<String, String> props = tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
//Set high waterMark
- Long highWatermark = tableCurrentWaterMarkMap.get(tid);
- props.put(String.format(GMCE_HIGH_WATERMARK_KEY, tableTopicpartitionMap.get(tid)), highWatermark.toString());
+ Long highWatermark = tableCurrentWatermarkMap.get(tid);
+ props.put(String.format(GMCE_HIGH_WATERMARK_KEY, tableTopicPartitionMap.get(tid)), highWatermark.toString());
//Set low waterMark
- props.put(String.format(GMCE_LOW_WATERMARK_KEY, tableTopicpartitionMap.get(tid)),
+ props.put(String.format(GMCE_LOW_WATERMARK_KEY, tableTopicPartitionMap.get(tid)),
tableMetadata.lowWatermark.get().toString());
//Set whether to delete metadata files after commit
props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, Boolean.toString(
@@ -667,8 +700,9 @@ public class IcebergMetadataWriter implements MetadataWriter {
//In case the topic name is not the table name or the topic name contains '-'
topicName = topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-'));
}
- //Update schema
+ //Update schema(commit)
updateSchema(tableMetadata, props, topicName);
+
//Update properties
UpdateProperties updateProperties = transaction.updateProperties();
props.forEach(updateProperties::set);
@@ -676,12 +710,17 @@ public class IcebergMetadataWriter implements MetadataWriter {
try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, tableName)) {
transaction.commitTransaction();
}
+
+ // Emit GTE for snapshot commits
Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
Map<String, String> currentProps = tableMetadata.table.get().properties();
submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, currentProps, highWatermark);
+
//Reset the table metadata for next accumulation period
tableMetadata.reset(currentProps, highWatermark);
- log.info(String.format("Finish flushing for table %s", tid.toString()));
+ log.info(String.format("Finish commit of new snapshot %s for table %s", snapshot.snapshotId(), tid.toString()));
+ } else {
+ log.info("There's no transaction initiated for the table {}", tid.toString());
}
} catch (RuntimeException e) {
throw new RuntimeException(String.format("Fail to flush table %s %s", dbName, tableName), e);
@@ -699,7 +738,7 @@ public class IcebergMetadataWriter implements MetadataWriter {
long currentSnapshotID = snapshot.snapshotId();
long endToEndLag = System.currentTimeMillis() - tableMetadata.lowestGMCEEmittedTime;
TableIdentifier tid = TableIdentifier.of(dbName, tableName);
- String gmceTopicPartition = tableTopicpartitionMap.get(tid);
+ String gmceTopicPartition = tableTopicPartitionMap.get(tid);
//Add information to automatically trigger repair jon when data loss happen
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_NAME, gmceTopicPartition.split("-")[0]);
@@ -787,28 +826,27 @@ public class IcebergMetadataWriter implements MetadataWriter {
GenericRecord genericRecord = recordEnvelope.getRecord();
GobblinMetadataChangeEvent gmce =
(GobblinMetadataChangeEvent) SpecificData.get().deepCopy(genericRecord.getSchema(), genericRecord);
- if (whiteistBlacklist.acceptTable(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName())) {
+ if (whitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName())) {
TableIdentifier tid = TableIdentifier.of(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName());
- String topicPartition = tableTopicpartitionMap.computeIfAbsent(tid,
- t -> ((KafkaStreamingExtractor.KafkaWatermark) recordEnvelope.getWatermark()).getTopicPartition().toString());
- Long currentWatermark = getCurrentWaterMark(tid, topicPartition);
- Long currentOffset =
- ((KafkaStreamingExtractor.KafkaWatermark) recordEnvelope.getWatermark()).getLwm().getValue();
+ String topicPartition = tableTopicPartitionMap.computeIfAbsent(tid,
+ t -> ((KafkaWatermark) recordEnvelope.getWatermark()).getTopicPartition().toString());
+ Long currentWatermark = getAndPersistCurrentWatermark(tid, topicPartition);
+ Long currentOffset = ((KafkaWatermark) recordEnvelope.getWatermark()).getLwm().getValue();
+
if (currentOffset > currentWatermark) {
if (currentWatermark == DEFAULT_WATERMARK) {
//This means we haven't register this table or the GMCE topic partition changed, we need to reset the low watermark
tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata()).lowWatermark =
Optional.of(currentOffset - 1);
}
- tableMetadataMap.get(tid).datasetName = gmce.getDatasetIdentifier().getNativeName();
+ tableMetadataMap.get(tid).setDatasetName(gmce.getDatasetIdentifier().getNativeName());
write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
- tableCurrentWaterMarkMap.put(tid,
- ((KafkaStreamingExtractor.KafkaWatermark) recordEnvelope.getWatermark()).getLwm().getValue());
+ tableCurrentWatermarkMap.put(tid, currentOffset);
} else {
log.warn(String.format("Skip processing record %s since it has lower watermark", genericRecord.toString()));
}
} else {
- log.info(String.format("Skip table %s.%s since it's blacklisted", tableSpec.getTable().getDbName(),
+ log.info(String.format("Skip table %s.%s since it's not selected", tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName()));
}
} finally {
@@ -821,32 +859,86 @@ public class IcebergMetadataWriter implements MetadataWriter {
this.closer.close();
}
+ /**
+ * A collection of Iceberg metadata including {@link Table} itself as well as
+ * A set of buffered objects (reflecting table's {@link org.apache.iceberg.PendingUpdate}s) within the flush interval
+ * that aggregates the metadata like location arriving / deleting files, schema,
+ * along with other table-level metadata like watermark, offset-range, etc.
+ *
+ * Also note the difference with {@link org.apache.iceberg.TableMetadata}.
+ */
private class TableMetadata {
- Optional<Transaction> transaction = Optional.absent();
Optional<Table> table = Optional.absent();
- Optional<AppendFiles> appendFiles = Optional.absent();
- Optional<DeleteFiles> deleteFiles = Optional.absent();
+
+ /**
+ * The {@link Transaction} object holds the reference of a {@link org.apache.iceberg.BaseTransaction.TransactionTableOperations}
+ * that is shared by all individual operation (e.g. {@link AppendFiles}) to ensure atomicity even if commit method
+ * is invoked from a individual operation.
+ */
+ Optional<Transaction> transaction = Optional.absent();
+ private Optional<AppendFiles> appendFiles = Optional.absent();
+ private Optional<DeleteFiles> deleteFiles = Optional.absent();
+
Optional<Map<String, String>> lastProperties = Optional.absent();
Optional<Map<String, String>> newProperties = Optional.absent();
Optional<Cache<String, Schema>> candidateSchemas = Optional.absent();
Optional<Map<String, List<Range>>> dataOffsetRange = Optional.absent();
Optional<String> lastSchemaVersion = Optional.absent();
Optional<Long> lowWatermark = Optional.absent();
+
+ @Setter
String datasetName;
+
Cache<CharSequence, String> addedFiles = CacheBuilder.newBuilder()
.expireAfterAccess(conf.getInt(ADDED_FILES_CACHE_EXPIRING_TIME, DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME),
TimeUnit.HOURS)
.build();
long lowestGMCEEmittedTime = Long.MAX_VALUE;
- public void reset(Map<String, String> props, Long lowWaterMark) {
+ /**
+ * Always use this method to obtain {@link AppendFiles} object within flush interval
+ * if clients want to have the {@link AppendFiles} committed along with other updates in a txn.
+ */
+ AppendFiles getOrInitAppendFiles() {
+ ensureTxnInit();
+ if (!this.appendFiles.isPresent()) {
+ this.appendFiles = Optional.of(this.transaction.get().newAppend());
+ }
+
+ return this.appendFiles.get();
+ }
+
+ DeleteFiles getOrInitDeleteFiles() {
+ ensureTxnInit();
+ if (!this.deleteFiles.isPresent()) {
+ this.deleteFiles = Optional.of(this.transaction.get().newDelete());
+ }
+
+ return this.deleteFiles.get();
+ }
+
+ /**
+ * Initializing {@link Transaction} object within {@link TableMetadata} when needed.
+ */
+ void ensureTxnInit() {
+ if (!this.transaction.isPresent()) {
+ this.transaction = Optional.of(table.get().newTransaction());
+ }
+ }
+
+ void reset(Map<String, String> props, Long lowWaterMark) {
this.lastProperties = Optional.of(props);
this.lastSchemaVersion = Optional.of(props.get(SCHEMA_CREATION_TIME_KEY));
- //clear cache
this.transaction = Optional.absent();
this.deleteFiles = Optional.absent();
this.appendFiles = Optional.absent();
+
+ // Clean cache and reset to eagerly release unreferenced objects.
+ if (this.candidateSchemas.isPresent()) {
+ this.candidateSchemas.get().cleanUp();
+ }
this.candidateSchemas = Optional.absent();
+
this.dataOffsetRange = Optional.absent();
this.newProperties = Optional.absent();
this.lowestGMCEEmittedTime = Long.MAX_VALUE;
diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 066bcc28..2ae9859 100644
--- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -154,13 +154,17 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
@Test ( priority = 0 )
public void testWriteAddFileGMCE() throws IOException {
- gobblinMCEWriterWithAcceptClusters.writeEnvelope(new RecordEnvelope<>(gmce,
+ // Creating a copy of gmce with static type in GenericRecord to work with writeEnvelop method
+ // without risking running into type cast runtime error.
+ GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+
+ gobblinMCEWriterWithAcceptClusters.writeEnvelope(new RecordEnvelope<>(genericGmce,
new KafkaStreamingExtractor.KafkaWatermark(
new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(10L))));
//Test when accept clusters does not contain the gmce cluster, we will skip
Assert.assertEquals(catalog.listTables(Namespace.of(dbName)).size(), 0);
- gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
+ gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
new KafkaStreamingExtractor.KafkaWatermark(
new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(10L))));
@@ -169,8 +173,11 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
Assert.assertFalse(table.properties().containsKey("offset.range.testTopic-1"));
Assert.assertEquals(table.location(),
new File(tmpDir, "data/tracking/testIcebergTable/_iceberg_metadata/").getAbsolutePath() + "/" + dbName);
+
gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "1000-2000").build());
- gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
+ GenericRecord genericGmce_1000_2000 = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+
+ gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce_1000_2000,
new KafkaStreamingExtractor.KafkaWatermark(
new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(20L))));
@@ -189,7 +196,8 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
.setFileFormat("avro")
.setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
.build()));
- gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
+ GenericRecord genericGmce_2000_3000 = GenericData.get().deepCopy(gmce.getSchema(), gmce);
+ gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce_2000_3000,
new KafkaStreamingExtractor.KafkaWatermark(
new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(30L))));
@@ -202,7 +210,7 @@ public class IcebergMetadataWriterTest extends HiveMetastoreTest {
/* Test it will skip event with lower watermark*/
gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, String>builder().put("testTopic-1", "3000-4000").build());
- gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
+ gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
new KafkaStreamingExtractor.KafkaWatermark(
new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(30L))));
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java
index 389e6ab..07f0172 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/reporter/FileFailureEventReporterTest.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.annotations.Test;
-import avro.shaded.com.google.common.collect.Maps;
+import com.google.common.collect.Maps;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
index 0104f43..5278092 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
@@ -64,6 +64,8 @@ public class AzkabanGobblinYarnAppLauncher extends AbstractJob {
throws IOException {
super(jobId, LOGGER);
+ addRuntimeProperties(gobblinProps);
+
Config gobblinConfig = ConfigUtils.propertiesToConfig(gobblinProps);
//Suppress logs from classes that emit Yarn application Id that Azkaban uses to kill the application.
@@ -98,6 +100,12 @@ public class AzkabanGobblinYarnAppLauncher extends AbstractJob {
return new YarnConfiguration();
}
+ /**
+ * Extended class can override this method to add some runtime properties.
+ */
+ protected void addRuntimeProperties(Properties gobblinProps) {
+ }
+
@Override
public void run() throws Exception {
this.gobblinYarnAppLauncher.launch();
diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
index 2e1b113..a517f12 100644
--- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
+++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
@@ -20,6 +20,7 @@ import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
+import org.apache.gobblin.util.AvroSchemaUtils;
import org.apache.orc.TypeDescription;
@@ -102,8 +103,8 @@ public class AvroOrcSchemaConverter {
*/
private static TypeDescription getTypeDescriptionForBinarySchema(Schema avroSchema) {
if ("decimal".equalsIgnoreCase(avroSchema.getProp("logicalType"))) {
- int scale = avroSchema.getJsonProp("scale").asInt(0);
- int precision = avroSchema.getJsonProp("precision").asInt();
+ int scale = AvroSchemaUtils.getValueAsInteger(avroSchema, "scale");
+ int precision = AvroSchemaUtils.getValueAsInteger(avroSchema, "precision");
return TypeDescription.createDecimal().withScale(scale).withPrecision(precision);
}
diff --git a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractor.java b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractor.java
index 557c337..1410f00 100644
--- a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractor.java
+++ b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractor.java
@@ -31,7 +31,7 @@ import com.google.api.services.webmasters.model.ApiDimensionFilter;
import com.google.common.base.Splitter;
import com.google.gson.JsonArray;
-import avro.shaded.com.google.common.collect.Iterables;
+import com.google.common.collect.Iterables;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
diff --git a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractorIterator.java b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractorIterator.java
index 042bbae..df3e479 100644
--- a/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractorIterator.java
+++ b/gobblin-modules/google-ingestion/src/main/java/org/apache/gobblin/ingestion/google/webmaster/GoogleWebmasterExtractorIterator.java
@@ -41,7 +41,7 @@ import com.google.api.services.webmasters.model.ApiDimensionFilter;
import com.google.api.services.webmasters.model.SearchAnalyticsQueryResponse;
import com.google.common.base.Optional;
-import avro.shaded.com.google.common.base.Joiner;
+import com.google.common.base.Joiner;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.WorkUnitState;
diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/restli/throttling/ThrottlingClientTest.java b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/restli/throttling/ThrottlingClientTest.java
index 135f462..10be171 100644
--- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/restli/throttling/ThrottlingClientTest.java
+++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-client/src/test/java/org/apache/gobblin/restli/throttling/ThrottlingClientTest.java
@@ -43,7 +43,7 @@ import org.apache.gobblin.broker.BrokerConfigurationKeyGenerator;
import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.util.limiter.broker.SharedLimiterKey;
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
public class ThrottlingClientTest {
diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/ConfigStoreBasedPolicyTest.java b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/ConfigStoreBasedPolicyTest.java
index b0e6812..92e3336 100644
--- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/ConfigStoreBasedPolicyTest.java
+++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/ConfigStoreBasedPolicyTest.java
@@ -29,7 +29,7 @@ import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.util.limiter.broker.SharedLimiterKey;
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
public class ConfigStoreBasedPolicyTest {
diff --git a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java
index bc538ec..15fcae4 100644
--- a/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java
+++ b/gobblin-restli/gobblin-throttling-service/gobblin-throttling-service-server/src/test/java/org/apache/gobblin/restli/throttling/LimiterServerResourceTest.java
@@ -38,7 +38,7 @@ import org.apache.gobblin.broker.BrokerConfigurationKeyGenerator;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.util.limiter.broker.SharedLimiterKey;
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
public class LimiterServerResourceTest {
diff --git a/gobblin-runtime-hadoop/src/test/java/org/apache/gobblin/runtime/instance/plugin/hadoop/TestHadoopKerberosKeytabAuthenticationPlugin.java b/gobblin-runtime-hadoop/src/test/java/org/apache/gobblin/runtime/instance/plugin/hadoop/TestHadoopKerberosKeytabAuthenticationPlugin.java
index 0a567a1..bf29d05 100644
--- a/gobblin-runtime-hadoop/src/test/java/org/apache/gobblin/runtime/instance/plugin/hadoop/TestHadoopKerberosKeytabAuthenticationPlugin.java
+++ b/gobblin-runtime-hadoop/src/test/java/org/apache/gobblin/runtime/instance/plugin/hadoop/TestHadoopKerberosKeytabAuthenticationPlugin.java
@@ -27,7 +27,7 @@ import com.typesafe.config.ConfigFactory;
import org.apache.gobblin.runtime.api.GobblinInstanceDriver;
import org.apache.gobblin.runtime.std.DefaultConfigurableImpl;
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
/**
* Unit tests for {@link HadoopKerberosKeytabAuthenticationPlugin}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceDriver.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceDriver.java
index 8ad9093..a56410e 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceDriver.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/TestStandardGobblinInstanceDriver.java
@@ -36,7 +36,7 @@ import org.apache.gobblin.runtime.api.GobblinInstancePluginFactory;
import org.apache.gobblin.runtime.plugins.email.EmailNotificationPlugin;
import org.apache.gobblin.runtime.std.DefaultConfigurableImpl;
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
import lombok.AllArgsConstructor;
import lombok.Getter;
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/hadoop/TestHadoopConfigLoader.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/hadoop/TestHadoopConfigLoader.java
index ce9aa50..a85c85e 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/hadoop/TestHadoopConfigLoader.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/instance/hadoop/TestHadoopConfigLoader.java
@@ -23,7 +23,7 @@ import org.testng.annotations.Test;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import avro.shaded.com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap;
/**
* Unit tests for {@link HadoopConfigLoader}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index 9792b83..399bfb8 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -33,7 +33,7 @@ import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import avro.shaded.com.google.common.annotations.VisibleForTesting;
+import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
index 771026c..7a15357 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java
@@ -26,7 +26,6 @@ import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
-import org.codehaus.jackson.JsonNode;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -417,7 +416,8 @@ public class AvroFlattener {
if (StringUtils.isNotBlank(flattenSource)) {
field.addProp(FLATTENED_SOURCE_KEY, flattenSource);
}
- for (Map.Entry<String, JsonNode> entry : f.getJsonProps().entrySet()) {
+ // Avro 1.9 compatible change - replaced deprecated public api getJsonProps with getObjectProps
+ for (Map.Entry<String, Object> entry : f.getObjectProps().entrySet()) {
field.addProp(entry.getKey(), entry.getValue());
}
flattenedFields.add(field);
@@ -466,8 +466,8 @@ public class AvroFlattener {
private static void copyProperties(Schema oldSchema, Schema newSchema) {
Preconditions.checkNotNull(oldSchema);
Preconditions.checkNotNull(newSchema);
-
- Map<String, JsonNode> props = oldSchema.getJsonProps();
+ // Avro 1.9 compatible change - replaced deprecated public api getJsonProps with getObjectProps
+ Map<String, Object> props = oldSchema.getObjectProps();
copyProperties(props, newSchema);
}
@@ -476,12 +476,12 @@ public class AvroFlattener {
* @param props Properties to copy to Avro Schema
* @param schema Avro Schema to copy properties to
*/
- private static void copyProperties(Map<String, JsonNode> props, Schema schema) {
+ private static void copyProperties(Map<String, Object> props, Schema schema) {
Preconditions.checkNotNull(schema);
// (if null, don't copy but do not throw exception)
if (null != props) {
- for (Map.Entry<String, JsonNode> prop : props.entrySet()) {
+ for (Map.Entry<String, Object> prop : props.entrySet()) {
schema.addProp(prop.getKey(), prop.getValue());
}
}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroSchemaUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroSchemaUtils.java
new file mode 100644
index 0000000..e1ad552
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroSchemaUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util;
+
+import org.apache.avro.Schema;
+
+
+/**
+ * Avro schema utility class to perform schema property conversion to the appropriate data types
+ */
+public class AvroSchemaUtils {
+
+ private AvroSchemaUtils() {
+
+ }
+
+ public static Integer getValueAsInteger(final Schema schema, String prop) {
+ Object value = schema.getObjectProp(prop);
+ if (value instanceof Integer) {
+ return (Integer) value;
+ } else if (value instanceof String) {
+ return Integer.parseInt((String) value);
+ }
+ return null;
+ }
+
+ public static Integer getValueAsInteger(final Schema.Field field, String prop) {
+ Object value = field.getObjectProp(prop);
+ if (value instanceof Integer) {
+ return (Integer) value;
+ } else if (value instanceof String) {
+ return Integer.parseInt((String) value);
+ }
+ return null;
+ }
+
+ public static String getValueAsString(final Schema schema, String prop) {
+ Object value = schema.getObjectProp(prop);
+ if (value instanceof String) {
+ return (String) value;
+ }
+ return null;
+ }
+
+ public static String getValueAsString(final Schema.Field field, String prop) {
+ Object value = field.getObjectProp(prop);
+ if (value instanceof String) {
+ return (String) value;
+ }
+ return null;
+ }
+}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 3d2587f..d976ea7 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -62,7 +62,6 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
-import org.codehaus.jackson.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -147,9 +146,10 @@ public class AvroUtils {
* Common use cases for this method is in traversing {@link Schema} object into nested level and create {@link Schema}
* object for non-root level.
*/
- public static void convertFieldToSchemaWithProps(Map<String,JsonNode> fieldProps, Schema targetSchemaObj) {
- for (Map.Entry<String, JsonNode> stringJsonNodeEntry : fieldProps.entrySet()) {
- targetSchemaObj.addProp(stringJsonNodeEntry.getKey(), stringJsonNodeEntry.getValue());
+ public static void convertFieldToSchemaWithProps(Map<String,Object> fieldProps,
+ Schema targetSchemaObj) {
+ for (Map.Entry<String, Object> objectEntry : fieldProps.entrySet()) {
+ targetSchemaObj.addProp(objectEntry.getKey(), objectEntry.getValue());
}
}
@@ -830,8 +830,8 @@ public class AvroUtils {
private static void copyProperties(Schema oldSchema, Schema newSchema) {
Preconditions.checkNotNull(oldSchema);
Preconditions.checkNotNull(newSchema);
-
- Map<String, JsonNode> props = oldSchema.getJsonProps();
+ // Avro 1.9 compatible change - replaced deprecated public api getJsonProps with getObjectProps
+ Map<String, Object> props = oldSchema.getObjectProps();
copyProperties(props, newSchema);
}
@@ -840,12 +840,12 @@ public class AvroUtils {
* @param props Properties to copy to Avro Schema
* @param schema Avro Schema to copy properties to
*/
- private static void copyProperties(Map<String, JsonNode> props, Schema schema) {
+ private static void copyProperties(Map<String, Object> props, Schema schema) {
Preconditions.checkNotNull(schema);
// (if null, don't copy but do not throw exception)
if (null != props) {
- for (Map.Entry<String, JsonNode> prop : props.entrySet()) {
+ for (Map.Entry<String, Object> prop : props.entrySet()) {
schema.addProp(prop.getKey(), prop.getValue());
}
}
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index 11274af..6f18a35 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -45,6 +45,7 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.mapred.FsInput;
+import org.apache.avro.util.internal.JacksonUtils;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -620,10 +621,13 @@ public class AvroUtilsTest {
.getResourceAsStream("recursive_schemas/recursive_" + scenario + "_solution.avsc"));
// get the answer from the input schema (test author needs to provide this)
- ArrayNode foo = (ArrayNode) inputSchema.getJsonProp("recursive_fields");
+ // Avro 1.9 compatible change - replaced deprecated public api getJsonProps with getObjectProps
+ // Use internal JacksonUtils to convert object to the corresponding JsonNode (ArrayNode)
+ ArrayNode foo = (ArrayNode) JacksonUtils.toJsonNode(inputSchema.getObjectProp(
+ "recursive_fields"));
HashSet<String> answers = new HashSet<>();
for (JsonNode fieldsWithRecursion: foo) {
- answers.add(fieldsWithRecursion.getTextValue());
+ answers.add(fieldsWithRecursion.asText());
}
Pair<Schema, List<AvroUtils.SchemaEntry>> results = AvroUtils.dropRecursiveFields(inputSchema);
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 01cb964..c0cde51 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -23,6 +23,7 @@ import java.net.URI;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -45,8 +46,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -191,7 +195,6 @@ public class GobblinYarnAppLauncher {
private final HelixManager helixManager;
private final Configuration yarnConfiguration;
- private final YarnClient yarnClient;
private final FileSystem fs;
private final EventBus eventBus = new EventBus(GobblinYarnAppLauncher.class.getSimpleName());
@@ -235,6 +238,9 @@ public class GobblinYarnAppLauncher {
private final String containerTimezone;
private final String appLauncherMode;
+ private final String originalYarnRMAddress;
+ private final Map<String, YarnClient> potentialYarnClients;
+ private YarnClient yarnClient;
public GobblinYarnAppLauncher(Config config, YarnConfiguration yarnConfiguration) throws IOException {
this.config = config;
@@ -253,8 +259,16 @@ public class GobblinYarnAppLauncher {
YarnHelixUtils.setYarnClassPath(config, this.yarnConfiguration);
YarnHelixUtils.setAdditionalYarnClassPath(config, this.yarnConfiguration);
this.yarnConfiguration.set("fs.automatic.close", "false");
- this.yarnClient = YarnClient.createYarnClient();
- this.yarnClient.init(this.yarnConfiguration);
+ this.originalYarnRMAddress = this.yarnConfiguration.get(GobblinYarnConfigurationKeys.YARN_RESOURCE_MANAGER_ADDRESS);
+ this.potentialYarnClients = new HashMap();
+ Set<String> potentialRMAddresses = new HashSet<>(ConfigUtils.getStringList(config, GobblinYarnConfigurationKeys.OTHER_YARN_RESOURCE_MANAGER_ADDRESSES));
+ potentialRMAddresses.add(originalYarnRMAddress);
+ for (String rmAddress : potentialRMAddresses) {
+ YarnClient tmpYarnClient = YarnClient.createYarnClient();
+ this.yarnConfiguration.set(GobblinYarnConfigurationKeys.YARN_RESOURCE_MANAGER_ADDRESS, rmAddress);
+ tmpYarnClient.init(new YarnConfiguration(this.yarnConfiguration));
+ potentialYarnClients.put(rmAddress, tmpYarnClient);
+ }
this.fs = GobblinClusterUtils.buildFileSystem(config, this.yarnConfiguration);
this.closer.register(this.fs);
@@ -340,20 +354,20 @@ public class GobblinYarnAppLauncher {
connectHelixManager();
- startYarnClient();
-
- Optional<ApplicationId> reconnectableApplicationId = getReconnectableApplicationId();
-
- boolean isReconnected = reconnectableApplicationId.isPresent();
- // Before setup application, first login to make sure ugi has the right token.
+ //Before connect with yarn client, we need to login to get the token
if(ConfigUtils.getBoolean(config, GobblinYarnConfigurationKeys.ENABLE_KEY_MANAGEMENT, false)) {
- this.securityManager = Optional.of(buildSecurityManager(isReconnected));
+ this.securityManager = Optional.of(buildSecurityManager());
this.securityManager.get().loginAndScheduleTokenRenewal();
}
- if (!reconnectableApplicationId.isPresent()) {
+ startYarnClient();
+
+ this.applicationId = getReconnectableApplicationId();
+
+ if (!this.applicationId.isPresent()) {
disableLiveHelixInstances();
LOGGER.info("No reconnectable application found so submitting a new application");
+ this.yarnClient = potentialYarnClients.get(this.originalYarnRMAddress);
this.applicationId = Optional.of(setupAndSubmitApplication());
}
@@ -549,12 +563,16 @@ public class GobblinYarnAppLauncher {
@VisibleForTesting
void startYarnClient() {
- this.yarnClient.start();
+ for (YarnClient yarnClient : potentialYarnClients.values()) {
+ yarnClient.start();
+ }
}
@VisibleForTesting
void stopYarnClient() {
- this.yarnClient.stop();
+ for (YarnClient yarnClient : potentialYarnClients.values()) {
+ yarnClient.stop();
+ }
}
/**
@@ -574,19 +592,21 @@ public class GobblinYarnAppLauncher {
@VisibleForTesting
Optional<ApplicationId> getReconnectableApplicationId() throws YarnException, IOException {
- List<ApplicationReport> applicationReports =
- this.yarnClient.getApplications(APPLICATION_TYPES, RECONNECTABLE_APPLICATION_STATES);
- if (applicationReports == null || applicationReports.isEmpty()) {
- return Optional.absent();
- }
+ for (YarnClient yarnClient: potentialYarnClients.values()) {
+ List<ApplicationReport> applicationReports = yarnClient.getApplications(APPLICATION_TYPES, RECONNECTABLE_APPLICATION_STATES);
+ if (applicationReports == null || applicationReports.isEmpty()) {
+ continue;
+ }
- // Try to find an application with a matching application name
- for (ApplicationReport applicationReport : applicationReports) {
- if (this.applicationName.equals(applicationReport.getName())) {
- String applicationId = sanitizeApplicationId(applicationReport.getApplicationId().toString());
- LOGGER.info("Found reconnectable application with application ID: " + applicationId);
- LOGGER.info("Application Tracking URL: " + applicationReport.getTrackingUrl());
- return Optional.of(applicationReport.getApplicationId());
+ // Try to find an application with a matching application name
+ for (ApplicationReport applicationReport : applicationReports) {
+ if (this.applicationName.equals(applicationReport.getName())) {
+ String applicationId = sanitizeApplicationId(applicationReport.getApplicationId().toString());
+ LOGGER.info("Found reconnectable application with application ID: " + applicationId);
+ LOGGER.info("Application Tracking URL: " + applicationReport.getTrackingUrl());
+ this.yarnClient = yarnClient;
+ return Optional.of(applicationReport.getApplicationId());
+ }
}
}
@@ -835,14 +855,22 @@ public class GobblinYarnAppLauncher {
TokenUtils.getAllFSTokens(new Configuration(), credentials, renewerName,
Optional.absent(), ConfigUtils.getStringList(this.config, TokenUtils.OTHER_NAMENODES));
-
+ // Only pass token here and no secrets. (since there is no simple way to remove single token/ get secrets)
+ // For RM token, only pass the RM token for the current RM, or the RM will fail to update the token
+ Credentials finalCredentials = new Credentials();
+ for (Token<? extends TokenIdentifier> token: credentials.getAllTokens()) {
+ if (token.getKind().equals(new Text("RM_DELEGATION_TOKEN")) && !token.getService().equals(new Text(this.originalYarnRMAddress))) {
+ continue;
+ }
+ finalCredentials.addToken(token.getService(), token);
+ }
Closer closer = Closer.create();
try {
DataOutputBuffer dataOutputBuffer = closer.register(new DataOutputBuffer());
- credentials.writeTokenStorageToStream(dataOutputBuffer);
+ finalCredentials.writeTokenStorageToStream(dataOutputBuffer);
ByteBuffer fsTokens = ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
containerLaunchContext.setTokens(fsTokens);
- LOGGER.info("Setting containerLaunchContext with All credential tokens: " + credentials.getAllTokens());
+ LOGGER.info("Setting containerLaunchContext with All credential tokens: " + finalCredentials.getAllTokens());
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
@@ -872,7 +900,7 @@ public class GobblinYarnAppLauncher {
return logRootDir;
}
- private AbstractYarnAppSecurityManager buildSecurityManager(boolean isReconnected) throws IOException {
+ private AbstractYarnAppSecurityManager buildSecurityManager() throws IOException {
Path tokenFilePath = new Path(this.fs.getHomeDirectory(), this.applicationName + Path.SEPARATOR +
GobblinYarnConfigurationKeys.TOKEN_FILE_NAME);
@@ -881,7 +909,7 @@ public class GobblinYarnAppLauncher {
try {
return (AbstractYarnAppSecurityManager) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(aliasResolver.resolve(
ConfigUtils.getString(config, GobblinYarnConfigurationKeys.SECURITY_MANAGER_CLASS, GobblinYarnConfigurationKeys.DEFAULT_SECURITY_MANAGER_CLASS))), this.config, this.helixManager, this.fs,
- tokenFilePath, isReconnected);
+ tokenFilePath);
} catch (ReflectiveOperationException e) {
throw new IOException(e);
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 956848e..fcc30d3 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -37,6 +37,8 @@ public class GobblinYarnConfigurationKeys {
public static final int DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS = 300;
public static final String APP_VIEW_ACL = GOBBLIN_YARN_PREFIX + "appViewAcl";
public static final String DEFAULT_APP_VIEW_ACL = "*";
+ public static final String YARN_RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.address";
+ public static final String OTHER_YARN_RESOURCE_MANAGER_ADDRESSES= "other.yarn.resourcemanager.addresses";
// Gobblin Yarn ApplicationMaster configuration properties.
public static final String APP_MASTER_MEMORY_MBS_KEY = GOBBLIN_YARN_PREFIX + "app.master.memory.mbs";