You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/12/18 08:47:15 UTC

[flink] 01/02: [FLINK-10457][fs-connector] Add SequenceFile support to StreamingFileSink.

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b078c8090203fe55215a7626968a40a985af25ce
Author: Jihyun Cho <ji...@linecorp.com>
AuthorDate: Fri Sep 28 10:00:10 2018 +0200

    [FLINK-10457][fs-connector] Add SequenceFile support to StreamingFileSink.
---
 .../avro/ParquetStreamingFileSinkITCase.java       |   2 +-
 flink-formats/flink-sequencefile/pom.xml           | 107 +++++++++++++
 .../formats/sequencefile/SequenceFileWriter.java   |  61 ++++++++
 .../sequencefile/SequenceFileWriterFactory.java    | 168 +++++++++++++++++++++
 .../sequencefile/SequenceFileSinkITCase.java       | 117 ++++++++++++++
 .../src/test/resources/log4j-test.properties       |  23 +++
 flink-formats/pom.xml                              |   1 +
 .../apache/flink/test/util}/FiniteTestSource.java  |   2 +-
 8 files changed, 479 insertions(+), 2 deletions(-)

diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
index d1f0a5f..540f762 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
@@ -22,11 +22,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 import org.apache.flink.formats.parquet.generated.Address;
-import org.apache.flink.formats.parquet.testutils.FiniteTestSource;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.FiniteTestSource;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
diff --git a/flink-formats/flink-sequencefile/pom.xml b/flink-formats/flink-sequencefile/pom.xml
new file mode 100644
index 0000000..632cf2f
--- /dev/null
+++ b/flink-formats/flink-sequencefile/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-formats</artifactId>
+		<version>1.8-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-sequencefile</artifactId>
+	<name>flink-sequencefile</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- Hadoop is needed for SequenceFile -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-hadoop2</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-hadoop-fs</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+
+	<build>
+		<plugins>
+			<!-- skip dependency convergence due to Hadoop dependency -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-enforcer-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>dependency-convergence</id>
+						<goals>
+							<goal>enforce</goal>
+						</goals>
+						<configuration>
+							<skip>true</skip>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java b/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
new file mode 100644
index 0000000..fa8fab2
--- /dev/null
+++ b/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link BulkWriter} implementation that wraps a {@link SequenceFile.Writer}.
+ *
+ * @param <K> The type of key written.
+ * @param <V> The type of value written.
+ */
+@PublicEvolving
+public class SequenceFileWriter<K extends Writable, V extends Writable> implements BulkWriter<Tuple2<K, V>> {
+	private final SequenceFile.Writer writer;
+
+	public SequenceFileWriter(SequenceFile.Writer writer) {
+		this.writer = checkNotNull(writer);
+	}
+
+	@Override
+	public void addElement(Tuple2<K, V> element) throws IOException {
+		writer.append(element.f0, element.f1);
+	}
+
+	@Override
+	public void flush() throws IOException {
+		writer.hsync();
+	}
+
+	@Override
+	public void finish() throws IOException {
+		writer.close();
+	}
+}
+
diff --git a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java b/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
new file mode 100644
index 0000000..90a87e7
--- /dev/null
+++ b/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A factory that creates a SequenceFile {@link BulkWriter}.
+ *
+ * @param <K> The type of key to write. It should be writable.
+ * @param <V> The type of value to write. It should be writable.
+ */
+@PublicEvolving
+public class SequenceFileWriterFactory<K extends Writable, V extends Writable> implements BulkWriter.Factory<Tuple2<K, V>> {
+	private static final long serialVersionUID = 1L;
+
+	private final SerializableHadoopConfiguration serdeHadoopConfig;
+	private final Class<K> keyClass;
+	private final Class<V> valueClass;
+	private final String compressionCodecName;
+	private final SequenceFile.CompressionType compressionType;
+
+	/**
+	 * Creates a new SequenceFileWriterFactory using the given builder to assemble the
+	 * SequenceFileWriter.
+	 *
+	 * @param hadoopConf The Hadoop configuration for Sequence File Writer.
+	 * @param keyClass   The class of key to write.
+	 * @param valueClass The class of value to write.
+	 */
+	public SequenceFileWriterFactory(Configuration hadoopConf, Class<K> keyClass, Class<V> valueClass) {
+		this(hadoopConf, keyClass, valueClass, "None", SequenceFile.CompressionType.BLOCK);
+	}
+
+	/**
+	 * Creates a new SequenceFileWriterFactory using the given builder to assemble the
+	 * SequenceFileWriter.
+	 *
+	 * @param hadoopConf           The Hadoop configuration for Sequence File Writer.
+	 * @param keyClass             The class of key to write.
+	 * @param valueClass           The class of value to write.
+	 * @param compressionCodecName The name of compression codec.
+	 */
+	public SequenceFileWriterFactory(Configuration hadoopConf, Class<K> keyClass, Class<V> valueClass, String compressionCodecName) {
+		this(hadoopConf, keyClass, valueClass, compressionCodecName, SequenceFile.CompressionType.BLOCK);
+	}
+
+	/**
+	 * Creates a new SequenceFileWriterFactory using the given builder to assemble the
+	 * SequenceFileWriter.
+	 *
+	 * @param hadoopConf           The Hadoop configuration for Sequence File Writer.
+	 * @param keyClass             The class of key to write.
+	 * @param valueClass           The class of value to write.
+	 * @param compressionCodecName The name of compression codec.
+	 * @param compressionType      The type of compression level.
+	 */
+	public SequenceFileWriterFactory(Configuration hadoopConf, Class<K> keyClass, Class<V> valueClass, String compressionCodecName, SequenceFile.CompressionType compressionType) {
+		this.serdeHadoopConfig = new SerializableHadoopConfiguration(checkNotNull(hadoopConf));
+		this.keyClass = checkNotNull(keyClass);
+		this.valueClass = checkNotNull(valueClass);
+		this.compressionCodecName = checkNotNull(compressionCodecName);
+		this.compressionType = checkNotNull(compressionType);
+	}
+
+	@Override
+	public SequenceFileWriter<K, V> create(FSDataOutputStream out) throws IOException {
+		org.apache.hadoop.fs.FSDataOutputStream stream = new org.apache.hadoop.fs.FSDataOutputStream(out, null);
+		CompressionCodec compressionCodec = getCompressionCodec(serdeHadoopConfig.get(), compressionCodecName);
+		SequenceFile.Writer writer = SequenceFile.createWriter(
+			serdeHadoopConfig.get(),
+			SequenceFile.Writer.stream(stream),
+			SequenceFile.Writer.keyClass(keyClass),
+			SequenceFile.Writer.valueClass(valueClass),
+			SequenceFile.Writer.compression(compressionType, compressionCodec));
+		return new SequenceFileWriter<>(writer);
+	}
+
+	private CompressionCodec getCompressionCodec(Configuration conf, String compressionCodecName) {
+		if (compressionCodecName.equals("None")) {
+			return null;
+		}
+
+		CompressionCodecFactory codecFactory = new CompressionCodecFactory(checkNotNull(conf));
+		CompressionCodec codec = codecFactory.getCodecByName(compressionCodecName);
+		if (codec == null) {
+			throw new RuntimeException("Codec " + compressionCodecName + " not found.");
+		}
+		return codec;
+	}
+
+	/**
+	 * Get Hadoop configuration based by the path.
+	 * If the path is not Hadoop URI, it will be return default configuration.
+	 *
+	 * @param path The path to get configuration.
+	 * @return Hadoop configuration.
+	 * @throws IOException
+	 */
+	public static Configuration getHadoopConfigFromPath(Path path) throws IOException {
+		FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
+		if (fs != null && fs instanceof HadoopFileSystem) {
+			return ((HadoopFileSystem) fs).getHadoopFileSystem().getConf();
+		} else {
+			return new Configuration();
+		}
+	}
+
+	/**
+	 * The wrapper class for serialization of {@link Configuration}.
+	 */
+	private class SerializableHadoopConfiguration implements Serializable {
+		private transient Configuration hadoopConfig;
+
+		private SerializableHadoopConfiguration(Configuration hadoopConfig) {
+			this.hadoopConfig = hadoopConfig;
+		}
+
+		private Configuration get() {
+			return this.hadoopConfig;
+		}
+
+		private void writeObject(ObjectOutputStream out) throws IOException {
+			this.hadoopConfig.write(out);
+		}
+
+		private void readObject(ObjectInputStream in) throws IOException {
+			this.hadoopConfig = new Configuration();
+			this.hadoopConfig.readFields(in);
+		}
+	}
+}
+
diff --git a/flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java b/flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java
new file mode 100644
index 0000000..b3d4b22
--- /dev/null
+++ b/flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.FiniteTestSource;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test case for writing bulk encoded files with the
+ * {@link StreamingFileSink} with SequenceFile.
+ */
+public class SequenceFileSinkITCase extends AbstractTestBase {
+	@Test
+	public void testWriteSequenceFile() throws Exception {
+		final File folder = TEMPORARY_FOLDER.newFolder();
+
+		final List<Tuple2<Long, String>> data = Arrays.asList(
+			new Tuple2<>(1L, "a"),
+			new Tuple2<>(2L, "b"),
+			new Tuple2<>(3L, "c")
+		);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.enableCheckpointing(100);
+
+		DataStream<Tuple2<Long, String>> stream = env.addSource(
+			new FiniteTestSource<>(data), TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {
+			}));
+
+		Path testPath = Path.fromLocalFile(folder);
+
+		stream.map(new MapFunction<Tuple2<Long, String>, Tuple2<LongWritable, Text>>() {
+			@Override
+			public Tuple2<LongWritable, Text> map(Tuple2<Long, String> value) throws Exception {
+				return new Tuple2<>(new LongWritable(value.f0), new Text(value.f1));
+			}
+		}).addSink(
+			StreamingFileSink.forBulkFormat(
+				testPath,
+				new SequenceFileWriterFactory<>(SequenceFileWriterFactory.getHadoopConfigFromPath(testPath), LongWritable.class, Text.class, "BZip2")
+			).build());
+
+		env.execute();
+
+		validateResults(folder, data);
+	}
+
+	private List<Tuple2<Long, String>> readSequenceFile(File file) throws IOException {
+		SequenceFile.Reader reader = new SequenceFile.Reader(
+			new Configuration(), SequenceFile.Reader.file(new org.apache.hadoop.fs.Path(file.toURI())));
+		LongWritable key = new LongWritable();
+		Text val = new Text();
+		ArrayList<Tuple2<Long, String>> results = new ArrayList<>();
+		while (reader.next(key, val)) {
+			results.add(new Tuple2<>(key.get(), val.toString()));
+		}
+		reader.close();
+		return results;
+	}
+
+	private void validateResults(File folder, List<Tuple2<Long, String>> expected) throws Exception {
+		File[] buckets = folder.listFiles();
+		assertNotNull(buckets);
+		assertEquals(1, buckets.length);
+
+		File[] partFiles = buckets[0].listFiles();
+		assertNotNull(partFiles);
+		assertEquals(1, partFiles.length);
+		assertTrue(partFiles[0].length() > 0);
+
+		List<Tuple2<Long, String>> results = readSequenceFile(partFiles[0]);
+		assertEquals(expected, results);
+	}
+}
+
+
diff --git a/flink-formats/flink-sequencefile/src/test/resources/log4j-test.properties b/flink-formats/flink-sequencefile/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..644b884
--- /dev/null
+++ b/flink-formats/flink-sequencefile/src/test/resources/log4j-test.properties
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, testlogger
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml
index c354402..380eadc 100644
--- a/flink-formats/pom.xml
+++ b/flink-formats/pom.xml
@@ -40,6 +40,7 @@ under the License.
 		<module>flink-json</module>
 		<module>flink-avro-confluent-registry</module>
 		<module>flink-parquet</module>
+		<module>flink-sequencefile</module>
 	</modules>
 
 	<!-- override these root dependencies as 'provided', so they don't end up
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils/FiniteTestSource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java
similarity index 97%
rename from flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils/FiniteTestSource.java
rename to flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java
index 03db7ff..1d75118 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils/FiniteTestSource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.formats.parquet.testutils;
+package org.apache.flink.test.util;
 
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;