You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/12/18 08:47:16 UTC
[flink] 02/02: [FLINK-10457][fs-connector] Add SequenceFile support
to StreamingFileSink.
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit cf88590e992b974284e01a3a35e8ad01fd47174e
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Dec 11 10:50:33 2018 +0100
[FLINK-10457][fs-connector] Add SequenceFile support to StreamingFileSink.
This closes #6774.
---
.../avro/ParquetStreamingFileSinkITCase.java | 14 ++--
.../pom.xml | 11 +--
.../formats/sequencefile/SequenceFileWriter.java | 4 +-
.../sequencefile/SequenceFileWriterFactory.java | 68 ++++-----------
.../SerializableHadoopConfiguration.java | 58 +++++++++++++
.../SequenceStreamingFileSinkITCase.java} | 46 +++++-----
.../SerializableHadoopConfigurationTest.java | 98 ++++++++++++++++++++++
.../src/test/resources/log4j-test.properties | 0
flink-formats/pom.xml | 2 +-
.../{test => streaming}/util/FiniteTestSource.java | 25 ++++--
10 files changed, 230 insertions(+), 96 deletions(-)
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
index 540f762..484ef86 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
@@ -19,14 +19,15 @@
package org.apache.flink.formats.parquet.avro;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.formats.parquet.generated.Address;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.test.util.FiniteTestSource;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
@@ -156,11 +157,14 @@ public class ParquetStreamingFileSinkITCase extends AbstractTestBase {
File[] partFiles = buckets[0].listFiles();
assertNotNull(partFiles);
- assertEquals(1, partFiles.length);
- assertTrue(partFiles[0].length() > 0);
+ assertEquals(2, partFiles.length);
- List<Address> results = readParquetFile(partFiles[0], dataModel);
- assertEquals(expected, results);
+ for (File partFile : partFiles) {
+ assertTrue(partFile.length() > 0);
+
+ final List<Tuple2<Long, String>> fileContent = readParquetFile(partFile, dataModel);
+ assertEquals(expected, fileContent);
+ }
}
private static <T> List<T> readParquetFile(File file, GenericData dataModel) throws IOException {
diff --git a/flink-formats/flink-sequencefile/pom.xml b/flink-formats/flink-sequence-file/pom.xml
similarity index 91%
rename from flink-formats/flink-sequencefile/pom.xml
rename to flink-formats/flink-sequence-file/pom.xml
index 632cf2f..1a2f84e 100644
--- a/flink-formats/flink-sequencefile/pom.xml
+++ b/flink-formats/flink-sequence-file/pom.xml
@@ -29,8 +29,8 @@ under the License.
<relativePath>..</relativePath>
</parent>
- <artifactId>flink-sequencefile</artifactId>
- <name>flink-sequencefile</name>
+ <artifactId>flink-sequence-file</artifactId>
+ <name>flink-sequence-file</name>
<packaging>jar</packaging>
@@ -50,13 +50,6 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-hadoop-fs</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
<!-- test dependencies -->
diff --git a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
similarity index 97%
rename from flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
rename to flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
index fa8fab2..169aa4b 100644
--- a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
+++ b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
@@ -37,9 +37,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
@PublicEvolving
public class SequenceFileWriter<K extends Writable, V extends Writable> implements BulkWriter<Tuple2<K, V>> {
+
private final SequenceFile.Writer writer;
- public SequenceFileWriter(SequenceFile.Writer writer) {
+ SequenceFileWriter(SequenceFile.Writer writer) {
this.writer = checkNotNull(writer);
}
@@ -58,4 +59,3 @@ public class SequenceFileWriter<K extends Writable, V extends Writable> implemen
writer.close();
}
}
-
diff --git a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
similarity index 71%
rename from flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
rename to flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
index 90a87e7..d7b96f6 100644
--- a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
+++ b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
@@ -22,9 +22,6 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.SequenceFile;
@@ -33,9 +30,6 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -47,9 +41,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
@PublicEvolving
public class SequenceFileWriterFactory<K extends Writable, V extends Writable> implements BulkWriter.Factory<Tuple2<K, V>> {
+
private static final long serialVersionUID = 1L;
- private final SerializableHadoopConfiguration serdeHadoopConfig;
+ /** A constant specifying that no compression is requested. */
+ public static final String NO_COMPRESSION = "NO_COMPRESSION";
+
+ private final SerializableHadoopConfiguration serializableHadoopConfig;
private final Class<K> keyClass;
private final Class<V> valueClass;
private final String compressionCodecName;
@@ -64,7 +62,7 @@ public class SequenceFileWriterFactory<K extends Writable, V extends Writable> i
* @param valueClass The class of value to write.
*/
public SequenceFileWriterFactory(Configuration hadoopConf, Class<K> keyClass, Class<V> valueClass) {
- this(hadoopConf, keyClass, valueClass, "None", SequenceFile.CompressionType.BLOCK);
+ this(hadoopConf, keyClass, valueClass, NO_COMPRESSION, SequenceFile.CompressionType.BLOCK);
}
/**
@@ -91,7 +89,7 @@ public class SequenceFileWriterFactory<K extends Writable, V extends Writable> i
* @param compressionType The type of compression level.
*/
public SequenceFileWriterFactory(Configuration hadoopConf, Class<K> keyClass, Class<V> valueClass, String compressionCodecName, SequenceFile.CompressionType compressionType) {
- this.serdeHadoopConfig = new SerializableHadoopConfiguration(checkNotNull(hadoopConf));
+ this.serializableHadoopConfig = new SerializableHadoopConfiguration(checkNotNull(hadoopConf));
this.keyClass = checkNotNull(keyClass);
this.valueClass = checkNotNull(valueClass);
this.compressionCodecName = checkNotNull(compressionCodecName);
@@ -101,9 +99,9 @@ public class SequenceFileWriterFactory<K extends Writable, V extends Writable> i
@Override
public SequenceFileWriter<K, V> create(FSDataOutputStream out) throws IOException {
org.apache.hadoop.fs.FSDataOutputStream stream = new org.apache.hadoop.fs.FSDataOutputStream(out, null);
- CompressionCodec compressionCodec = getCompressionCodec(serdeHadoopConfig.get(), compressionCodecName);
+ CompressionCodec compressionCodec = getCompressionCodec(serializableHadoopConfig.get(), compressionCodecName);
SequenceFile.Writer writer = SequenceFile.createWriter(
- serdeHadoopConfig.get(),
+ serializableHadoopConfig.get(),
SequenceFile.Writer.stream(stream),
SequenceFile.Writer.keyClass(keyClass),
SequenceFile.Writer.valueClass(valueClass),
@@ -112,57 +110,19 @@ public class SequenceFileWriterFactory<K extends Writable, V extends Writable> i
}
private CompressionCodec getCompressionCodec(Configuration conf, String compressionCodecName) {
- if (compressionCodecName.equals("None")) {
+ checkNotNull(conf);
+ checkNotNull(compressionCodecName);
+
+ if (compressionCodecName.equals(NO_COMPRESSION)) {
return null;
}
- CompressionCodecFactory codecFactory = new CompressionCodecFactory(checkNotNull(conf));
+ CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
CompressionCodec codec = codecFactory.getCodecByName(compressionCodecName);
if (codec == null) {
throw new RuntimeException("Codec " + compressionCodecName + " not found.");
}
return codec;
}
-
- /**
- * Get Hadoop configuration based by the path.
- * If the path is not Hadoop URI, it will be return default configuration.
- *
- * @param path The path to get configuration.
- * @return Hadoop configuration.
- * @throws IOException
- */
- public static Configuration getHadoopConfigFromPath(Path path) throws IOException {
- FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
- if (fs != null && fs instanceof HadoopFileSystem) {
- return ((HadoopFileSystem) fs).getHadoopFileSystem().getConf();
- } else {
- return new Configuration();
- }
- }
-
- /**
- * The wrapper class for serialization of {@link Configuration}.
- */
- private class SerializableHadoopConfiguration implements Serializable {
- private transient Configuration hadoopConfig;
-
- private SerializableHadoopConfiguration(Configuration hadoopConfig) {
- this.hadoopConfig = hadoopConfig;
- }
-
- private Configuration get() {
- return this.hadoopConfig;
- }
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- this.hadoopConfig.write(out);
- }
-
- private void readObject(ObjectInputStream in) throws IOException {
- this.hadoopConfig = new Configuration();
- this.hadoopConfig.readFields(in);
- }
- }
}
diff --git a/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java
new file mode 100644
index 0000000..8e00e07
--- /dev/null
+++ b/flink-formats/flink-sequence-file/src/main/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfiguration.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * Wrapper class for serialization of {@link Configuration}.
+ */
+class SerializableHadoopConfiguration implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient Configuration hadoopConfig;
+
+ SerializableHadoopConfiguration(Configuration hadoopConfig) {
+ this.hadoopConfig = hadoopConfig;
+ }
+
+ Configuration get() {
+ return this.hadoopConfig;
+ }
+
+ // --------------------
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ this.hadoopConfig.write(out);
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException {
+ final Configuration config = new Configuration();
+ config.readFields(in);
+
+ if (this.hadoopConfig == null) {
+ this.hadoopConfig = config;
+ }
+ }
+}
diff --git a/flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java b/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java
similarity index 78%
rename from flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java
rename to flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java
index b3d4b22..2b3f325 100644
--- a/flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java
+++ b/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SequenceStreamingFileSinkITCase.java
@@ -26,8 +26,8 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.test.util.FiniteTestSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
@@ -49,26 +49,31 @@ import static org.junit.Assert.assertTrue;
* Integration test case for writing bulk encoded files with the
* {@link StreamingFileSink} with SequenceFile.
*/
-public class SequenceFileSinkITCase extends AbstractTestBase {
- @Test
- public void testWriteSequenceFile() throws Exception {
- final File folder = TEMPORARY_FOLDER.newFolder();
+public class SequenceStreamingFileSinkITCase extends AbstractTestBase {
+
+ private final Configuration configuration = new Configuration();
- final List<Tuple2<Long, String>> data = Arrays.asList(
+ private final List<Tuple2<Long, String>> testData = Arrays.asList(
new Tuple2<>(1L, "a"),
new Tuple2<>(2L, "b"),
new Tuple2<>(3L, "c")
- );
+ );
+
+ @Test
+ public void testWriteSequenceFile() throws Exception {
+ final File folder = TEMPORARY_FOLDER.newFolder();
+ final Path testPath = Path.fromLocalFile(folder);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(100);
DataStream<Tuple2<Long, String>> stream = env.addSource(
- new FiniteTestSource<>(data), TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {
- }));
+ new FiniteTestSource<>(testData),
+ TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {
- Path testPath = Path.fromLocalFile(folder);
+ })
+ );
stream.map(new MapFunction<Tuple2<Long, String>, Tuple2<LongWritable, Text>>() {
@Override
@@ -78,17 +83,17 @@ public class SequenceFileSinkITCase extends AbstractTestBase {
}).addSink(
StreamingFileSink.forBulkFormat(
testPath,
- new SequenceFileWriterFactory<>(SequenceFileWriterFactory.getHadoopConfigFromPath(testPath), LongWritable.class, Text.class, "BZip2")
+ new SequenceFileWriterFactory<>(configuration, LongWritable.class, Text.class, "BZip2")
).build());
env.execute();
- validateResults(folder, data);
+ validateResults(folder, testData);
}
private List<Tuple2<Long, String>> readSequenceFile(File file) throws IOException {
SequenceFile.Reader reader = new SequenceFile.Reader(
- new Configuration(), SequenceFile.Reader.file(new org.apache.hadoop.fs.Path(file.toURI())));
+ configuration, SequenceFile.Reader.file(new org.apache.hadoop.fs.Path(file.toURI())));
LongWritable key = new LongWritable();
Text val = new Text();
ArrayList<Tuple2<Long, String>> results = new ArrayList<>();
@@ -104,14 +109,15 @@ public class SequenceFileSinkITCase extends AbstractTestBase {
assertNotNull(buckets);
assertEquals(1, buckets.length);
- File[] partFiles = buckets[0].listFiles();
+ final File[] partFiles = buckets[0].listFiles();
assertNotNull(partFiles);
- assertEquals(1, partFiles.length);
- assertTrue(partFiles[0].length() > 0);
+ assertEquals(2, partFiles.length);
+
+ for (File partFile : partFiles) {
+ assertTrue(partFile.length() > 0);
- List<Tuple2<Long, String>> results = readSequenceFile(partFiles[0]);
- assertEquals(expected, results);
+ final List<Tuple2<Long, String>> fileContent = readSequenceFile(partFile);
+ assertEquals(expected, fileContent);
+ }
}
}
-
-
diff --git a/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfigurationTest.java b/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfigurationTest.java
new file mode 100644
index 0000000..ea0fb95
--- /dev/null
+++ b/flink-formats/flink-sequence-file/src/test/java/org/apache/flink/formats/sequencefile/SerializableHadoopConfigurationTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.hadoop.conf.Configuration;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * Tests for the {@link SerializableHadoopConfiguration}.
+ */
+public class SerializableHadoopConfigurationTest {
+
+ private static final String TEST_KEY = "test-key";
+
+ private static final String TEST_VALUE = "test-value";
+
+ private Configuration configuration;
+
+ @Before
+ public void createConfigWithCustomProperty() {
+ this.configuration = new Configuration();
+ configuration.set(TEST_KEY, TEST_VALUE);
+ }
+
+ @Test
+ public void customPropertiesSurviveSerializationDeserialization() throws IOException, ClassNotFoundException {
+ final SerializableHadoopConfiguration serializableConfigUnderTest = new SerializableHadoopConfiguration(configuration);
+ final byte[] serializedConfigUnderTest = serializeAndGetBytes(serializableConfigUnderTest);
+ final SerializableHadoopConfiguration deserializableConfigUnderTest = deserializeAndGetConfiguration(serializedConfigUnderTest);
+
+ Assert.assertThat(deserializableConfigUnderTest.get(), hasTheSamePropertiesAs(configuration));
+ }
+
+ // ---------------------------------------- Matchers ---------------------------------------- //
+
+ private static TypeSafeMatcher<Configuration> hasTheSamePropertiesAs(final Configuration expectedConfig) {
+ return new TypeSafeMatcher<Configuration>() {
+ @Override
+ protected boolean matchesSafely(Configuration actualConfig) {
+ final String value = actualConfig.get(TEST_KEY);
+ return actualConfig != expectedConfig && value != null && expectedConfig.get(TEST_KEY).equals(value);
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("a Hadoop Configuration with property: key=")
+ .appendValue(TEST_KEY)
+ .appendText(" and value=")
+ .appendValue(TEST_VALUE);
+ }
+ };
+ }
+
+ // ---------------------------------------- Helper Methods ---------------------------------------- //
+
+ private byte[] serializeAndGetBytes(SerializableHadoopConfiguration serializableConfigUnderTest) throws IOException {
+ try (
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(byteStream)
+ ) {
+ out.writeObject(serializableConfigUnderTest);
+ out.flush();
+ return byteStream.toByteArray();
+ }
+ }
+
+ private SerializableHadoopConfiguration deserializeAndGetConfiguration(byte[] serializedConfig) throws IOException, ClassNotFoundException {
+ try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(serializedConfig))) {
+ return (SerializableHadoopConfiguration) in.readObject();
+ }
+ }
+}
diff --git a/flink-formats/flink-sequencefile/src/test/resources/log4j-test.properties b/flink-formats/flink-sequence-file/src/test/resources/log4j-test.properties
similarity index 100%
rename from flink-formats/flink-sequencefile/src/test/resources/log4j-test.properties
rename to flink-formats/flink-sequence-file/src/test/resources/log4j-test.properties
diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml
index 380eadc..51c3466 100644
--- a/flink-formats/pom.xml
+++ b/flink-formats/pom.xml
@@ -40,7 +40,7 @@ under the License.
<module>flink-json</module>
<module>flink-avro-confluent-registry</module>
<module>flink-parquet</module>
- <module>flink-sequencefile</module>
+ <module>flink-sequence-file</module>
</modules>
<!-- override these root dependencies as 'provided', so they don't end up
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
similarity index 72%
rename from flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java
rename to flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
index 1d75118..b3a9546 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/FiniteTestSource.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.test.util;
+package org.apache.flink.streaming.util;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -24,8 +24,15 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Arrays;
/**
- * A stream source that emits elements without allowing checkpoints and waits
- * for two more checkpoints to complete before exiting.
+ * A stream source that:
+ * 1) emits a list of elements without allowing checkpoints,
+ * 2) then waits for two more checkpoints to complete,
+ * 3) then re-emits the same elements before
+ * 4) waiting for another two checkpoints and
+ * 5) exiting.
+ *
+ * <p>This class was written to test the Bulk Writers used by
+ * the StreamingFileSink.
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListener {
@@ -50,11 +57,19 @@ public class FiniteTestSource<T> implements SourceFunction<T>, CheckpointListene
@Override
public void run(SourceContext<T> ctx) throws Exception {
+ // first round of sending the elements and waiting for the checkpoints
+ emitElementsAndWaitForCheckpoints(ctx, 2);
+
+ // second round of the same
+ emitElementsAndWaitForCheckpoints(ctx, 2);
+ }
+
+ private void emitElementsAndWaitForCheckpoints(SourceContext<T> ctx, int checkpointsToWaitFor) throws InterruptedException {
final Object lock = ctx.getCheckpointLock();
- final int checkpointToAwait;
+ final int checkpointToAwait;
synchronized (lock) {
- checkpointToAwait = numCheckpointsComplete + 2;
+ checkpointToAwait = numCheckpointsComplete + checkpointsToWaitFor;
for (T t : elements) {
ctx.collect(t);
}