You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/01/14 05:11:31 UTC

[GitHub] [incubator-gobblin] ZihanLi58 commented on a change in pull request #3172: [GOBBLIN-1335] Publish GMCE(GobblinMetadataChangeEvent) publisher and iceberg retention job to Gobblin OSS

ZihanLi58 commented on a change in pull request #3172:
URL: https://github.com/apache/incubator-gobblin/pull/3172#discussion_r557047553



##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg;
+
+import azkaban.jobExecutor.AbstractJob;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
+import org.apache.gobblin.iceberg.publisher.GobblinMCEPublisher;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metadata.DataFile;
+import org.apache.gobblin.metadata.DataMetrics;
+import org.apache.gobblin.metadata.DataOrigin;
+import org.apache.gobblin.metadata.DatasetIdentifier;
+import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
+import org.apache.gobblin.metadata.IntegerBytesPair;
+import org.apache.gobblin.metadata.IntegerLongPair;
+import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metadata.SchemaSource;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.util.ClustersNames;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.writer.PartitionedDataWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+
+
+/**
+ * A class for emitting GobblinMCE (Gobblin Metadata Change Event that includes the information of the file metadata change,
+ * i.e., add or delete file, and the column min/max value of the added file.
+ * GMCE will be consumed by metadata pipeline to register/de-register hive/iceberg metadata)
+ */
+@Slf4j
+public abstract class GobblinMCEProducer<D> implements Closeable {
+
+  public static final String GMCE_PRODUCER_CLASS = "GobblinMCEProducer.class.name";
+  public static final String OLD_FILES_HIVE_REGISTRATION_KEY = "old.files.hive.registration.policy";
+  public static final String FORMAT_KEY = "writer.output.format";
+  public static final String DATASET_DIR = "dataset.dir";
+  public static final String HIVE_PARTITION_NAME = "hive.partition.name";
+  private static final String HDFS_PLATFORM_URN = "urn:li:dataPlatform:hdfs";
+  private static final String DATASET_ORIGIN_KEY = "dataset.origin";
+  private static final String DEFAULT_DATASET_ORIGIN = "PROD";
+
+  @Setter
+  private State state;
+  private MetricContext metricContext;
+
+  public GobblinMCEProducer(State state) {
+    this.state = state;
+    this.metricContext = Instrumented.getMetricContext(state, this.getClass());
+  }
+
+
+  /**
+   * This method will use the files to compute the table name and dataset name, for each table it will generate one GMCE and send that to kafka so
+   * the metadata ingestion pipeline can use the information to register metadata
+   * @param newFiles The map of new files' path and metrics
+   * @param oldFiles the list of old file to be dropped
+   * @param offsetRange offset rang of the new data, can be null
+   * @param operationType The opcode of gmce emitted by this method.
+   * @throws IOException
+   */
+  public void sendGMCE(Map<Path, Metrics> newFiles, List<String> oldFiles, List<String> oldFilePrefixs,
+      Map<String, String> offsetRange, OperationType operationType, SchemaSource schemaSource) throws IOException {
+    GobblinMetadataChangeEvent gmce =
+        getGobblinMetadataChangeEvent(newFiles, oldFiles, oldFilePrefixs, offsetRange, operationType, schemaSource);
+    underlyingSendGMCE(gmce);
+  }
+
+  /**
+   * Use the producer to send GMCE, the implementation need to make sure the emitting is at-least once in-order delivery
+   * (i.e. use kafka producer to send event and config it to be at-least once delivery)
+   * @param gmce GMCE that contains information of the metadata change
+   */
+  public abstract void underlyingSendGMCE(GobblinMetadataChangeEvent gmce);

Review comment:
       Yes, I already create ticket for that, will be in a separate PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org