You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rinat <r....@cleverdata.ru> on 2018/07/06 15:03:03 UTC

Re: [BucketingSink] incorrect indexing of part files, when part suffix is specified

Hi Mingey !

I’ve implemented the group of tests, that shows that problem exists only when part suffix is specified and file in pending state exists

here is an exception

testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState(org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest)  Time elapsed: 0.018 sec  <<< ERROR!
java.io.IOException: File already exists: /var/folders/v9/r7ybtp9n4lj_6ybx5xnngyzm0000gn/T/junit8543902037302786417/junit2291904425846970077/part-0-0.my.in-progress
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:259)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
	at org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:71)
	at org.apache.flink.streaming.connectors.fs.StringWriter.open(StringWriter.java:69)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:587)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
	at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:111)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testThatPartIndexIsIncremented(BucketingSinkTest.java:970)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState(BucketingSinkTest.java:909)


You could add the following test to the org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.class

    @Test//(expected = IOException.class)
   public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState()
      throws Exception {
      testThatPartIndexIsIncremented(".my", "part-0-0.my" + IN_PROGRESS_SUFFIX);
   }

   private void testThatPartIndexIsIncremented(String partSuffix, String existingPartFile) throws Exception {
      File outDir = tempFolder.newFolder();
      long inactivityInterval = 100;

      java.nio.file.Path bucket = Paths.get(outDir.getPath());
      Files.createFile(bucket.resolve(existingPartFile));

      String basePath = outDir.getAbsolutePath();
      BucketingSink<String> sink = new BucketingSink<String>(basePath)
         .setBucketer(new BasePathBucketer<>())
         .setInactiveBucketCheckInterval(inactivityInterval)
         .setInactiveBucketThreshold(inactivityInterval)
         .setPartPrefix(PART_PREFIX)
         .setInProgressPrefix("")
         .setPendingPrefix("")
         .setValidLengthPrefix("")
         .setInProgressSuffix(IN_PROGRESS_SUFFIX)
         .setPendingSuffix(PENDING_SUFFIX)
         .setValidLengthSuffix(VALID_LENGTH_SUFFIX)
         .setPartSuffix(partSuffix)
         .setBatchSize(0);

      OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, 1, 0);
      testHarness.setup();
      testHarness.open();

      testHarness.setProcessingTime(0L);

      testHarness.processElement(new StreamRecord<>("test1", 1L));

      testHarness.setProcessingTime(101L);
      testHarness.snapshot(0, 0);
      testHarness.notifyOfCompletedCheckpoint(0);
      sink.close();

      String expectedFileName = partSuffix == null ? "part-0-1" : "part-0-1" + partSuffix;
//    assertThat(Files.exists(bucket.resolve(expectedFileName)), is(true));
   }

And check, that test fails

it’s actual for the current master branch, also I’ve implemented a PR, that fixes this problem (https://github.com/apache/flink/pull/6176 <https://github.com/apache/flink/pull/6176>)

For some reasons, I still couldn’t compile the whole flink repository, to run your example locally from IDE, but from my point of view, problem exists, and the following test shows it’s existance, please, have a look

I’m working on flink project assembly on my local machine …

Thx


> On 25 Jun 2018, at 10:44, Rinat <r....@cleverdata.ru> wrote:
> 
> Hi Mingey !
> 
> Thx for your reply, really, have no idea why everything works in your case, I have implemented unit tests in my PR which shows, that problem exists. Please, let me know which Flink version do you use ?
> Current fix is actual for current master branch, here it an example of unit test, that shows the problem
> 
> @Test
> public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecified() throws Exception {
>    String partSuffix = ".my";
> 
>    File outDir = tempFolder.newFolder();
>    long inactivityInterval = 100;
> 
>    java.nio.file.Path bucket = Paths.get(outDir.getPath());
>    Files.createFile(bucket.resolve("part-0-0.my.pending"));
> 
>    String basePath = outDir.getAbsolutePath();
>    BucketingSink<String> sink = new BucketingSink<String>(basePath)
>       .setBucketer(new BasePathBucketer<>())
>       .setInactiveBucketCheckInterval(inactivityInterval)
>       .setInactiveBucketThreshold(inactivityInterval)
>       .setPartPrefix(PART_PREFIX)
>       .setInProgressPrefix("")
>       .setPendingPrefix("")
>       .setValidLengthPrefix("")
>       .setInProgressSuffix(IN_PROGRESS_SUFFIX)
>       .setPendingSuffix(PENDING_SUFFIX)
>       .setValidLengthSuffix(VALID_LENGTH_SUFFIX)
>       .setPartSuffix(partSuffix)
>       .setBatchSize(0);
> 
>    OneInputStreamOperatorTestHarness<String, Object> testHarness = createTestSink(sink, 1, 0);
>    testHarness.setup();
>    testHarness.open();
> 
>    testHarness.setProcessingTime(0L);
> 
>    testHarness.processElement(new StreamRecord<>("test1", 1L));
> 
>    testHarness.setProcessingTime(101L);
>    testHarness.snapshot(0, 0);
>    testHarness.notifyOfCompletedCheckpoint(0);
>    sink.close();
> 
>    assertThat(Files.exists(bucket.resolve("part-0-1")), is(true));
> }
> 
>> On 24 Jun 2018, at 06:02, zhangminglei <18717838093@163.com <ma...@163.com>> wrote:
>> 
>> Hi, Rinat
>> 
>> I tried this situation you said and it works fine for me. The partCounter incremented as we hope. When the new part file is created, I did not see any same part index. Here is my code for that, you can take a look.
>> In my case, the max index of part file is part-0-683PartSuffix, other than that, all still keep in _part-0-684PartSuffix.pending,  _part-0-685PartSuffix.pending and so on since checkpoint does not finished.
>> 
>> Cheers
>> Minglei.
>> 
>> public class TestSuffix {
>>    public static void main(String[] args) throws Exception {
>>       ParameterTool params = ParameterTool.fromArgs(args);
>>       String outputPath = params.getRequired("outputPath");
>> 
>>       StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>       sEnv.setStateBackend(new FsStateBackend("file:///tmp/checkpoints <file:///tmp/checkpoints>"));
>>       sEnv.enableCheckpointing(200);
>>       sEnv.setParallelism(1);
>> 
>>       BucketingSink<Tuple4<Integer, String, String, Integer>> sink =
>>          new BucketingSink<Tuple4<Integer, String, String, Integer>>(outputPath)
>>             .setInactiveBucketThreshold(1000)
>>             .setInactiveBucketCheckInterval(1000)
>>             .setPartSuffix("PartSuffix")
>>             .setBatchSize(500);
>> 
>>       sEnv.addSource(new DataGenerator())
>>          .keyBy(0)
>>          .map(new CountUpRichMap())
>>          .addSink(sink);
>> 
>>       sEnv.execute();
>>    }
>> 
>>    public static class CountUpRichMap extends RichMapFunction<Tuple3<Integer, String, String>, Tuple4<Integer, String, String, Integer>> {
>> 
>>       private ValueState<Integer> counter;
>> 
>>       @Override
>>       public void open(Configuration parameters) throws Exception {
>>          counter = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Types.INT));
>>       }
>> 
>>       @Override
>>       public Tuple4<Integer, String, String, Integer> map(Tuple3<Integer, String, String> value) throws Exception {
>>          Integer counterValue = counter.value();
>>          if (counterValue == null) {
>>             counterValue = 0;
>>          }
>>          counter.update(counterValue + 1);
>>          return Tuple4.of(value.f0, value.f1, value.f2, counterValue);
>>       }
>>    }
>> 
>>    public static class DataGenerator implements SourceFunction<Tuple3<Integer, String, String>> {
>> 
>>       public DataGenerator() {
>>       }
>> 
>>       @Override
>>       public void run(SourceContext<Tuple3<Integer, String, String>> ctx) throws Exception {
>>          for (int i = 0; i < 10000; i++) {
>>             ctx.collect(Tuple3.of(i % 10, UUID.randomUUID().toString(), "Some payloads......"));
>>          }
>>       }
>> 
>>       @Override
>>       public void cancel() {
>> 
>>       }
>>    }
>> }
>> 
>> 
>> 
>> 
>>> 在 2018年6月16日,下午10:21,Rinat <r.sharipov@cleverdata.ru <ma...@cleverdata.ru>> 写道:
>>> 
>>> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of the part file. It’s very useful, when it’s necessary to set specific extension of the file.
>>> 
>>> During the usage, I’ve found the issue - when new part file is created, it has the same part index, as index of just closed file. 
>>> So, when Flink tries to move it into final state, we have a FileAlreadyExistsException.
>>> 
>>> This problem is related with the following code:
>>> Here we are trying to find the max index of part file, that doesn’t exist in bucket directory, the problem is, that the partSuffix is not involved into path assembly. This means, that path always doesn’t exist
>>> and partCounter wouldn’t be ever incremented.
>>> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
>>> while (fs.exists(partPath) ||
>>>       fs.exists(getPendingPathFor(partPath)) ||
>>>       fs.exists(getInProgressPathFor(partPath))) {
>>>    bucketState.partCounter++;
>>>    partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
>>> }
>>> 
>>> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
>>> 
>>> Before creating of writer, we appending the partSuffix here, but it should be already appended, before index checks
>>> if (partSuffix != null) {
>>>    partPath = partPath.suffix(partSuffix);
>>> }
>>> I’ll create an issue and try to submit a fix
>>> 
>>> Sincerely yours,
>>> Rinat Sharipov
>>> Software Engineer at 1DMP CORE Team
>>> 
>>> email: r.sharipov@cleverdata.ru <ma...@cleverdata.ru>
>>> mobile: +7 (925) 416-37-26
>>> 
>>> CleverDATA
>>> make your data clever
>>> 
>> 
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.sharipov@cleverdata.ru <ma...@cleverdata.ru>
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.sharipov@cleverdata.ru <ma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever