You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/02/24 05:47:07 UTC

[GitHub] [hudi] lamber-ken commented on a change in pull request #2593: [HUDI-1632] Supports merge on read write mode for Flink writer

lamber-ken commented on a change in pull request #2593:
URL: https://github.com/apache/hudi/pull/2593#discussion_r581642265



##########
File path: hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
##########
@@ -219,4 +226,81 @@ public void testWriteToHoodieLegacy() throws Exception {
 
     TestData.checkWrittenFullData(tempFile, EXPECTED);
   }
+
+  @Test
+  public void testMergeOnReadWriteWithCompaction() throws Exception {
+    Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+    conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+    StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+    execEnv.getConfig().disableObjectReuse();
+    execEnv.setParallelism(4);
+    // set up checkpoint interval
+    execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+    execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+    // Read from file source
+    RowType rowType =
+        (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+            .getLogicalType();
+    StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
+        new StreamWriteOperatorFactory<>(conf);
+
+    JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
+        rowType,
+        new RowDataTypeInfo(rowType),
+        false,
+        true,
+        TimestampFormat.ISO_8601
+    );
+    String sourcePath = Objects.requireNonNull(Thread.currentThread()
+        .getContextClassLoader().getResource("test_source.data")).toString();
+
+    TextInputFormat format = new TextInputFormat(new Path(sourcePath));
+    format.setFilesFilter(FilePathFilter.createDefaultFilter());
+    TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+    format.setCharsetName("UTF-8");
+
+    DataStreamSink<CompactCommitEvent> dataStream = execEnv
+        // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
+        .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
+        .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+        .setParallelism(4)
+        .map(new RowDataToHoodieFunction<>(rowType, conf), TypeInformation.of(HoodieRecord.class))
+        // Key-by partition path, to avoid multiple subtasks write to a partition at the same time
+        .keyBy(HoodieRecord::getPartitionPath)
+        .transform(
+            "bucket_assigner",
+            TypeInformation.of(HoodieRecord.class),
+            new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
+        .uid("uid_bucket_assigner")
+        // shuffle by fileId(bucket id)
+        .keyBy(record -> record.getCurrentLocation().getFileId())
+        .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
+        .uid("uid_hoodie_stream_write")
+        .transform("compact_plan_generate",
+            TypeInformation.of(CompactEvent.class),
+            new CompactionPlanOperator(conf))
+        .uid("uid_compact_plan_generate")
+        .setParallelism(1) // plan generate must be singleton

Review comment:
       we can add following statements to `CompactionPlanOperator#open`
   ```
       ValidationUtils.checkArgument(getRuntimeContext().getNumberOfParallelSubtasks() == 1);
       ValidationUtils.checkArgument(getRuntimeContext().getMaxNumberOfParallelSubtasks() == 1);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org