You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/08/04 03:04:27 UTC

[GitHub] [hudi] danny0405 commented on a change in pull request #3390: [HUDI-2087] Support Append only in Flink stream

danny0405 commented on a change in pull request #3390:
URL: https://github.com/apache/hudi/pull/3390#discussion_r682250961



##########
File path: hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
##########
@@ -394,6 +394,48 @@ public static void checkWrittenData(
     }
   }
 
+  /**
+   * Checks the source data set are written as expected.
+   * Different with {@link #checkWrittenData}, it reads all the data files.
+   *
+   * <p>Note: Replace it with the Flink reader when it is supported.
+   *
+   * @param baseFile   The file base to check, should be a directory
+   * @param expected   The expected results mapping, the key should be the partition path
+   *                   and value should be values list with the key partition
+   * @param partitions The expected partition number
+   */
+  public static void checkWrittenAllData(
+      File baseFile,
+      Map<String, String> expected,
+      int partitions) throws IOException {
+    assert baseFile.isDirectory();
+    FileFilter filter = file -> !file.getName().startsWith(".");
+    File[] partitionDirs = baseFile.listFiles(filter);
+
+    assertNotNull(partitionDirs);
+    assertThat(partitionDirs.length, is(partitions));
+
+    for (File partitionDir : partitionDirs) {
+      File[] dataFiles = partitionDir.listFiles(filter);
+      assertNotNull(dataFiles);
+
+      List<String> readBuffer = new ArrayList<>();
+      for (File dataFile : dataFiles) {
+        ParquetReader<GenericRecord> reader = AvroParquetReader
+            .<GenericRecord>builder(new Path(dataFile.getAbsolutePath())).build();
+        GenericRecord nextRecord = reader.read();
+        while (nextRecord != null) {
+          readBuffer.add(filterOutVariables(nextRecord));
+          nextRecord = reader.read();
+        }
+        readBuffer.sort(Comparator.naturalOrder());
+      }

Review comment:
       `readBuffer.sort(Comparator.naturalOrder());` can be moved out of the for loop i guess.

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
##########
@@ -117,21 +115,25 @@ public BucketAssignFunction(Configuration conf) {
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf);
-    this.hadoopConf = StreamerUtil.getHadoopConf();
     HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
-        new SerializableConfiguration(this.hadoopConf),
+        new SerializableConfiguration(StreamerUtil.getHadoopConf()),
         new FlinkTaskContextSupplier(getRuntimeContext()));
     this.bucketAssigner = BucketAssigners.create(
         getRuntimeContext().getIndexOfThisSubtask(),
         getRuntimeContext().getMaxNumberOfParallelSubtasks(),
         getRuntimeContext().getNumberOfParallelSubtasks(),
-        WriteOperationType.isOverwrite(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))),
+        ignoreSmallFiles(),
         HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
         context,
         writeConfig);
     this.payloadCreation = PayloadCreation.instance(this.conf);
   }
 
+  private boolean ignoreSmallFiles() {
+    WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
+    return WriteOperationType.isOverwrite(operationType) || StreamerUtil.allowDuplicateInserts(conf);
+  }

Review comment:
       `StreamerUtil.allowDuplicateInserts(conf);` can be replaced by `writeConfig.allowDuplicateInserts`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org