You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/10/19 16:39:56 UTC

[GitHub] [iceberg] rdblue commented on a change in pull request #1627: Minor MicroBatch Class Cleanup and Return to Spark Structured Streaming Reader

rdblue commented on a change in pull request #1627:
URL: https://github.com/apache/iceberg/pull/1627#discussion_r507896593



##########
File path: core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java
##########
@@ -140,13 +141,102 @@ public void testGenerateMicroBatchWithSmallTargetSize() {
     Assert.assertTrue(batch5.lastIndexOfSnapshot());
   }
 
-  private static DataFile file(String name) {
+  @Test
+  public void testMicroBatchRespectsRequestedMaximumSize() {
+    // Add files A-E, all of 10kb, and process in multiple microbatches of varying sizes,
+    // emulating perhaps a dynamically growing source (assuming the total batch is being
+    // built by one process, as in Flink or on the driver in Spark).
+    add(table.newAppend(), files("A", "B", "C", "D", "E"));
+
+    // Request 10kb - Receive file A.
+    MicroBatch batch0 = MicroBatches.from(table.snapshot(1L), table.io())
+            .specsById(table.specs())
+            .generate(0, 10L, true);
+    Assert.assertEquals(batch0.snapshotId(), 1L);
+    Assert.assertEquals(batch0.startFileIndex(), 0);
+    Assert.assertEquals(batch0.endFileIndex(), 1);
+    Assert.assertEquals(batch0.sizeInBytes(), 10);
+    Assert.assertFalse(batch0.lastIndexOfSnapshot());
+    filesMatch(Lists.newArrayList("A"), filesToScan(batch0.tasks()));
+
+    // Request 30kb. Receive B, C, and D.
+    MicroBatch batch1 = MicroBatches.from(table.snapshot(1L), table.io())
+            .specsById(table.specs())
+            .generate(batch0.endFileIndex(), 35L, false);
+    Assert.assertEquals(batch1.startFileIndex(), 1);
+    Assert.assertEquals(batch1.endFileIndex(), 4);
+    Assert.assertEquals(batch1.sizeInBytes(), 30);
+    filesMatch(Lists.newArrayList("B", "C", "D"), filesToScan(batch1.tasks()));
+    Assert.assertFalse(batch1.lastIndexOfSnapshot());
+
+    // Request 35kb - Receive File E which is the end of the input, only 10kb.
+    MicroBatch batch2 = MicroBatches.from(table.snapshot(1L), table.io())
+            .specsById(table.specs())
+            .generate(batch1.endFileIndex(), 35L, false);
+    Assert.assertEquals(4, batch2.startFileIndex());
+    Assert.assertEquals(5, batch2.endFileIndex());
+    Assert.assertEquals(10, batch2.sizeInBytes());
+    filesMatch(Lists.newArrayList("E"), filesToScan(batch2.tasks()));
+  }
+
+  @Test
+  public void testReadingSnapshotIsNotInterruptedByChildSnapshot() {
+    // Add files A-E, all of 10kb, and process the single generated snapshot
+    // in multiple microbatches.
+    add(table.newAppend(), files("A", "B", "C", "D", "E"));
+    Assert.assertEquals(1L, table.currentSnapshot().snapshotId());
+
+    // Request a batch of 40kb - Reads in A, B, C, and D.
+    MicroBatch batch0 = MicroBatches.from(table.snapshot(1L), table.io())
+            .specsById(table.specs())
+            .generate(0, 40L, false);
+    Assert.assertEquals(0, batch0.startFileIndex());
+    Assert.assertEquals(4, batch0.endFileIndex());
+    Assert.assertEquals(40, batch0.sizeInBytes());
+    filesMatch(Lists.newArrayList("A", "B", "C", "D"), filesToScan(batch0.tasks()));
+    Assert.assertFalse(batch0.lastIndexOfSnapshot());
+
+    // Concurrent write sometime after the start of the last batch and before the next batch.
+    final long sizeOfFileF = 25L;
+    add(table.newAppend(),
+            Collections.singletonList(fileWithSize("F", sizeOfFileF)));
+    Assert.assertEquals(2L, table.currentSnapshot().snapshotId());
+
+    // Read the last 10kb of Snapshot 1.
+    // Simulates desired stream behavior for example on Trigger.Once().
+    MicroBatch batch1 = MicroBatches.from(table.snapshot(1L), table.io())
+            .specsById(table.specs())
+            .generate(batch0.endFileIndex(), 40L, false);
+    Assert.assertEquals(4, batch1.startFileIndex());
+    Assert.assertEquals(5, batch1.endFileIndex());
+    Assert.assertEquals(10, batch1.sizeInBytes());
+    filesMatch(Lists.newArrayList("E"), filesToScan(batch1.tasks()));
+    Assert.assertTrue(batch1.lastIndexOfSnapshot());
+
+    // Show that the next batch / snapshot can be read.
+    MicroBatch batch2 = MicroBatches.from(table.currentSnapshot(), table.io())
+            .specsById(table.specs())
+            .generate(0, 40L, true);
+    Assert.assertEquals(0, batch2.startFileIndex());
+    Assert.assertEquals(1, batch2.endFileIndex());
+    Assert.assertEquals(sizeOfFileF, batch2.sizeInBytes());
+    filesMatch(Lists.newArrayList("F"), filesToScan(batch2.tasks()));
+    Assert.assertTrue(batch2.lastIndexOfSnapshot());
+
+  }
+
+  private static DataFile fileWithSize(String name, long newFileSizeInBytes) {
     return DataFiles.builder(SPEC)
-        .withPath(name + ".parquet")
-        .withFileSizeInBytes(10)
-        .withPartitionPath("data_bucket=0") // easy way to set partition data for now
-        .withRecordCount(1)
-        .build();
+            .withPath(name + ".parquet")
+            .withFileSizeInBytes(newFileSizeInBytes)
+            .withPartitionPath("data_bucket=0") // easy way to set partition data for now
+            .withRecordCount(1)
+            .build();

Review comment:
       Looks like you indentation settings are off. This should be 2 indents (4 spaces) from the start of `return`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org