You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/29 09:06:05 UTC

[pulsar] branch master updated: Perform Checkstyle analysis in the pulsar-flink module (#4832)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b0793a1  Perform Checkstyle analysis in the pulsar-flink module (#4832)
b0793a1 is described below

commit b0793a178b8b5d2d1e82316b224c6dc3c27c6664
Author: vzhikserg <vz...@users.noreply.github.com>
AuthorDate: Mon Jul 29 11:05:57 2019 +0200

    Perform Checkstyle analysis in the pulsar-flink module (#4832)
    
    * Add maven plugin for style checking. Fix some style violations.
    
    * Fix issues shown by the style checker in the pulsar-flink module
---
 pulsar-flink/pom.xml                               | 19 +++++++++
 .../connectors/pulsar/BasePulsarOutputFormat.java  | 22 ++++++-----
 .../connectors/pulsar/PulsarAvroOutputFormat.java  |  4 +-
 .../connectors/pulsar/PulsarCsvOutputFormat.java   |  3 +-
 .../connectors/pulsar/PulsarJsonOutputFormat.java  |  3 +-
 .../connectors/pulsar/PulsarOutputFormat.java      |  3 +-
 .../connectors/pulsar/package-info.java}           | 20 +---------
 .../serialization/AvroSerializationSchema.java     |  7 ++--
 .../serialization/CsvSerializationSchema.java      |  7 ++--
 .../pulsar/serialization/package-info.java}        | 20 +---------
 .../connectors/pulsar/FlinkPulsarProducer.java     | 25 ++++++------
 .../connectors/pulsar/PulsarConsumerSource.java    | 21 +++++-----
 .../connectors/pulsar/PulsarSourceBuilder.java     | 46 ++++++++++++----------
 .../connectors/pulsar/PulsarTableSink.java         | 12 +++---
 .../PulsarKeyExtractor.java => package-info.java}  | 20 +---------
 .../pulsar/partitioner/PulsarKeyExtractor.java     |  4 +-
 .../{PulsarKeyExtractor.java => package-info.java} | 20 +---------
 17 files changed, 110 insertions(+), 146 deletions(-)

diff --git a/pulsar-flink/pom.xml b/pulsar-flink/pom.xml
index d04bd8c..f348efa 100644
--- a/pulsar-flink/pom.xml
+++ b/pulsar-flink/pom.xml
@@ -136,6 +136,25 @@
           </execution>
         </executions>
       </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>check-style</id>
+            <phase>verify</phase>
+            <configuration>
+              <configLocation>../buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation>
+              <suppressionsLocation>../buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation>
+              <encoding>UTF-8</encoding>
+            </configuration>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
index 438af59..d061559 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
@@ -18,25 +18,24 @@
  */
 package org.apache.flink.batch.connectors.pulsar;
 
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Preconditions;
+import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Function;
-
 /**
  * Base Pulsar Output Format to write Flink DataSets into a Pulsar topic.
  */
@@ -54,7 +53,8 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T>  {
     private ProducerConfigurationData producerConf;
 
 
-    protected BasePulsarOutputFormat(final String serviceUrl, final String topicName, final Authentication authentication) {
+    protected BasePulsarOutputFormat(final String serviceUrl, final String topicName,
+        final Authentication authentication) {
         Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank.");
         Preconditions.checkArgument(StringUtils.isNotBlank(topicName),  "topicName cannot be blank.");
 
@@ -65,7 +65,8 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T>  {
         this.clientConf.setAuthentication(authentication);
         this.producerConf.setTopicName(topicName);
 
-        LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.producerConf.getTopicName());
+        LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}",
+            this.producerConf.getTopicName());
     }
 
     protected BasePulsarOutputFormat(ClientConfigurationData clientConf, ProducerConfigurationData producerConf) {
@@ -75,7 +76,8 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T>  {
         Preconditions.checkArgument(StringUtils.isNotBlank(clientConf.getServiceUrl()), "serviceUrl cannot be blank.");
         Preconditions.checkArgument(StringUtils.isNotBlank(producerConf.getTopicName()),  "topicName cannot be blank.");
 
-        LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.producerConf.getTopicName());
+        LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}",
+            this.producerConf.getTopicName());
     }
 
     @Override
@@ -107,9 +109,9 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T>  {
 
     private Producer<byte[]> getProducerInstance()
             throws PulsarClientException {
-        if(producer == null){
+        if (producer == null){
             synchronized (PulsarOutputFormat.class) {
-                if(producer == null){
+                if (producer == null){
                     producer = Preconditions.checkNotNull(createPulsarProducer(),
                             "Pulsar producer cannot be null.");
                 }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
index 6e11e4c..cc2acab 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
@@ -36,9 +36,9 @@ public class PulsarAvroOutputFormat<T extends SpecificRecord> extends BasePulsar
         this.serializationSchema = new AvroSerializationSchema();
     }
 
-    public PulsarAvroOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) {
+    public PulsarAvroOutputFormat(ClientConfigurationData clientConfigurationData,
+        ProducerConfigurationData producerConfigurationData) {
         super(clientConfigurationData, producerConfigurationData);
         this.serializationSchema = new AvroSerializationSchema();
     }
-
 }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
index 8c3b441..74680d3 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
@@ -36,7 +36,8 @@ public class PulsarCsvOutputFormat<T extends Tuple> extends BasePulsarOutputForm
         this.serializationSchema = new CsvSerializationSchema<>();
     }
 
-    public PulsarCsvOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) {
+    public PulsarCsvOutputFormat(ClientConfigurationData clientConfigurationData,
+        ProducerConfigurationData producerConfigurationData) {
         super(clientConfigurationData, producerConfigurationData);
         this.serializationSchema = new CsvSerializationSchema<>();
     }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
index 093e5c9..837f743 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
@@ -35,7 +35,8 @@ public class PulsarJsonOutputFormat<T> extends BasePulsarOutputFormat<T> {
         this.serializationSchema = new JsonSerializationSchema();
     }
 
-    public PulsarJsonOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) {
+    public PulsarJsonOutputFormat(ClientConfigurationData clientConfigurationData,
+        ProducerConfigurationData producerConfigurationData) {
         super(clientConfigurationData, producerConfigurationData);
         this.serializationSchema = new JsonSerializationSchema();
     }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
index cb307f0..14e12e4 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -31,7 +31,8 @@ public class PulsarOutputFormat<T> extends BasePulsarOutputFormat<T> {
 
     private static final long serialVersionUID = 2997027580167793000L;
 
-    public PulsarOutputFormat(String serviceUrl, String topicName, Authentication authentication, final SerializationSchema<T> serializationSchema) {
+    public PulsarOutputFormat(String serviceUrl, String topicName, Authentication authentication,
+        final SerializationSchema<T> serializationSchema) {
         super(serviceUrl, topicName, authentication);
         Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
         this.serializationSchema = serializationSchema;
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/package-info.java
similarity index 67%
copy from pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
copy to pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/package-info.java
index 270892e..79a8213 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/package-info.java
@@ -16,23 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.flink.streaming.connectors.pulsar.partitioner;
-
-import java.io.Serializable;
-
 /**
- * Extract key from a value.
+ * Implementations of different output formats.
  */
-public interface PulsarKeyExtractor<IN> extends Serializable {
-
-    PulsarKeyExtractor NULL = in -> null;
-
-    /**
-     * Retrieve a key from the value.
-     *
-     * @param in the value to extract a key.
-     * @return key.
-     */
-    String getKey(IN in);
-
-}
+package org.apache.flink.batch.connectors.pulsar;
\ No newline at end of file
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchema.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchema.java
index 693ef8d..ea71d4d 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchema.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchema.java
@@ -18,6 +18,8 @@
  */
 package org.apache.flink.batch.connectors.pulsar.serialization;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
@@ -25,9 +27,6 @@ import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
 /**
  * Avro Serialization Schema to serialize Dataset records to Avro.
  */
@@ -49,7 +48,7 @@ public class AvroSerializationSchema<T extends SpecificRecord> implements Serial
         Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
         arrayOutputStream.reset();
         try {
-            writer.write(t,encoder);
+            writer.write(t, encoder);
             encoder.flush();
         } catch (IOException e) {
             throw new RuntimeException("Error while serializing the record to Avro", e);
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
index c7b7131..4ee6f22 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
@@ -18,13 +18,12 @@
  */
 package org.apache.flink.batch.connectors.pulsar.serialization;
 
+import java.io.IOException;
+import java.io.StringWriter;
 import org.apache.commons.csv.CSVFormat;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.java.tuple.Tuple;
 
-import java.io.IOException;
-import java.io.StringWriter;
-
 /**
  * Csv Serialization Schema to serialize Tuples to Csv.
  */
@@ -38,7 +37,7 @@ public class CsvSerializationSchema<T extends Tuple> implements SerializationSch
         StringWriter stringWriter;
         try {
             Object[] fieldsValues = new Object[t.getArity()];
-            for(int index = 0; index < t.getArity(); index++) {
+            for (int index = 0; index < t.getArity(); index++) {
                 fieldsValues[index] = (t.getField(index));
             }
 
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/package-info.java
similarity index 67%
copy from pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
copy to pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/package-info.java
index 270892e..e63f03f 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/package-info.java
@@ -16,23 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.flink.streaming.connectors.pulsar.partitioner;
-
-import java.io.Serializable;
-
 /**
- * Extract key from a value.
+ * Implementations of the serialization schemas.
  */
-public interface PulsarKeyExtractor<IN> extends Serializable {
-
-    PulsarKeyExtractor NULL = in -> null;
-
-    /**
-     * Retrieve a key from the value.
-     *
-     * @param in the value to extract a key.
-     * @return key.
-     */
-    String getKey(IN in);
-
-}
+package org.apache.flink.batch.connectors.pulsar.serialization;
\ No newline at end of file
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 5afba67..215dcfd 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -22,7 +22,6 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 import java.util.function.Function;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -35,10 +34,10 @@ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
 import org.apache.flink.util.SerializableObject;
+import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -48,8 +47,8 @@ import org.slf4j.LoggerFactory;
 /**
  * Flink Sink to produce data into a Pulsar topic.
  */
-public class FlinkPulsarProducer<IN>
-        extends RichSinkFunction<IN>
+public class FlinkPulsarProducer<T>
+        extends RichSinkFunction<T>
         implements CheckpointedFunction {
 
     private static final Logger LOG = LoggerFactory.getLogger(FlinkPulsarProducer.class);
@@ -61,12 +60,12 @@ public class FlinkPulsarProducer<IN>
      * (Serializable) SerializationSchema for turning objects used with Flink into.
      * byte[] for Pulsar.
      */
-    protected final SerializationSchema<IN> schema;
+    protected final SerializationSchema<T> schema;
 
     /**
      * User-provided key extractor for assigning a key to a pulsar message.
      */
-    protected final PulsarKeyExtractor<IN> flinkPulsarKeyExtractor;
+    protected final PulsarKeyExtractor<T> flinkPulsarKeyExtractor;
 
     /**
      * Produce Mode.
@@ -110,8 +109,8 @@ public class FlinkPulsarProducer<IN>
     public FlinkPulsarProducer(String serviceUrl,
                                String defaultTopicName,
                                Authentication authentication,
-                               SerializationSchema<IN> serializationSchema,
-                               PulsarKeyExtractor<IN> keyExtractor) {
+                               SerializationSchema<T> serializationSchema,
+                               PulsarKeyExtractor<T> keyExtractor) {
         checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank");
         checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
         checkNotNull(authentication, "auth cannot be null, set disabled for no auth");
@@ -129,8 +128,8 @@ public class FlinkPulsarProducer<IN>
 
     public FlinkPulsarProducer(ClientConfigurationData clientConfigurationData,
                                ProducerConfigurationData producerConfigurationData,
-                               SerializationSchema<IN> serializationSchema,
-                               PulsarKeyExtractor<IN> keyExtractor) {
+                               SerializationSchema<T> serializationSchema,
+                               PulsarKeyExtractor<T> keyExtractor) {
         this.clientConf = checkNotNull(clientConfigurationData, "client conf can not be null");
         this.producerConf = checkNotNull(producerConfigurationData, "producer conf can not be null");
         this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
@@ -144,7 +143,7 @@ public class FlinkPulsarProducer<IN>
     /**
      * @return pulsar key extractor.
      */
-    public PulsarKeyExtractor<IN> getKeyExtractor() {
+    public PulsarKeyExtractor<T> getKeyExtractor() {
         return flinkPulsarKeyExtractor;
     }
 
@@ -178,7 +177,7 @@ public class FlinkPulsarProducer<IN>
     // ----------------------------------- Sink Methods --------------------------
 
     @SuppressWarnings("unchecked")
-    private static final <T> PulsarKeyExtractor<T> getOrNullKeyExtractor(PulsarKeyExtractor<T> extractor) {
+    private static <T> PulsarKeyExtractor<T> getOrNullKeyExtractor(PulsarKeyExtractor<T> extractor) {
         if (null == extractor) {
             return PulsarKeyExtractor.NULL;
         } else {
@@ -238,7 +237,7 @@ public class FlinkPulsarProducer<IN>
     }
 
     @Override
-    public void invoke(IN value, Context context) throws Exception {
+    public void invoke(T value, Context context) throws Exception {
         checkErroneous();
 
         byte[] serializedValue = schema.serialize(value);
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 642d69d..58ccccf 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -18,6 +18,12 @@
  */
 package org.apache.flink.streaming.connectors.pulsar;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -25,20 +31,17 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.util.IOUtils;
-import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Pulsar source (consumer) which receives messages from a topic and acknowledges messages.
  * When checkpointing is enabled, it guarantees at least once processing semantics.
@@ -190,6 +193,6 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI
     }
 
     Consumer<byte[]> createConsumer(PulsarClient client) throws PulsarClientException {
-        return ((PulsarClientImpl)client).subscribeAsync(consumerConfigurationData).join();
+        return ((PulsarClientImpl) client).subscribeAsync(consumerConfigurationData).join();
     }
 }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index d4a25ee..15dc6e4 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -18,6 +18,11 @@
  */
 package org.apache.flink.streaming.connectors.pulsar;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.regex.Pattern;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -27,16 +32,9 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 
-import java.util.Arrays;
-import java.util.List;
-import java.util.TreeSet;
-import java.util.regex.Pattern;
-import java.util.Map;
-
 /**
  * A class for building a pulsar source.
  */
@@ -117,7 +115,7 @@ public class PulsarSourceBuilder<T> {
     }
 
     /**
-     * Use topic pattern to config sets of topics to consumer
+     * Use topic pattern to config sets of topics to consumer.
      *
      * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
      * are in the following format:
@@ -128,13 +126,14 @@ public class PulsarSourceBuilder<T> {
      */
     public PulsarSourceBuilder<T> topicsPattern(Pattern topicsPattern) {
         Preconditions.checkArgument(topicsPattern != null, "Param topicsPattern cannot be null");
-        Preconditions.checkArgument(this.consumerConfigurationData.getTopicsPattern() == null, "Pattern has already been set.");
+        Preconditions.checkArgument(this.consumerConfigurationData.getTopicsPattern() == null,
+            "Pattern has already been set.");
         this.consumerConfigurationData.setTopicsPattern(topicsPattern);
         return this;
     }
 
     /**
-     * Use topic pattern to config sets of topics to consumer
+     * Use topic pattern to config sets of topics to consumer.
      *
      * <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
      * are in the following format:
@@ -145,7 +144,8 @@ public class PulsarSourceBuilder<T> {
      */
     public PulsarSourceBuilder<T> topicsPatternString(String topicsPattern) {
         Preconditions.checkArgument(StringUtils.isNotBlank(topicsPattern), "Topics pattern string cannot be blank");
-        Preconditions.checkArgument(this.consumerConfigurationData.getTopicsPattern() == null, "Pattern has already been set.");
+        Preconditions.checkArgument(this.consumerConfigurationData.getTopicsPattern() == null,
+            "Pattern has already been set.");
         this.consumerConfigurationData.setTopicsPattern(Pattern.compile(topicsPattern));
         return this;
     }
@@ -164,13 +164,14 @@ public class PulsarSourceBuilder<T> {
     }
 
     /**
-     * Sets the subscription initial position for the topic consumer. Default is {@link SubscriptionInitialPosition#Latest}
+     * Sets the subscription initial position for the topic consumer.
+     * Default is {@link SubscriptionInitialPosition#Latest}
      *
      * @param initialPosition the subscription initial position.
      * @return this builder
      */
     public PulsarSourceBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition initialPosition) {
-        Preconditions.checkNotNull(initialPosition,"subscription initial position cannot be null");
+        Preconditions.checkNotNull(initialPosition, "subscription initial position cannot be null");
         this.consumerConfigurationData.setSubscriptionInitialPosition(initialPosition);
         return this;
     }
@@ -187,7 +188,8 @@ public class PulsarSourceBuilder<T> {
             acknowledgementBatchSize = size;
             return this;
         }
-        throw new IllegalArgumentException("acknowledgementBatchSize can only take values > 0 and <= " + MAX_ACKNOWLEDGEMENT_BATCH_SIZE);
+        throw new IllegalArgumentException(
+            "acknowledgementBatchSize can only take values > 0 and <= " + MAX_ACKNOWLEDGEMENT_BATCH_SIZE);
     }
 
     /**
@@ -204,7 +206,7 @@ public class PulsarSourceBuilder<T> {
     }
 
     /**
-     * Configure the authentication provider to use in the Pulsar client instance
+     * Configure the authentication provider to use in the Pulsar client instance.
      *
      * @param authPluginClassName
      *            name of the Authentication-Plugin to use
@@ -220,7 +222,8 @@ public class PulsarSourceBuilder<T> {
                 "Authentication-Plugin class name can not be blank");
         Preconditions.checkArgument(StringUtils.isNotBlank(authParamsString),
                 "Authentication-Plugin parameters can not be blank");
-        this.clientConfigurationData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString));
+        this.clientConfigurationData
+            .setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString));
         return this;
     }
 
@@ -240,7 +243,7 @@ public class PulsarSourceBuilder<T> {
             throws PulsarClientException.UnsupportedAuthenticationException {
         Preconditions.checkArgument(StringUtils.isNotBlank(authPluginClassName),
                 "Authentication-Plugin class name can not be blank");
-        Preconditions.checkArgument((authParams != null && authParams.isEmpty() == false),
+        Preconditions.checkArgument((authParams != null && !authParams.isEmpty()),
                 "parameters to authentication plugin can not be null/empty");
         this.clientConfigurationData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParams));
         return this;
@@ -272,9 +275,9 @@ public class PulsarSourceBuilder<T> {
     public SourceFunction<T> build() throws PulsarClientException{
         Preconditions.checkArgument(StringUtils.isNotBlank(this.clientConfigurationData.getServiceUrl()),
                 "a service url is required");
-        Preconditions.checkArgument((this.consumerConfigurationData.getTopicNames() != null &&
-                        !this.consumerConfigurationData.getTopicNames().isEmpty()) ||
-                        this.consumerConfigurationData.getTopicsPattern() != null,
+        Preconditions.checkArgument((this.consumerConfigurationData.getTopicNames() != null
+                && !this.consumerConfigurationData.getTopicNames().isEmpty())
+                || this.consumerConfigurationData.getTopicsPattern() != null,
                 "At least one topic or topics pattern is required");
         Preconditions.checkArgument(StringUtils.isNotBlank(this.consumerConfigurationData.getSubscriptionName()),
                 "a subscription name is required");
@@ -290,8 +293,9 @@ public class PulsarSourceBuilder<T> {
 
     private void setAuth() throws PulsarClientException{
         if (StringUtils.isBlank(this.clientConfigurationData.getAuthPluginClassName())
-                || StringUtils.isBlank(this.clientConfigurationData.getAuthParams()))
+                || StringUtils.isBlank(this.clientConfigurationData.getAuthParams())) {
             return;
+        }
 
         clientConfigurationData.setAuthentication(
                 AuthenticationFactory.create(
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
index 274084e..b89b60b 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
@@ -59,7 +59,7 @@ public abstract class PulsarTableSink implements AppendStreamTableSink<Row> {
         checkNotNull(topic, "Topic is null");
         this.clientConfigurationData.setServiceUrl(serviceUrl);
         this.clientConfigurationData.setAuthentication(authentication);
-        this.producerConfigurationData.setTopicName(topic);;
+        this.producerConfigurationData.setTopicName(topic);
         this.routingKeyFieldName = routingKeyFieldName;
     }
 
@@ -91,11 +91,11 @@ public abstract class PulsarTableSink implements AppendStreamTableSink<Row> {
      * Returns the low-level producer.
      */
     protected FlinkPulsarProducer<Row> createFlinkPulsarProducer() {
-        return new FlinkPulsarProducer<Row>(
-                clientConfigurationData,
-                producerConfigurationData,
-                serializationSchema,
-                keyExtractor);
+        return new FlinkPulsarProducer<>(
+            clientConfigurationData,
+            producerConfigurationData,
+            serializationSchema,
+            keyExtractor);
     }
 
     @Override
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/package-info.java
similarity index 67%
copy from pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
copy to pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/package-info.java
index 270892e..6ed4a01 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/package-info.java
@@ -16,23 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.flink.streaming.connectors.pulsar.partitioner;
-
-import java.io.Serializable;
-
 /**
- * Extract key from a value.
+ * Classes for implementing pulsar flink connector.
  */
-public interface PulsarKeyExtractor<IN> extends Serializable {
-
-    PulsarKeyExtractor NULL = in -> null;
-
-    /**
-     * Retrieve a key from the value.
-     *
-     * @param in the value to extract a key.
-     * @return key.
-     */
-    String getKey(IN in);
-
-}
+package org.apache.flink.streaming.connectors.pulsar;
\ No newline at end of file
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
index 270892e..c8a858d 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
 /**
  * Extract key from a value.
  */
-public interface PulsarKeyExtractor<IN> extends Serializable {
+public interface PulsarKeyExtractor<T> extends Serializable {
 
     PulsarKeyExtractor NULL = in -> null;
 
@@ -33,6 +33,6 @@ public interface PulsarKeyExtractor<IN> extends Serializable {
      * @param in the value to extract a key.
      * @return key.
      */
-    String getKey(IN in);
+    String getKey(T in);
 
 }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/package-info.java
similarity index 73%
copy from pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
copy to pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/package-info.java
index 270892e..7a25be1 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/package-info.java
@@ -16,23 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.flink.streaming.connectors.pulsar.partitioner;
-
-import java.io.Serializable;
-
 /**
- * Extract key from a value.
+ * Classes for implementing key extractors.
  */
-public interface PulsarKeyExtractor<IN> extends Serializable {
-
-    PulsarKeyExtractor NULL = in -> null;
-
-    /**
-     * Retrieve a key from the value.
-     *
-     * @param in the value to extract a key.
-     * @return key.
-     */
-    String getKey(IN in);
-
-}
+package org.apache.flink.streaming.connectors.pulsar.partitioner;
\ No newline at end of file