You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "nsivabalan (via GitHub)" <gi...@apache.org> on 2023/03/21 06:00:53 UTC

[GitHub] [hudi] nsivabalan commented on a diff in pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

nsivabalan commented on code in PR #8152:
URL: https://github.com/apache/hudi/pull/8152#discussion_r1142860752


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  public static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("The path for providing the checkpoints.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("Kafka topic name.");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")

Review Comment:
   same question as above



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/BaseSchemaPostProcessorConfig.java:
##########
@@ -7,23 +7,33 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.utilities.schema.postprocessor.add;
+package org.apache.hudi.utilities.config;
 
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
 
 /**
  * Base configs to describe a primitive type column.
  */
-public class BaseSchemaPostProcessorConfig {
+@Immutable
+@ConfigClassProperty(name = "Schema Post Processor Config Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Base configs to describe a primitive type column.")

Review Comment:
   guess we need to fix L30 and this description. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  public static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("The path for providing the checkpoints.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC = ConfigProperty

Review Comment:
   is this not specific to kafka source? why its located in this file ?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  public static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("The path for providing the checkpoints.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("Kafka topic name.");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");

Review Comment:
   so for a single table deltastreamer, his config value is ignore right? can we call it out



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ProtoClassBasedSchemaProviderConfig.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.utilities.config.HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX;
+
+/**
+ * JDBC-based Schema Provider Configs.
+ */
+@Immutable
+@ConfigClassProperty(name = "JDBC-based Schema Provider Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+    description = "Configurations for Proto schema provider.")
+public class ProtoClassBasedSchemaProviderConfig extends HoodieConfig {
+  private static final String PROTO_SCHEMA_PROVIDER_PREFIX = SCHEMAPROVIDER_CONFIG_PREFIX + "proto";

Review Comment:
   minor :
   in previous classes, I see "." at the end of prefix variables. and individual ones don't have "." at the beginning. but here we are changing it. can we standardize on one. 
   
   for eg, 
   
   SCHEMAPROVIDER_CONFIG_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "schemaprovider."
   
   ends with dot (".")
   
   and then each config is named as 
   SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor"
   
   there is no dot (".") here. 
   
   where as in this class, its diff. 
   PROTO_SCHEMA_PROVIDER_PREFIX = SCHEMAPROVIDER_CONFIG_PREFIX + "proto";
   PROTO_SCHEMA_PROVIDER_PREFIX + ".class.name"
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org