You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/05/11 22:39:25 UTC
[2/2] flink git commit: [FLINK-9138] [bucketSink] Support time-based
rollover of part files in BucketingSink.
[FLINK-9138] [bucketSink] Support time-based rollover of part files in BucketingSink.
This closes #5860.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6a1b6e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6a1b6e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6a1b6e9
Branch: refs/heads/master
Commit: b6a1b6e9dd3a278c9c644682e377deaf105e7ac4
Parents: c1ffb4f
Author: Lakshmi Gururaja Rao <gl...@gmail.com>
Authored: Mon Apr 16 16:31:49 2018 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri May 11 21:53:16 2018 +0200
----------------------------------------------------------------------
docs/dev/connectors/filesystem_sink.md | 15 ++--
.../connectors/fs/bucketing/BucketingSink.java | 72 +++++++++++++++++---
.../fs/bucketing/BucketingSinkTest.java | 61 +++++++++++++++++
3 files changed, 134 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b6a1b6e9/docs/dev/connectors/filesystem_sink.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/filesystem_sink.md b/docs/dev/connectors/filesystem_sink.md
index 4a00322..af1349d 100644
--- a/docs/dev/connectors/filesystem_sink.md
+++ b/docs/dev/connectors/filesystem_sink.md
@@ -89,8 +89,13 @@ and write them to part files, separated by newline. To specify a custom writer u
on a `BucketingSink`. If you want to write Hadoop SequenceFiles you can use the provided
`SequenceFileWriter` which can also be configured to use compression.
-The last configuration option is the batch size. This specifies when a part file should be closed
-and a new one started. (The default part file size is 384 MB).
+There are two configuration options that specify when a part file should be closed
+and a new one started:
+
+* By setting a batch size (The default part file size is 384 MB)
+* By setting a batch roll over time interval (The default roll over interval is `Long.MAX_VALUE`)
+
+A new part file is started when either of these two conditions is satisfied.
Example:
@@ -103,6 +108,7 @@ BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
+sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
input.addSink(sink);
@@ -116,6 +122,7 @@ val sink = new BucketingSink[String]("/base/path")
sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
+sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
input.addSink(sink)
@@ -130,8 +137,8 @@ This will create a sink that writes to bucket files that follow this schema:
{% endhighlight %}
Where `date-time` is the string that we get from the date/time format, `parallel-task` is the index
-of the parallel sink instance and `count` is the running number of part files that where created
-because of the batch size.
+of the parallel sink instance and `count` is the running number of part files that were created
+because of the batch size or batch roll over interval.
For in-depth information, please refer to the JavaDoc for
[BucketingSink](http://flink.apache.org/docs/latest/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html).
http://git-wip-us.apache.org/repos/asf/flink/blob/b6a1b6e9/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index fe712ae..23e4e0c 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -88,9 +88,11 @@ import java.util.UUID;
* and a rolling counter. For example the file {@code "part-1-17"} contains the data from
* {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. Per default
* the part prefix is {@code "part"} but this can be configured using {@link #setPartPrefix(String)}.
- * When a part file becomes bigger than the user-specified batch size the current part file is closed,
- * the part counter is increased and a new part file is created. The batch size defaults to {@code 384MB},
- * this can be configured using {@link #setBatchSize(long)}.
+ * When a part file becomes bigger than the user-specified batch size or when the part file becomes older
+ * than the user-specified roll over interval the current part file is closed, the part counter is increased
+ * and a new part file is created. The batch size defaults to {@code 384MB}, this can be configured
+ * using {@link #setBatchSize(long)}. The roll over interval defaults to {@code Long.MAX_VALUE} and
+ * this can be configured using {@link #setBatchRolloverInterval(long)}.
*
*
* <p>In some scenarios, the open buckets are required to change based on time. In these cases, the sink
@@ -137,6 +139,10 @@ import java.util.UUID;
* {@link #setWriter(Writer)}. For example, {@link SequenceFileWriter}
* can be used to write Hadoop {@code SequenceFiles}.
* </li>
+ * <li>
+ * {@link #closePartFilesByTime(long)} closes buckets that have not been written to for
+ * {@code inactiveBucketThreshold} or if they are older than {@code batchRolloverInterval}.
+ * </li>
* </ol>
*
*
@@ -240,6 +246,11 @@ public class BucketingSink<T>
private static final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
/**
+ * The default time interval at which part files are written to the filesystem.
+ */
+ private static final long DEFAULT_BATCH_ROLLOVER_INTERVAL = Long.MAX_VALUE;
+
+ /**
* The base {@code Path} that stores all bucket directories.
*/
private final String basePath;
@@ -258,6 +269,7 @@ public class BucketingSink<T>
private long batchSize = DEFAULT_BATCH_SIZE;
private long inactiveBucketCheckInterval = DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS;
private long inactiveBucketThreshold = DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS;
+ private long batchRolloverInterval = DEFAULT_BATCH_ROLLOVER_INTERVAL;
// These are the actually configured prefixes/suffixes
private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX;
@@ -442,7 +454,7 @@ public class BucketingSink<T>
state.addBucketState(bucketPath, bucketState);
}
- if (shouldRoll(bucketState)) {
+ if (shouldRoll(bucketState, currentProcessingTime)) {
openNewPartFile(bucketPath, bucketState);
}
@@ -456,9 +468,10 @@ public class BucketingSink<T>
* <ol>
* <li>no file is created yet for the task to write to, or</li>
* <li>the current file has reached the maximum bucket size.</li>
+ * <li>the current file is older than roll over interval</li>
* </ol>
*/
- private boolean shouldRoll(BucketState<T> bucketState) throws IOException {
+ private boolean shouldRoll(BucketState<T> bucketState, long currentProcessingTime) throws IOException {
boolean shouldRoll = false;
int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
if (!bucketState.isWriterOpen) {
@@ -473,6 +486,14 @@ public class BucketingSink<T>
subtaskIndex,
writePosition,
batchSize);
+ } else {
+ if (currentProcessingTime - bucketState.creationTime > batchRolloverInterval) {
+ shouldRoll = true;
+ LOG.debug(
+ "BucketingSink {} starting new bucket because file is older than roll over interval {}.",
+ subtaskIndex,
+ batchRolloverInterval);
+ }
}
}
return shouldRoll;
@@ -482,21 +503,23 @@ public class BucketingSink<T>
public void onProcessingTime(long timestamp) throws Exception {
long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
- checkForInactiveBuckets(currentProcessingTime);
+ closePartFilesByTime(currentProcessingTime);
processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);
}
/**
* Checks for inactive buckets, and closes them. Buckets are considered inactive if they have not been
- * written to for a period greater than {@code inactiveBucketThreshold} ms. This enables in-progress
- * files to be moved to the pending state and be finalised on the next checkpoint.
+ * written to for a period greater than {@code inactiveBucketThreshold} ms. Buckets are also closed if they are
+ * older than {@code batchRolloverInterval} ms. This enables in-progress files to be moved to the pending state
+ * and be finalised on the next checkpoint.
*/
- private void checkForInactiveBuckets(long currentProcessingTime) throws Exception {
+ private void closePartFilesByTime(long currentProcessingTime) throws Exception {
synchronized (state.bucketStates) {
for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
- if (entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold) {
+ if ((entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold)
+ || (entry.getValue().creationTime < currentProcessingTime - batchRolloverInterval)) {
LOG.debug("BucketingSink {} closing bucket due to inactivity of over {} ms.",
getRuntimeContext().getIndexOfThisSubtask(), inactiveBucketThreshold);
closeCurrentPartFile(entry.getValue());
@@ -537,6 +560,9 @@ public class BucketingSink<T>
partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
}
+ // Record the creation time of the bucket
+ bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
+
if (partSuffix != null) {
partPath = partPath.suffix(partSuffix);
}
@@ -915,6 +941,25 @@ public class BucketingSink<T>
}
/**
+ * Sets the roll over interval in milliseconds.
+ *
+ *
+ * <p>When a bucket part file is older than the roll over interval, a new bucket part file is
+ * started and the old one is closed. The name of the bucket file depends on the {@link Bucketer}.
+ * Additionally, the old part file is also closed if the bucket is not written to for a minimum of
+ * {@code inactiveBucketThreshold} ms.
+ *
+ * @param batchRolloverInterval The roll over interval in milliseconds
+ */
+ public BucketingSink<T> setBatchRolloverInterval(long batchRolloverInterval) {
+ if (batchRolloverInterval > 0) {
+ this.batchRolloverInterval = batchRolloverInterval;
+ }
+
+ return this;
+ }
+
+ /**
* Sets the default time between checks for inactive buckets.
*
* @param interval The timeout, in milliseconds.
@@ -927,6 +972,8 @@ public class BucketingSink<T>
/**
* Sets the default threshold for marking a bucket as inactive and closing its part files.
* Buckets which haven't been written to for at least this period of time become inactive.
+ * Additionally, part files for the bucket are also closed if the bucket is older than
+ * {@code batchRolloverInterval} ms.
*
* @param threshold The timeout, in milliseconds.
*/
@@ -1117,6 +1164,11 @@ public class BucketingSink<T>
long lastWrittenToTime;
/**
+ * The time this bucket was created.
+ */
+ long creationTime;
+
+ /**
* Pending files that accumulated since the last checkpoint.
*/
List<String> pendingFiles = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/b6a1b6e9/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index 56415fe..362c078 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -137,6 +137,33 @@ public class BucketingSinkTest extends TestLogger {
return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx);
}
+ private OneInputStreamOperatorTestHarness<String, Object> createRescalingTestSinkWithRollover(
+ File outDir, int totalParallelism, int taskIdx, long inactivityInterval, long rolloverInterval) throws Exception {
+
+ BucketingSink<String> sink = new BucketingSink<String>(outDir.getAbsolutePath())
+ .setBucketer(new Bucketer<String>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Path getBucketPath(Clock clock, Path basePath, String element) {
+ return new Path(basePath, element);
+ }
+ })
+ .setWriter(new StringWriter<String>())
+ .setInactiveBucketCheckInterval(inactivityInterval)
+ .setInactiveBucketThreshold(inactivityInterval)
+ .setPartPrefix(PART_PREFIX)
+ .setInProgressPrefix("")
+ .setPendingPrefix("")
+ .setValidLengthPrefix("")
+ .setInProgressSuffix(IN_PROGRESS_SUFFIX)
+ .setPendingSuffix(PENDING_SUFFIX)
+ .setValidLengthSuffix(VALID_LENGTH_SUFFIX)
+ .setBatchRolloverInterval(rolloverInterval);
+
+ return createTestSink(sink, totalParallelism, taskIdx);
+ }
+
@BeforeClass
public static void createHDFS() throws IOException {
Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows());
@@ -437,6 +464,40 @@ public class BucketingSinkTest extends TestLogger {
checkLocalFs(outDir, 0, 3, 5, 5);
}
+ @Test
+ public void testRolloverInterval() throws Exception {
+ final File outDir = tempFolder.newFolder();
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSinkWithRollover(outDir, 1, 0, 1000L, 100L);
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.setProcessingTime(0L);
+
+ testHarness.processElement(new StreamRecord<>("test1", 1L));
+ checkLocalFs(outDir, 1, 0, 0, 0);
+
+ // invoke rollover based on rollover interval
+ testHarness.setProcessingTime(101L);
+ testHarness.processElement(new StreamRecord<>("test1", 2L));
+ checkLocalFs(outDir, 1, 1, 0, 0);
+
+ testHarness.snapshot(0, 0);
+ testHarness.notifyOfCompletedCheckpoint(0);
+ checkLocalFs(outDir, 1, 0, 1, 0);
+
+ // move the in-progress file to pending
+ testHarness.setProcessingTime(3000L);
+ testHarness.snapshot(1, 1);
+ checkLocalFs(outDir, 0, 1, 1, 0);
+
+ // move the pending file to "committed"
+ testHarness.notifyOfCompletedCheckpoint(1);
+ testHarness.close();
+
+ checkLocalFs(outDir, 0, 0, 2, 0);
+ }
+
/**
* This tests {@link StringWriter} with
* non-bucketing output.