You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/01/06 07:48:51 UTC

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #3172: [GOBBLIN-1335] Publish GMCE(GobblinMetadataChangeEvent) publisher and iceberg retention job to Gobblin OSS

autumnust commented on a change in pull request #3172:
URL: https://github.com/apache/incubator-gobblin/pull/3172#discussion_r552269352



##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg;
+
+import azkaban.jobExecutor.AbstractJob;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
+import org.apache.gobblin.iceberg.publisher.GobblinMCEPublisher;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metadata.DataFile;
+import org.apache.gobblin.metadata.DataMetrics;
+import org.apache.gobblin.metadata.DataOrigin;
+import org.apache.gobblin.metadata.DatasetIdentifier;
+import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
+import org.apache.gobblin.metadata.IntegerBytesPair;
+import org.apache.gobblin.metadata.IntegerLongPair;
+import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metadata.SchemaSource;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.util.ClustersNames;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.writer.PartitionedDataWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+
+
+/**
+ * A class for emitting GobblinMCE (Gobblin Metadata Change Event that includes the information of the file metadata change,

Review comment:
       Unless you planning to have different wiki page explaining usage of these constructs, let's add a little bit more details (I notice there are more detailed comments in each method, maybe consolidate them a bit ? ):
   - We need a sub/pub system like Kafka in the middle. 
   - This class is running along with data ingestor and require an additional metadata ingestor to process and write Iceberg metadata. 

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg;
+
+import azkaban.jobExecutor.AbstractJob;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
+import org.apache.gobblin.iceberg.publisher.GobblinMCEPublisher;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metadata.DataFile;
+import org.apache.gobblin.metadata.DataMetrics;
+import org.apache.gobblin.metadata.DataOrigin;
+import org.apache.gobblin.metadata.DatasetIdentifier;
+import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
+import org.apache.gobblin.metadata.IntegerBytesPair;
+import org.apache.gobblin.metadata.IntegerLongPair;
+import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metadata.SchemaSource;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.util.ClustersNames;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.writer.PartitionedDataWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+
+
+/**
+ * A class for emitting GobblinMCE (Gobblin Metadata Change Event that includes the information of the file metadata change,
+ * i.e., add or delete file, and the column min/max value of the added file.
+ * GMCE will be consumed by metadata pipeline to register/de-register hive/iceberg metadata)
+ */
+@Slf4j
+public abstract class GobblinMCEProducer<D> implements Closeable {
+
+  public static final String GMCE_PRODUCER_CLASS = "GobblinMCEProducer.class.name";
+  public static final String OLD_FILES_HIVE_REGISTRATION_KEY = "old.files.hive.registration.policy";
+  public static final String FORMAT_KEY = "writer.output.format";
+  public static final String DATASET_DIR = "dataset.dir";
+  public static final String HIVE_PARTITION_NAME = "hive.partition.name";
+  private static final String HDFS_PLATFORM_URN = "urn:li:dataPlatform:hdfs";
+  private static final String DATASET_ORIGIN_KEY = "dataset.origin";
+  private static final String DEFAULT_DATASET_ORIGIN = "PROD";
+
+  @Setter
+  private State state;
+  private MetricContext metricContext;
+
+  public GobblinMCEProducer(State state) {
+    this.state = state;
+    this.metricContext = Instrumented.getMetricContext(state, this.getClass());
+  }
+
+
+  /**
+   * This method will use the files to compute the table name and dataset name, for each table it will generate one GMCE and send that to kafka so
+   * the metadata ingestion pipeline can use the information to register metadata
+   * @param newFiles The map of new files' path and metrics
+   * @param oldFiles the list of old file to be dropped
+   * @param offsetRange offset rang of the new data, can be null
+   * @param operationType The opcode of gmce emitted by this method.
+   * @throws IOException
+   */
+  public void sendGMCE(Map<Path, Metrics> newFiles, List<String> oldFiles, List<String> oldFilePrefixs,

Review comment:
       typo: oldFilePrefixes

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.publisher;
+
+import com.google.common.io.Closer;
+import org.apache.gobblin.iceberg.GobblinMCEProducer;
+import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metadata.SchemaSource;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.writer.PartitionedDataWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.orc.OrcMetrics;
+
+
+/**
+ *  A {@link DataPublisher} that compute and emit GobblinMetadataChangeEvent to kafka and rely on metadata ingestion pipeline
+ *  to register metadata.
+ *
+ * <p>
+ *   This publisher is not responsible for publishing data, and it relies on another publisher
+ *   to document the published paths in property {@link NEW_FILES_LIST}.
+ *   This publisher will use {@link GobblinMCEProducer} to emit GMCE events.
+ * </p>
+ */
+
+@Slf4j
+public class GobblinMCEPublisher extends DataPublisher {
+  public static final String OFFSET_RANGE_KEY = "offset.range";
+  public static final String MAP_DELIMITER_KEY = ":";
+  public static final String NEW_FILES_LIST = "new.files.list";
+  public static final String AVRO_SCHEMA_WITH_ICEBERG_ID = "avro.schema.with.iceberg.id";
+  private final GobblinMCEProducer producer;
+
+  private final Closer closer = Closer.create();
+  private final Configuration conf;
+  private static final PathFilter HIDDEN_FILES_FILTER = p -> {

Review comment:
       `org.apache.gobblin.util.filters.HiddenFilter` seems to serve the same purpose. 

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg;
+
+import azkaban.jobExecutor.AbstractJob;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
+import org.apache.gobblin.iceberg.publisher.GobblinMCEPublisher;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metadata.DataFile;
+import org.apache.gobblin.metadata.DataMetrics;
+import org.apache.gobblin.metadata.DataOrigin;
+import org.apache.gobblin.metadata.DatasetIdentifier;
+import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
+import org.apache.gobblin.metadata.IntegerBytesPair;
+import org.apache.gobblin.metadata.IntegerLongPair;
+import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metadata.SchemaSource;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.util.ClustersNames;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.writer.PartitionedDataWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+
+
+/**
+ * A class for emitting GobblinMCE (Gobblin Metadata Change Event that includes the information of the file metadata change,
+ * i.e., add or delete file, and the column min/max value of the added file.
+ * GMCE will be consumed by metadata pipeline to register/de-register hive/iceberg metadata)
+ */
+@Slf4j
+public abstract class GobblinMCEProducer<D> implements Closeable {
+
+  public static final String GMCE_PRODUCER_CLASS = "GobblinMCEProducer.class.name";
+  public static final String OLD_FILES_HIVE_REGISTRATION_KEY = "old.files.hive.registration.policy";
+  public static final String FORMAT_KEY = "writer.output.format";
+  public static final String DATASET_DIR = "dataset.dir";
+  public static final String HIVE_PARTITION_NAME = "hive.partition.name";
+  private static final String HDFS_PLATFORM_URN = "urn:li:dataPlatform:hdfs";
+  private static final String DATASET_ORIGIN_KEY = "dataset.origin";
+  private static final String DEFAULT_DATASET_ORIGIN = "PROD";
+
+  @Setter
+  private State state;
+  private MetricContext metricContext;
+
+  public GobblinMCEProducer(State state) {
+    this.state = state;
+    this.metricContext = Instrumented.getMetricContext(state, this.getClass());
+  }
+
+
+  /**
+   * This method will use the files to compute the table name and dataset name, for each table it will generate one GMCE and send that to kafka so
+   * the metadata ingestion pipeline can use the information to register metadata
+   * @param newFiles The map of new files' path and metrics
+   * @param oldFiles the list of old file to be dropped
+   * @param offsetRange offset rang of the new data, can be null
+   * @param operationType The opcode of gmce emitted by this method.
+   * @throws IOException
+   */
+  public void sendGMCE(Map<Path, Metrics> newFiles, List<String> oldFiles, List<String> oldFilePrefixs,
+      Map<String, String> offsetRange, OperationType operationType, SchemaSource schemaSource) throws IOException {
+    GobblinMetadataChangeEvent gmce =
+        getGobblinMetadataChangeEvent(newFiles, oldFiles, oldFilePrefixs, offsetRange, operationType, schemaSource);
+    underlyingSendGMCE(gmce);
+  }
+
+  /**
+   * Use the producer to send GMCE, the implementation need to make sure the emitting is at-least once in-order delivery
+   * (i.e. use kafka producer to send event and config it to be at-least once delivery)
+   * @param gmce GMCE that contains information of the metadata change
+   */
+  public abstract void underlyingSendGMCE(GobblinMetadataChangeEvent gmce);

Review comment:
       Will you consider using a non-linkedin kafka producer as a default implementation here? It doesn't have to be in this PR though, but having this as the only abstract method seems weird as a user. 

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.publisher;
+
+import com.google.common.io.Closer;
+import org.apache.gobblin.iceberg.GobblinMCEProducer;
+import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metadata.SchemaSource;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.writer.PartitionedDataWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.orc.OrcMetrics;
+
+
+/**
+ *  A {@link DataPublisher} that compute and emit GobblinMetadataChangeEvent to kafka and rely on metadata ingestion pipeline
+ *  to register metadata.
+ *
+ * <p>
+ *   This publisher is not responsible for publishing data, and it relies on another publisher
+ *   to document the published paths in property {@link NEW_FILES_LIST}.
+ *   This publisher will use {@link GobblinMCEProducer} to emit GMCE events.
+ * </p>
+ */
+
+@Slf4j
+public class GobblinMCEPublisher extends DataPublisher {
+  public static final String OFFSET_RANGE_KEY = "offset.range";
+  public static final String MAP_DELIMITER_KEY = ":";
+  public static final String NEW_FILES_LIST = "new.files.list";
+  public static final String AVRO_SCHEMA_WITH_ICEBERG_ID = "avro.schema.with.iceberg.id";
+  private final GobblinMCEProducer producer;
+
+  private final Closer closer = Closer.create();
+  private final Configuration conf;
+  private static final PathFilter HIDDEN_FILES_FILTER = p -> {
+    String name = p.getName();
+    return !name.startsWith("_") && !name.startsWith(".");
+  };
+
+  public GobblinMCEPublisher(State state) throws IOException {
+
+    this(state, GobblinMCEProducer.getGobblinMCEProducer(state));
+  }
+
+  public GobblinMCEPublisher(State state, GobblinMCEProducer producer) {
+    super(state);
+    this.producer = this.closer.register(producer);
+    conf = HadoopUtils.getConfFromState(state);
+  }
+
+  public void publishData(Collection<? extends WorkUnitState> states) throws IOException {
+    // First aggregate the new files by partition
+    for (State state : states) {
+      Map<Path, Metrics> newFiles = computeFileMetrics(state);
+      Map<String, String> offsetRange = getPartitionOffsetRange(OFFSET_RANGE_KEY);
+      this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
+    }
+  }
+
+  private Map<String, String> getPartitionOffsetRange(String offsetKey) {
+    return state.getPropAsList(offsetKey)
+        .stream()
+        .collect(Collectors.toMap(s -> s.split(MAP_DELIMITER_KEY)[0], s -> s.split(MAP_DELIMITER_KEY)[1]));
+  }
+
+  /**
+   * For each publish  path, get all the data files under path
+   * and calculate the hive spec for each datafile and submit the task to register that datafile
+   * @throws IOException
+   */
+  private Map<Path, Metrics> computeFileMetrics(State state) throws IOException {
+    Map<Path, Metrics> newFiles = new HashMap<>();
+    NameMapping mapping = getNameMapping();
+
+    for (final String pathString : state.getPropAsList(NEW_FILES_LIST, "")) {
+      Path path = new Path(pathString);
+      FileSystem fs = path.getFileSystem(conf);

Review comment:
       Should this be put outside the for loop as creating fs object could be quite expensive? 

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg;
+
+import azkaban.jobExecutor.AbstractJob;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
+import org.apache.gobblin.iceberg.publisher.GobblinMCEPublisher;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metadata.DataFile;
+import org.apache.gobblin.metadata.DataMetrics;
+import org.apache.gobblin.metadata.DataOrigin;
+import org.apache.gobblin.metadata.DatasetIdentifier;
+import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
+import org.apache.gobblin.metadata.IntegerBytesPair;
+import org.apache.gobblin.metadata.IntegerLongPair;
+import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metadata.SchemaSource;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.util.ClustersNames;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.writer.PartitionedDataWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+
+
+/**
+ * A class for emitting GobblinMCE (Gobblin Metadata Change Event that includes the information of the file metadata change,
+ * i.e., add or delete file, and the column min/max value of the added file.
+ * GMCE will be consumed by metadata pipeline to register/de-register hive/iceberg metadata)
+ */
+@Slf4j
+public abstract class GobblinMCEProducer<D> implements Closeable {
+
+  public static final String GMCE_PRODUCER_CLASS = "GobblinMCEProducer.class.name";
+  public static final String OLD_FILES_HIVE_REGISTRATION_KEY = "old.files.hive.registration.policy";
+  public static final String FORMAT_KEY = "writer.output.format";
+  public static final String DATASET_DIR = "dataset.dir";
+  public static final String HIVE_PARTITION_NAME = "hive.partition.name";
+  private static final String HDFS_PLATFORM_URN = "urn:li:dataPlatform:hdfs";
+  private static final String DATASET_ORIGIN_KEY = "dataset.origin";
+  private static final String DEFAULT_DATASET_ORIGIN = "PROD";
+
+  @Setter
+  private State state;
+  private MetricContext metricContext;
+
+  public GobblinMCEProducer(State state) {
+    this.state = state;
+    this.metricContext = Instrumented.getMetricContext(state, this.getClass());
+  }
+
+
+  /**
+   * This method will use the files to compute the table name and dataset name, for each table it will generate one GMCE and send that to kafka so
+   * the metadata ingestion pipeline can use the information to register metadata
+   * @param newFiles The map of new files' path and metrics
+   * @param oldFiles the list of old file to be dropped
+   * @param offsetRange offset rang of the new data, can be null

Review comment:
       typo: offset range

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg;
+
+import azkaban.jobExecutor.AbstractJob;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
+import org.apache.gobblin.iceberg.publisher.GobblinMCEPublisher;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metadata.DataFile;
+import org.apache.gobblin.metadata.DataMetrics;
+import org.apache.gobblin.metadata.DataOrigin;
+import org.apache.gobblin.metadata.DatasetIdentifier;
+import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
+import org.apache.gobblin.metadata.IntegerBytesPair;
+import org.apache.gobblin.metadata.IntegerLongPair;
+import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metadata.SchemaSource;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.util.ClustersNames;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.writer.PartitionedDataWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+
+
+/**
+ * A class for emitting GobblinMCE (Gobblin Metadata Change Event that includes the information of the file metadata change,
+ * i.e., add or delete file, and the column min/max value of the added file.
+ * GMCE will be consumed by metadata pipeline to register/de-register hive/iceberg metadata)
+ */
+@Slf4j
+public abstract class GobblinMCEProducer<D> implements Closeable {
+
+  public static final String GMCE_PRODUCER_CLASS = "GobblinMCEProducer.class.name";
+  public static final String OLD_FILES_HIVE_REGISTRATION_KEY = "old.files.hive.registration.policy";
+  public static final String FORMAT_KEY = "writer.output.format";
+  public static final String DATASET_DIR = "dataset.dir";
+  public static final String HIVE_PARTITION_NAME = "hive.partition.name";
+  private static final String HDFS_PLATFORM_URN = "urn:li:dataPlatform:hdfs";
+  private static final String DATASET_ORIGIN_KEY = "dataset.origin";
+  private static final String DEFAULT_DATASET_ORIGIN = "PROD";
+
+  @Setter
+  private State state;
+  private MetricContext metricContext;
+
+  public GobblinMCEProducer(State state) {
+    this.state = state;
+    this.metricContext = Instrumented.getMetricContext(state, this.getClass());
+  }
+
+
+  /**
+   * This method will use the files to compute the table name and dataset name, for each table it will generate one GMCE and send that to kafka so
+   * the metadata ingestion pipeline can use the information to register metadata
+   * @param newFiles The map of new files' path and metrics
+   * @param oldFiles the list of old file to be dropped
+   * @param offsetRange offset rang of the new data, can be null
+   * @param operationType The opcode of gmce emitted by this method.
+   * @throws IOException
+   */
+  public void sendGMCE(Map<Path, Metrics> newFiles, List<String> oldFiles, List<String> oldFilePrefixs,
+      Map<String, String> offsetRange, OperationType operationType, SchemaSource schemaSource) throws IOException {
+    GobblinMetadataChangeEvent gmce =
+        getGobblinMetadataChangeEvent(newFiles, oldFiles, oldFilePrefixs, offsetRange, operationType, schemaSource);
+    underlyingSendGMCE(gmce);
+  }
+
+  /**
+   * Use the producer to send GMCE, the implementation need to make sure the emitting is at-least once in-order delivery
+   * (i.e. use kafka producer to send event and config it to be at-least once delivery)
+   * @param gmce GMCE that contains information of the metadata change
+   */
+  public abstract void underlyingSendGMCE(GobblinMetadataChangeEvent gmce);
+
+  private void setBasicInformationForGMCE(GobblinMetadataChangeEvent.Builder gmceBuilder,
+      Map<String, String> offsetRange, SchemaSource schemaSource) {
+    String origin = state.getProp(DATASET_ORIGIN_KEY, DEFAULT_DATASET_ORIGIN);
+    gmceBuilder.setDatasetIdentifier(DatasetIdentifier.newBuilder()
+        .setDataPlatformUrn(HDFS_PLATFORM_URN)
+        .setDataOrigin(DataOrigin.valueOf(origin))
+        .setNativeName(state.getProp(DATASET_DIR))
+        .build());
+    gmceBuilder.setCluster(ClustersNames.getInstance().getClusterName());
+    //retention job does not have job.id
+    gmceBuilder.setFlowId(
+        state.getProp(AbstractJob.JOB_ID, new Configuration().get(ConfigurationKeys.AZKABAN_FLOW_ID)));
+    gmceBuilder.setRegistrationPolicy(state.getProp(ConfigurationKeys.HIVE_REGISTRATION_POLICY));
+    gmceBuilder.setSchemaSource(schemaSource);
+    gmceBuilder.setPartitionColumns(Lists.newArrayList(state.getProp(HIVE_PARTITION_NAME, "")));
+    if (offsetRange != null) {
+      gmceBuilder.setTopicPartitionOffsetsRange(offsetRange);
+    }
+    String schemaString = state.getProp(PartitionedDataWriter.WRITER_LATEST_SCHEMA);
+    if (schemaString != null) {
+      gmceBuilder.setTableSchema(schemaString);
+    }
+    if (state.contains(GobblinMCEPublisher.AVRO_SCHEMA_WITH_ICEBERG_ID)) {
+      gmceBuilder.setAvroSchemaWithIcebergSchemaID(state.getProp(GobblinMCEPublisher.AVRO_SCHEMA_WITH_ICEBERG_ID));
+    }
+    if (state.contains(OLD_FILES_HIVE_REGISTRATION_KEY)) {
+      gmceBuilder.setRegistrationPolicyForOldData(state.getProp(OLD_FILES_HIVE_REGISTRATION_KEY));
+    } else {
+      log.warn(
+          "properties {} does not set, if it's for rewrite/drop operation, there may be trouble to get partition value for old data",
+          OLD_FILES_HIVE_REGISTRATION_KEY);
+    }
+    Map<String, String> regProperties = new HashMap<>();
+    if (state.contains(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME)) {
+      regProperties.put(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME,
+          state.getProp(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME));
+    }
+    if (state.contains(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES)) {
+      regProperties.put(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES,
+          state.getProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES));
+    }
+    if (state.contains(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_TABLE_NAMES)) {
+      regProperties.put(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_TABLE_NAMES,
+          state.getProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_TABLE_NAMES));
+    }
+    if (!regProperties.isEmpty()) {
+      gmceBuilder.setRegistrationProperties(regProperties);
+    }
+  }
+
+  public GobblinMetadataChangeEvent getGobblinMetadataChangeEvent(Map<Path, Metrics> newFiles, List<String> oldFiles,
+      List<String> oldFilePrefixes, Map<String, String> offsetRange, OperationType operationType,
+      SchemaSource schemaSource) {
+    if (!verifyInput(newFiles, oldFiles, oldFilePrefixes, operationType)) {
+      return null;
+    }
+    GobblinMetadataChangeEvent.Builder gmceBuilder = GobblinMetadataChangeEvent.newBuilder();
+    setBasicInformationForGMCE(gmceBuilder, offsetRange, schemaSource);
+    if (newFiles != null && !newFiles.isEmpty()) {
+      gmceBuilder.setNewFiles(toGobblinDataFileList(newFiles));
+    }
+    if (oldFiles != null && !oldFiles.isEmpty()) {
+      gmceBuilder.setOldFiles(oldFiles);
+    }
+    if (oldFilePrefixes != null && !oldFilePrefixes.isEmpty()) {
+      gmceBuilder.setOldFilePrefixes(oldFilePrefixes);
+    }
+    gmceBuilder.setOperationType(operationType);
+    return gmceBuilder.build();
+  }
+
+  private boolean verifyInput(Map<Path, Metrics> newFiles, List<String> oldFiles, List<String> oldFilePrefixes,

Review comment:
       Let's add more logging in each invalid cases so that it will become easier to figure out where the verification failed? 

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/IcebergUtils.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.Utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.metadata.IntegerBytesPair;
+import org.apache.gobblin.metadata.IntegerLongPair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+
+
+@Slf4j
+public class IcebergUtils {
+
+  private static final String AVRO_SCHEMA_URL = "avro.schema.url";
+  private static final String AVRO_SCHEMA_LITERAL = "avro.schema.literal";
+  private static final String[] RESTRICTED_PROPERTIES =
+      new String[]{AVRO_SCHEMA_URL, AVRO_SCHEMA_LITERAL};
+
+  private IcebergUtils() {
+  }
+  /**
+   * Calculate the {@Link PartitionSpec} used to create iceberg table
+   */
+  public static PartitionSpec getPartitionSpec(Schema tableSchema, Schema partitionSchema) {
+    //TODO: Add more information into partition spec e.g. day, year, month, kafka partition ids, offset ranges for better consuming
+    PartitionSpec.Builder builder = PartitionSpec.builderFor(tableSchema);
+    partitionSchema.asStruct().fields().forEach(f -> builder.identity(f.name()));
+    return builder.build();
+  }
+
+  /**
+   * Given a avro schema string and a hive table,
+   * calculate the iceberg table schema and partition schema.
+   * (Since we use 'datepartition' as partition column, which is not included inside the data schema,
+   * we'll need to add that column to data schema to construct table schema
+   */
+  public static IcebergDataAndPartitionSchema getIcebergSchema(String schema,
+      org.apache.hadoop.hive.metastore.api.Table table) {
+
+    org.apache.iceberg.shaded.org.apache.avro.Schema icebergDataSchema =
+        new org.apache.iceberg.shaded.org.apache.avro.Schema.Parser().parse(schema);
+    Types.StructType dataStructType = AvroSchemaUtil.convert(icebergDataSchema).asStructType();
+    List<Types.NestedField> dataFields = Lists.newArrayList(dataStructType.fields());
+    org.apache.iceberg.shaded.org.apache.avro.Schema icebergPartitionSchema =
+        parseSchemaFromCols(table.getPartitionKeys(), table.getDbName(), table.getTableName(), true);
+    Types.StructType partitionStructType = AvroSchemaUtil.convert(icebergPartitionSchema).asStructType();
+    List<Types.NestedField> partitionFields = partitionStructType.fields();
+    Preconditions.checkArgument(partitionFields.stream().allMatch(f -> f.type().isPrimitiveType()),
+        "Only primitive fields are supported for partition columns");
+    dataFields.addAll(partitionFields);
+    Types.StructType updatedStructType = Types.StructType.of(dataFields);
+    updatedStructType =
+        (Types.StructType) TypeUtil.assignFreshIds(updatedStructType, new AtomicInteger(0)::incrementAndGet);
+    return new IcebergDataAndPartitionSchema(new org.apache.iceberg.Schema(updatedStructType.fields()),
+        new org.apache.iceberg.Schema(partitionFields));
+  }
+
+  private static org.apache.iceberg.shaded.org.apache.avro.Schema parseSchemaFromCols(List<FieldSchema> cols,
+      String namespace, String recordName, boolean mkFieldsOptional) {
+    final List<String> colNames = new ArrayList<>(cols.size());
+    final List<TypeInfo> colsTypeInfo = new ArrayList<>(cols.size());
+    cols.forEach(fs -> {
+      colNames.add(fs.getName());
+      colsTypeInfo.add(TypeInfoUtils.getTypeInfoFromTypeString(fs.getType()));
+    });
+    final TypeInfoToSchemaParser parser =
+        new TypeInfoToSchemaParser(namespace, mkFieldsOptional, Collections.emptyMap());
+    return new org.apache.iceberg.shaded.org.apache.avro.Schema.Parser().parse(
+        parser.parseSchemaFromFieldsTypeInfo("", recordName, colNames, colsTypeInfo).toString());
+  }
+
+  /**
+   * Given a Hive table, get all the properties of the table, and drop unneeded ones and transfer to a map
+   */
+  public static Map<String, String> getTableProperties(org.apache.hadoop.hive.metastore.api.Table table) {
+    final Map<String, String> parameters = getRawTableProperties(table);
+    // drop unneeded parameters
+    for (String k : RESTRICTED_PROPERTIES) {
+      parameters.remove(k);
+    }
+    return parameters;
+  }
+
+  private static Map<String, String> getRawTableProperties(org.apache.hadoop.hive.metastore.api.Table table) {
+    final Map<String, String> parameters = new HashMap<>();
+    // lowest to highest priority of updating tableProperties
+    parameters.putAll(table.getSd().getSerdeInfo().getParameters());
+    parameters.putAll(table.getSd().getParameters());
+    parameters.putAll(table.getParameters());
+    return parameters;
+  }
+
+  /**
+   * Get the iceberg partition value for given partition strings
+   */
+  public static StructLike getPartition(Types.StructType partitionType, List<String> partitionValues) {
+    //TODO parse partitionValue as per partitionSchema
+    return new StructLike() {
+      @Override
+      public int size() {
+        return partitionValues.size();
+      }
+
+      @Override
+      public <T> T get(int pos, Class<T> javaClass) {
+        return partitionValue(partitionType.fields().get(pos), partitionValues.get(pos));
+      }
+
+      @Override
+      public <T> void set(int pos, T value) {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  private static <T> T partitionValue(Types.NestedField partitionField, String colAsString) {
+    Preconditions.checkState(partitionField.type().isPrimitiveType(), "Partition column {} is not of primitive type",
+        partitionField);
+    return (T) Conversions.fromPartitionString(partitionField.type(), colAsString);
+  }
+
+  /**
+   * Transfer list of {@Link IntegerLongPair} from origin id to long, to Map<Integer, Long> from real column id to long
+   * This method is mainly used to get parse the file metrics from GMCE
+   * @param list list of {@Link IntegerLongPair}
+   * @param schemaIdMap A map from origin ID (defined by data pipeline) to the real iceberg table column id
+   * @return A map from real id to long as the file metrics
+   */
+  public static Map<Integer, Long> getMapFromIntegerLongPairs(
+      List<IntegerLongPair> list, Map<Integer, Integer> schemaIdMap) {
+    //If schemaIdMap is not set, we directly return null to avoid set wrong file metrics
+    if (list == null || list.size() == 0 || schemaIdMap == null) {
+      return null;
+    }
+    try {
+      return list.stream().collect(Collectors.toMap(t -> schemaIdMap.get(t.getKey()), IntegerLongPair::getValue));
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  /**
+   * Transfer list of {@Link IntegerBytesPair} from origin id to bytes, to Map<Integer, ByteBuffer> from real column id to ByteBuffer
+   * This method is mainly used to get parse the file metrics from GMCE
+   * @param list list of {@Link IntegerBytesPair} from origin id to bytes
+   * @param schemaIdMap A map from origin ID (defined by data pipeline) to the real iceberg table column id
+   * @return A map from real id to ByteBuffer as the file metrics
+   */
+  public static Map<Integer, ByteBuffer> getMapFromIntegerBytesPairs(
+      List<IntegerBytesPair> list, Map<Integer, Integer> schemaIdMap) {
+    //If schemaWithOriginId is not set, we directly return null to avoid set wrong file metrics
+    if (list == null || list.size() == 0 || schemaIdMap == null) {
+      return null;
+    }
+    try {
+      return list.stream().collect(Collectors.toMap(t -> schemaIdMap.get(t.getKey()), IntegerBytesPair::getValue));
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  /**
+   * Method to get DataFile without format and metrics information
+   * This method is mainly used to get the file to be deleted
+   */
+  public static DataFile getIcebergDataFileWithoutMetric(String file, PartitionSpec partitionSpec,
+      StructLike partition) {
+    //Use raw Path to support federation.
+    String rawPath = new Path(file).toUri().getRawPath();
+    //Just want to remove the old files, so set the record number and file size to a random value
+    DataFiles.Builder dataFileBuilder =
+        DataFiles.builder(partitionSpec).withPath(rawPath).withFileSizeInBytes(0).withRecordCount(0);
+
+    if (partition != null) {
+      dataFileBuilder.withPartition(partition);
+    }
+    return dataFileBuilder.build();
+  }
+
+  /**
+   * Method to get DataFile with format and metrics information
+   * This method is mainly used to get the file to be added
+   */
+  public static DataFile getIcebergDataFileWithMetric(org.apache.gobblin.metadata.DataFile file,

Review comment:
       Shall we keep consistent with other files for import convention ? 

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg;
+
+import azkaban.jobExecutor.AbstractJob;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
+import org.apache.gobblin.iceberg.publisher.GobblinMCEPublisher;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metadata.DataFile;
+import org.apache.gobblin.metadata.DataMetrics;
+import org.apache.gobblin.metadata.DataOrigin;
+import org.apache.gobblin.metadata.DatasetIdentifier;
+import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
+import org.apache.gobblin.metadata.IntegerBytesPair;
+import org.apache.gobblin.metadata.IntegerLongPair;
+import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metadata.SchemaSource;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.util.ClustersNames;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.writer.PartitionedDataWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+
+
+/**
+ * A class for emitting GobblinMCE (Gobblin Metadata Change Event that includes the information of the file metadata change,
+ * i.e., add or delete file, and the column min/max value of the added file.
+ * GMCE will be consumed by metadata pipeline to register/de-register hive/iceberg metadata)
+ */
+@Slf4j
+public abstract class GobblinMCEProducer<D> implements Closeable {
+
+  public static final String GMCE_PRODUCER_CLASS = "GobblinMCEProducer.class.name";
+  public static final String OLD_FILES_HIVE_REGISTRATION_KEY = "old.files.hive.registration.policy";
+  public static final String FORMAT_KEY = "writer.output.format";
+  public static final String DATASET_DIR = "dataset.dir";
+  public static final String HIVE_PARTITION_NAME = "hive.partition.name";
+  private static final String HDFS_PLATFORM_URN = "urn:li:dataPlatform:hdfs";
+  private static final String DATASET_ORIGIN_KEY = "dataset.origin";
+  private static final String DEFAULT_DATASET_ORIGIN = "PROD";
+
+  @Setter
+  private State state;
+  private MetricContext metricContext;
+
+  public GobblinMCEProducer(State state) {
+    this.state = state;
+    this.metricContext = Instrumented.getMetricContext(state, this.getClass());
+  }
+
+
+  /**
+   * This method will use the files to compute the table name and dataset name, for each table it will generate one GMCE and send that to kafka so
+   * the metadata ingestion pipeline can use the information to register metadata
+   * @param newFiles The map of new files' path and metrics
+   * @param oldFiles the list of old file to be dropped
+   * @param offsetRange offset rang of the new data, can be null
+   * @param operationType The opcode of gmce emitted by this method.
+   * @throws IOException
+   */
+  public void sendGMCE(Map<Path, Metrics> newFiles, List<String> oldFiles, List<String> oldFilePrefixs,
+      Map<String, String> offsetRange, OperationType operationType, SchemaSource schemaSource) throws IOException {
+    GobblinMetadataChangeEvent gmce =
+        getGobblinMetadataChangeEvent(newFiles, oldFiles, oldFilePrefixs, offsetRange, operationType, schemaSource);
+    underlyingSendGMCE(gmce);
+  }
+
+  /**
+   * Use the producer to send GMCE, the implementation need to make sure the emitting is at-least once in-order delivery
+   * (i.e. use kafka producer to send event and config it to be at-least once delivery)
+   * @param gmce GMCE that contains information of the metadata change
+   */
+  public abstract void underlyingSendGMCE(GobblinMetadataChangeEvent gmce);
+
+  private void setBasicInformationForGMCE(GobblinMetadataChangeEvent.Builder gmceBuilder,
+      Map<String, String> offsetRange, SchemaSource schemaSource) {
+    String origin = state.getProp(DATASET_ORIGIN_KEY, DEFAULT_DATASET_ORIGIN);
+    gmceBuilder.setDatasetIdentifier(DatasetIdentifier.newBuilder()
+        .setDataPlatformUrn(HDFS_PLATFORM_URN)
+        .setDataOrigin(DataOrigin.valueOf(origin))
+        .setNativeName(state.getProp(DATASET_DIR))
+        .build());
+    gmceBuilder.setCluster(ClustersNames.getInstance().getClusterName());
+    //retention job does not have job.id
+    gmceBuilder.setFlowId(
+        state.getProp(AbstractJob.JOB_ID, new Configuration().get(ConfigurationKeys.AZKABAN_FLOW_ID)));
+    gmceBuilder.setRegistrationPolicy(state.getProp(ConfigurationKeys.HIVE_REGISTRATION_POLICY));
+    gmceBuilder.setSchemaSource(schemaSource);
+    gmceBuilder.setPartitionColumns(Lists.newArrayList(state.getProp(HIVE_PARTITION_NAME, "")));
+    if (offsetRange != null) {
+      gmceBuilder.setTopicPartitionOffsetsRange(offsetRange);
+    }
+    String schemaString = state.getProp(PartitionedDataWriter.WRITER_LATEST_SCHEMA);
+    if (schemaString != null) {
+      gmceBuilder.setTableSchema(schemaString);
+    }
+    if (state.contains(GobblinMCEPublisher.AVRO_SCHEMA_WITH_ICEBERG_ID)) {
+      gmceBuilder.setAvroSchemaWithIcebergSchemaID(state.getProp(GobblinMCEPublisher.AVRO_SCHEMA_WITH_ICEBERG_ID));
+    }
+    if (state.contains(OLD_FILES_HIVE_REGISTRATION_KEY)) {
+      gmceBuilder.setRegistrationPolicyForOldData(state.getProp(OLD_FILES_HIVE_REGISTRATION_KEY));
+    } else {
+      log.warn(
+          "properties {} does not set, if it's for rewrite/drop operation, there may be trouble to get partition value for old data",
+          OLD_FILES_HIVE_REGISTRATION_KEY);
+    }
+    Map<String, String> regProperties = new HashMap<>();
+    if (state.contains(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME)) {
+      regProperties.put(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME,
+          state.getProp(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME));
+    }
+    if (state.contains(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES)) {
+      regProperties.put(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES,
+          state.getProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES));
+    }
+    if (state.contains(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_TABLE_NAMES)) {
+      regProperties.put(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_TABLE_NAMES,
+          state.getProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_TABLE_NAMES));
+    }
+    if (!regProperties.isEmpty()) {
+      gmceBuilder.setRegistrationProperties(regProperties);
+    }
+  }
+
+  public GobblinMetadataChangeEvent getGobblinMetadataChangeEvent(Map<Path, Metrics> newFiles, List<String> oldFiles,
+      List<String> oldFilePrefixes, Map<String, String> offsetRange, OperationType operationType,
+      SchemaSource schemaSource) {
+    if (!verifyInput(newFiles, oldFiles, oldFilePrefixes, operationType)) {
+      return null;
+    }
+    GobblinMetadataChangeEvent.Builder gmceBuilder = GobblinMetadataChangeEvent.newBuilder();
+    setBasicInformationForGMCE(gmceBuilder, offsetRange, schemaSource);
+    if (newFiles != null && !newFiles.isEmpty()) {
+      gmceBuilder.setNewFiles(toGobblinDataFileList(newFiles));
+    }
+    if (oldFiles != null && !oldFiles.isEmpty()) {
+      gmceBuilder.setOldFiles(oldFiles);
+    }
+    if (oldFilePrefixes != null && !oldFilePrefixes.isEmpty()) {
+      gmceBuilder.setOldFilePrefixes(oldFilePrefixes);
+    }
+    gmceBuilder.setOperationType(operationType);
+    return gmceBuilder.build();
+  }
+
+  private boolean verifyInput(Map<Path, Metrics> newFiles, List<String> oldFiles, List<String> oldFilePrefixes,
+      OperationType operationType) {
+    switch (operationType) {
+      case rewrite_files: {
+        if (newFiles == null || ((oldFiles == null || oldFiles.isEmpty()) && (oldFilePrefixes == null || oldFilePrefixes
+            .isEmpty())) || newFiles.isEmpty()) {
+          return false;
+        }
+        break;
+      }
+      case add_files: {
+        if (newFiles == null || newFiles.isEmpty()) {
+          return false;
+        }
+        break;
+      }
+      case drop_files: {
+        if ((oldFiles == null || oldFiles.isEmpty()) && (oldFilePrefixes == null || oldFilePrefixes.isEmpty())) {
+          return false;
+        }
+        break;
+      }
+      default: {
+        //unsupported operation
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public static FileFormat getIcebergFormat(State state) {

Review comment:
       Consider moving it into IcebergUtils  ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org