You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2020/03/19 16:39:59 UTC

[flink] branch release-1.10 updated (aa4d627 -> 06859cc)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from aa4d627  [FLINK-15852][cli] Prioritize ExecutorCLI over YarnSessionCLI for active CLI
     new c861ee4  [hotfix] Make TestBulkWriterFactory publicly available for other test modules
     new 06859cc  [FLINK-16684] Fix StreamingFileSink builder compilation for Scala

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../sink/filesystem/StreamingFileSink.java         | 33 ++++++++++++--
 .../functions/sink/filesystem/BulkWriterTest.java  |  2 +-
 .../sink/filesystem/StreamingFileSinkTest.scala    | 53 ++++++++++++++++++++++
 3 files changed, 83 insertions(+), 5 deletions(-)
 create mode 100644 flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/functions/sink/filesystem/StreamingFileSinkTest.scala


[flink] 01/02: [hotfix] Make TestBulkWriterFactory publicly available for other test modules

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c861ee40ad71d49b1b572da33dfbc520d34f858a
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Mar 19 14:59:54 2020 +0100

    [hotfix] Make TestBulkWriterFactory publicly available for other test modules
---
 .../flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
index 0449e1d..e32d462 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
@@ -236,7 +236,7 @@ public class BulkWriterTest extends TestLogger {
 	/**
 	 * A {@link BulkWriter.Factory} used for the tests.
 	 */
-	private static class TestBulkWriterFactory implements BulkWriter.Factory<Tuple2<String, Integer>> {
+	public static final class TestBulkWriterFactory implements BulkWriter.Factory<Tuple2<String, Integer>> {
 
 		private static final long serialVersionUID = 1L;
 


[flink] 02/02: [FLINK-16684] Fix StreamingFileSink builder compilation for Scala

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 06859cc2579ee4b0b0a12ee5529d4d12ea6e20aa
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Mar 19 15:05:40 2020 +0100

    [FLINK-16684] Fix StreamingFileSink builder compilation for Scala
    
    This commit introduces a new type name for the row and bulk format
    StreamingFileSink builders in order to solve the compilation problem
    of Scala when using generic types with the self-type idiom.
    
    This closes #11454.
---
 .../sink/filesystem/StreamingFileSink.java         | 33 ++++++++++++--
 .../sink/filesystem/StreamingFileSinkTest.scala    | 53 ++++++++++++++++++++++
 2 files changed, 82 insertions(+), 4 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index d8d9d6d..b4a3684 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -165,9 +165,9 @@ public class StreamingFileSink<IN>
 	 * @return The builder where the remaining of the configuration parameters for the sink can be configured.
 	 * In order to instantiate the sink, call {@link RowFormatBuilder#build()} after specifying the desired parameters.
 	 */
-	public static <IN> StreamingFileSink.RowFormatBuilder<IN, String, ? extends RowFormatBuilder<IN, String, ?>> forRowFormat(
+	public static <IN> StreamingFileSink.DefaultRowFormatBuilder<IN> forRowFormat(
 			final Path basePath, final Encoder<IN> encoder) {
-		return new StreamingFileSink.RowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
+		return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
 	}
 
 	/**
@@ -178,9 +178,9 @@ public class StreamingFileSink<IN>
 	 * @return The builder where the remaining of the configuration parameters for the sink can be configured.
 	 * In order to instantiate the sink, call {@link RowFormatBuilder#build()} after specifying the desired parameters.
 	 */
-	public static <IN> StreamingFileSink.BulkFormatBuilder<IN, String, ? extends BulkFormatBuilder<IN, String, ?>> forBulkFormat(
+	public static <IN> StreamingFileSink.DefaultBulkFormatBuilder<IN> forBulkFormat(
 			final Path basePath, final BulkWriter.Factory<IN> writerFactory) {
-		return new StreamingFileSink.BulkFormatBuilder<>(basePath, writerFactory, new DateTimeBucketAssigner<>());
+		return new StreamingFileSink.DefaultBulkFormatBuilder<>(basePath, writerFactory, new DateTimeBucketAssigner<>());
 	}
 
 	/**
@@ -297,6 +297,18 @@ public class StreamingFileSink<IN>
 	}
 
 	/**
+	 * Builder for the vanilla {@link StreamingFileSink} using a row format.
+	 * @param <IN> record type
+	 */
+	public static final class DefaultRowFormatBuilder<IN> extends RowFormatBuilder<IN, String, DefaultRowFormatBuilder<IN>> {
+		private static final long serialVersionUID = -8503344257202146718L;
+
+		private DefaultRowFormatBuilder(Path basePath, Encoder<IN> encoder, BucketAssigner<IN, String> bucketAssigner) {
+			super(basePath, encoder, bucketAssigner);
+		}
+	}
+
+	/**
 	 * A builder for configuring the sink for bulk-encoding formats, e.g. Parquet/ORC.
 	 */
 	@PublicEvolving
@@ -394,6 +406,19 @@ public class StreamingFileSink<IN>
 		}
 	}
 
+	/**
+	 * Builder for the vanilla {@link StreamingFileSink} using a bulk format.
+	 * @param <IN> record type
+	 */
+	public static final class DefaultBulkFormatBuilder<IN> extends BulkFormatBuilder<IN, String, DefaultBulkFormatBuilder<IN>> {
+
+		private static final long serialVersionUID = 7493169281036370228L;
+
+		private DefaultBulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> writerFactory, BucketAssigner<IN, String> assigner) {
+			super(basePath, writerFactory, assigner);
+		}
+	}
+
 	// --------------------------- Sink Methods -----------------------------
 
 	@Override
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/functions/sink/filesystem/StreamingFileSinkTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/functions/sink/filesystem/StreamingFileSinkTest.scala
new file mode 100644
index 0000000..758c46c
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/functions/sink/filesystem/StreamingFileSinkTest.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.functions.sink.filesystem
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
+import org.apache.flink.core.fs.Path
+import org.apache.flink.streaming.api.functions.sink.filesystem.BulkWriterTest.TestBulkWriterFactory
+import org.apache.flink.streaming.api.functions.sink.filesystem.{OutputFileConfig, StreamingFileSink}
+import org.junit.Test
+
+/**
+ * Tests for the [[org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink]]
+ */
+class StreamingFileSinkTest {
+
+  /**
+   * Tests that the StreamingFileSink builder works with the Scala APIs.
+   */
+  @Test
+  def testStreamingFileSinkRowFormatBuilderCompiles(): Unit = {
+    StreamingFileSink.forRowFormat(new Path("foobar"), new SimpleStringEncoder[String]())
+      .withBucketCheckInterval(10L)
+      .withOutputFileConfig(OutputFileConfig.builder().build())
+      .build()
+  }
+
+  /**
+   * Tests that the StreamingFileSink builder works with the Scala APIs.
+   */
+  @Test
+  def testStreamingFileSinkBulkFormatBuilderCompiles(): Unit = {
+    StreamingFileSink.forBulkFormat(new Path("foobar"), new TestBulkWriterFactory())
+      .withBucketCheckInterval(10L)
+      .withOutputFileConfig(OutputFileConfig.builder().build())
+      .build()
+  }
+}