You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/29 09:06:05 UTC
[pulsar] branch master updated: Perform Checkstyle analysis in the
pulsar-flink module (#4832)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b0793a1 Perform Checkstyle analysis in the pulsar-flink module (#4832)
b0793a1 is described below
commit b0793a178b8b5d2d1e82316b224c6dc3c27c6664
Author: vzhikserg <vz...@users.noreply.github.com>
AuthorDate: Mon Jul 29 11:05:57 2019 +0200
Perform Checkstyle analysis in the pulsar-flink module (#4832)
* Add maven plugin for style checking. Fix some style violations.
* Fix issues shown by the style checker in the pulsar-flink module
---
pulsar-flink/pom.xml | 19 +++++++++
.../connectors/pulsar/BasePulsarOutputFormat.java | 22 ++++++-----
.../connectors/pulsar/PulsarAvroOutputFormat.java | 4 +-
.../connectors/pulsar/PulsarCsvOutputFormat.java | 3 +-
.../connectors/pulsar/PulsarJsonOutputFormat.java | 3 +-
.../connectors/pulsar/PulsarOutputFormat.java | 3 +-
.../connectors/pulsar/package-info.java} | 20 +---------
.../serialization/AvroSerializationSchema.java | 7 ++--
.../serialization/CsvSerializationSchema.java | 7 ++--
.../pulsar/serialization/package-info.java} | 20 +---------
.../connectors/pulsar/FlinkPulsarProducer.java | 25 ++++++------
.../connectors/pulsar/PulsarConsumerSource.java | 21 +++++-----
.../connectors/pulsar/PulsarSourceBuilder.java | 46 ++++++++++++----------
.../connectors/pulsar/PulsarTableSink.java | 12 +++---
.../PulsarKeyExtractor.java => package-info.java} | 20 +---------
.../pulsar/partitioner/PulsarKeyExtractor.java | 4 +-
.../{PulsarKeyExtractor.java => package-info.java} | 20 +---------
17 files changed, 110 insertions(+), 146 deletions(-)
diff --git a/pulsar-flink/pom.xml b/pulsar-flink/pom.xml
index d04bd8c..f348efa 100644
--- a/pulsar-flink/pom.xml
+++ b/pulsar-flink/pom.xml
@@ -136,6 +136,25 @@
</execution>
</executions>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>check-style</id>
+ <phase>verify</phase>
+ <configuration>
+ <configLocation>../buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation>
+ <suppressionsLocation>../buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation>
+ <encoding>UTF-8</encoding>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
index 438af59..d061559 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
@@ -18,25 +18,24 @@
*/
package org.apache.flink.batch.connectors.pulsar;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;
+import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Function;
-
/**
* Base Pulsar Output Format to write Flink DataSets into a Pulsar topic.
*/
@@ -54,7 +53,8 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T> {
private ProducerConfigurationData producerConf;
- protected BasePulsarOutputFormat(final String serviceUrl, final String topicName, final Authentication authentication) {
+ protected BasePulsarOutputFormat(final String serviceUrl, final String topicName,
+ final Authentication authentication) {
Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank.");
Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicName cannot be blank.");
@@ -65,7 +65,8 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T> {
this.clientConf.setAuthentication(authentication);
this.producerConf.setTopicName(topicName);
- LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.producerConf.getTopicName());
+ LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}",
+ this.producerConf.getTopicName());
}
protected BasePulsarOutputFormat(ClientConfigurationData clientConf, ProducerConfigurationData producerConf) {
@@ -75,7 +76,8 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T> {
Preconditions.checkArgument(StringUtils.isNotBlank(clientConf.getServiceUrl()), "serviceUrl cannot be blank.");
Preconditions.checkArgument(StringUtils.isNotBlank(producerConf.getTopicName()), "topicName cannot be blank.");
- LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.producerConf.getTopicName());
+ LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}",
+ this.producerConf.getTopicName());
}
@Override
@@ -107,9 +109,9 @@ public abstract class BasePulsarOutputFormat<T> extends RichOutputFormat<T> {
private Producer<byte[]> getProducerInstance()
throws PulsarClientException {
- if(producer == null){
+ if (producer == null){
synchronized (PulsarOutputFormat.class) {
- if(producer == null){
+ if (producer == null){
producer = Preconditions.checkNotNull(createPulsarProducer(),
"Pulsar producer cannot be null.");
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
index 6e11e4c..cc2acab 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
@@ -36,9 +36,9 @@ public class PulsarAvroOutputFormat<T extends SpecificRecord> extends BasePulsar
this.serializationSchema = new AvroSerializationSchema();
}
- public PulsarAvroOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) {
+ public PulsarAvroOutputFormat(ClientConfigurationData clientConfigurationData,
+ ProducerConfigurationData producerConfigurationData) {
super(clientConfigurationData, producerConfigurationData);
this.serializationSchema = new AvroSerializationSchema();
}
-
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
index 8c3b441..74680d3 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
@@ -36,7 +36,8 @@ public class PulsarCsvOutputFormat<T extends Tuple> extends BasePulsarOutputForm
this.serializationSchema = new CsvSerializationSchema<>();
}
- public PulsarCsvOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) {
+ public PulsarCsvOutputFormat(ClientConfigurationData clientConfigurationData,
+ ProducerConfigurationData producerConfigurationData) {
super(clientConfigurationData, producerConfigurationData);
this.serializationSchema = new CsvSerializationSchema<>();
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
index 093e5c9..837f743 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
@@ -35,7 +35,8 @@ public class PulsarJsonOutputFormat<T> extends BasePulsarOutputFormat<T> {
this.serializationSchema = new JsonSerializationSchema();
}
- public PulsarJsonOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) {
+ public PulsarJsonOutputFormat(ClientConfigurationData clientConfigurationData,
+ ProducerConfigurationData producerConfigurationData) {
super(clientConfigurationData, producerConfigurationData);
this.serializationSchema = new JsonSerializationSchema();
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
index cb307f0..14e12e4 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -31,7 +31,8 @@ public class PulsarOutputFormat<T> extends BasePulsarOutputFormat<T> {
private static final long serialVersionUID = 2997027580167793000L;
- public PulsarOutputFormat(String serviceUrl, String topicName, Authentication authentication, final SerializationSchema<T> serializationSchema) {
+ public PulsarOutputFormat(String serviceUrl, String topicName, Authentication authentication,
+ final SerializationSchema<T> serializationSchema) {
super(serviceUrl, topicName, authentication);
Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
this.serializationSchema = serializationSchema;
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/package-info.java
similarity index 67%
copy from pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
copy to pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/package-info.java
index 270892e..79a8213 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/package-info.java
@@ -16,23 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.flink.streaming.connectors.pulsar.partitioner;
-
-import java.io.Serializable;
-
/**
- * Extract key from a value.
+ * Implementations of different output formats.
*/
-public interface PulsarKeyExtractor<IN> extends Serializable {
-
- PulsarKeyExtractor NULL = in -> null;
-
- /**
- * Retrieve a key from the value.
- *
- * @param in the value to extract a key.
- * @return key.
- */
- String getKey(IN in);
-
-}
+package org.apache.flink.batch.connectors.pulsar;
\ No newline at end of file
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchema.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchema.java
index 693ef8d..ea71d4d 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchema.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/AvroSerializationSchema.java
@@ -18,6 +18,8 @@
*/
package org.apache.flink.batch.connectors.pulsar.serialization;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
@@ -25,9 +27,6 @@ import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
/**
* Avro Serialization Schema to serialize Dataset records to Avro.
*/
@@ -49,7 +48,7 @@ public class AvroSerializationSchema<T extends SpecificRecord> implements Serial
Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
arrayOutputStream.reset();
try {
- writer.write(t,encoder);
+ writer.write(t, encoder);
encoder.flush();
} catch (IOException e) {
throw new RuntimeException("Error while serializing the record to Avro", e);
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
index c7b7131..4ee6f22 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/CsvSerializationSchema.java
@@ -18,13 +18,12 @@
*/
package org.apache.flink.batch.connectors.pulsar.serialization;
+import java.io.IOException;
+import java.io.StringWriter;
import org.apache.commons.csv.CSVFormat;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.tuple.Tuple;
-import java.io.IOException;
-import java.io.StringWriter;
-
/**
* Csv Serialization Schema to serialize Tuples to Csv.
*/
@@ -38,7 +37,7 @@ public class CsvSerializationSchema<T extends Tuple> implements SerializationSch
StringWriter stringWriter;
try {
Object[] fieldsValues = new Object[t.getArity()];
- for(int index = 0; index < t.getArity(); index++) {
+ for (int index = 0; index < t.getArity(); index++) {
fieldsValues[index] = (t.getField(index));
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/package-info.java
similarity index 67%
copy from pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
copy to pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/package-info.java
index 270892e..e63f03f 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/serialization/package-info.java
@@ -16,23 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.flink.streaming.connectors.pulsar.partitioner;
-
-import java.io.Serializable;
-
/**
- * Extract key from a value.
+ * Implementations of the serialization schemas.
*/
-public interface PulsarKeyExtractor<IN> extends Serializable {
-
- PulsarKeyExtractor NULL = in -> null;
-
- /**
- * Retrieve a key from the value.
- *
- * @param in the value to extract a key.
- * @return key.
- */
- String getKey(IN in);
-
-}
+package org.apache.flink.batch.connectors.pulsar.serialization;
\ No newline at end of file
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 5afba67..215dcfd 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -22,7 +22,6 @@ import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import java.util.function.Function;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -35,10 +34,10 @@ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
import org.apache.flink.util.SerializableObject;
+import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -48,8 +47,8 @@ import org.slf4j.LoggerFactory;
/**
* Flink Sink to produce data into a Pulsar topic.
*/
-public class FlinkPulsarProducer<IN>
- extends RichSinkFunction<IN>
+public class FlinkPulsarProducer<T>
+ extends RichSinkFunction<T>
implements CheckpointedFunction {
private static final Logger LOG = LoggerFactory.getLogger(FlinkPulsarProducer.class);
@@ -61,12 +60,12 @@ public class FlinkPulsarProducer<IN>
* (Serializable) SerializationSchema for turning objects used with Flink into.
* byte[] for Pulsar.
*/
- protected final SerializationSchema<IN> schema;
+ protected final SerializationSchema<T> schema;
/**
* User-provided key extractor for assigning a key to a pulsar message.
*/
- protected final PulsarKeyExtractor<IN> flinkPulsarKeyExtractor;
+ protected final PulsarKeyExtractor<T> flinkPulsarKeyExtractor;
/**
* Produce Mode.
@@ -110,8 +109,8 @@ public class FlinkPulsarProducer<IN>
public FlinkPulsarProducer(String serviceUrl,
String defaultTopicName,
Authentication authentication,
- SerializationSchema<IN> serializationSchema,
- PulsarKeyExtractor<IN> keyExtractor) {
+ SerializationSchema<T> serializationSchema,
+ PulsarKeyExtractor<T> keyExtractor) {
checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank");
checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
checkNotNull(authentication, "auth cannot be null, set disabled for no auth");
@@ -129,8 +128,8 @@ public class FlinkPulsarProducer<IN>
public FlinkPulsarProducer(ClientConfigurationData clientConfigurationData,
ProducerConfigurationData producerConfigurationData,
- SerializationSchema<IN> serializationSchema,
- PulsarKeyExtractor<IN> keyExtractor) {
+ SerializationSchema<T> serializationSchema,
+ PulsarKeyExtractor<T> keyExtractor) {
this.clientConf = checkNotNull(clientConfigurationData, "client conf can not be null");
this.producerConf = checkNotNull(producerConfigurationData, "producer conf can not be null");
this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
@@ -144,7 +143,7 @@ public class FlinkPulsarProducer<IN>
/**
* @return pulsar key extractor.
*/
- public PulsarKeyExtractor<IN> getKeyExtractor() {
+ public PulsarKeyExtractor<T> getKeyExtractor() {
return flinkPulsarKeyExtractor;
}
@@ -178,7 +177,7 @@ public class FlinkPulsarProducer<IN>
// ----------------------------------- Sink Methods --------------------------
@SuppressWarnings("unchecked")
- private static final <T> PulsarKeyExtractor<T> getOrNullKeyExtractor(PulsarKeyExtractor<T> extractor) {
+ private static <T> PulsarKeyExtractor<T> getOrNullKeyExtractor(PulsarKeyExtractor<T> extractor) {
if (null == extractor) {
return PulsarKeyExtractor.NULL;
} else {
@@ -238,7 +237,7 @@ public class FlinkPulsarProducer<IN>
}
@Override
- public void invoke(IN value, Context context) throws Exception {
+ public void invoke(T value, Context context) throws Exception {
checkErroneous();
byte[] serializedValue = schema.serialize(value);
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 642d69d..58ccccf 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -18,6 +18,12 @@
*/
package org.apache.flink.streaming.connectors.pulsar;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -25,20 +31,17 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.IOUtils;
-import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
/**
* Pulsar source (consumer) which receives messages from a topic and acknowledges messages.
* When checkpointing is enabled, it guarantees at least once processing semantics.
@@ -190,6 +193,6 @@ class PulsarConsumerSource<T> extends MessageAcknowledgingSourceBase<T, MessageI
}
Consumer<byte[]> createConsumer(PulsarClient client) throws PulsarClientException {
- return ((PulsarClientImpl)client).subscribeAsync(consumerConfigurationData).join();
+ return ((PulsarClientImpl) client).subscribeAsync(consumerConfigurationData).join();
}
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index d4a25ee..15dc6e4 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -18,6 +18,11 @@
*/
package org.apache.flink.streaming.connectors.pulsar;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -27,16 +32,9 @@ import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import java.util.Arrays;
-import java.util.List;
-import java.util.TreeSet;
-import java.util.regex.Pattern;
-import java.util.Map;
-
/**
* A class for building a pulsar source.
*/
@@ -117,7 +115,7 @@ public class PulsarSourceBuilder<T> {
}
/**
- * Use topic pattern to config sets of topics to consumer
+ * Use topic pattern to config sets of topics to consumer.
*
* <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
* are in the following format:
@@ -128,13 +126,14 @@ public class PulsarSourceBuilder<T> {
*/
public PulsarSourceBuilder<T> topicsPattern(Pattern topicsPattern) {
Preconditions.checkArgument(topicsPattern != null, "Param topicsPattern cannot be null");
- Preconditions.checkArgument(this.consumerConfigurationData.getTopicsPattern() == null, "Pattern has already been set.");
+ Preconditions.checkArgument(this.consumerConfigurationData.getTopicsPattern() == null,
+ "Pattern has already been set.");
this.consumerConfigurationData.setTopicsPattern(topicsPattern);
return this;
}
/**
- * Use topic pattern to config sets of topics to consumer
+ * Use topic pattern to config sets of topics to consumer.
*
* <p>Topic names (https://pulsar.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Topics)
* are in the following format:
@@ -145,7 +144,8 @@ public class PulsarSourceBuilder<T> {
*/
public PulsarSourceBuilder<T> topicsPatternString(String topicsPattern) {
Preconditions.checkArgument(StringUtils.isNotBlank(topicsPattern), "Topics pattern string cannot be blank");
- Preconditions.checkArgument(this.consumerConfigurationData.getTopicsPattern() == null, "Pattern has already been set.");
+ Preconditions.checkArgument(this.consumerConfigurationData.getTopicsPattern() == null,
+ "Pattern has already been set.");
this.consumerConfigurationData.setTopicsPattern(Pattern.compile(topicsPattern));
return this;
}
@@ -164,13 +164,14 @@ public class PulsarSourceBuilder<T> {
}
/**
- * Sets the subscription initial position for the topic consumer. Default is {@link SubscriptionInitialPosition#Latest}
+ * Sets the subscription initial position for the topic consumer.
+ * Default is {@link SubscriptionInitialPosition#Latest}
*
* @param initialPosition the subscription initial position.
* @return this builder
*/
public PulsarSourceBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition initialPosition) {
- Preconditions.checkNotNull(initialPosition,"subscription initial position cannot be null");
+ Preconditions.checkNotNull(initialPosition, "subscription initial position cannot be null");
this.consumerConfigurationData.setSubscriptionInitialPosition(initialPosition);
return this;
}
@@ -187,7 +188,8 @@ public class PulsarSourceBuilder<T> {
acknowledgementBatchSize = size;
return this;
}
- throw new IllegalArgumentException("acknowledgementBatchSize can only take values > 0 and <= " + MAX_ACKNOWLEDGEMENT_BATCH_SIZE);
+ throw new IllegalArgumentException(
+ "acknowledgementBatchSize can only take values > 0 and <= " + MAX_ACKNOWLEDGEMENT_BATCH_SIZE);
}
/**
@@ -204,7 +206,7 @@ public class PulsarSourceBuilder<T> {
}
/**
- * Configure the authentication provider to use in the Pulsar client instance
+ * Configure the authentication provider to use in the Pulsar client instance.
*
* @param authPluginClassName
* name of the Authentication-Plugin to use
@@ -220,7 +222,8 @@ public class PulsarSourceBuilder<T> {
"Authentication-Plugin class name can not be blank");
Preconditions.checkArgument(StringUtils.isNotBlank(authParamsString),
"Authentication-Plugin parameters can not be blank");
- this.clientConfigurationData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString));
+ this.clientConfigurationData
+ .setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString));
return this;
}
@@ -240,7 +243,7 @@ public class PulsarSourceBuilder<T> {
throws PulsarClientException.UnsupportedAuthenticationException {
Preconditions.checkArgument(StringUtils.isNotBlank(authPluginClassName),
"Authentication-Plugin class name can not be blank");
- Preconditions.checkArgument((authParams != null && authParams.isEmpty() == false),
+ Preconditions.checkArgument((authParams != null && !authParams.isEmpty()),
"parameters to authentication plugin can not be null/empty");
this.clientConfigurationData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParams));
return this;
@@ -272,9 +275,9 @@ public class PulsarSourceBuilder<T> {
public SourceFunction<T> build() throws PulsarClientException{
Preconditions.checkArgument(StringUtils.isNotBlank(this.clientConfigurationData.getServiceUrl()),
"a service url is required");
- Preconditions.checkArgument((this.consumerConfigurationData.getTopicNames() != null &&
- !this.consumerConfigurationData.getTopicNames().isEmpty()) ||
- this.consumerConfigurationData.getTopicsPattern() != null,
+ Preconditions.checkArgument((this.consumerConfigurationData.getTopicNames() != null
+ && !this.consumerConfigurationData.getTopicNames().isEmpty())
+ || this.consumerConfigurationData.getTopicsPattern() != null,
"At least one topic or topics pattern is required");
Preconditions.checkArgument(StringUtils.isNotBlank(this.consumerConfigurationData.getSubscriptionName()),
"a subscription name is required");
@@ -290,8 +293,9 @@ public class PulsarSourceBuilder<T> {
private void setAuth() throws PulsarClientException{
if (StringUtils.isBlank(this.clientConfigurationData.getAuthPluginClassName())
- || StringUtils.isBlank(this.clientConfigurationData.getAuthParams()))
+ || StringUtils.isBlank(this.clientConfigurationData.getAuthParams())) {
return;
+ }
clientConfigurationData.setAuthentication(
AuthenticationFactory.create(
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
index 274084e..b89b60b 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
@@ -59,7 +59,7 @@ public abstract class PulsarTableSink implements AppendStreamTableSink<Row> {
checkNotNull(topic, "Topic is null");
this.clientConfigurationData.setServiceUrl(serviceUrl);
this.clientConfigurationData.setAuthentication(authentication);
- this.producerConfigurationData.setTopicName(topic);;
+ this.producerConfigurationData.setTopicName(topic);
this.routingKeyFieldName = routingKeyFieldName;
}
@@ -91,11 +91,11 @@ public abstract class PulsarTableSink implements AppendStreamTableSink<Row> {
* Returns the low-level producer.
*/
protected FlinkPulsarProducer<Row> createFlinkPulsarProducer() {
- return new FlinkPulsarProducer<Row>(
- clientConfigurationData,
- producerConfigurationData,
- serializationSchema,
- keyExtractor);
+ return new FlinkPulsarProducer<>(
+ clientConfigurationData,
+ producerConfigurationData,
+ serializationSchema,
+ keyExtractor);
}
@Override
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/package-info.java
similarity index 67%
copy from pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
copy to pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/package-info.java
index 270892e..6ed4a01 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/package-info.java
@@ -16,23 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.flink.streaming.connectors.pulsar.partitioner;
-
-import java.io.Serializable;
-
/**
- * Extract key from a value.
+ * Classes for implementing pulsar flink connector.
*/
-public interface PulsarKeyExtractor<IN> extends Serializable {
-
- PulsarKeyExtractor NULL = in -> null;
-
- /**
- * Retrieve a key from the value.
- *
- * @param in the value to extract a key.
- * @return key.
- */
- String getKey(IN in);
-
-}
+package org.apache.flink.streaming.connectors.pulsar;
\ No newline at end of file
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
index 270892e..c8a858d 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
/**
* Extract key from a value.
*/
-public interface PulsarKeyExtractor<IN> extends Serializable {
+public interface PulsarKeyExtractor<T> extends Serializable {
PulsarKeyExtractor NULL = in -> null;
@@ -33,6 +33,6 @@ public interface PulsarKeyExtractor<IN> extends Serializable {
* @param in the value to extract a key.
* @return key.
*/
- String getKey(IN in);
+ String getKey(T in);
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/package-info.java
similarity index 73%
copy from pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
copy to pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/package-info.java
index 270892e..7a25be1 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/package-info.java
@@ -16,23 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.flink.streaming.connectors.pulsar.partitioner;
-
-import java.io.Serializable;
-
/**
- * Extract key from a value.
+ * Classes for implementing key extractors.
*/
-public interface PulsarKeyExtractor<IN> extends Serializable {
-
- PulsarKeyExtractor NULL = in -> null;
-
- /**
- * Retrieve a key from the value.
- *
- * @param in the value to extract a key.
- * @return key.
- */
- String getKey(IN in);
-
-}
+package org.apache.flink.streaming.connectors.pulsar.partitioner;
\ No newline at end of file