You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sh...@apache.org on 2022/02/19 01:26:54 UTC

[pulsar] branch master updated: Enable pulsar-functions-api checkstyle (#14347)

This is an automated email from the ASF dual-hosted git repository.

shoothzj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d972990  Enable pulsar-functions-api checkstyle (#14347)
d972990 is described below

commit d972990d251a2c8467e68a38768b9a2d516e6be2
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Sat Feb 19 09:24:13 2022 +0800

    Enable pulsar-functions-api checkstyle (#14347)
    
    ### Motivation
    
    Enable CheckStyle plugin in pulsar-functions-api module.
    
    ### Modifications
    
    Fix the wrong code style in pulsar-functions-api module.
    
    
    ### Verifying this change
    The maven-checkstyle-plugin is added to pom.xml so
    that the code style will be checked in mvn clean install.
    
    Co-authored-by: gavingaozhangmin <ga...@didiglobal.com>
---
 pulsar-functions/api-java/pom.xml                  | 13 +++++++++++
 .../apache/pulsar/functions/api/BaseContext.java   | 25 +++++++++++-----------
 .../org/apache/pulsar/functions/api/Context.java   | 17 +++++++--------
 .../org/apache/pulsar/functions/api/Function.java  |  6 +++---
 .../org/apache/pulsar/functions/api/SerDe.java     |  2 +-
 .../apache/pulsar/functions/api/WindowContext.java | 14 ++++++------
 .../pulsar/functions/api/WindowFunction.java       |  6 +++---
 .../apache/pulsar/functions/api/package-info.java  |  2 +-
 .../functions/api/state/ByteBufferStateStore.java  |  6 +++---
 .../functions/api/state/CounterStateStore.java     |  4 ++--
 .../pulsar/functions/api/state/package-info.java   |  2 +-
 .../pulsar/functions/api/utils/JavaSerDe.java      |  8 +++----
 .../api/{state => utils}/package-info.java         |  2 +-
 .../pulsar/functions/api/utils/JavaSerDeTest.java  |  1 -
 14 files changed, 59 insertions(+), 49 deletions(-)

diff --git a/pulsar-functions/api-java/pom.xml b/pulsar-functions/api-java/pom.xml
index 56cf952..f1f242d 100644
--- a/pulsar-functions/api-java/pom.xml
+++ b/pulsar-functions/api-java/pom.xml
@@ -77,6 +77,19 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <executions>
+            <execution>
+                <id>checkstyle</id>
+                <phase>verify</phase>
+                <goals>
+                    <goal>check</goal>
+                </goals>
+            </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
index 5bd7597..2b9a4e3 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/BaseContext.java
@@ -18,15 +18,14 @@
  */
 package org.apache.pulsar.functions.api;
 
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 import org.slf4j.Logger;
 
-import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
-
 /**
  * BaseContext provides base contextual information to the executing function/source/sink.
  * It allows to propagate information, such as pulsar environment, logs, metrics, states etc.
@@ -81,13 +80,13 @@ public interface BaseContext {
      * Get the state store with the provided store name in the tenant & namespace.
      *
      * @param name the state store name
-     * @param <S> the type of interface of the store to return
+     * @param <X> the type of interface of the store to return
      * @return the state store instance.
      *
      * @throws ClassCastException if the return type isn't a type
      * or interface of the actual returned store.
      */
-    default <S extends StateStore> S getStateStore(String name) {
+    default <X extends StateStore> X getStateStore(String name) {
         throw new UnsupportedOperationException("Component cannot get state store");
     }
 
@@ -97,13 +96,13 @@ public interface BaseContext {
      * @param tenant the state tenant name
      * @param ns the state namespace name
      * @param name the state store name
-     * @param <S> the type of interface of the store to return
+     * @param <X> the type of interface of the store to return
      * @return the state store instance.
      *
      * @throws ClassCastException if the return type isn't a type
      * or interface of the actual returned store.
      */
-    default <S extends StateStore> S getStateStore(String tenant, String ns, String name) {
+    default <X extends StateStore> X getStateStore(String tenant, String ns, String name) {
         throw new UnsupportedOperationException("Component cannot get state store");
     }
 
@@ -116,7 +115,7 @@ public interface BaseContext {
     void putState(String key, ByteBuffer value);
 
     /**
-     * Update the state value for the key, but don't wait for the operation to be completed
+     * Update the state value for the key, but don't wait for the operation to be completed.
      *
      * @param key   name of the key
      * @param value state value of the key
@@ -132,7 +131,7 @@ public interface BaseContext {
     ByteBuffer getState(String key);
 
     /**
-     * Retrieve the state value for the key, but don't wait for the operation to be completed
+     * Retrieve the state value for the key, but don't wait for the operation to be completed.
      *
      * @param key name of the key
      * @return the state value for the key.
@@ -147,7 +146,7 @@ public interface BaseContext {
     void deleteState(String key);
 
     /**
-     * Delete the state value for the key, but don't wait for the operation to be completed
+     * Delete the state value for the key, but don't wait for the operation to be completed.
      *
      * @param key   name of the key
      */
@@ -163,7 +162,7 @@ public interface BaseContext {
 
     /**
      * Increment the builtin distributed counter referred by key
-     * but dont wait for the completion of the increment operation
+     * but dont wait for the completion of the increment operation.
      *
      * @param key    The name of the key
      * @param amount The amount to be incremented
@@ -180,7 +179,7 @@ public interface BaseContext {
 
     /**
      * Retrieve the counter value for the key, but don't wait
-     * for the operation to be completed
+     * for the operation to be completed.
      *
      * @param key name of the key
      * @return the amount of the counter value for this key
@@ -188,7 +187,7 @@ public interface BaseContext {
     CompletableFuture<Long> getCounterAsync(String key);
 
     /**
-     * Record a user defined metric
+     * Record a user defined metric.
      * @param metricName The name of the metric
      * @param value The value of the metric
      */
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 1a2175e..e45c2de 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -76,7 +75,7 @@ public interface Context extends BaseContext {
     String getFunctionName();
 
     /**
-     * The id of the function that we are executing
+     * The id of the function that we are executing.
      *
      * @return The function id
      */
@@ -130,7 +129,7 @@ public interface Context extends BaseContext {
      * @return A future that completes when the framework is done publishing the message
      * @deprecated in favor of using {@link #newOutputMessage(String, Schema)}
      */
-    <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName);
+    <X> CompletableFuture<Void> publish(String topicName, X object, String schemaOrSerdeClassName);
 
     /**
      * Publish an object to the topic using default schemas.
@@ -140,26 +139,26 @@ public interface Context extends BaseContext {
      * @return A future that completes when the framework is done publishing the message
      * @deprecated in favor of using {@link #newOutputMessage(String, Schema)}
      */
-    <O> CompletableFuture<Void> publish(String topicName, O object);
+    <X> CompletableFuture<Void> publish(String topicName, X object);
 
     /**
-     * New output message using schema for serializing to the topic
+     * New output message using schema for serializing to the topic.
      *
      * @param topicName The name of the topic for output message
      * @param schema provide a way to convert between serialized data and domain objects
-     * @param <O>
+     * @param <X>
      * @return the message builder instance
      * @throws PulsarClientException
      */
-    <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException;
+    <X> TypedMessageBuilder<X> newOutputMessage(String topicName, Schema<X> schema) throws PulsarClientException;
 
     /**
      * Create a ConsumerBuilder with the schema.
      *
      * @param schema provide a way to convert between serialized data and domain objects
-     * @param <O>
+     * @param <X>
      * @return the consumer builder instance
      * @throws PulsarClientException
      */
-    <O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException;
+    <X> ConsumerBuilder<X> newConsumerBuilder(Schema<X> schema) throws PulsarClientException;
 }
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java
index 467d29b..f9ba691 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java
@@ -31,13 +31,13 @@ import org.apache.pulsar.common.classification.InterfaceStability;
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 @FunctionalInterface
-public interface Function<I, O> {
+public interface Function<X, T> {
     /**
      * Process the input.
      *
      * @return the output
      */
-    O process(I input, Context context) throws Exception;
+    T process(X input, Context context) throws Exception;
 
     /**
      * Called once to initialize resources when function instance is started.
@@ -54,4 +54,4 @@ public interface Function<I, O> {
      * @throws Exception
      */
     default void close() throws Exception {}
-}
\ No newline at end of file
+}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
index eb12cf5..b0e3917 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/SerDe.java
@@ -29,4 +29,4 @@ import org.apache.pulsar.common.classification.InterfaceStability;
 public interface SerDe<T> {
     T deserialize(byte[] input);
     byte[] serialize(T input);
-}
\ No newline at end of file
+}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
index 54526d4..635e010 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
@@ -18,15 +18,14 @@
  */
 package org.apache.pulsar.functions.api;
 
-import org.apache.pulsar.common.classification.InterfaceAudience;
-import org.apache.pulsar.common.classification.InterfaceStability;
-import org.slf4j.Logger;
-
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+import org.slf4j.Logger;
 
 @InterfaceAudience.Public
 @InterfaceStability.Stable
@@ -181,10 +180,11 @@ public interface WindowContext {
      * @param object
      *            The object that needs to be published
      * @param schemaOrSerdeClassName
-     *            Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name of the custom schema class
+     *            Either a builtin schema type (eg: "avro", "json", "protobuf")
+     *            or the class name of the custom schema class
      * @return A future that completes when the framework is done publishing the message
      */
-    <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName);
+    <T> CompletableFuture<Void> publish(String topicName, T object, String schemaOrSerdeClassName);
 
     /**
      * Publish an object to the topic using default schemas.
@@ -193,5 +193,5 @@ public interface WindowContext {
      * @param object The object that needs to be published
      * @return A future that completes when the framework is done publishing the message
      */
-    <O> CompletableFuture<Void> publish(String topicName, O object);
+    <T> CompletableFuture<Void> publish(String topicName, T object);
 }
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java
index 85ad7de..7aa3e45 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java
@@ -29,11 +29,11 @@ import org.apache.pulsar.common.classification.InterfaceStability;
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 @FunctionalInterface
-public interface WindowFunction<I, O> {
+public interface WindowFunction<X, T> {
     /**
      * Process the input.
      *
      * @return the output
      */
-    O process(Collection<Record<I>> input, WindowContext context) throws Exception;
-}
\ No newline at end of file
+    T process(Collection<Record<X>> input, WindowContext context) throws Exception;
+}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/package-info.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/package-info.java
index 45244df..311f80b 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/package-info.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/package-info.java
@@ -20,4 +20,4 @@
 /**
  * Provides a simple model for enabling lightweight computation on Apache Pulsar.
  */
-package org.apache.pulsar.functions.api;
\ No newline at end of file
+package org.apache.pulsar.functions.api;
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
index 7de1ef1..22c4a86 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
@@ -36,7 +36,7 @@ public interface ByteBufferStateStore extends StateStore {
     void put(String key, ByteBuffer value);
 
     /**
-     * Update the state value for the key, but don't wait for the operation to be completed
+     * Update the state value for the key, but don't wait for the operation to be completed.
      *
      * @param key   name of the key
      * @param value state value of the key
@@ -51,7 +51,7 @@ public interface ByteBufferStateStore extends StateStore {
     void delete(String key);
 
     /**
-     * Delete the state value for the key, but don't wait for the operation to be completed
+     * Delete the state value for the key, but don't wait for the operation to be completed.
      *
      * @param key   name of the key
      */
@@ -66,7 +66,7 @@ public interface ByteBufferStateStore extends StateStore {
     ByteBuffer get(String key);
 
     /**
-     * Retrieve the state value for the key, but don't wait for the operation to be completed
+     * Retrieve the state value for the key, but don't wait for the operation to be completed.
      *
      * @param key name of the key
      * @return the state value for the key.
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/CounterStateStore.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/CounterStateStore.java
index 548c60b0..c90b093 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/CounterStateStore.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/CounterStateStore.java
@@ -36,7 +36,7 @@ public interface CounterStateStore extends StateStore {
 
     /**
      * Increment the builtin distributed counter referred by key
-     * but dont wait for the completion of the increment operation
+     * but dont wait for the completion of the increment operation.
      *
      * @param key    The name of the key
      * @param amount The amount to be incremented
@@ -53,7 +53,7 @@ public interface CounterStateStore extends StateStore {
 
     /**
      * Retrieve the counter value for the key, but don't wait
-     * for the operation to be completed
+     * for the operation to be completed.
      *
      * @param key name of the key
      * @return the amount of the counter value for this key
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/package-info.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/package-info.java
index 1059832..ddcab42 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/package-info.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/package-info.java
@@ -19,4 +19,4 @@
 /**
  * State Store API.
  */
-package org.apache.pulsar.functions.api.state;
\ No newline at end of file
+package org.apache.pulsar.functions.api.state;
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/JavaSerDe.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/JavaSerDe.java
index 0bf7403..fec81fb 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/JavaSerDe.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/JavaSerDe.java
@@ -29,19 +29,19 @@ import org.apache.pulsar.common.classification.InterfaceStability;
 import org.apache.pulsar.functions.api.SerDe;
 
 /**
- * Java Serialization based SerDe
+ * Java Serialization based SerDe.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 @Slf4j
 public class JavaSerDe implements SerDe<Object> {
 
+    private static final JavaSerDe INSTANCE = new JavaSerDe();
+
     public static JavaSerDe of() {
         return INSTANCE;
     }
 
-    private static final JavaSerDe INSTANCE = new JavaSerDe();
-
     @Override
     public byte[] serialize(Object resultValue) {
         try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -66,4 +66,4 @@ public class JavaSerDe implements SerDe<Object> {
         }
         return obj;
     }
-}
\ No newline at end of file
+}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/package-info.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/package-info.java
similarity index 94%
copy from pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/package-info.java
copy to pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/package-info.java
index 1059832..7374796b 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/package-info.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/utils/package-info.java
@@ -19,4 +19,4 @@
 /**
  * State Store API.
  */
-package org.apache.pulsar.functions.api.state;
\ No newline at end of file
+package org.apache.pulsar.functions.api.utils;
diff --git a/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/JavaSerDeTest.java b/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/JavaSerDeTest.java
index 20821b2..29aacfe 100644
--- a/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/JavaSerDeTest.java
+++ b/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/JavaSerDeTest.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.functions.api.utils;
 
 import static org.testng.Assert.assertEquals;
-
 import java.io.Serializable;
 import lombok.AllArgsConstructor;
 import lombok.Data;