You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "chris snow (JIRA)" <ji...@apache.org> on 2018/02/01 13:28:00 UTC
[jira] [Updated] (FLINK-8543) Output Stream closed at
org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
[ https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
chris snow updated FLINK-8543:
------------------------------
Attachment: Screen Shot 2018-01-30 at 18.34.51.png
Description:
I'm hitting an issue with my BucketingSink from a streaming job.
{code:java}
return new BucketingSink<Tuple2<String, Object>>(path)
.setWriter(writer)
.setBucketer(new DateTimeBucketer<Tuple2<String, Object>>(formatString));
{code}
I can see that a few files have run into issues with uploading to S3:
!Screen Shot 2018-01-30 at 18.34.51.png!
I've grabbed the S3AOutputStream class from my cluster and added some additional logging to the checkOpen() method to log the 'key' just before the exception is thrown:
{code:java}
/*
* Decompiled with CFR.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.services.s3.transfer.model.UploadResult;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AOutputStream
extends OutputStream {
private final OutputStream backupStream;
private final File backupFile;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final String key;
private final Progressable progress;
private final S3AFileSystem fs;
public static final Logger LOG = S3AFileSystem.LOG;
public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, Progressable progress) throws IOException {
this.key = key;
this.progress = progress;
this.fs = fs;
this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
LOG.debug("OutputStream for key '{}' writing to tempfile: {}", (Object)key, (Object)this.backupFile);
this.backupStream = new BufferedOutputStream(new FileOutputStream(this.backupFile));
}
void checkOpen() throws IOException {
if (!this.closed.get()) return;
// vvvvvv-- Additional logging --vvvvvvv
LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
throw new IOException("Output Stream closed");
}
@Override
public void flush() throws IOException {
this.checkOpen();
this.backupStream.flush();
}
@Override
public void close() throws IOException {
if (this.closed.getAndSet(true)) {
return;
}
this.backupStream.close();
LOG.debug("OutputStream for key '{}' closed. Now beginning upload", (Object)this.key);
try {
ObjectMetadata om = this.fs.newObjectMetadata(this.backupFile.length());
Upload upload = this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
ProgressableProgressListener listener = new ProgressableProgressListener(this.fs, this.key, upload, this.progress);
upload.addProgressListener((ProgressListener)listener);
upload.waitForUploadResult();
listener.uploadCompleted();
this.fs.finishedWrite(this.key);
}
catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException(e.toString()).initCause(e);
}
catch (AmazonClientException e) {
throw S3AUtils.translateException("saving output", this.key, e);
}
finally {
if (!this.backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: {}", (Object)this.backupFile);
}
super.close();
}
LOG.debug("OutputStream for key '{}' upload complete", (Object)this.key);
}
@Override
public void write(int b) throws IOException {
this.checkOpen();
this.backupStream.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
this.checkOpen();
this.backupStream.write(b, off, len);
}
static {
}
}
{code}
You can see from this addition log output that the S3AOutputStream#close() method **appears** to be called before the S3AOutputStream#flush() method:
{code:java}
2018-02-01 12:42:20,698 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 128497 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Finished write to landingzone/2018-02-01--1240/_part-0-0.in-progress
2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - object_delete_requests += 1 -> 3
vvvvv- close() is called here? -vvvvv
2018-02-01 12:42:21,212 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem
- OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' upload complete
vvvvv- flush() is called here? -vvvvv
2018-02-01 12:42:21,212 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem
- OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' closed.
2018-02-01 12:42:21,212 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to fail task externally Source: Custom Source -> Map -> Sink: Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb).
2018-02-01 12:42:21,214 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Map -> Sink: Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb) switched from RUNNING to FAILED.
TimerException{java.io.IOException: Output Stream closed}
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Output Stream closed
at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:83)
at org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:89)
at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
at org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
... 7 more
{code}
was:
I'm hitting an issue with my BucketingSink from a streaming job.
{code:java}
return new BucketingSink<Tuple2<String, Object>>(path)
.setWriter(writer)
.setBucketer(new DateTimeBucketer<Tuple2<String, Object>>(formatString));
{code}
I can see that a few files have run into issues with uploading to S3:
!Screen Shot 2018-01-30 at 18.34.51.png!
I've grabbed the S3AOutputStream class from my cluster and added some additional logging to the checkOpen() method to log the 'key' just before the exception is thrown:
{code:java}
/*
* Decompiled with CFR.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.services.s3.transfer.model.UploadResult;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AOutputStream
extends OutputStream {
private final OutputStream backupStream;
private final File backupFile;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final String key;
private final Progressable progress;
private final S3AFileSystem fs;
public static final Logger LOG = S3AFileSystem.LOG;
public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, Progressable progress) throws IOException {
this.key = key;
this.progress = progress;
this.fs = fs;
this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
LOG.debug("OutputStream for key '{}' writing to tempfile: {}", (Object)key, (Object)this.backupFile);
this.backupStream = new BufferedOutputStream(new FileOutputStream(this.backupFile));
}
void checkOpen() throws IOException {
if (!this.closed.get()) return;
// vvvvvv-- Additional logging --vvvvvvv
LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
throw new IOException("Output Stream closed");
}
@Override
public void flush() throws IOException {
this.checkOpen();
this.backupStream.flush();
}
@Override
public void close() throws IOException {
if (this.closed.getAndSet(true)) {
return;
}
this.backupStream.close();
LOG.debug("OutputStream for key '{}' closed. Now beginning upload", (Object)this.key);
try {
ObjectMetadata om = this.fs.newObjectMetadata(this.backupFile.length());
Upload upload = this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
ProgressableProgressListener listener = new ProgressableProgressListener(this.fs, this.key, upload, this.progress);
upload.addProgressListener((ProgressListener)listener);
upload.waitForUploadResult();
listener.uploadCompleted();
this.fs.finishedWrite(this.key);
}
catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException(e.toString()).initCause(e);
}
catch (AmazonClientException e) {
throw S3AUtils.translateException("saving output", this.key, e);
}
finally {
if (!this.backupFile.delete()) {
LOG.warn("Could not delete temporary s3a file: {}", (Object)this.backupFile);
}
super.close();
}
LOG.debug("OutputStream for key '{}' upload complete", (Object)this.key);
}
@Override
public void write(int b) throws IOException {
this.checkOpen();
this.backupStream.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
this.checkOpen();
this.backupStream.write(b, off, len);
}
static {
}
}
{code}
You can see from this addition log output that the S3AOutputStream#close() method **appears** to be called before the S3AOutputStream#flush() method:
{code:java}
2018-02-01 12:42:20,698 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 128497 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Finished write to landingzone/2018-02-01--1240/_part-0-0.in-progress
2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - object_delete_requests += 1 -> 3
vvvvv- close() is called here? -vvvvv
2018-02-01 12:42:21,212 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem
- OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' upload complete
vvvvv- flush() is called here? -vvvvv
2018-02-01 12:42:21,212 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem
- OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' closed.
2018-02-01 12:42:21,212 INFO org.apache.flink.runtime.taskmanager.Task
- Attempting to fail task externally Source: Custom Source -> Map -> Sink: Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb).
2018-02-01 12:42:21,214 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Map -> Sink: Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb) switched from RUNNING to FAILED.
TimerException{java.io.IOException: Output Stream closed}
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Output Stream closed
at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:83)
at org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:89)
at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
at org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
... 7 more
{code}
> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --------------------------------------------------------------------------
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2
> Jupyter Enterprise Gateway 0.5.0
> HBase 1.1.2 *
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 *
> Tez 0.7.0 *
> Pig 0.16.0 *
> Sqoop 1.4.6 *
> Slider 0.92.0 *
> Reporter: chris snow
> Priority: Major
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>
> {code:java}
> return new BucketingSink<Tuple2<String, Object>>(path)
> .setWriter(writer)
> .setBucketer(new DateTimeBucketer<Tuple2<String, Object>>(formatString));
> {code}
>
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!
> I've grabbed the S3AOutputStream class from my cluster and added some additional logging to the checkOpen() method to log the 'key' just before the exception is thrown:
>
> {code:java}
> /*
> * Decompiled with CFR.
> */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vvvvvv-- Additional logging --vvvvvvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", (Object)this.key);
> try {
> ObjectMetadata om = this.fs.newObjectMetadata(this.backupFile.length());
> Upload upload = this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
> ProgressableProgressListener listener = new ProgressableProgressListener(this.fs, this.key, upload, this.progress);
> upload.addProgressListener((ProgressListener)listener);
> upload.waitForUploadResult();
> listener.uploadCompleted();
> this.fs.finishedWrite(this.key);
> }
> catch (InterruptedException e) {
> throw (InterruptedIOException)new InterruptedIOException(e.toString()).initCause(e);
> }
> catch (AmazonClientException e) {
> throw S3AUtils.translateException("saving output", this.key, e);
> }
> finally {
> if (!this.backupFile.delete()) {
> LOG.warn("Could not delete temporary s3a file: {}", (Object)this.backupFile);
> }
> super.close();
> }
> LOG.debug("OutputStream for key '{}' upload complete", (Object)this.key);
> }
> @Override
> public void write(int b) throws IOException {
> this.checkOpen();
> this.backupStream.write(b);
> }
> @Override
> public void write(byte[] b, int off, int len) throws IOException {
> this.checkOpen();
> this.backupStream.write(b, off, len);
> }
> static {
> }
> }
> {code}
>
> You can see from this addition log output that the S3AOutputStream#close() method **appears** to be called before the S3AOutputStream#flush() method:
>
> {code:java}
> 2018-02-01 12:42:20,698 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 128497 bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
> 2018-02-01 12:42:20,910 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress - PUT landingzone/2018-02-01--1240/_part-0-0.in-progress: 0 bytes
> 2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - Finished write to landingzone/2018-02-01--1240/_part-0-0.in-progress
> 2018-02-01 12:42:20,911 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem - object_delete_requests += 1 -> 3
> vvvvv- close() is called here? -vvvvv
> 2018-02-01 12:42:21,212 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem
> - OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' upload complete
> vvvvv- flush() is called here? -vvvvv
> 2018-02-01 12:42:21,212 ERROR org.apache.hadoop.fs.s3a.S3AFileSystem
> - OutputStream for key 'landingzone/2018-02-01--1240/_part-0-0.in-progress' closed.
> 2018-02-01 12:42:21,212 INFO org.apache.flink.runtime.taskmanager.Task
> - Attempting to fail task externally Source: Custom Source -> Map -> Sink: Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb).
> 2018-02-01 12:42:21,214 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Map -> Sink: Unnamed (1/2) (510c8316d3a249e5ea5b8d8e693f7beb) switched from RUNNING to FAILED.
> TimerException{java.io.IOException: Output Stream closed}
> at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:252)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Output Stream closed
> at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen(S3AOutputStream.java:83)
> at org.apache.hadoop.fs.s3a.S3AOutputStream.flush(S3AOutputStream.java:89)
> at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
> at java.io.DataOutputStream.flush(DataOutputStream.java:123)
> at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
> at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:141)
> at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerFlush(BufferedBinaryEncoder.java:220)
> at org.apache.avro.io.BufferedBinaryEncoder.flush(BufferedBinaryEncoder.java:85)
> at org.apache.avro.file.DataFileWriter.flush(DataFileWriter.java:368)
> at org.apache.avro.file.DataFileWriter.close(DataFileWriter.java:375)
> at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter$AvroKeyValueWriter.close(AvroKeyValueSinkWriter.java:251)
> at org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.close(AvroKeyValueSinkWriter.java:163)
> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:551)
> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.checkForInactiveBuckets(BucketingSink.java:493)
> at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.onProcessingTime(BucketingSink.java:476)
> at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:249)
> ... 7 more
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)