You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/12/05 14:11:39 UTC

[flink] branch master updated (3eac419 -> 2e1cbf2)

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 3eac419  [FLINK-11045][table] Set correct UserCodeClassLoader for RuntimeUDFContext in CollectionExecutor
     new 600bd9c  [FLINK-10522][fs-connector] Check if RecoverableWriter supportsResume() and act accordingly.
     new 2e1cbf2  [hotfix] Fixing the broken code examples

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/connectors/streamfile_sink.md             |  20 +--
 .../api/functions/sink/filesystem/Bucket.java      |  14 +-
 .../api/functions/sink/filesystem/BucketTest.java  | 185 ++++++++++++++++++++-
 .../sink/filesystem/utils/NoOpCommitter.java       |  35 ++--
 .../sink/filesystem/utils/NoOpRecoverable.java     |  19 ++-
 .../utils/NoOpRecoverableFsDataOutputStream.java   |  48 +++---
 .../filesystem/utils/NoOpRecoverableWriter.java    |  75 +++++++++
 7 files changed, 327 insertions(+), 69 deletions(-)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java => flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpCommitter.java (57%)
 copy flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/SelectOneReducer.java => flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverable.java (67%)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingOutputStream.java => flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableFsDataOutputStream.java (55%)
 create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java


[flink] 01/02: [FLINK-10522][fs-connector] Check if RecoverableWriter supportsResume() and act accordingly.

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 600bd9cdbb42b709cfa6fa313648fcea317ad781
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Dec 4 17:35:42 2018 +0100

    [FLINK-10522][fs-connector] Check if RecoverableWriter supportsResume() and act accordingly.
    
    This closes #7047.
---
 .../api/functions/sink/filesystem/Bucket.java      |  14 +-
 .../api/functions/sink/filesystem/BucketTest.java  | 185 ++++++++++++++++++++-
 .../sink/filesystem/utils/NoOpCommitter.java       |  49 ++++++
 .../sink/filesystem/utils/NoOpRecoverable.java     |  32 ++++
 .../utils/NoOpRecoverableFsDataOutputStream.java   |  67 ++++++++
 .../filesystem/utils/NoOpRecoverableWriter.java    |  75 +++++++++
 6 files changed, 418 insertions(+), 4 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index b59c84e..3252d9c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -135,9 +135,17 @@ public class Bucket<IN, BucketID> {
 
 		// we try to resume the previous in-progress file
 		final ResumeRecoverable resumable = state.getInProgressResumableFile();
-		final RecoverableFsDataOutputStream stream = fsWriter.recover(resumable);
-		inProgressPart = partFileFactory.resumeFrom(
-				bucketId, stream, resumable, state.getInProgressFileCreationTime());
+
+		if (fsWriter.supportsResume()) {
+			final RecoverableFsDataOutputStream stream = fsWriter.recover(resumable);
+			inProgressPart = partFileFactory.resumeFrom(
+					bucketId, stream, resumable, state.getInProgressFileCreationTime());
+		} else {
+			// if the writer does not support resume, then we close the
+			// in-progress part and commit it, as done in the case of pending files.
+
+			fsWriter.recoverForCommit(resumable).commitAfterRecovery();
+		}
 
 		if (fsWriter.requiresCleanupOfRecoverableState()) {
 			fsWriter.cleanupRecoverableState(resumable);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
index f328fd7..308bc31 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -21,25 +21,36 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.core.fs.local.LocalRecoverableWriter;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpCommitter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverable;
+import org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverableFsDataOutputStream;
+import org.apache.flink.streaming.api.functions.sink.filesystem.utils.NoOpRecoverableWriter;
 
 import org.hamcrest.Description;
 import org.hamcrest.TypeSafeMatcher;
+import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for the {@link Bucket}.
+ * Tests for the {@code Bucket}.
  */
 public class BucketTest {
 
@@ -128,6 +139,37 @@ public class BucketTest {
 		assertThat(recoverableWriter, hasCalledDiscard(0)); // we have no in-progress file.
 	}
 
+	// --------------------------- Checking Restore ---------------------------
+
+	@Test
+	public void inProgressFileShouldBeCommittedIfWriterDoesNotSupportResume() throws IOException {
+		final StubNonResumableWriter nonResumableWriter = new StubNonResumableWriter();
+		final Bucket<String, String> bucket = getRestoredBucketWithOnlyInProgressPart(nonResumableWriter);
+
+		Assert.assertThat(nonResumableWriter, hasMethodCallCountersEqualTo(1, 0, 1));
+		Assert.assertThat(bucket, hasNullInProgressFile(true));
+	}
+
+	@Test
+	public void inProgressFileShouldBeRestoredIfWriterSupportsResume() throws IOException {
+		final StubResumableWriter resumableWriter = new StubResumableWriter();
+		final Bucket<String, String> bucket = getRestoredBucketWithOnlyInProgressPart(resumableWriter);
+
+		Assert.assertThat(resumableWriter, hasMethodCallCountersEqualTo(1, 1, 0));
+		Assert.assertThat(bucket, hasNullInProgressFile(false));
+	}
+
+	@Test
+	public void pendingFilesShouldBeRestored() throws IOException {
+		final int expectedRecoverForCommitCounter = 10;
+
+		final StubNonResumableWriter writer = new StubNonResumableWriter();
+		final Bucket<String, String> bucket = getRestoredBucketWithOnlyPendingParts(writer, expectedRecoverForCommitCounter);
+
+		Assert.assertThat(writer, hasMethodCallCountersEqualTo(0, 0, expectedRecoverForCommitCounter));
+		Assert.assertThat(bucket, hasNullInProgressFile(true));
+	}
+
 	// ------------------------------- Matchers --------------------------------
 
 	private static TypeSafeMatcher<TestRecoverableWriter> hasCalledDiscard(int times) {
@@ -175,6 +217,47 @@ public class BucketTest {
 		};
 	}
 
+	private static TypeSafeMatcher<Bucket<String, String>> hasNullInProgressFile(final boolean isNull) {
+
+		return new TypeSafeMatcher<Bucket<String, String>>() {
+			@Override
+			protected boolean matchesSafely(Bucket<String, String> bucket) {
+				final PartFileWriter<String, String> inProgressPart = bucket.getInProgressPart();
+				return isNull == (inProgressPart == null);
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description.appendText("a Bucket with its inProgressPart being ")
+						.appendText(isNull ? " null." : " not null.");
+			}
+		};
+	}
+
+	private static TypeSafeMatcher<BaseStubWriter> hasMethodCallCountersEqualTo(
+			final int supportsResumeCalls,
+			final int recoverCalls,
+			final int recoverForCommitCalls) {
+
+		return new TypeSafeMatcher<BaseStubWriter>() {
+			@Override
+			protected boolean matchesSafely(BaseStubWriter writer) {
+				return writer.getSupportsResumeCallCounter() == supportsResumeCalls &&
+						writer.getRecoverCallCounter() == recoverCalls &&
+						writer.getRecoverForCommitCallCounter() == recoverForCommitCalls;
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description.appendText("a Writer where:")
+						.appendText(" supportsResume was called ").appendValue(supportsResumeCalls).appendText(" times,")
+						.appendText(" recover was called ").appendValue(recoverCalls).appendText(" times,")
+						.appendText(" and recoverForCommit was called ").appendValue(recoverForCommitCalls).appendText(" times.")
+						.appendText("'");
+			}
+		};
+	}
+
 	// ------------------------------- Mock Classes --------------------------------
 
 	private static class TestRecoverableWriter extends LocalRecoverableWriter {
@@ -207,6 +290,81 @@ public class BucketTest {
 		}
 	}
 
+	/**
+	 * A test implementation of a {@link RecoverableWriter} that does not support
+	 * resuming, i.e. keep on writing to the in-progress file at the point we were
+	 * before the failure.
+	 */
+	private static class StubResumableWriter extends BaseStubWriter {
+
+		StubResumableWriter() {
+			super(true);
+		}
+	}
+
+	/**
+	 * A test implementation of a {@link RecoverableWriter} that does not support
+	 * resuming, i.e. keep on writing to the in-progress file at the point we were
+	 * before the failure.
+	 */
+	private static class StubNonResumableWriter extends BaseStubWriter {
+
+		StubNonResumableWriter() {
+			super(false);
+		}
+	}
+
+	/**
+	 * A test implementation of a {@link RecoverableWriter} that does not support
+	 * resuming, i.e. keep on writing to the in-progress file at the point we were
+	 * before the failure.
+	 */
+	private static class BaseStubWriter extends NoOpRecoverableWriter {
+
+		private final boolean supportsResume;
+
+		private int supportsResumeCallCounter = 0;
+
+		private int recoverCallCounter = 0;
+
+		private int recoverForCommitCallCounter = 0;
+
+		private BaseStubWriter(final boolean supportsResume) {
+			this.supportsResume = supportsResume;
+		}
+
+		int getSupportsResumeCallCounter() {
+			return supportsResumeCallCounter;
+		}
+
+		int getRecoverCallCounter() {
+			return recoverCallCounter;
+		}
+
+		int getRecoverForCommitCallCounter() {
+			return recoverForCommitCallCounter;
+		}
+
+		@Override
+		public RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws IOException {
+			recoverCallCounter++;
+			return new NoOpRecoverableFsDataOutputStream();
+		}
+
+		@Override
+		public RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable) throws IOException {
+			checkArgument(resumable instanceof NoOpRecoverable);
+			recoverForCommitCallCounter++;
+			return new NoOpCommitter();
+		}
+
+		@Override
+		public boolean supportsResume() {
+			supportsResumeCallCounter++;
+			return supportsResume;
+		}
+	}
+
 	// ------------------------------- Utility Methods --------------------------------
 
 	private static final String bucketId = "testing-bucket";
@@ -260,4 +418,29 @@ public class BucketTest {
 		}
 		return null;
 	}
+
+	private Bucket<String, String> getRestoredBucketWithOnlyInProgressPart(final BaseStubWriter writer) throws IOException {
+		final BucketState<String> stateWithOnlyInProgressFile =
+				new BucketState<>("test", new Path(), 12345L, new NoOpRecoverable(), new HashMap<>());
+		return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, stateWithOnlyInProgressFile);
+	}
+
+	private Bucket<String, String> getRestoredBucketWithOnlyPendingParts(final BaseStubWriter writer, final int numberOfPendingParts) throws IOException {
+		final Map<Long, List<RecoverableWriter.CommitRecoverable>> completePartsPerCheckpoint =
+				createPendingPartsPerCheckpoint(numberOfPendingParts);
+
+		final BucketState<String> initStateWithOnlyInProgressFile =
+				new BucketState<>("test", new Path(), 12345L, null, completePartsPerCheckpoint);
+		return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, initStateWithOnlyInProgressFile);
+	}
+
+	private Map<Long, List<RecoverableWriter.CommitRecoverable>> createPendingPartsPerCheckpoint(int noOfCheckpoints) {
+		final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommittablesPerCheckpoint = new HashMap<>();
+		for (int checkpointId = 0; checkpointId < noOfCheckpoints; checkpointId++) {
+			final List<RecoverableWriter.CommitRecoverable> pending = new ArrayList<>();
+			pending.add(new NoOpRecoverable());
+			pendingCommittablesPerCheckpoint.put((long) checkpointId, pending);
+		}
+		return pendingCommittablesPerCheckpoint;
+	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpCommitter.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpCommitter.java
new file mode 100644
index 0000000..06005a1
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpCommitter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.utils;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import java.io.IOException;
+
+/**
+ * An implementation of the {@link RecoverableFsDataOutputStream.Committer committer}
+ * that does nothing.
+ *
+ * <p>This is to avoid to have to implement all methods for every implementation
+ * used in tests.
+ */
+public class NoOpCommitter implements RecoverableFsDataOutputStream.Committer {
+
+	@Override
+	public void commit() throws IOException {
+
+	}
+
+	@Override
+	public void commitAfterRecovery() throws IOException {
+
+	}
+
+	@Override
+	public RecoverableWriter.CommitRecoverable getRecoverable() {
+		return null;
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverable.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverable.java
new file mode 100644
index 0000000..e00d4c4
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverable.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.utils;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+
+/**
+ * An implementation of the {@link RecoverableWriter.ResumeRecoverable ResumeRecoverable}
+ * that does nothing.
+ *
+ * <p>This is to avoid to have to implement all methods for every implementation
+ * used in tests.
+ */
+public class NoOpRecoverable implements RecoverableWriter.ResumeRecoverable {
+
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableFsDataOutputStream.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableFsDataOutputStream.java
new file mode 100644
index 0000000..a549896
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableFsDataOutputStream.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.utils;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import java.io.IOException;
+
+/**
+ * A default implementation of the {@link RecoverableFsDataOutputStream} that does nothing.
+ *
+ * <p>This is to avoid to have to implement all methods for every implementation
+ * used in tests.
+ */
+public class NoOpRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream {
+	@Override
+	public RecoverableWriter.ResumeRecoverable persist() throws IOException {
+		return null;
+	}
+
+	@Override
+	public Committer closeForCommit() throws IOException {
+		return null;
+	}
+
+	@Override
+	public void close() throws IOException {
+
+	}
+
+	@Override
+	public long getPos() throws IOException {
+		return 0;
+	}
+
+	@Override
+	public void flush() throws IOException {
+
+	}
+
+	@Override
+	public void sync() throws IOException {
+
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java
new file mode 100644
index 0000000..e21da2a
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem.utils;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+
+/**
+ * A default implementation of the {@link RecoverableWriter} that does nothing.
+ *
+ * <p>This is to avoid to have to implement all methods for every implementation
+ * used in tests.
+ */
+public class NoOpRecoverableWriter implements RecoverableWriter {
+
+	@Override
+	public RecoverableFsDataOutputStream open(Path path) throws IOException {
+		return null;
+	}
+
+	@Override
+	public RecoverableFsDataOutputStream recover(RecoverableWriter.ResumeRecoverable resumable) throws IOException {
+		return null;
+	}
+
+	@Override
+	public boolean requiresCleanupOfRecoverableState() {
+		return false;
+	}
+
+	@Override
+	public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public RecoverableFsDataOutputStream.Committer recoverForCommit(RecoverableWriter.CommitRecoverable resumable) throws IOException {
+		return null;
+	}
+
+	@Override
+	public SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitRecoverableSerializer() {
+		return null;
+	}
+
+	@Override
+	public SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumeRecoverableSerializer() {
+		return null;
+	}
+
+	@Override
+	public boolean supportsResume() {
+		return false;
+	}
+}


[flink] 02/02: [hotfix] Fixing the broken code examples

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2e1cbf2e3fbfc58051d8a5c3c2b39349bee39b31
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Tue Dec 4 16:45:54 2018 +0100

    [hotfix] Fixing the broken code examples
    
    The code examples for Scala and Java are both broken,
    and set a bad example in terms of efficiency.
    
    This closes #7232.
---
 docs/dev/connectors/streamfile_sink.md | 20 +++++++-------------
 1 file changed, 7 insertions(+), 13 deletions(-)

diff --git a/docs/dev/connectors/streamfile_sink.md b/docs/dev/connectors/streamfile_sink.md
index 8f50675..82ab562 100644
--- a/docs/dev/connectors/streamfile_sink.md
+++ b/docs/dev/connectors/streamfile_sink.md
@@ -60,17 +60,14 @@ Basic usage thus looks like this:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 
 DataStream<String> input = ...;
 
 final StreamingFileSink<String> sink = StreamingFileSink
-	.forRowFormat(new Path(outputPath), (Encoder<String>) (element, stream) -> {
-		PrintStream out = new PrintStream(stream);
-		out.println(element.f1);
-	})
+	.forRowFormat(new Path(outputPath), new SimpleStringEncoder<>("UTF-8"))
 	.build();
 
 input.addSink(sink);
@@ -79,19 +76,16 @@ input.addSink(sink);
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-import org.apache.flink.api.common.serialization.Encoder
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
 
 val input: DataStream[String] = ...
 
-final StreamingFileSink[String] sink = StreamingFileSink
-	.forRowFormat(new Path(outputPath), (element, stream) => {
-		val out = new PrintStream(stream)
-		out.println(element.f1)
-	})
-	.build()
-
+val sink: StreamingFileSink[String] = StreamingFileSink
+    .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
+    .build()
+    
 input.addSink(sink)
 
 {% endhighlight %}