You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/01/13 08:39:21 UTC

[GitHub] [hudi] yanghua commented on a change in pull request #2430: [HUDI-1522] Add a new pipeline for Flink writer

yanghua commented on a change in pull request #2430:
URL: https://github.com/apache/hudi/pull/2430#discussion_r556310823



##########
File path: hudi-flink/src/main/java/org/apache/hudi/streamer/OperationConverter.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.streamer;
+
+import org.apache.hudi.common.model.WriteOperationType;
+
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.ParameterException;
+
+/** Converter that converts a string into enum WriteOperationType. */

Review comment:
       Can we keep the consistency to make the comment of class level align to the existing classes?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.streamer;
+
+import org.apache.hudi.operator.HoodieOptions;
+import org.apache.hudi.operator.StreamWriteOperatorFactory;
+import org.apache.hudi.util.StreamerUtil;
+
+import com.beust.jcommander.JCommander;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.formats.json.TimestampFormat;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Properties;
+
+/**
+ * An Utility which can incrementally consume data from Kafka and apply it to the target table.
+ * currently, it only support COW table and insert, upsert operation.
+ */
+public class HoodieFlinkStreamerV2 {
+  public static void main(String[] args) throws Exception {
+    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+    final Config cfg = new Config();
+    JCommander cmd = new JCommander(cfg, null, args);
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+    env.enableCheckpointing(cfg.checkpointInterval);
+    env.getConfig().setGlobalJobParameters(cfg);
+    // We use checkpoint to trigger write operation, including instant generating and committing,
+    // There can only be one checkpoint at one time.
+    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+    env.disableOperatorChaining();
+
+    if (cfg.flinkCheckPointPath != null) {
+      env.setStateBackend(new FsStateBackend(cfg.flinkCheckPointPath));
+    }
+
+    Properties kafkaProps = StreamerUtil.getKafkaProps(cfg);

Review comment:
       And can we extract the same logic both in v1 and v2?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -81,16 +103,50 @@ public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath,
     return conf;
   }
 
-  public static Configuration getHadoopConf() {
-    return new Configuration();
+  public static org.apache.hadoop.conf.Configuration getHadoopConf() {
+    // create HiveConf from hadoop configuration with hadoop conf directory configured.
+    org.apache.hadoop.conf.Configuration hadoopConf = null;
+    for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new Configuration())) {

Review comment:
       Do we really need to find a possible config file that we may not know its config information? Or only a new Object just enough?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -81,16 +103,50 @@ public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path cfgPath,
     return conf;
   }
 
-  public static Configuration getHadoopConf() {
-    return new Configuration();
+  public static org.apache.hadoop.conf.Configuration getHadoopConf() {
+    // create HiveConf from hadoop configuration with hadoop conf directory configured.
+    org.apache.hadoop.conf.Configuration hadoopConf = null;
+    for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new Configuration())) {
+      hadoopConf = getHadoopConfiguration(possibleHadoopConfPath);
+      if (hadoopConf != null) {
+        break;
+      }
+    }
+    if (hadoopConf == null) {
+      hadoopConf = new org.apache.hadoop.conf.Configuration();
+    }
+    return hadoopConf;
   }
 
-  public static void checkRequiredProperties(TypedProperties props, List<String> checkPropNames) {
-    checkPropNames.forEach(prop -> {
-      if (!props.containsKey(prop)) {
-        throw new HoodieNotSupportedException("Required property " + prop + " is missing");
+  /**
+   * Returns a new Hadoop Configuration object using the path to the hadoop conf configured.
+   *
+   * @param hadoopConfDir Hadoop conf directory path.
+   * @return A Hadoop configuration instance.
+   */
+  private static org.apache.hadoop.conf.Configuration getHadoopConfiguration(String hadoopConfDir) {
+    if (new File(hadoopConfDir).exists()) {

Review comment:
       Can we use `Files.exists()` to test?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -122,21 +193,62 @@ public static HoodieRecordPayload createPayload(String payloadClass, GenericReco
     }
   }
 
-  public static HoodieWriteConfig getHoodieClientConfig(HoodieFlinkStreamer.Config cfg) {
-    FileSystem fs = FSUtils.getFs(cfg.targetBasePath, getHadoopConf());
+  /**
+   * Create a payload class via reflection, do not ordering/precombine value.
+   */
+  public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record)
+      throws IOException {
+    try {
+      return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
+          new Class<?>[] {Option.class}, Option.of(record));
+    } catch (Throwable e) {
+      throw new IOException("Could not create payload for class: " + payloadClass, e);
+    }
+  }
+
+  public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
     HoodieWriteConfig.Builder builder =
-        HoodieWriteConfig.newBuilder().withEngineType(EngineType.FLINK).withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true)
-            .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
-            .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField)
-                .build())
-            .forTable(cfg.targetTableName)
-            .withAutoCommit(false)
-            .withProps(readConfig(fs, new Path(cfg.propsFilePath), cfg.configs)
-                .getConfig());
-
-    builder = builder.withSchema(new FilebasedSchemaProvider(getProps(cfg)).getTargetSchema().toString());
-    HoodieWriteConfig config = builder.build();
-    return config;
+            HoodieWriteConfig.newBuilder()
+                    .withEngineType(EngineType.FLINK)
+                    .withPath(conf.getString(HoodieOptions.PATH))
+                    .combineInput(conf.getBoolean(HoodieOptions.INSERT_DROP_DUPS), true)
+                    .withCompactionConfig(
+                        HoodieCompactionConfig.newBuilder()
+                            .withPayloadClass(conf.getString(HoodieOptions.PAYLOAD_CLASS))
+                            .build())
+                    .forTable(conf.getString(HoodieOptions.TABLE_NAME))
+                    .withAutoCommit(false)
+                    .withProps(flinkConf2TypedProperties(HoodieOptions.flatOptions(conf)));
+
+    builder = builder.withSchema(getSourceSchema(conf).toString());
+    return builder.build();
+  }
+
+  /**
+   * Converts the give {@link Configuration} to {@link TypedProperties}.
+   * The default values are also set up.
+   *
+   * @param conf The flink configuration
+   * @return a TypedProperties instance
+   */
+  public static TypedProperties flinkConf2TypedProperties(Configuration conf) {
+    Properties properties = new Properties();
+    // put all the set up options
+    conf.addAllToProperties(properties);
+    // put all the default options
+    for (ConfigOption<?> option : HoodieOptions.OPTIONAL_OPTIONS) {
+      if (!conf.contains(option)) {
+        properties.put(option.key(), option.defaultValue());
+      }
+    }
+    return new TypedProperties(properties);
   }
 
+  public static void checkRequiredProperties(TypedProperties props, List<String> checkPropNames) {
+    checkPropNames.forEach(prop -> {
+      if (!props.containsKey(prop)) {
+        throw new HoodieNotSupportedException("Required property " + prop + " is missing");

Review comment:
       Why does it throw `HoodieNotSupportedException `? it's an `illegal config option` case?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -79,6 +79,11 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload
    * @returns {@code true} if record represents a delete record. {@code false} otherwise.
    */
   protected boolean isDeleteRecord(GenericRecord genericRecord) {
+    final String isDeleteKey = "_hoodie_is_deleted";
+    // Modify to be compatible with old version Avro.
+    if (genericRecord.getSchema().getField(isDeleteKey) == null) {
+      return false;
+    }
     Object deleteMarker = genericRecord.get("_hoodie_is_deleted");

Review comment:
       Why do not change the literal to the `_hoodie_is_deleted ` that you defined?

##########
File path: hudi-flink/pom.xml
##########
@@ -124,28 +124,77 @@
       <artifactId>kafka-clients</artifactId>
       <version>${kafka.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-avro</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-json</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-common</artifactId>
+      <version>${flink.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>

Review comment:
       I am thinking about one question: how to serve the users who only use DataStream API. Only define one `${flink.version}`? Here introduced some artifacts which only created in the high-level flink version.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/HoodieOptions.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.streamer.Config;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Hoodie Flink config options.
+ *
+ * <p>It has the options for Hoodie table read and write. It also defines some utilities.
+ */
+public class HoodieOptions {

Review comment:
       Does `HoodieFlinkConfigOptions` sounds better?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/streamer/OperationConverter.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.streamer;
+
+import org.apache.hudi.common.model.WriteOperationType;
+
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.ParameterException;
+
+/** Converter that converts a string into enum WriteOperationType. */

Review comment:
       There is a `OperationConverter` in `org.apache.hudi.utilities.deltastreamer` let us use single one.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
##########
@@ -18,47 +18,69 @@
 
 package org.apache.hudi.util;
 
-import org.apache.hudi.HoodieFlinkStreamer;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
+import org.apache.hudi.streamer.Config;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.keygen.KeyGenerator;
-import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
+import org.apache.hudi.operator.HoodieOptions;
 import org.apache.hudi.schema.FilebasedSchemaProvider;
 
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
+import java.io.File;
 import java.io.IOException;
 import java.io.StringReader;
 import java.util.List;
+import java.util.Properties;
 
+/** Utilities for Flink stream read and write. */

Review comment:
       ditto

##########
File path: hudi-flink/pom.xml
##########
@@ -174,48 +229,34 @@
       <version>0.9.7</version>
     </dependency>
 
-    <!-- Junit Test Suite -->
-    <dependency>
-      <groupId>org.junit.jupiter</groupId>
-      <artifactId>junit-jupiter-api</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.junit.jupiter</groupId>
-      <artifactId>junit-jupiter-engine</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.junit.vintage</groupId>
-      <artifactId>junit-vintage-engine</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.junit.jupiter</groupId>
-      <artifactId>junit-jupiter-params</artifactId>
-      <scope>test</scope>
-    </dependency>
+    <!-- test dependencies -->
     <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-junit-jupiter</artifactId>
+      <groupId>org.apache.flink</groupId>

Review comment:
       Can you make the changes in this file cleaner?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -217,7 +217,7 @@ public static String addMetadataColumnTypes(String hiveColumnTypes) {
 
   private static Schema initRecordKeySchema() {
     Schema.Field recordKeyField =
-        new Schema.Field(HoodieRecord.RECORD_KEY_METADATA_FIELD, METADATA_FIELD_SCHEMA, "", NullNode.getInstance());

Review comment:
       Other places also used `NullNode.getInstance()`, we need to keep consistency. Revert or change all.

##########
File path: hudi-common/src/main/avro/HoodieRollbackMetadata.avsc
##########
@@ -51,10 +51,10 @@
      /* overlaps with 'commitsRollback' field. Adding this to track action type for all the instants being rolled back. */
      {
        "name": "instantsRollback",
-       "default": null,
+       "default": [],
        "type": {
           "type": "array",
-          "default": null,
+          "default": [],
           "items": "HoodieInstantInfo"

Review comment:
       Will the change affect the version less than 1.10?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org