You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/12/18 08:47:15 UTC
[flink] 01/02: [FLINK-10457][fs-connector] Add SequenceFile support
to StreamingFileSink.
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b078c8090203fe55215a7626968a40a985af25ce
Author: Jihyun Cho <ji...@linecorp.com>
AuthorDate: Fri Sep 28 10:00:10 2018 +0200
[FLINK-10457][fs-connector] Add SequenceFile support to StreamingFileSink.
---
.../avro/ParquetStreamingFileSinkITCase.java | 2 +-
flink-formats/flink-sequencefile/pom.xml | 107 +++++++++++++
.../formats/sequencefile/SequenceFileWriter.java | 61 ++++++++
.../sequencefile/SequenceFileWriterFactory.java | 168 +++++++++++++++++++++
.../sequencefile/SequenceFileSinkITCase.java | 117 ++++++++++++++
.../src/test/resources/log4j-test.properties | 23 +++
flink-formats/pom.xml | 1 +
.../apache/flink/test/util}/FiniteTestSource.java | 2 +-
8 files changed, 479 insertions(+), 2 deletions(-)
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
index d1f0a5f..540f762 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
+++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
@@ -22,11 +22,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.formats.parquet.generated.Address;
-import org.apache.flink.formats.parquet.testutils.FiniteTestSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.FiniteTestSource;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
diff --git a/flink-formats/flink-sequencefile/pom.xml b/flink-formats/flink-sequencefile/pom.xml
new file mode 100644
index 0000000..632cf2f
--- /dev/null
+++ b/flink-formats/flink-sequencefile/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-formats</artifactId>
+ <version>1.8-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-sequencefile</artifactId>
+ <name>flink-sequencefile</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Hadoop is needed for SequenceFile -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop2</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-hadoop-fs</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+
+ <build>
+ <plugins>
+ <!-- skip dependency convergence due to Hadoop dependency -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>dependency-convergence</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java b/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
new file mode 100644
index 0000000..fa8fab2
--- /dev/null
+++ b/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link BulkWriter} implementation that wraps a {@link SequenceFile.Writer}.
+ *
+ * @param <K> The type of key written.
+ * @param <V> The type of value written.
+ */
+@PublicEvolving
+public class SequenceFileWriter<K extends Writable, V extends Writable> implements BulkWriter<Tuple2<K, V>> {
+ private final SequenceFile.Writer writer;
+
+ public SequenceFileWriter(SequenceFile.Writer writer) {
+ this.writer = checkNotNull(writer);
+ }
+
+ @Override
+ public void addElement(Tuple2<K, V> element) throws IOException {
+ writer.append(element.f0, element.f1);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ writer.hsync();
+ }
+
+ @Override
+ public void finish() throws IOException {
+ writer.close();
+ }
+}
+
diff --git a/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java b/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
new file mode 100644
index 0000000..90a87e7
--- /dev/null
+++ b/flink-formats/flink-sequencefile/src/main/java/org/apache/flink/formats/sequencefile/SequenceFileWriterFactory.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A factory that creates a SequenceFile {@link BulkWriter}.
+ *
+ * @param <K> The type of key to write. It should be writable.
+ * @param <V> The type of value to write. It should be writable.
+ */
+@PublicEvolving
+public class SequenceFileWriterFactory<K extends Writable, V extends Writable> implements BulkWriter.Factory<Tuple2<K, V>> {
+ private static final long serialVersionUID = 1L;
+
+ private final SerializableHadoopConfiguration serdeHadoopConfig;
+ private final Class<K> keyClass;
+ private final Class<V> valueClass;
+ private final String compressionCodecName;
+ private final SequenceFile.CompressionType compressionType;
+
+ /**
+ * Creates a new SequenceFileWriterFactory using the given builder to assemble the
+ * SequenceFileWriter.
+ *
+ * @param hadoopConf The Hadoop configuration for Sequence File Writer.
+ * @param keyClass The class of key to write.
+ * @param valueClass The class of value to write.
+ */
+ public SequenceFileWriterFactory(Configuration hadoopConf, Class<K> keyClass, Class<V> valueClass) {
+ this(hadoopConf, keyClass, valueClass, "None", SequenceFile.CompressionType.BLOCK);
+ }
+
+ /**
+ * Creates a new SequenceFileWriterFactory using the given builder to assemble the
+ * SequenceFileWriter.
+ *
+ * @param hadoopConf The Hadoop configuration for Sequence File Writer.
+ * @param keyClass The class of key to write.
+ * @param valueClass The class of value to write.
+ * @param compressionCodecName The name of compression codec.
+ */
+ public SequenceFileWriterFactory(Configuration hadoopConf, Class<K> keyClass, Class<V> valueClass, String compressionCodecName) {
+ this(hadoopConf, keyClass, valueClass, compressionCodecName, SequenceFile.CompressionType.BLOCK);
+ }
+
+ /**
+ * Creates a new SequenceFileWriterFactory using the given builder to assemble the
+ * SequenceFileWriter.
+ *
+ * @param hadoopConf The Hadoop configuration for Sequence File Writer.
+ * @param keyClass The class of key to write.
+ * @param valueClass The class of value to write.
+ * @param compressionCodecName The name of compression codec.
+ * @param compressionType The type of compression level.
+ */
+ public SequenceFileWriterFactory(Configuration hadoopConf, Class<K> keyClass, Class<V> valueClass, String compressionCodecName, SequenceFile.CompressionType compressionType) {
+ this.serdeHadoopConfig = new SerializableHadoopConfiguration(checkNotNull(hadoopConf));
+ this.keyClass = checkNotNull(keyClass);
+ this.valueClass = checkNotNull(valueClass);
+ this.compressionCodecName = checkNotNull(compressionCodecName);
+ this.compressionType = checkNotNull(compressionType);
+ }
+
+ @Override
+ public SequenceFileWriter<K, V> create(FSDataOutputStream out) throws IOException {
+ org.apache.hadoop.fs.FSDataOutputStream stream = new org.apache.hadoop.fs.FSDataOutputStream(out, null);
+ CompressionCodec compressionCodec = getCompressionCodec(serdeHadoopConfig.get(), compressionCodecName);
+ SequenceFile.Writer writer = SequenceFile.createWriter(
+ serdeHadoopConfig.get(),
+ SequenceFile.Writer.stream(stream),
+ SequenceFile.Writer.keyClass(keyClass),
+ SequenceFile.Writer.valueClass(valueClass),
+ SequenceFile.Writer.compression(compressionType, compressionCodec));
+ return new SequenceFileWriter<>(writer);
+ }
+
+ private CompressionCodec getCompressionCodec(Configuration conf, String compressionCodecName) {
+ if (compressionCodecName.equals("None")) {
+ return null;
+ }
+
+ CompressionCodecFactory codecFactory = new CompressionCodecFactory(checkNotNull(conf));
+ CompressionCodec codec = codecFactory.getCodecByName(compressionCodecName);
+ if (codec == null) {
+ throw new RuntimeException("Codec " + compressionCodecName + " not found.");
+ }
+ return codec;
+ }
+
+ /**
+ * Get Hadoop configuration based by the path.
+ * If the path is not Hadoop URI, it will be return default configuration.
+ *
+ * @param path The path to get configuration.
+ * @return Hadoop configuration.
+ * @throws IOException
+ */
+ public static Configuration getHadoopConfigFromPath(Path path) throws IOException {
+ FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
+ if (fs != null && fs instanceof HadoopFileSystem) {
+ return ((HadoopFileSystem) fs).getHadoopFileSystem().getConf();
+ } else {
+ return new Configuration();
+ }
+ }
+
+ /**
+ * The wrapper class for serialization of {@link Configuration}.
+ */
+ private class SerializableHadoopConfiguration implements Serializable {
+ private transient Configuration hadoopConfig;
+
+ private SerializableHadoopConfiguration(Configuration hadoopConfig) {
+ this.hadoopConfig = hadoopConfig;
+ }
+
+ private Configuration get() {
+ return this.hadoopConfig;
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ this.hadoopConfig.write(out);
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException {
+ this.hadoopConfig = new Configuration();
+ this.hadoopConfig.readFields(in);
+ }
+ }
+}
+
diff --git a/flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java b/flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java
new file mode 100644
index 0000000..b3d4b22
--- /dev/null
+++ b/flink-formats/flink-sequencefile/src/test/java/org/apache/flink/formats/sequencefile/SequenceFileSinkITCase.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.sequencefile;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.FiniteTestSource;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration test case for writing bulk encoded files with the
+ * {@link StreamingFileSink} with SequenceFile.
+ */
+public class SequenceFileSinkITCase extends AbstractTestBase {
+ @Test
+ public void testWriteSequenceFile() throws Exception {
+ final File folder = TEMPORARY_FOLDER.newFolder();
+
+ final List<Tuple2<Long, String>> data = Arrays.asList(
+ new Tuple2<>(1L, "a"),
+ new Tuple2<>(2L, "b"),
+ new Tuple2<>(3L, "c")
+ );
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(100);
+
+ DataStream<Tuple2<Long, String>> stream = env.addSource(
+ new FiniteTestSource<>(data), TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {
+ }));
+
+ Path testPath = Path.fromLocalFile(folder);
+
+ stream.map(new MapFunction<Tuple2<Long, String>, Tuple2<LongWritable, Text>>() {
+ @Override
+ public Tuple2<LongWritable, Text> map(Tuple2<Long, String> value) throws Exception {
+ return new Tuple2<>(new LongWritable(value.f0), new Text(value.f1));
+ }
+ }).addSink(
+ StreamingFileSink.forBulkFormat(
+ testPath,
+ new SequenceFileWriterFactory<>(SequenceFileWriterFactory.getHadoopConfigFromPath(testPath), LongWritable.class, Text.class, "BZip2")
+ ).build());
+
+ env.execute();
+
+ validateResults(folder, data);
+ }
+
+ private List<Tuple2<Long, String>> readSequenceFile(File file) throws IOException {
+ SequenceFile.Reader reader = new SequenceFile.Reader(
+ new Configuration(), SequenceFile.Reader.file(new org.apache.hadoop.fs.Path(file.toURI())));
+ LongWritable key = new LongWritable();
+ Text val = new Text();
+ ArrayList<Tuple2<Long, String>> results = new ArrayList<>();
+ while (reader.next(key, val)) {
+ results.add(new Tuple2<>(key.get(), val.toString()));
+ }
+ reader.close();
+ return results;
+ }
+
+ private void validateResults(File folder, List<Tuple2<Long, String>> expected) throws Exception {
+ File[] buckets = folder.listFiles();
+ assertNotNull(buckets);
+ assertEquals(1, buckets.length);
+
+ File[] partFiles = buckets[0].listFiles();
+ assertNotNull(partFiles);
+ assertEquals(1, partFiles.length);
+ assertTrue(partFiles[0].length() > 0);
+
+ List<Tuple2<Long, String>> results = readSequenceFile(partFiles[0]);
+ assertEquals(expected, results);
+ }
+}
+
+
diff --git a/flink-formats/flink-sequencefile/src/test/resources/log4j-test.properties b/flink-formats/flink-sequencefile/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..644b884
--- /dev/null
+++ b/flink-formats/flink-sequencefile/src/test/resources/log4j-test.properties
@@ -0,0 +1,23 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF, testlogger
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml
index c354402..380eadc 100644
--- a/flink-formats/pom.xml
+++ b/flink-formats/pom.xml
@@ -40,6 +40,7 @@ under the License.
<module>flink-json</module>
<module>flink-avro-confluent-registry</module>
<module>flink-parquet</module>
+ <module>flink-sequencefile</module>
</modules>
<!-- override these root dependencies as 'provided', so they don't end up
diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils/FiniteTestSource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java
similarity index 97%
rename from flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils/FiniteTestSource.java
rename to flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java
index 03db7ff..1d75118 100644
--- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/testutils/FiniteTestSource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/FiniteTestSource.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.formats.parquet.testutils;
+package org.apache.flink.test.util;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.functions.source.SourceFunction;