You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "chris snow (JIRA)" <ji...@apache.org> on 2018/02/01 13:28:00 UTC

[jira] [Updated] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

     [ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

chris snow updated FLINK-8543:
------------------------------
     Attachment: Screen Shot 2018-01-30 at 18.34.51.png
    Description: 
I'm hitting an issue with my BucketingSink from a streaming job.

 
{code:java}
return new BucketingSink<Tuple2<String, Object>>(path)
         .setWriter(writer)
         .setBucketer(new DateTimeBucketer<Tuple2<String, Object>>(formatString));
{code}
 

I can see that a few files have run into issues with uploading to S3:

!Screen Shot 2018-01-30 at 18.34.51.png!   

I've grabbed the S3AOutputStream class from my cluster and added some additional logging to the checkOpen() method to log the 'key' just before the exception is thrown:

 
{code:java}
/*
 * Decompiled with CFR.
 */
package org.apache.hadoop.fs.s3a;

import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.services.s3.transfer.model.UploadResult;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AOutputStream
extends OutputStream {
    private final OutputStream backupStream;
    private final File backupFile;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final String key;
    private final Progressable progress;
    private final S3AFileSystem fs;
    public static final Logger LOG = S3AFileSystem.LOG;

    public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, Progressable progress) throws IOException {
        this.key = key;
        this.progress = progress;
        this.fs = fs;
        this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
        LOG.debug("OutputStream for key '{}' writing to tempfile: {}", (Object)key, (Object)this.backupFile);
        this.backupStream = new BufferedOutputStream(new FileOutputStream(this.backupFile));
    }

    void checkOpen() throws IOException {
        if (!this.closed.get()) return;

        // vvvvvv-- Additional logging --vvvvvvv

        LOG.error("OutputStream for key '{}' closed.", (Object)this.key);


        throw new IOException("Output Stream closed");
    }

    @Override
    public void flush() throws IOException {
        this.checkOpen();
        this.backupStream.flush();
    }

    @Override
    public void close() throws IOException {
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.backupStream.close();
        LOG.debug("OutputStream for key '{}' closed. Now beginning upload", (Object)this.key);
        try {
            ObjectMetadata om = this.fs.newObjectMetadata(this.backupFile.length());
            Upload upload = this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
            ProgressableProgressListener listener = new ProgressableProgressListener(this.fs, this.key, upload, this.progress);
            upload.addProgressListener((ProgressListener)listener);
            upload.waitForUploadResult();
            listener.uploadCompleted();
            this.fs.finishedWrite(this.key);
        }
        catch (InterruptedException e) {
            throw (InterruptedIOException)new InterruptedIOException(e.toString()).initCause(e);
        }
        catch (AmazonClientException e) {
            throw S3AUtils.translateException("saving output", this.key, e);
        }
        finally {
            if (!this.backupFile.delete()) {
                LOG.warn("Could not delete temporary s3a file: {}", (Object)this.backupFile);
            }
            super.close();
        }
        LOG.debug("OutputStream for key '{}' upload complete", (Object)this.key);
    }

    @Override
    public void write(int b) throws IOException {
        this.checkOpen();
        this.backupStream.write(b);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        this.checkOpen();
        this.backupStream.write(b, off, len);
    }

    static {
    }
}
{code}
 

You can see from this addition log output that the S3AOutputStream#close() method **appears** to be called before the S3AOutputStream#flush() method:

 
{code:java}
2018-02-01 12:42:20,698 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 128497 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                        - Finished write to landingzone/2018-02-01--1240/_part-0-0.in-progress
2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                        - object_delete_requests += 1  ->  3

vvvvv- close() is called here? -vvvvv

2018-02-01 12:42:21,212 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                        
- OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' upload complete

vvvvv- flush() is called here? -vvvvv

2018-02-01 12:42:21,212 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem                        
- OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' closed.

2018-02-01 12:42:21,212 INFO  org.apache.flink.runtime.taskmanager.Task                     
- Attempting to fail task externally Source: Custom Source -> Map -> Sink: Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb).
2018-02-01 12:42:21,214 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Map -> Sink: Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb) switched from RUNNING to FAILED.
TimerException{java.io.IOException: Output Stream closed}
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Output Stream closed
	at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:83)
	at org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:89)
	at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
	at java.io.DataOutputStream.flush(DataOutputStream.java:123)
	at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
	at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
	at org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
	at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
	at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
	at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
	at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
	... 7 more
{code}

  was:
I'm hitting an issue with my BucketingSink from a streaming job.

 
{code:java}
return new BucketingSink<Tuple2<String, Object>>(path)
         .setWriter(writer)
         .setBucketer(new DateTimeBucketer<Tuple2<String, Object>>(formatString));
{code}
 

I can see that a few files have run into issues with uploading to S3:

  !Screen Shot 2018-01-30 at 18.34.51.png!

I've grabbed the S3AOutputStream class from my cluster and added some additional logging to the checkOpen() method to log the 'key' just before the exception is thrown:

 
{code:java}
/*
 * Decompiled with CFR.
 */
package org.apache.hadoop.fs.s3a;

import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.services.s3.transfer.model.UploadResult;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AOutputStream
extends OutputStream {
    private final OutputStream backupStream;
    private final File backupFile;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final String key;
    private final Progressable progress;
    private final S3AFileSystem fs;
    public static final Logger LOG = S3AFileSystem.LOG;

    public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, Progressable progress) throws IOException {
        this.key = key;
        this.progress = progress;
        this.fs = fs;
        this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
        LOG.debug("OutputStream for key '{}' writing to tempfile: {}", (Object)key, (Object)this.backupFile);
        this.backupStream = new BufferedOutputStream(new FileOutputStream(this.backupFile));
    }

    void checkOpen() throws IOException {
        if (!this.closed.get()) return;

        // vvvvvv-- Additional logging --vvvvvvv

        LOG.error("OutputStream for key '{}' closed.", (Object)this.key);


        throw new IOException("Output Stream closed");
    }

    @Override
    public void flush() throws IOException {
        this.checkOpen();
        this.backupStream.flush();
    }

    @Override
    public void close() throws IOException {
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.backupStream.close();
        LOG.debug("OutputStream for key '{}' closed. Now beginning upload", (Object)this.key);
        try {
            ObjectMetadata om = this.fs.newObjectMetadata(this.backupFile.length());
            Upload upload = this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
            ProgressableProgressListener listener = new ProgressableProgressListener(this.fs, this.key, upload, this.progress);
            upload.addProgressListener((ProgressListener)listener);
            upload.waitForUploadResult();
            listener.uploadCompleted();
            this.fs.finishedWrite(this.key);
        }
        catch (InterruptedException e) {
            throw (InterruptedIOException)new InterruptedIOException(e.toString()).initCause(e);
        }
        catch (AmazonClientException e) {
            throw S3AUtils.translateException("saving output", this.key, e);
        }
        finally {
            if (!this.backupFile.delete()) {
                LOG.warn("Could not delete temporary s3a file: {}", (Object)this.backupFile);
            }
            super.close();
        }
        LOG.debug("OutputStream for key '{}' upload complete", (Object)this.key);
    }

    @Override
    public void write(int b) throws IOException {
        this.checkOpen();
        this.backupStream.write(b);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        this.checkOpen();
        this.backupStream.write(b, off, len);
    }

    static {
    }
}
{code}
 

You can see from this addition log output that the S3AOutputStream#close() method **appears** to be called before the S3AOutputStream#flush() method:

 
{code:java}
2018-02-01 12:42:20,698 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 128497 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                        - Finished write to landingzone/2018-02-01--1240/_part-0-0.in-progress
2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                        - object_delete_requests += 1  ->  3

vvvvv- close() is called here? -vvvvv

2018-02-01 12:42:21,212 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                        
- OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' upload complete

vvvvv- flush() is called here? -vvvvv

2018-02-01 12:42:21,212 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem                        
- OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' closed.

2018-02-01 12:42:21,212 INFO  org.apache.flink.runtime.taskmanager.Task                     
- Attempting to fail task externally Source: Custom Source -> Map -> Sink: Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb).
2018-02-01 12:42:21,214 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Map -> Sink: Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb) switched from RUNNING to FAILED.
TimerException{java.io.IOException: Output Stream closed}
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Output Stream closed
	at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:83)
	at org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:89)
	at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
	at java.io.DataOutputStream.flush(DataOutputStream.java:123)
	at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
	at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
	at org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
	at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
	at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
	at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
	at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
	... 7 more
{code}


> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --------------------------------------------------------------------------
>
>                 Key: FLINK-8543
>                 URL: https://issues.apache.org/jira/browse/FLINK-8543
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.4.0
>         Environment: IBM Analytics Engine - [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>            Reporter: chris snow
>            Priority: Major
>         Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink<Tuple2<String, Object>>(path)
>          .setWriter(writer)
>          .setBucketer(new DateTimeBucketer<Tuple2<String, Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> I've grabbed the S3AOutputStream class from my cluster and added some additional logging to the checkOpen() method to log the 'key' just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
>     private final OutputStream backupStream;
>     private final File backupFile;
>     private final AtomicBoolean closed = new AtomicBoolean(false);
>     private final String key;
>     private final Progressable progress;
>     private final S3AFileSystem fs;
>     public static final Logger LOG = S3AFileSystem.LOG;
>     public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, Progressable progress) throws IOException {
>         this.key = key;
>         this.progress = progress;
>         this.fs = fs;
>         this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
>         LOG.debug("OutputStream for key '{}' writing to tempfile: {}", (Object)key, (Object)this.backupFile);
>         this.backupStream = new BufferedOutputStream(new FileOutputStream(this.backupFile));
>     }
>     void checkOpen() throws IOException {
>         if (!this.closed.get()) return;
>         // vvvvvv-- Additional logging --vvvvvvv
>         LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
>         throw new IOException("Output Stream closed");
>     }
>     @Override
>     public void flush() throws IOException {
>         this.checkOpen();
>         this.backupStream.flush();
>     }
>     @Override
>     public void close() throws IOException {
>         if (this.closed.getAndSet(true)) {
>             return;
>         }
>         this.backupStream.close();
>         LOG.debug("OutputStream for key '{}' closed. Now beginning upload", (Object)this.key);
>         try {
>             ObjectMetadata om = this.fs.newObjectMetadata(this.backupFile.length());
>             Upload upload = this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
>             ProgressableProgressListener listener = new ProgressableProgressListener(this.fs, this.key, upload, this.progress);
>             upload.addProgressListener((ProgressListener)listener);
>             upload.waitForUploadResult();
>             listener.uploadCompleted();
>             this.fs.finishedWrite(this.key);
>         }
>         catch (InterruptedException e) {
>             throw (InterruptedIOException)new InterruptedIOException(e.toString()).initCause(e);
>         }
>         catch (AmazonClientException e) {
>             throw S3AUtils.translateException("saving output", this.key, e);
>         }
>         finally {
>             if (!this.backupFile.delete()) {
>                 LOG.warn("Could not delete temporary s3a file: {}", (Object)this.backupFile);
>             }
>             super.close();
>         }
>         LOG.debug("OutputStream for key '{}' upload complete", (Object)this.key);
>     }
>     @Override
>     public void write(int b) throws IOException {
>         this.checkOpen();
>         this.backupStream.write(b);
>     }
>     @Override
>     public void write(byte[] b, int off, int len) throws IOException {
>         this.checkOpen();
>         this.backupStream.write(b, off, len);
>     }
>     static {
>     }
> }
> {code}
>  
> You can see from this addition log output that the S3AOutputStream#close() method **appears** to be called before the S3AOutputStream#flush() method:
>  
> {code:java}
> 2018-02-01 12:42:20,698 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 128497 bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress               - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
> 2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                        - Finished write to landingzone/2018-02-01--1240/_part-0-0.in-progress
> 2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                        - object_delete_requests += 1  ->  3
> vvvvv- close() is called here? -vvvvv
> 2018-02-01 12:42:21,212 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem                        
> - OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' upload complete
> vvvvv- flush() is called here? -vvvvv
> 2018-02-01 12:42:21,212 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem                        
> - OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' closed.
> 2018-02-01 12:42:21,212 INFO  org.apache.flink.runtime.taskmanager.Task                     
> - Attempting to fail task externally Source: Custom Source -> Map -> Sink: Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb).
> 2018-02-01 12:42:21,214 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Map -> Sink: Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb) switched from RUNNING to FAILED.
> TimerException{java.io.IOException: Output Stream closed}
> 	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Output Stream closed
> 	at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:83)
> 	at org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:89)
> 	at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
> 	at java.io.DataOutputStream.flush(DataOutputStream.java:123)
> 	at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
> 	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
> 	at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
> 	at org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
> 	at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
> 	at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
> 	at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
> 	at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
> 	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
> 	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
> 	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
> 	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
> 	... 7 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)