You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/07/23 12:46:35 UTC
[5/5] flink git commit: [FLINK-9921][DataStream API] Update
RollingPolicy interface
[FLINK-9921][DataStream API] Update RollingPolicy interface
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/702f7735
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/702f7735
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/702f7735
Branch: refs/heads/release-1.6
Commit: 702f77355bbb9ccf560885dd5c6a717a25cafa53
Parents: fdb11c5
Author: kkloudas <kk...@gmail.com>
Authored: Fri Jul 20 17:03:16 2018 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Mon Jul 23 14:23:32 2018 +0200
----------------------------------------------------------------------
.../api/functions/sink/filesystem/Buckets.java | 13 +++-------
.../sink/filesystem/RollingPolicy.java | 5 ++--
.../sink/filesystem/StreamingFileSink.java | 7 +++--
.../rolling/policies/DefaultRollingPolicy.java | 27 +++++---------------
.../policies/OnCheckpointRollingPolicy.java | 4 +--
.../filesystem/LocalStreamingFileSinkTest.java | 2 +-
.../sink/filesystem/RollingPolicyTest.java | 8 +++---
.../functions/sink/filesystem/TestUtils.java | 4 +--
8 files changed, 26 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/702f7735/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 6afba17..e6f8c00 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -61,7 +61,7 @@ public class Buckets<IN, BucketID> {
private final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory;
- private final RollingPolicy<BucketID> rollingPolicy;
+ private final RollingPolicy<IN, BucketID> rollingPolicy;
// --------------------------- runtime fields -----------------------------
@@ -95,7 +95,7 @@ public class Buckets<IN, BucketID> {
final Bucketer<IN, BucketID> bucketer,
final BucketFactory<IN, BucketID> bucketFactory,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
- final RollingPolicy<BucketID> rollingPolicy,
+ final RollingPolicy<IN, BucketID> rollingPolicy,
final int subtaskIndex) throws IOException {
this.basePath = Preconditions.checkNotNull(basePath);
@@ -189,7 +189,6 @@ public class Buckets<IN, BucketID> {
void snapshotState(
final long checkpointId,
- final long checkpointTimestamp,
final ListState<byte[]> bucketStates,
final ListState<Long> partCounterState) throws Exception {
@@ -201,11 +200,7 @@ public class Buckets<IN, BucketID> {
for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo();
- if (info != null &&
- (rollingPolicy.shouldRollOnCheckpoint(info) ||
- rollingPolicy.shouldRollOnEvent(info) ||
- rollingPolicy.shouldRollOnProcessingTime(info, checkpointTimestamp))
- ) {
+ if (info != null && rollingPolicy.shouldRollOnCheckpoint(info)) {
// we also check here so that we do not have to always
// wait for the "next" element to arrive.
bucket.closePartFile();
@@ -249,7 +244,7 @@ public class Buckets<IN, BucketID> {
}
final PartFileInfo<BucketID> info = bucket.getInProgressPartInfo();
- if (info == null || rollingPolicy.shouldRollOnEvent(info)) {
+ if (info == null || rollingPolicy.shouldRollOnEvent(info, value)) {
bucket.rollPartFile(currentProcessingTime);
}
bucket.write(value, currentProcessingTime);
http://git-wip-us.apache.org/repos/asf/flink/blob/702f7735/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
index 24c38aa..b1354a9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicy.java
@@ -28,7 +28,7 @@ import java.io.Serializable;
* rolls its currently open part file and opens a new one.
*/
@PublicEvolving
-public interface RollingPolicy<BucketID> extends Serializable {
+public interface RollingPolicy<IN, BucketID> extends Serializable {
/**
* Determines if the in-progress part file for a bucket should roll on every checkpoint.
@@ -39,10 +39,11 @@ public interface RollingPolicy<BucketID> extends Serializable {
/**
* Determines if the in-progress part file for a bucket should roll based on its current state, e.g. its size.
+ * @param element the element being processed.
* @param partFileState the state of the currently open part file of the bucket.
* @return {@code True} if the part file should roll, {@link false} otherwise.
*/
- boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState) throws IOException;
+ boolean shouldRollOnEvent(final PartFileInfo<BucketID> partFileState, IN element) throws IOException;
/**
* Determines if the in-progress part file for a bucket should roll based on a time condition.
http://git-wip-us.apache.org/repos/asf/flink/blob/702f7735/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index c208079..0ebcc4f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -194,7 +194,7 @@ public class StreamingFileSink<IN>
private Bucketer<IN, BucketID> bucketer;
- private RollingPolicy<BucketID> rollingPolicy;
+ private RollingPolicy<IN, BucketID> rollingPolicy;
private BucketFactory<IN, BucketID> bucketFactory = new DefaultBucketFactory<>();
@@ -215,12 +215,12 @@ public class StreamingFileSink<IN>
return this;
}
- public StreamingFileSink.RowFormatBuilder<IN, BucketID> withRollingPolicy(final RollingPolicy<BucketID> policy) {
+ public StreamingFileSink.RowFormatBuilder<IN, BucketID> withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
this.rollingPolicy = Preconditions.checkNotNull(policy);
return this;
}
- public <ID> StreamingFileSink.RowFormatBuilder<IN, ID> withBucketerAndPolicy(final Bucketer<IN, ID> bucketer, final RollingPolicy<ID> policy) {
+ public <ID> StreamingFileSink.RowFormatBuilder<IN, ID> withBucketerAndPolicy(final Bucketer<IN, ID> bucketer, final RollingPolicy<IN, ID> policy) {
@SuppressWarnings("unchecked")
StreamingFileSink.RowFormatBuilder<IN, ID> reInterpreted = (StreamingFileSink.RowFormatBuilder<IN, ID>) this;
reInterpreted.bucketer = Preconditions.checkNotNull(bucketer);
@@ -340,7 +340,6 @@ public class StreamingFileSink<IN>
buckets.snapshotState(
context.getCheckpointId(),
- context.getCheckpointTimestamp(),
bucketStates,
maxPartCountersState);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/702f7735/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
index a9ff617..15c3b4d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/DefaultRollingPolicy.java
@@ -37,7 +37,7 @@ import java.io.IOException;
* </ol>
*/
@PublicEvolving
-public final class DefaultRollingPolicy<BucketID> implements RollingPolicy<BucketID> {
+public final class DefaultRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
private static final long serialVersionUID = 1L;
@@ -67,32 +67,19 @@ public final class DefaultRollingPolicy<BucketID> implements RollingPolicy<Bucke
}
@Override
- public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) {
- return false;
+ public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) throws IOException {
+ return partFileState.getSize() > partSize;
}
@Override
- public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState) throws IOException {
- if (partFileState == null) {
- // this means that there is no currently open part file.
- return true;
- }
-
+ public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException {
return partFileState.getSize() > partSize;
}
@Override
public boolean shouldRollOnProcessingTime(final PartFileInfo<BucketID> partFileState, final long currentTime) {
- if (partFileState == null) {
- // this means that there is no currently open part file.
- return true;
- }
-
- if (currentTime - partFileState.getCreationTime() > rolloverInterval) {
- return true;
- }
-
- return currentTime - partFileState.getLastUpdateTime() > inactivityInterval;
+ return currentTime - partFileState.getCreationTime() > rolloverInterval ||
+ currentTime - partFileState.getLastUpdateTime() > inactivityInterval;
}
/**
@@ -150,7 +137,7 @@ public final class DefaultRollingPolicy<BucketID> implements RollingPolicy<Bucke
/**
* Creates the actual policy.
*/
- public <BucketID> DefaultRollingPolicy<BucketID> build() {
+ public <IN, BucketID> DefaultRollingPolicy<IN, BucketID> build() {
return new DefaultRollingPolicy<>(partSize, rolloverInterval, inactivityInterval);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/702f7735/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
index 4361941..df15981 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/rolling/policies/OnCheckpointRollingPolicy.java
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
/**
* A {@link RollingPolicy} which rolls on every checkpoint.
*/
-public class OnCheckpointRollingPolicy<BucketID> implements RollingPolicy<BucketID> {
+public class OnCheckpointRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
private static final long serialVersionUID = 1L;
@@ -34,7 +34,7 @@ public class OnCheckpointRollingPolicy<BucketID> implements RollingPolicy<Bucket
}
@Override
- public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState) {
+ public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) {
return false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/702f7735/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
index 6e942e9..7c23918 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
@@ -494,7 +494,7 @@ public class LocalStreamingFileSinkTest extends TestLogger {
final TestBucketFactory first = new TestBucketFactory();
final TestBucketFactory second = new TestBucketFactory();
- final RollingPolicy<String> rollingPolicy = DefaultRollingPolicy
+ final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy = DefaultRollingPolicy
.create()
.withMaxPartSize(2L)
.withRolloverInterval(100L)
http://git-wip-us.apache.org/repos/asf/flink/blob/702f7735/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index 61e1433..078a46b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -44,7 +44,7 @@ public class RollingPolicyTest {
public void testDefaultRollingPolicy() throws Exception {
final File outDir = TEMP_FOLDER.newFolder();
- final RollingPolicy<String> rollingPolicy = DefaultRollingPolicy
+ final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy = DefaultRollingPolicy
.create()
.withMaxPartSize(10L)
.withInactivityInterval(4L)
@@ -104,7 +104,7 @@ public class RollingPolicyTest {
public void testRollOnCheckpointPolicy() throws Exception {
final File outDir = TEMP_FOLDER.newFolder();
- final RollingPolicy<String> rollingPolicy = new OnCheckpointRollingPolicy<>();
+ final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy = new OnCheckpointRollingPolicy<>();
try (
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> testHarness = TestUtils.createCustomRescalingTestSink(
@@ -159,7 +159,7 @@ public class RollingPolicyTest {
public void testCustomRollingPolicy() throws Exception {
final File outDir = TEMP_FOLDER.newFolder();
- final RollingPolicy<String> rollingPolicy = new RollingPolicy<String>() {
+ final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy = new RollingPolicy<Tuple2<String, Integer>, String>() {
private static final long serialVersionUID = 1L;
@@ -169,7 +169,7 @@ public class RollingPolicyTest {
}
@Override
- public boolean shouldRollOnEvent(PartFileInfo<String> partFileState) throws IOException {
+ public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, Tuple2<String, Integer> element) throws IOException {
// this means that 2 elements will close the part file.
return partFileState.getSize() > 12L;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/702f7735/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index 184e23e..9589c5a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -51,7 +51,7 @@ public class TestUtils {
long inactivityInterval,
long partMaxSize) throws Exception {
- final RollingPolicy<String> rollingPolicy =
+ final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy =
DefaultRollingPolicy
.create()
.withMaxPartSize(partMaxSize)
@@ -84,7 +84,7 @@ public class TestUtils {
final long bucketCheckInterval,
final Bucketer<Tuple2<String, Integer>, String> bucketer,
final Encoder<Tuple2<String, Integer>> writer,
- final RollingPolicy<String> rollingPolicy,
+ final RollingPolicy<Tuple2<String, Integer>, String> rollingPolicy,
final BucketFactory<Tuple2<String, Integer>, String> bucketFactory) throws Exception {
StreamingFileSink<Tuple2<String, Integer>> sink = StreamingFileSink