You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/05/11 22:39:24 UTC

[1/2] flink git commit: [FLINK-9332] [table] Fix handling of null return values in CallGenerator.

Repository: flink
Updated Branches:
  refs/heads/master 1b1122fce -> b6a1b6e9d


[FLINK-9332] [table] Fix handling of null return values in CallGenerator.

This closes #5988.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c1ffb4f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c1ffb4f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c1ffb4f2

Branch: refs/heads/master
Commit: c1ffb4f22da18dd5626805f5d1600429fa62e07c
Parents: 1b1122f
Author: Xpray <le...@gmail.com>
Authored: Fri May 11 22:47:32 2018 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri May 11 21:42:23 2018 +0200

----------------------------------------------------------------------
 .../table/codegen/calls/CallGenerator.scala     | 21 +++++++++++++++-----
 .../table/expressions/ScalarFunctionsTest.scala |  2 ++
 2 files changed, 18 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c1ffb4f2/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala
index 48f5a95..4fbabd6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.codegen.calls
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+import org.apache.flink.table.typeutils.TypeCheckUtils
 
 trait CallGenerator {
 
@@ -64,17 +65,26 @@ object CallGenerator {
 
     val (auxiliaryStmt, result) = call(operands.map(_.resultTerm))
 
+    val nullTermCode = if (
+      nullCheck &&
+      isReference(returnType) &&
+      !TypeCheckUtils.isTemporal(returnType)) {
+      s"""
+         |$nullTerm = ($resultTerm == null);
+       """.stripMargin
+    } else {
+      ""
+    }
+
     val resultCode = if (nullCheck && operands.nonEmpty) {
       s"""
         |${operands.map(_.code).mkString("\n")}
         |boolean $nullTerm = ${operands.map(_.nullTerm).mkString(" || ")};
-        |$resultTypeTerm $resultTerm;
-        |if ($nullTerm) {
-        |  $resultTerm = $defaultValue;
-        |}
-        |else {
+        |$resultTypeTerm $resultTerm = $defaultValue;
+        |if (!$nullTerm) {
         |  ${auxiliaryStmt.getOrElse("")}
         |  $resultTerm = $result;
+        |  $nullTermCode
         |}
         |""".stripMargin
     } else if (nullCheck && operands.isEmpty) {
@@ -83,6 +93,7 @@ object CallGenerator {
         |boolean $nullTerm = false;
         |${auxiliaryStmt.getOrElse("")}
         |$resultTypeTerm $resultTerm = $result;
+        |$nullTermCode
         |""".stripMargin
     } else{
       s"""

http://git-wip-us.apache.org/repos/asf/flink/blob/c1ffb4f2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index f3c48ac..a9d2733 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -364,6 +364,8 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
     testSqlApi("LPAD('⎨⎨',1,'??')", "⎨")
     testSqlApi("LPAD('äääääääää',2,'??')", "ää")
     testSqlApi("LPAD('äääääääää',10,'??')", "?äääääääää")
+    testSqlApi("LPAD('Hello', -1, 'x') IS NULL", "true")
+    testSqlApi("LPAD('Hello', -1, 'x') IS NOT NULL", "false")
 
     testAllApis(
       "äää".lpad(13, "12345"),


[2/2] flink git commit: [FLINK-9138] [bucketSink] Support time-based rollover of part files in BucketingSink.

Posted by fh...@apache.org.
[FLINK-9138] [bucketSink] Support time-based rollover of part files in BucketingSink.

This closes #5860.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6a1b6e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6a1b6e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6a1b6e9

Branch: refs/heads/master
Commit: b6a1b6e9dd3a278c9c644682e377deaf105e7ac4
Parents: c1ffb4f
Author: Lakshmi Gururaja Rao <gl...@gmail.com>
Authored: Mon Apr 16 16:31:49 2018 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri May 11 21:53:16 2018 +0200

----------------------------------------------------------------------
 docs/dev/connectors/filesystem_sink.md          | 15 ++--
 .../connectors/fs/bucketing/BucketingSink.java  | 72 +++++++++++++++++---
 .../fs/bucketing/BucketingSinkTest.java         | 61 +++++++++++++++++
 3 files changed, 134 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b6a1b6e9/docs/dev/connectors/filesystem_sink.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/filesystem_sink.md b/docs/dev/connectors/filesystem_sink.md
index 4a00322..af1349d 100644
--- a/docs/dev/connectors/filesystem_sink.md
+++ b/docs/dev/connectors/filesystem_sink.md
@@ -89,8 +89,13 @@ and write them to part files, separated by newline. To specify a custom writer u
 on a `BucketingSink`. If you want to write Hadoop SequenceFiles you can use the provided
 `SequenceFileWriter` which can also be configured to use compression.
 
-The last configuration option is the batch size. This specifies when a part file should be closed
-and a new one started. (The default part file size is 384 MB).
+There are two configuration options that specify when a part file should be closed
+and a new one started:
+ 
+* By setting a batch size (The default part file size is 384 MB)
+* By setting a batch roll over time interval (The default roll over interval is `Long.MAX_VALUE`)
+
+A new part file is started when either of these two conditions is satisfied.
 
 Example:
 
@@ -103,6 +108,7 @@ BucketingSink<String> sink = new BucketingSink<String>("/base/path");
 sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
 sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
 sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
+sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
 
 input.addSink(sink);
 
@@ -116,6 +122,7 @@ val sink = new BucketingSink[String]("/base/path")
 sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
 sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
 sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
+sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
 
 input.addSink(sink)
 
@@ -130,8 +137,8 @@ This will create a sink that writes to bucket files that follow this schema:
 {% endhighlight %}
 
 Where `date-time` is the string that we get from the date/time format, `parallel-task` is the index
-of the parallel sink instance and `count` is the running number of part files that where created
-because of the batch size.
+of the parallel sink instance and `count` is the running number of part files that were created
+because of the batch size or batch roll over interval.
 
 For in-depth information, please refer to the JavaDoc for
 [BucketingSink](http://flink.apache.org/docs/latest/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html).

http://git-wip-us.apache.org/repos/asf/flink/blob/b6a1b6e9/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index fe712ae..23e4e0c 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -88,9 +88,11 @@ import java.util.UUID;
  * and a rolling counter. For example the file {@code "part-1-17"} contains the data from
  * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. Per default
  * the part prefix is {@code "part"} but this can be configured using {@link #setPartPrefix(String)}.
- * When a part file becomes bigger than the user-specified batch size the current part file is closed,
- * the part counter is increased and a new part file is created. The batch size defaults to {@code 384MB},
- * this can be configured using {@link #setBatchSize(long)}.
+ * When a part file becomes bigger than the user-specified batch size or when the part file becomes older
+ * than the user-specified roll over interval the current part file is closed, the part counter is increased
+ * and a new part file is created. The batch size defaults to {@code 384MB}, this can be configured
+ * using {@link #setBatchSize(long)}. The roll over interval defaults to {@code Long.MAX_VALUE} and
+ * this can be configured using {@link #setBatchRolloverInterval(long)}.
  *
  *
  * <p>In some scenarios, the open buckets are required to change based on time. In these cases, the sink
@@ -137,6 +139,10 @@ import java.util.UUID;
  *         {@link #setWriter(Writer)}. For example, {@link SequenceFileWriter}
  *         can be used to write Hadoop {@code SequenceFiles}.
  *     </li>
+ *     <li>
+ *       	{@link #closePartFilesByTime(long)} closes buckets that have not been written to for
+ *       	{@code inactiveBucketThreshold} or if they are older than {@code batchRolloverInterval}.
+ *     </li>
  * </ol>
  *
  *
@@ -240,6 +246,11 @@ public class BucketingSink<T>
 	private static final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000;
 
 	/**
+	 * The default time interval at which part files are written to the filesystem.
+	 */
+	private static final long DEFAULT_BATCH_ROLLOVER_INTERVAL = Long.MAX_VALUE;
+
+	/**
 	 * The base {@code Path} that stores all bucket directories.
 	 */
 	private final String basePath;
@@ -258,6 +269,7 @@ public class BucketingSink<T>
 	private long batchSize = DEFAULT_BATCH_SIZE;
 	private long inactiveBucketCheckInterval = DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS;
 	private long inactiveBucketThreshold = DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS;
+	private long batchRolloverInterval = DEFAULT_BATCH_ROLLOVER_INTERVAL;
 
 	// These are the actually configured prefixes/suffixes
 	private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX;
@@ -442,7 +454,7 @@ public class BucketingSink<T>
 			state.addBucketState(bucketPath, bucketState);
 		}
 
-		if (shouldRoll(bucketState)) {
+		if (shouldRoll(bucketState, currentProcessingTime)) {
 			openNewPartFile(bucketPath, bucketState);
 		}
 
@@ -456,9 +468,10 @@ public class BucketingSink<T>
 	 * <ol>
 	 *     <li>no file is created yet for the task to write to, or</li>
 	 *     <li>the current file has reached the maximum bucket size.</li>
+	 *     <li>the current file is older than roll over interval</li>
 	 * </ol>
 	 */
-	private boolean shouldRoll(BucketState<T> bucketState) throws IOException {
+	private boolean shouldRoll(BucketState<T> bucketState, long currentProcessingTime) throws IOException {
 		boolean shouldRoll = false;
 		int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
 		if (!bucketState.isWriterOpen) {
@@ -473,6 +486,14 @@ public class BucketingSink<T>
 					subtaskIndex,
 					writePosition,
 					batchSize);
+			} else {
+				if (currentProcessingTime - bucketState.creationTime > batchRolloverInterval) {
+					shouldRoll = true;
+					LOG.debug(
+						"BucketingSink {} starting new bucket because file is older than roll over interval {}.",
+						subtaskIndex,
+						batchRolloverInterval);
+				}
 			}
 		}
 		return shouldRoll;
@@ -482,21 +503,23 @@ public class BucketingSink<T>
 	public void onProcessingTime(long timestamp) throws Exception {
 		long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
 
-		checkForInactiveBuckets(currentProcessingTime);
+		closePartFilesByTime(currentProcessingTime);
 
 		processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);
 	}
 
 	/**
 	 * Checks for inactive buckets, and closes them. Buckets are considered inactive if they have not been
-	 * written to for a period greater than {@code inactiveBucketThreshold} ms. This enables in-progress
-	 * files to be moved to the pending state and be finalised on the next checkpoint.
+	 * written to for a period greater than {@code inactiveBucketThreshold} ms. Buckets are also closed if they are
+	 * older than {@code batchRolloverInterval} ms. This enables in-progress files to be moved to the pending state
+	 * and be finalised on the next checkpoint.
 	 */
-	private void checkForInactiveBuckets(long currentProcessingTime) throws Exception {
+	private void closePartFilesByTime(long currentProcessingTime) throws Exception {
 
 		synchronized (state.bucketStates) {
 			for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
-				if (entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold) {
+				if ((entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold)
+						|| (entry.getValue().creationTime < currentProcessingTime - batchRolloverInterval)) {
 					LOG.debug("BucketingSink {} closing bucket due to inactivity of over {} ms.",
 						getRuntimeContext().getIndexOfThisSubtask(), inactiveBucketThreshold);
 					closeCurrentPartFile(entry.getValue());
@@ -537,6 +560,9 @@ public class BucketingSink<T>
 			partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter);
 		}
 
+		// Record the creation time of the bucket
+		bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
+
 		if (partSuffix != null) {
 			partPath = partPath.suffix(partSuffix);
 		}
@@ -915,6 +941,25 @@ public class BucketingSink<T>
 	}
 
 	/**
+	 * Sets the roll over interval in milliseconds.
+	 *
+	 *
+	 * <p>When a bucket part file is older than the roll over interval, a new bucket part file is
+	 * started and the old one is closed. The name of the bucket file depends on the {@link Bucketer}.
+	 * Additionally, the old part file is also closed if the bucket is not written to for a minimum of
+	 * {@code inactiveBucketThreshold} ms.
+	 *
+	 * @param batchRolloverInterval The roll over interval in milliseconds
+	 */
+	public BucketingSink<T> setBatchRolloverInterval(long batchRolloverInterval) {
+		if (batchRolloverInterval > 0) {
+			this.batchRolloverInterval = batchRolloverInterval;
+		}
+
+		return this;
+	}
+
+	/**
 	 * Sets the default time between checks for inactive buckets.
 	 *
 	 * @param interval The timeout, in milliseconds.
@@ -927,6 +972,8 @@ public class BucketingSink<T>
 	/**
 	 * Sets the default threshold for marking a bucket as inactive and closing its part files.
 	 * Buckets which haven't been written to for at least this period of time become inactive.
+	 * Additionally, part files for the bucket are also closed if the bucket is older than
+	 * {@code batchRolloverInterval} ms.
 	 *
 	 * @param threshold The timeout, in milliseconds.
 	 */
@@ -1117,6 +1164,11 @@ public class BucketingSink<T>
 		long lastWrittenToTime;
 
 		/**
+		 * The time this bucket was created.
+		 */
+		long creationTime;
+
+		/**
 		 * Pending files that accumulated since the last checkpoint.
 		 */
 		List<String> pendingFiles = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/b6a1b6e9/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index 56415fe..362c078 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -137,6 +137,33 @@ public class BucketingSinkTest extends TestLogger {
 		return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx);
 	}
 
+	private OneInputStreamOperatorTestHarness<String, Object> createRescalingTestSinkWithRollover(
+		File outDir, int totalParallelism, int taskIdx, long inactivityInterval, long rolloverInterval) throws Exception {
+
+		BucketingSink<String> sink = new BucketingSink<String>(outDir.getAbsolutePath())
+			.setBucketer(new Bucketer<String>() {
+				private static final long serialVersionUID = 1L;
+
+				@Override
+				public Path getBucketPath(Clock clock, Path basePath, String element) {
+					return new Path(basePath, element);
+				}
+			})
+			.setWriter(new StringWriter<String>())
+			.setInactiveBucketCheckInterval(inactivityInterval)
+			.setInactiveBucketThreshold(inactivityInterval)
+			.setPartPrefix(PART_PREFIX)
+			.setInProgressPrefix("")
+			.setPendingPrefix("")
+			.setValidLengthPrefix("")
+			.setInProgressSuffix(IN_PROGRESS_SUFFIX)
+			.setPendingSuffix(PENDING_SUFFIX)
+			.setValidLengthSuffix(VALID_LENGTH_SUFFIX)
+			.setBatchRolloverInterval(rolloverInterval);
+
+		return createTestSink(sink, totalParallelism, taskIdx);
+	}
+
 	@BeforeClass
 	public static void createHDFS() throws IOException {
 		Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows());
@@ -437,6 +464,40 @@ public class BucketingSinkTest extends TestLogger {
 		checkLocalFs(outDir, 0, 3, 5, 5);
 	}
 
+	@Test
+	public void testRolloverInterval() throws Exception {
+		final File outDir = tempFolder.newFolder();
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSinkWithRollover(outDir, 1, 0, 1000L, 100L);
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(0L);
+
+		testHarness.processElement(new StreamRecord<>("test1", 1L));
+		checkLocalFs(outDir, 1, 0, 0, 0);
+
+		// invoke rollover based on rollover interval
+		testHarness.setProcessingTime(101L);
+		testHarness.processElement(new StreamRecord<>("test1", 2L));
+		checkLocalFs(outDir, 1, 1, 0, 0);
+
+		testHarness.snapshot(0, 0);
+		testHarness.notifyOfCompletedCheckpoint(0);
+		checkLocalFs(outDir, 1, 0, 1, 0);
+
+		// move the in-progress file to pending
+		testHarness.setProcessingTime(3000L);
+		testHarness.snapshot(1, 1);
+		checkLocalFs(outDir, 0, 1, 1, 0);
+
+		// move the pending file to "committed"
+		testHarness.notifyOfCompletedCheckpoint(1);
+		testHarness.close();
+
+		checkLocalFs(outDir, 0, 0, 2, 0);
+	}
+
 	/**
 	 * This tests {@link StringWriter} with
 	 * non-bucketing output.