You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/09 01:46:47 UTC

[pulsar] branch master updated: [pulsar-functions] fix words error and remove not use import java class (#3791)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f03548  [pulsar-functions] fix words error and remove not use import java class (#3791)
4f03548 is described below

commit 4f03548c5ec468ecb7f63a64990bf17639c33c70
Author: wpl <12...@qq.com>
AuthorDate: Sat Mar 9 09:46:43 2019 +0800

    [pulsar-functions] fix words error and remove not use import java class (#3791)
    
    ### Motivation
       when I study and read this module code , I fix some error like:
    1. words error, such as JaveInstance -> JavaInstance, Recieved -> Received,  Unknwon -> Unknown, alterted -> alterted, Updare -> Update, etc.
    2. remove not use import java class.
    3. duplicated code in this  java class I extract method.
    4. javadoc  styles are consistent (not required)
    
    ### Modifications
    
    minor fix up in pulsar-functions module
---
 .../org/apache/pulsar/functions/api/Context.java   | 52 +++++++++++++-------
 .../org/apache/pulsar/functions/api/Function.java  |  1 +
 .../org/apache/pulsar/functions/api/Record.java    | 12 ++---
 .../apache/pulsar/functions/api/WindowContext.java | 49 ++++++++++++-------
 .../pulsar/functions/api/WindowFunction.java       |  1 +
 .../pulsar/functions/api/utils/JavaSerDeTest.java  |  1 -
 .../pulsar/functions/instance/ContextImpl.java     |  3 +-
 .../functions/instance/FunctionResultRouter.java   |  2 +-
 .../pulsar/functions/instance/InstanceConfig.java  |  2 +-
 .../pulsar/functions/instance/JavaInstance.java    |  1 -
 .../functions/instance/JavaInstanceRunnable.java   |  3 +-
 .../pulsar/functions/instance/LogAppender.java     |  1 -
 .../instance/stats/ComponentStatsManager.java      |  1 -
 .../instance/stats/FunctionStatsManager.java       | 15 +++---
 .../functions/instance/stats/SinkStatsManager.java | 15 +++---
 .../instance/stats/SourceStatsManager.java         | 15 +++---
 .../pulsar/functions/source/PulsarSource.java      |  3 --
 .../pulsar/functions/source/TopicSchema.java       |  2 +-
 .../pulsar/functions/sink/PulsarSinkTest.java      | 57 +++++++++-------------
 .../pulsar/functions/source/PulsarSourceTest.java  | 20 +-------
 .../functions/api/examples/ByteBufferSerDe.java    |  2 +-
 .../api/examples/CommaWindowFunction.java          |  2 +-
 .../api/examples/ConfigBasedAppendFunction.java    |  2 +-
 .../functions/api/examples/CustomBaseSerde.java    |  2 +-
 .../api/examples/CustomBaseToBaseFunction.java     |  2 +-
 .../api/examples/CustomBaseToDerivedFunction.java  |  2 +-
 .../functions/api/examples/CustomDerivedSerde.java |  2 +-
 .../examples/CustomDerivedToDerivedFunction.java   |  2 +-
 .../api/examples/CustomObjectFunction.java         |  2 +-
 .../api/examples/ExclamationFunction.java          |  2 +-
 .../api/examples/HostAppenderFunction.java         |  2 +-
 .../api/examples/InstanceIdAppenderFunction.java   |  2 +-
 .../api/examples/IntegerAdditionFunction.java      |  2 +-
 .../functions/api/examples/LoggingFunction.java    |  2 +-
 .../functions/api/examples/PublishFunction.java    |  2 +-
 .../functions/api/examples/UserConfigFunction.java |  2 +-
 .../api/examples/UserPublishFunction.java          |  2 +-
 .../api/examples/WindowDurationFunction.java       |  2 +-
 .../pulsar/functions/api/examples/pojo/Tick.java   |  2 +-
 .../functions/api/examples/serde/CustomObject.java |  2 +-
 .../api/examples/serde/CustomObjectSerde.java      |  2 +-
 .../pulsar/functions/runtime/JavaInstanceMain.java |  4 +-
 .../functions/runtime/KubernetesRuntime.java       |  6 +--
 .../pulsar/functions/runtime/LocalRunner.java      |  1 -
 .../pulsar/functions/runtime/ProcessRuntime.java   |  2 +-
 .../functions/runtime/ProcessRuntimeFactory.java   |  2 +-
 .../pulsar/functions/runtime/RuntimeSpawner.java   |  6 +--
 .../pulsar/functions/runtime/RuntimeUtils.java     |  4 +-
 .../pulsar/functions/runtime/ThreadRuntime.java    |  2 +-
 .../secretsprovider/ClearTextSecretsProvider.java  |  5 +-
 .../EnvironmentBasedSecretsProvider.java           |  3 +-
 .../functions/secretsprovider/SecretsProvider.java |  8 +--
 .../DefaultSecretsProviderConfigurator.java        |  2 +-
 .../KubernetesSecretsProviderConfigurator.java     |  2 +-
 .../SecretsProviderConfigurator.java               | 16 +++---
 .../EnvironmentBasedSecretsProviderTest.java       |  5 --
 .../functions/utils/FunctionConfigUtils.java       |  2 +-
 .../apache/pulsar/functions/utils/Reflections.java | 14 ++++--
 .../pulsar/functions/utils/SinkConfigUtils.java    |  3 +-
 .../pulsar/functions/utils/SourceConfigUtils.java  |  2 +-
 .../org/apache/pulsar/functions/utils/Utils.java   |  3 +-
 .../functions/utils/FunctionConfigUtilsTest.java   |  2 +-
 .../pulsar/functions/utils/ReflectionsTest.java    |  2 +-
 .../functions/utils/SinkConfigUtilsTest.java       |  2 +-
 .../functions/utils/SourceConfigUtilsTest.java     |  2 +-
 .../apache/pulsar/functions/windowing/Event.java   |  4 +-
 .../functions/windowing/EvictionContext.java       |  6 +--
 .../pulsar/functions/windowing/EvictionPolicy.java |  9 ++--
 .../windowing/WaterMarkEventGenerator.java         |  3 +-
 .../pulsar/functions/windowing/WindowManager.java  |  5 +-
 .../windowing/evictors/TimeEvictionPolicy.java     |  2 +-
 .../functions/worker/FunctionAssignmentTailer.java |  1 -
 .../functions/worker/FunctionRuntimeManager.java   |  2 +-
 .../org/apache/pulsar/functions/worker/Worker.java |  2 -
 .../pulsar/functions/worker/WorkerConfig.java      |  4 +-
 .../pulsar/functions/worker/WorkerService.java     |  3 +-
 .../functions/worker/dlog/DLInputStream.java       |  5 +-
 .../functions/worker/rest/FunctionApiResource.java |  1 -
 .../pulsar/functions/worker/rest/WorkerServer.java |  1 -
 .../functions/worker/rest/api/ComponentImpl.java   |  2 -
 .../functions/worker/rest/api/FunctionsImpl.java   | 28 +++++------
 .../pulsar/functions/worker/rest/api/SinkImpl.java | 28 +++++------
 .../worker/rest/api/v3/SourceApiV3Resource.java    |  1 -
 .../worker/scheduler/RoundRobinScheduler.java      |  6 +--
 .../rest/api/v2/FunctionApiV2ResourceTest.java     |  1 -
 .../rest/api/v3/FunctionApiV3ResourceTest.java     |  1 -
 86 files changed, 253 insertions(+), 260 deletions(-)

diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index e9b79c3..63cbc9e 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -34,43 +34,50 @@ import java.util.concurrent.CompletableFuture;
  */
 public interface Context {
     /**
-     * Access the record associated with the current input value
+     * Access the record associated with the current input value.
+     *
      * @return
      */
     Record<?> getCurrentRecord();
 
     /**
-     * Get a list of all input topics
+     * Get a list of all input topics.
+     *
      * @return a list of all input topics
      */
     Collection<String> getInputTopics();
 
     /**
-     * Get the output topic of the function
+     * Get the output topic of the function.
+     *
      * @return output topic name
      */
     String getOutputTopic();
 
     /**
-     * Get output schema builtin type or custom class name
+     * Get output schema builtin type or custom class name.
+     *
      * @return output schema builtin type or custom class name
      */
     String getOutputSchemaType();
 
     /**
-     * The tenant this function belongs to
+     * The tenant this function belongs to.
+     *
      * @return the tenant this function belongs to
      */
     String getTenant();
 
     /**
-     * The namespace this function belongs to
+     * The namespace this function belongs to.
+     *
      * @return the namespace this function belongs to
      */
     String getNamespace();
 
     /**
-     * The name of the function that we are executing
+     * The name of the function that we are executing.
+     *
      * @return The Function name
      */
     String getFunctionName();
@@ -96,19 +103,22 @@ public interface Context {
     int getNumInstances();
 
     /**
-     * The version of the function that we are executing
+     * The version of the function that we are executing.
+     *
      * @return The version id
      */
     String getFunctionVersion();
 
     /**
-     * The logger object that can be used to log in a function
+     * The logger object that can be used to log in a function.
+     *
      * @return the logger object
      */
     Logger getLogger();
 
     /**
-     * Increment the builtin distributed counter refered by key
+     * Increment the builtin distributed counter referred by key.
+     *
      * @param key The name of the key
      * @param amount The amount to be incremented
      */
@@ -123,7 +133,7 @@ public interface Context {
     long getCounter(String key);
 
     /**
-     * Updare the state value for the key.
+     * Update the state value for the key.
      *
      * @param key name of the key
      * @param value state value of the key
@@ -139,20 +149,23 @@ public interface Context {
     ByteBuffer getState(String key);
 
     /**
-     * Get a map of all user-defined key/value configs for the function
+     * Get a map of all user-defined key/value configs for the function.
+     *
      * @return The full map of user-defined config values
      */
     Map<String, Object> getUserConfigMap();
 
     /**
-     * Get any user-defined key/value
+     * Get any user-defined key/value.
+     *
      * @param key The key
      * @return The Optional value specified by the user for that key.
      */
     Optional<Object> getUserConfigValue(String key);
 
     /**
-     * Get any user-defined key/value or a default value if none is present
+     * Get any user-defined key/value or a default value if none is present.
+     *
      * @param key
      * @param defaultValue
      * @return Either the user config value associated with a given key or a supplied default value
@@ -160,21 +173,23 @@ public interface Context {
     Object getUserConfigValueOrDefault(String key, Object defaultValue);
 
     /**
-     * Get the secret associated with this key
+     * Get the secret associated with this key.
+     *
      * @param secretName The name of the secret
      * @return The secret if anything was found or null
      */
     String getSecret(String secretName);
 
     /**
-     * Record a user defined metric
+     * Record a user defined metric.
+     *
      * @param metricName The name of the metric
      * @param value The value of the metric
      */
     void recordMetric(String metricName, double value);
 
     /**
-     * Publish an object using serDe for serializing to the topic
+     * Publish an object using serDe for serializing to the topic.
      *
      * @param topicName
      *            The name of the topic for publishing
@@ -187,7 +202,8 @@ public interface Context {
     <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName);
 
     /**
-     * Publish an object to the topic using default schemas
+     * Publish an object to the topic using default schemas.
+     *
      * @param topicName The name of the topic for publishing
      * @param object The object that needs to be published
      * @return A future that completes when the framework is done publishing the message
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java
index ca292eb..08b81ba 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java
@@ -29,6 +29,7 @@ package org.apache.pulsar.functions.api;
 public interface Function<I, O> {
     /**
      * Process the input.
+     *
      * @return the output
      */
     O process(I input, Context context) throws Exception;
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
index cb03cb2..b99c4dc 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
@@ -28,21 +28,21 @@ import java.util.Optional;
 public interface Record<T> {
 
     /**
-     * If the record originated from a topic, report the topic name
+     * If the record originated from a topic, report the topic name.
      */
     default Optional<String> getTopicName() {
         return Optional.empty();
     }
 
     /**
-     * Return a key if the key has one associated
+     * Return a key if the key has one associated.
      */
     default Optional<String> getKey() {
         return Optional.empty();
     }
 
     /**
-     * Retrieves the actual data of the record
+     * Retrieves the actual data of the record.
      *
      * @return The record data
      */
@@ -85,19 +85,19 @@ public interface Record<T> {
     }
 
     /**
-     * Acknowledge that this record is fully processed
+     * Acknowledge that this record is fully processed.
      */
     default void ack() {
     }
 
     /**
-     * To indicate that this record has failed to be processed
+     * To indicate that this record has failed to be processed.
      */
     default void fail() {
     }
 
     /**
-     * To support message routing on a per message basis
+     * To support message routing on a per message basis.
      *
      * @return The topic this message should be written to
      */
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
index 15ef591..9effdb9 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
@@ -29,25 +29,29 @@ import java.util.concurrent.CompletableFuture;
 public interface WindowContext {
 
     /**
-     * The tenant this function belongs to
+     * The tenant this function belongs to.
+     *
      * @return the tenant this function belongs to
      */
     String getTenant();
 
     /**
-     * The namespace this function belongs to
+     * The namespace this function belongs to.
+     *
      * @return the namespace this function belongs to
      */
     String getNamespace();
 
     /**
-     * The name of the function that we are executing
+     * The name of the function that we are executing.
+     *
      * @return The Function name
      */
     String getFunctionName();
 
     /**
-     * The id of the function that we are executing
+     * The id of the function that we are executing.
+     *
      * @return The function id
      */
     String getFunctionId();
@@ -67,37 +71,43 @@ public interface WindowContext {
     int getNumInstances();
 
     /**
-     * The version of the function that we are executing
+     * The version of the function that we are executing.
+     *
      * @return The version id
      */
     String getFunctionVersion();
 
     /**
-     * Get a list of all input topics
+     * Get a list of all input topics.
+     *
      * @return a list of all input topics
      */
     Collection<String> getInputTopics();
 
     /**
-     * Get the output topic of the function
+     * Get the output topic of the function.
+     *
      * @return output topic name
      */
     String getOutputTopic();
 
     /**
-     * Get output schema builtin type or custom class name
+     * Get output schema builtin type or custom class name.
+     *
      * @return output schema builtin type or custom class name
      */
     String getOutputSchemaType();
 
     /**
-     * The logger object that can be used to log in a function
+     * The logger object that can be used to log in a function.
+     *
      * @return the logger object
      */
     Logger getLogger();
 
     /**
-     * Increment the builtin distributed counter refered by key
+     * Increment the builtin distributed counter referred by key.
+     *
      * @param key The name of the key
      * @param amount The amount to be incremented
      */
@@ -112,7 +122,7 @@ public interface WindowContext {
     long getCounter(String key);
 
     /**
-     * Updare the state value for the key.
+     * Update the state value for the key.
      *
      * @param key name of the key
      * @param value state value of the key
@@ -128,20 +138,23 @@ public interface WindowContext {
     ByteBuffer getState(String key);
 
     /**
-     * Get a map of all user-defined key/value configs for the function
+     * Get a map of all user-defined key/value configs for the function.
+     *
      * @return The full map of user-defined config values
      */
     Map<String, Object> getUserConfigMap();
 
     /**
-     * Get any user-defined key/value
+     * Get any user-defined key/value.
+     *
      * @param key The key
      * @return The Optional value specified by the user for that key.
      */
     Optional<Object> getUserConfigValue(String key);
 
     /**
-     * Get any user-defined key/value or a default value if none is present
+     * Get any user-defined key/value or a default value if none is present.
+     *
      * @param key
      * @param defaultValue
      * @return Either the user config value associated with a given key or a supplied default value
@@ -149,14 +162,15 @@ public interface WindowContext {
     Object getUserConfigValueOrDefault(String key, Object defaultValue);
 
     /**
-     * Record a user defined metric
+     * Record a user defined metric.
+     *
      * @param metricName The name of the metric
      * @param value The value of the metric
      */
     void recordMetric(String metricName, double value);
 
     /**
-     * Publish an object using serDe for serializing to the topic
+     * Publish an object using serDe for serializing to the topic.
      *
      * @param topicName
      *            The name of the topic for publishing
@@ -169,7 +183,8 @@ public interface WindowContext {
     <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName);
 
     /**
-     * Publish an object to the topic using default schemas
+     * Publish an object to the topic using default schemas.
+     *
      * @param topicName The name of the topic for publishing
      * @param object The object that needs to be published
      * @return A future that completes when the framework is done publishing the message
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java
index 6f2c421..111c570 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java
@@ -28,6 +28,7 @@ import java.util.Collection;
 public interface WindowFunction<I, O> {
     /**
      * Process the input.
+     *
      * @return the output
      */
     O process(Collection<Record<I>> input, WindowContext context) throws Exception;
diff --git a/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/JavaSerDeTest.java b/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/JavaSerDeTest.java
index 36e6bfc..cb067e8 100644
--- a/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/JavaSerDeTest.java
+++ b/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/JavaSerDeTest.java
@@ -25,7 +25,6 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import org.apache.pulsar.functions.api.utils.JavaSerDe;
 import org.testng.annotations.Test;
 
 /**
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 60b8ec0..2cb0537 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -66,7 +66,6 @@ import static org.apache.pulsar.functions.instance.stats.FunctionStatsManager.US
 /**
  * This class implements the Context interface exposed to the user.
  */
-
 class ContextImpl implements Context, SinkContext, SourceContext {
     private InstanceConfig config;
     private Logger logger;
@@ -215,7 +214,7 @@ class ContextImpl implements Context, SinkContext, SourceContext {
 
     @Override
     public String getFunctionId() {
-        return config.getFunctionId().toString();
+        return config.getFunctionId();
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java
index be06ee3..aeeaf6c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java
@@ -28,7 +28,7 @@ import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl;
 
 /**
- * Router for routing function results
+ * Router for routing function results.
  */
 public class FunctionResultRouter extends RoundRobinPartitionMessageRouterImpl {
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index 51ec0d4..023f6a2 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -27,7 +27,7 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 
 /**
  * This is the config passed to the Java Instance. Contains all the information
- * passed to run functions
+ * passed to run functions.
  */
 @Data
 @Getter
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index acd0781..8aee702 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -24,7 +24,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
 
 import java.util.Map;
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index c9bf644..8076e0f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -56,7 +56,6 @@ import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
-import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -219,7 +218,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
     }
 
     /**
-     * The core logic that initialize the instance thread and executes the function
+     * The core logic that initialize the instance thread and executes the function.
      */
     @Override
     public void run() {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
index afdc6f9..107b224 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.functions.instance;
 import org.apache.logging.log4j.core.*;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 
 import java.io.Serializable;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
index 38a5de6..0c9b9af 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.functions.instance.stats;
 import com.google.common.collect.EvictingQueue;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.exporter.common.TextFormat;
-import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.utils.Utils;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index e4e03f8..b35c32a 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -243,9 +243,7 @@ public class FunctionStatsManager extends ComponentStatsManager{
 
         // report exception throw prometheus
         if (userExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
             userExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
@@ -257,13 +255,18 @@ public class FunctionStatsManager extends ComponentStatsManager{
 
         // report exception throw prometheus
         if (sysExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
             sysExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
 
+    private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
+        String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
+        exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
+        exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+        return exceptionMetricsLabels;
+    }
+
     @Override
     public void incrTotalReceived() {
         _statTotalRecordsReceived.inc();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
index 1a23957..1b2d8b0 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
@@ -216,9 +216,7 @@ public class SinkStatsManager extends ComponentStatsManager {
 
         // report exception throw prometheus
         if (sysExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
             sysExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
@@ -241,13 +239,18 @@ public class SinkStatsManager extends ComponentStatsManager {
 
         // report exception throw prometheus
         if (sinkExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
             sinkExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
 
+    private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
+        String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
+        exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
+        exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+        return exceptionMetricsLabels;
+    }
+
     @Override
     public void setLastInvocation(long ts) {
         _statlastInvocation.set(ts);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
index 87a60a3..aac6720 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
@@ -215,9 +215,7 @@ public class SourceStatsManager extends ComponentStatsManager {
 
         // report exception throw prometheus
         if (sysExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
             sysExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
@@ -235,13 +233,18 @@ public class SourceStatsManager extends ComponentStatsManager {
 
         // report exception throw prometheus
         if (sourceExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
-            exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
             sourceExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
 
+    private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
+        String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
+        exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
+        exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+        return exceptionMetricsLabels;
+    }
+
     @Override
     public void incrSinkExceptions(Throwable ex) {
         incrSysExceptions(ex);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index bad83b0..83e7566 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -33,12 +33,9 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
-import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 21f8b00..d8abcc6 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -47,7 +47,7 @@ public class TopicSchema {
     }
 
     /**
-     * If there is no other information available, use JSON as default schema type
+     * If there is no other information available, use JSON as default schema type.
      */
     private static final SchemaType DEFAULT_SCHEMA_TYPE = SchemaType.JSON;
 
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index caf5dec..1ef72c7 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -82,9 +82,8 @@ public class PulsarSinkTest {
     }
 
     /**
-     * Verify that JavaInstance does not support functions that take Void type as input
+     * Verify that JavaInstance does not support functions that take Void type as input.
      */
-
     private static PulsarClientImpl getPulsarClient() throws PulsarClientException {
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
         ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
@@ -160,7 +159,7 @@ public class PulsarSinkTest {
     }
 
     /**
-     * Verify that JavaInstance does support functions that output Void type
+     * Verify that JavaInstance does support functions that output Void type.
      */
     @Test
     public void testVoidOutputClasses() throws Exception {
@@ -283,11 +282,7 @@ public class PulsarSinkTest {
 
                 @Override
                 public Optional<String> getDestinationTopic() {
-                    if (topic != null) {
-                        return Optional.of(topic);
-                    } else {
-                        return Optional.empty();
-                    }
+                    return getTopicOptional(topic);
                 }
             }, "out1");
 
@@ -307,11 +302,7 @@ public class PulsarSinkTest {
                 @Override
                 public boolean matches(Object o) {
                     if (o instanceof String) {
-                        if (topic != null) {
-                            return topic.equals(o);
-                        } else {
-                            return defaultTopic.equals(o);
-                        }
+                        return getTopicEquals(o, topic, defaultTopic);
                     }
                     return false;
                 }
@@ -340,11 +331,7 @@ public class PulsarSinkTest {
 
                 @Override
                 public Optional<String> getDestinationTopic() {
-                    if (topic != null) {
-                        return Optional.of(topic);
-                    } else {
-                        return Optional.empty();
-                    }
+                    return getTopicOptional(topic);
                 }
             }, "out1");
 
@@ -364,11 +351,7 @@ public class PulsarSinkTest {
                 @Override
                 public boolean matches(Object o) {
                     if (o instanceof String) {
-                        if (topic != null) {
-                            return topic.equals(o);
-                        } else {
-                            return defaultTopic.equals(o);
-                        }
+                        return getTopicEquals(o, topic, defaultTopic);
                     }
                     return false;
                 }
@@ -397,11 +380,7 @@ public class PulsarSinkTest {
 
                 @Override
                 public Optional<String> getDestinationTopic() {
-                    if (topic != null) {
-                        return Optional.of(topic);
-                    } else {
-                        return Optional.empty();
-                    }
+                    return getTopicOptional(topic);
                 }
                 @Override
                 public Optional<String> getPartitionId() {
@@ -434,11 +413,7 @@ public class PulsarSinkTest {
                 @Override
                 public boolean matches(Object o) {
                     if (o instanceof String) {
-                        if (topic != null) {
-                            return topic.equals(o);
-                        } else {
-                            return defaultTopic.equals(o);
-                        }
+                        return getTopicEquals(o, topic, defaultTopic);
                     }
                     return false;
                 }
@@ -460,4 +435,20 @@ public class PulsarSinkTest {
         }
     }
 
+    private Optional<String> getTopicOptional(String topic) {
+        if (topic != null) {
+            return Optional.of(topic);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    private boolean getTopicEquals(Object o, String topic, String defaultTopic) {
+        if (topic != null) {
+            return topic.equals(o);
+        } else {
+            return defaultTopic.equals(o);
+        }
+    }
+
 }
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 88c9637..43b4bcb 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -46,7 +46,6 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.SourceContext;
 import org.testng.annotations.Test;
 
@@ -73,9 +72,8 @@ public class PulsarSourceTest {
     }
 
     /**
-     * Verify that JavaInstance does not support functions that take Void type as input
+     * Verify that JavaInstance does not support functions that take Void type as input.
      */
-
     private static PulsarClientImpl getPulsarClient() throws PulsarClientException {
         PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
         ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
@@ -182,22 +180,6 @@ public class PulsarSourceTest {
         pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
     }
 
-    /**
-     * Verify that Explicit setting of Default Serializer works fine.
-     */
-    @Test
-    public void testExplicitDefaultSerDe() throws Exception {
-        PulsarSourceConfig pulsarConfig = getPulsarConfigs();
-        // set type to void
-        pulsarConfig.setTypeClassName(String.class.getName());
-        consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
-                ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build());
-        pulsarConfig.setTopicSchema(consumerConfigs);
-        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
-
-        pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
-    }
-
     @Test
     public void testComplexOuputType() throws Exception {
         PulsarSourceConfig pulsarConfig = getPulsarConfigs();
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ByteBufferSerDe.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ByteBufferSerDe.java
index f433372..0a1eb2a 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ByteBufferSerDe.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ByteBufferSerDe.java
@@ -23,7 +23,7 @@ import org.apache.pulsar.functions.api.SerDe;
 import java.nio.ByteBuffer;
 
 /**
- * Simple ByteBuffer Serializer and Deserializer
+ * Simple ByteBuffer Serializer and Deserializer.
  */
 public class ByteBufferSerDe implements SerDe<Integer> {
     @Override
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CommaWindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CommaWindowFunction.java
index da696fd..c966fae 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CommaWindowFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CommaWindowFunction.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.functions.api.examples;
 import java.util.Collection;
 
 /**
- * Comma based aggregation window function example
+ * Comma based aggregation window function example.
  */
 public class CommaWindowFunction implements java.util.function.Function<Collection<String>, String> {
     @Override
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConfigBasedAppendFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConfigBasedAppendFunction.java
index 1edd943..8e9a475 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConfigBasedAppendFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConfigBasedAppendFunction.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.functions.api.Function;
 import java.util.Optional;
 
 /**
- * Function that appends something to incoming input based on config supplied
+ * Function that appends something to incoming input based on config supplied.
  */
 public class ConfigBasedAppendFunction implements Function<String, String> {
     @Override
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseSerde.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseSerde.java
index b8a6a57..da84a40 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseSerde.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseSerde.java
@@ -23,7 +23,7 @@ import org.apache.pulsar.functions.api.SerDe;
 import java.nio.ByteBuffer;
 
 /**
- * Example of using a byte buffer serialization for Custom object
+ * Example of using a byte buffer serialization for Custom object.
  */
 public class CustomBaseSerde implements SerDe<CustomBaseObject> {
     @Override
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToBaseFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToBaseFunction.java
index d3b9225..f482fd6 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToBaseFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToBaseFunction.java
@@ -22,7 +22,7 @@ import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 
 /**
- * Function example of processing on a custom object type
+ * Function example of processing on a custom object type.
  */
 public class CustomBaseToBaseFunction implements Function<CustomBaseObject, CustomBaseObject> {
 
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToDerivedFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToDerivedFunction.java
index 9e8cab5..41aa3e4 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToDerivedFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToDerivedFunction.java
@@ -22,7 +22,7 @@ import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 
 /**
- * Examplf of fucntion doing a type object conversion between input ann output type
+ * Example of function doing a type object conversion between input ann output type.
  */
 public class CustomBaseToDerivedFunction implements Function<CustomBaseObject, CustomDerivedObject> {
 
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedSerde.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedSerde.java
index 219cdad..6305730 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedSerde.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedSerde.java
@@ -23,7 +23,7 @@ import org.apache.pulsar.functions.api.SerDe;
 import java.nio.ByteBuffer;
 
 /**
- *  Example to derived object serialization
+ *  Example to derived object serialization.
  */
 public class CustomDerivedSerde implements SerDe<CustomDerivedObject> {
     @Override
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToDerivedFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToDerivedFunction.java
index a86c82d..3937910 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToDerivedFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToDerivedFunction.java
@@ -22,7 +22,7 @@ import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 
 /**
- * Example of 2nd order conversion from a base object for composition pipelines
+ * Example of 2nd order conversion from a base object for composition pipelines.
  */
 public class CustomDerivedToDerivedFunction implements Function<CustomDerivedObject, CustomDerivedObject> {
 
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomObjectFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomObjectFunction.java
index d2db0a6..d1df6ba 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomObjectFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomObjectFunction.java
@@ -23,7 +23,7 @@ import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.examples.serde.CustomObject;
 
 /**
- * Function that deals with custom objects
+ * Function that deals with custom objects.
  */
 public class CustomObjectFunction implements Function<CustomObject, CustomObject> {
 
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java
index 7d4d2ec..c2af6e5 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java
@@ -23,7 +23,7 @@ import org.apache.pulsar.functions.api.Function;
 
 /**
  * The classic Exclamation Function that appends an exclamation at the end
- * of the input
+ * of the input.
  */
 public class ExclamationFunction implements Function<String, String> {
     @Override
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/HostAppenderFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/HostAppenderFunction.java
index 2358e91..0a2527d 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/HostAppenderFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/HostAppenderFunction.java
@@ -25,7 +25,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 
 /**
- * Function that appends the host name to the payload message
+ * Function that appends the host name to the payload message.
  */
 public class HostAppenderFunction implements Function<String, String> {
 
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/InstanceIdAppenderFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/InstanceIdAppenderFunction.java
index 0a56386..e7127ed 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/InstanceIdAppenderFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/InstanceIdAppenderFunction.java
@@ -22,7 +22,7 @@ import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 
 /**
- * Function that appends the instance id to the payload message
+ * Function that appends the instance id to the payload message.
  */
 public class InstanceIdAppenderFunction implements Function<String, String> {
 
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/IntegerAdditionFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/IntegerAdditionFunction.java
index d6e8276..78c8bc7 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/IntegerAdditionFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/IntegerAdditionFunction.java
@@ -22,7 +22,7 @@ import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 
 /**
- * Function that appends to an integer input value and outputs the new value
+ * Function that appends to an integer input value and outputs the new value.
  */
 public class IntegerAdditionFunction implements Function<Integer, Integer> {
 
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
index 4b73678..9f6e775 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
@@ -25,7 +25,7 @@ import org.apache.pulsar.functions.api.Function;
  * A function that demonstrates how to redirect logging to a topic.
  * In this particular example, for every input string, the function
  * does some logging. If --logTopic topic is specified, these log statements
- * end up in that specified pulsar topic
+ * end up in that specified pulsar topic.
  */
 public class LoggingFunction implements Function<String, String> {
 
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
index 45cbd72..cda8331 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
@@ -23,7 +23,7 @@ import org.apache.pulsar.functions.api.Function;
 
 /**
  * Example function that uses the built in publish function in the context
- * to publish to a desired topic based on config
+ * to publish to a desired topic based on config.
  */
 public class PublishFunction implements Function<String, Void> {
     @Override
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
index add14d4..5f29b1d 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.functions.api.Function;
 import java.util.Optional;
 
 /**
- * An example demonstrate retrieving user config value from Context
+ * An example demonstrate retrieving user config value from Context.
  */
 public class UserConfigFunction implements Function<String, String> {
 
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
index 5bf04be..ec3fd95 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 
 /**
- * An example demonstrate publishing messages through Context
+ * An example demonstrate publishing messages through Context.
  */
 public class UserPublishFunction implements Function<String, Void> {
 
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
index cb0aa9f..8c0da3b 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.functions.api.examples;
 import java.util.Collection;
 
 /**
- * This functions collects the timestamp during the window operation
+ * This functions collects the timestamp during the window operation.
  */
 public class WindowDurationFunction implements java.util.function.Function<Collection<String>, String> {
     @Override
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java
index 67d056e..efff849 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java
@@ -23,7 +23,7 @@ import lombok.EqualsAndHashCode;
 import lombok.ToString;
 
 /**
- * Pojo to represent a stock tick
+ * Pojo to represent a stock tick.
  */
 @Data
 @ToString
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObject.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObject.java
index f50d10c..bd7fa10 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObject.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObject.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.functions.api.examples.serde;
 import lombok.*;
 
 /**
- * This class simulates a user defined POJO
+ * This class simulates a user defined POJO.
  */
 @Getter
 @Setter
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObjectSerde.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObjectSerde.java
index fd5bcdc..f7f76f9 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObjectSerde.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObjectSerde.java
@@ -23,7 +23,7 @@ import org.apache.pulsar.functions.api.SerDe;
 import java.nio.ByteBuffer;
 
 /**
- * This class takes care of serializing/deserializing CustomObject
+ * This class takes care of serializing/deserializing CustomObject.
  */
 public class CustomObjectSerde implements SerDe<CustomObject> {
     @Override
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index f480b2e..15af660 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -192,7 +192,7 @@ public class JavaInstanceMain implements AutoCloseable {
                 .addService(new InstanceControlImpl(runtimeSpawner))
                 .build()
                 .start();
-        log.info("JaveInstance Server started, listening on " + port);
+        log.info("JavaInstance Server started, listening on " + port);
         java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
             @Override
             public void run() {
@@ -341,7 +341,7 @@ public class JavaInstanceMain implements AutoCloseable {
         @Override
         public void healthCheck(com.google.protobuf.Empty request,
                                 io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.HealthCheckResult> responseObserver) {
-            log.debug("Recieved health check request...");
+            log.debug("Received health check request...");
             InstanceCommunication.HealthCheckResult healthCheckResult
                     = InstanceCommunication.HealthCheckResult.newBuilder().setSuccess(true).build();
             responseObserver.onNext(healthCheckResult);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 3c0468d..140de1c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -217,7 +217,7 @@ public class KubernetesRuntime implements Runtime {
     }
 
     /**
-     * The core logic that creates a service first followed by statefulset
+     * The core logic that creates a service first followed by statefulset.
      */
     @Override
     public void start() throws Exception {
@@ -304,14 +304,14 @@ public class KubernetesRuntime implements Runtime {
     @Override
     public CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics() {
         CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
-        retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesnt support getAndReset metrics via rest"));
+        retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesn't support getAndReset metrics via rest"));
         return retval;
     }
 
     @Override
     public CompletableFuture<Void> resetMetrics() {
         CompletableFuture<Void> retval = new CompletableFuture<>();
-        retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesnt support resetting metrics via rest"));
+        retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesn't support resetting metrics via rest"));
         return retval;
     }
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
index c3bc756..717659e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
@@ -38,7 +38,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
-import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 7cc6efb..734925a 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -129,7 +129,7 @@ class ProcessRuntime implements Runtime {
     }
 
     /**
-     * The core logic that initialize the thread container and executes the function
+     * The core logic that initialize the thread container and executes the function.
      */
     @Override
     public void start() {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index 2b7de14..2b85815 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -112,7 +112,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
 
     @Override
     public ProcessRuntime createContainer(InstanceConfig instanceConfig, String codeFile,
-                                          String originalcodeFileName,
+                                          String originalCodeFileName,
                                           Long expectedHealthCheckInterval) throws Exception {
         String instanceFile;
         switch (instanceConfig.getFunctionDetails().getRuntime()) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index aff3404..5bb3ab1 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -24,8 +24,6 @@
 package org.apache.pulsar.functions.runtime;
 
 import java.io.IOException;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -33,14 +31,12 @@ import java.util.concurrent.TimeUnit;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.instance.InstanceCache;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.utils.Utils;
-import static org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime.PYTHON;
 
 @Slf4j
 public class RuntimeSpawner implements AutoCloseable {
@@ -134,7 +130,7 @@ public class RuntimeSpawner implements AutoCloseable {
                 return Utils.printJson(msg);
             } catch (IOException e) {
                 throw new RuntimeException(
-                        instanceConfig.getFunctionDetails().getName() + " Exception parsing getstatus", e);
+                        instanceConfig.getFunctionDetails().getName() + " Exception parsing getStatus", e);
             }
         });
     }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 95f10ae..37d2b04 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -47,7 +47,7 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
 /**
- * Util class for common runtime functionality
+ * Util class for common runtime functionality.
  */
 @Slf4j
 public class RuntimeUtils {
@@ -257,5 +257,5 @@ public class RuntimeUtils {
         rd.close();
         return result.toString();
     }
-
+  
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 93246f2..8aff81a 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -87,7 +87,7 @@ class ThreadRuntime implements Runtime {
     }
 
     /**
-     * The core logic that initialize the thread container and executes the function
+     * The core logic that initialize the thread container and executes the function.
      */
     @Override
     public void start() {
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/ClearTextSecretsProvider.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/ClearTextSecretsProvider.java
index adb2852..79aa58a 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/ClearTextSecretsProvider.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/ClearTextSecretsProvider.java
@@ -23,8 +23,9 @@ package org.apache.pulsar.functions.secretsprovider;
  * the secrets as being passed in cleartext.
  */
 public class ClearTextSecretsProvider implements SecretsProvider {
-        /**
-     * Fetches a secret
+    /**
+     * Fetches a secret.
+     *
      * @return The actual secret
      */
     @Override
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProvider.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProvider.java
index b058709..9793b7e 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProvider.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProvider.java
@@ -25,7 +25,8 @@ package org.apache.pulsar.functions.secretsprovider;
 public class EnvironmentBasedSecretsProvider implements SecretsProvider {
 
     /**
-     * Fetches a secret
+     * Fetches a secret.
+     *
      * @return The actual secret
      */
     @Override
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/SecretsProvider.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/SecretsProvider.java
index 7d5330d..2ee0a3f 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/SecretsProvider.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/SecretsProvider.java
@@ -23,17 +23,19 @@ import java.util.Map;
 /**
  * This file defines the SecretsProvider interface. This interface is used by the function
  * instances/containers to actually fetch the secrets. What SecretsProvider to use is
- * decided by the SecretsProviderConfigurator
+ * decided by the SecretsProviderConfigurator.
  */
 public interface SecretsProvider {
     /**
-     * Initialize the SecretsProvider
+     * Initialize the SecretsProvider.
+     *
      * @return
      */
     default void init(Map<String, String> config) {}
 
     /**
-     * Fetches a secret
+     * Fetches a secret.
+     *
      * @return The actual secret
      */
     String provideSecret(String secretName, Object pathToSecret);
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java
index c9b54f5..32c13b7 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java
@@ -41,7 +41,7 @@ public class DefaultSecretsProviderConfigurator implements SecretsProviderConfig
             case PYTHON:
                 return "secretsprovider.ClearTextSecretsProvider";
             default:
-                throw new RuntimeException("Unknwon runtime " + functionDetails.getRuntime());
+                throw new RuntimeException("Unknown runtime " + functionDetails.getRuntime());
         }
     }
 
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
index bcccfe4..6dc31c7 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
@@ -35,7 +35,7 @@ import java.util.Map;
  * As such this implementation is strictly when workers are configured to use kubernetes runtime.
  * We use kubernetes in built secrets and bind them as environment variables within the function container
  * to ensure that the secrets are available to the function at runtime. Then we plug in the
- * EnvironmentBasedSecretsConfig as the secrets provider who knows how to read these environment variables
+ * EnvironmentBasedSecretsConfig as the secrets provider who knows how to read these environment variables.
  */
 public class KubernetesSecretsProviderConfigurator implements SecretsProviderConfigurator {
     private static String ID_KEY = "path";
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java
index c31686f..349559e 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java
@@ -33,41 +33,41 @@ import java.util.Map;
  */
 public interface SecretsProviderConfigurator {
     /**
-     * Initialize the SecretsProviderConfigurator
-     * @return
+     * Initialize the SecretsProviderConfigurator.
      */
     default void init(Map<String, String> config) {}
 
     /**
      * Return the Secrets Provider Classname. This will be passed to the cmdline
      * of the instance and should contain the logic of connecting with the secrets
-     * provider and obtaining secrets
+     * provider and obtaining secrets.
      */
     String getSecretsProviderClassName(Function.FunctionDetails functionDetails);
 
     /**
-     * Return the secrets provider config
+     * Return the secrets provider config.
      */
     Map<String, String> getSecretsProviderConfig(Function.FunctionDetails functionDetails);
 
     /**
-     * Attaches any secrets specific stuff to the k8 container for kubernetes runtime
+     * Attaches any secrets specific stuff to the k8 container for kubernetes runtime.
      */
     void configureKubernetesRuntimeSecretsProvider(V1PodSpec podSpec, String functionsContainerName, Function.FunctionDetails functionDetails);
 
     /**
-     * Attaches any secrets specific stuff to the ProcessBuilder for process runtime
+     * Attaches any secrets specific stuff to the ProcessBuilder for process runtime.
      */
     void configureProcessRuntimeSecretsProvider(ProcessBuilder processBuilder, Function.FunctionDetails functionDetails);
 
     /**
-     * What is the type of the object that should be in the user secret config
+     * What is the type of the object that should be in the user secret config.
+     *
      * @return
      */
     Type getSecretObjectType();
 
     /**
-     * Do config checks to see whether the secrets provided are conforming
+     * Do config checks to see whether the secrets provided are conforming.
      */
     default void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, Function.FunctionDetails functionDetails) {}
 
diff --git a/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProviderTest.java b/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProviderTest.java
index fa80d70..3392e9a 100644
--- a/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProviderTest.java
+++ b/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProviderTest.java
@@ -19,17 +19,12 @@
 
 package org.apache.pulsar.functions.secretsprovider;
 
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import java.lang.reflect.Field;
 import java.util.Map;
 
-import static org.mockito.Matchers.anyString;
-
 /**
  * Unit test of {@link Exceptions}.
  */
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 7932f58..d627700 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -657,7 +657,7 @@ public class FunctionConfigUtils {
             mergedConfig.setLogTopic(newConfig.getLogTopic());
         }
         if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) {
-            throw new IllegalArgumentException("Processing Guarantess cannot be alterted");
+            throw new IllegalArgumentException("Processing Guarantess cannot be altered");
         }
         if (newConfig.getRetainOrdering() != null && !newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) {
             throw new IllegalArgumentException("Retain Orderning cannot be altered");
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
index 67dde1c..7ba22aa 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
@@ -133,7 +133,7 @@ public class Reflections {
                                         ClassLoader classLoader, Object[] params, Class[] paramTypes) {
         if (params.length != paramTypes.length) {
             throw new RuntimeException(
-                    "Unequal number of params and paramTypes. Each param must have a correspoinding param type");
+                    "Unequal number of params and paramTypes. Each param must have a corresponding param type");
         }
         Class<?> theCls;
         try {
@@ -172,7 +172,8 @@ public class Reflections {
     }
 
     /**
-     * Check if a class is in a jar
+     * Check if a class is in a jar.
+     *
      * @param jar location of the jar
      * @param fqcn fully qualified class name to search for in jar
      * @return true if class can be loaded from jar and false if otherwise
@@ -189,7 +190,8 @@ public class Reflections {
     }
 
     /**
-     * Check if class exists
+     * Check if class exists.
+     *
      * @param fqcn fully qualified class name to search for
      * @return true if class can be loaded from jar and false if otherwise
      */
@@ -203,7 +205,8 @@ public class Reflections {
     }
 
     /**
-     * check if a class implements an interface
+     * check if a class implements an interface.
+     *
      * @param fqcn fully qualified class name to search for in jar
      * @param xface interface to check if implement
      * @return true if class from jar implements interface xface and false if otherwise
@@ -223,7 +226,8 @@ public class Reflections {
     }
 
     /**
-     * check if class implements interface
+     * check if class implements interface.
+     *
      * @param fqcn fully qualified class name
      * @param xface the interface the fqcn should implement
      * @return true if class implements interface xface and false if otherwise
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index ecbe487..efb4133 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -30,7 +30,6 @@ import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -423,7 +422,7 @@ public class SinkConfigUtils {
             });
         }
         if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) {
-            throw new IllegalArgumentException("Processing Guarantess cannot be alterted");
+            throw new IllegalArgumentException("Processing Guarantess cannot be altered");
         }
         if (newConfig.getConfigs() != null) {
             mergedConfig.setConfigs(newConfig.getConfigs());
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 59b05bd..0f7ff51 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -270,7 +270,7 @@ public class SourceConfigUtils {
             mergedConfig.setSecrets(newConfig.getSecrets());
         }
         if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) {
-            throw new IllegalArgumentException("Processing Guarantess cannot be alterted");
+            throw new IllegalArgumentException("Processing Guarantess cannot be altered");
         }
         if (newConfig.getParallelism() != null) {
             mergedConfig.setParallelism(newConfig.getParallelism());
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index d2920ca..2318f5a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -236,7 +236,8 @@ public class Utils {
     }
 
     /**
-     * Load a jar
+     * Load a jar.
+     *
      * @param jar file of jar
      * @return classloader
      * @throws MalformedURLException
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 68dddc2..da2a728 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -221,7 +221,7 @@ public class FunctionConfigUtilsTest {
         assertTrue(mergedConfig.getCleanupSubscription());
     }
 
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be altered")
     public void testMergeDifferentProcessingGuarantees() {
         FunctionConfig functionConfig = createFunctionConfig();
         FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("processingGuarantees", EFFECTIVELY_ONCE);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java
index 41f943e..b0fa14c 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java
@@ -154,7 +154,7 @@ public class ReflectionsTest {
     }
 
     @Test
-    public void testclassInJarImplementsIface() {
+    public void testClassInJarImplementsIface() {
         assertTrue(Reflections.classImplementsIface(aImplementation.class.getName(), aInterface.class));
         assertTrue(!Reflections.classImplementsIface(aImplementation.class.getName(), bInterface.class));
     }
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index efde8ab..f147cfb 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -139,7 +139,7 @@ public class SinkConfigUtilsTest {
         assertEquals(mergedConfig.getInputSpecs().get("test-input"), newSinkConfig.getInputSpecs().get("test-input"));
     }
 
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be altered")
     public void testMergeDifferentProcessingGuarantees() {
         SinkConfig sinkConfig = createSinkConfig();
         SinkConfig newSinkConfig = createUpdatedSinkConfig("processingGuarantees", EFFECTIVELY_ONCE);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 0f4b1cb..cb141e8 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -117,7 +117,7 @@ public class SourceConfigUtilsTest {
         SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
     }
 
-    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be altered")
     public void testMergeDifferentProcessingGuarantees() {
         SourceConfig sourceConfig = createSourceConfig();
         SourceConfig newSourceConfig = createUpdatedSourceConfig("processingGuarantees", EFFECTIVELY_ONCE);
diff --git a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/Event.java b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/Event.java
index 1386f86..c9a12a1 100644
--- a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/Event.java
+++ b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/Event.java
@@ -33,14 +33,14 @@ public interface Event<T> {
     Record<?> getRecord();
 
     /**
-     * The event timestamp in millis
+     * The event timestamp in millis.
      *
      * @return the event timestamp in milliseconds.
      */
     long getTimestamp();
 
     /**
-     * Returns the wrapped object
+     * Returns the wrapped object.
      *
      * @return the wrapped object.
      */
diff --git a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/EvictionContext.java b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/EvictionContext.java
index d5700a8..4f1ef79 100644
--- a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/EvictionContext.java
+++ b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/EvictionContext.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.functions.windowing;
 
 /**
- * Context information that can be used by the eviction policy
+ * Context information that can be used by the eviction policy.
  */
 public interface EvictionContext {
     /**
@@ -32,7 +32,7 @@ public interface EvictionContext {
     Long getReferenceTime();
 
     /**
-     * Returns the sliding count for count based windows
+     * Returns the sliding count for count based windows.
      *
      * @return the sliding count
      */
@@ -40,7 +40,7 @@ public interface EvictionContext {
 
 
     /**
-     * Returns the sliding interval for time based windows
+     * Returns the sliding interval for time based windows.
      *
      * @return the sliding interval
      */
diff --git a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/EvictionPolicy.java b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/EvictionPolicy.java
index 2144fc1..25f4c99 100644
--- a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/EvictionPolicy.java
+++ b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/EvictionPolicy.java
@@ -33,14 +33,17 @@ public interface EvictionPolicy<T, S> {
         /**
          * expire the event and remove it from the queue.
          */
-        EXPIRE, /**
+        EXPIRE,
+        /**
          * process the event in the current window of events.
          */
-        PROCESS, /**
+        PROCESS,
+        /**
          * don't include in the current window but keep the event
          * in the queue for evaluating as a part of future windows.
          */
-        KEEP, /**
+        KEEP,
+        /**
          * stop processing the queue, there cannot be anymore events
          * satisfying the eviction policy.
          */
diff --git a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WaterMarkEventGenerator.java b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WaterMarkEventGenerator.java
index 046d584..d904202 100644
--- a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WaterMarkEventGenerator.java
+++ b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WaterMarkEventGenerator.java
@@ -53,6 +53,7 @@ public class WaterMarkEventGenerator<T> implements Runnable {
 
     /**
      * Creates a new WatermarkEventGenerator.
+     *
      * @param windowManager The window manager this generator will submit watermark events to
      * @param intervalMs The generator will check if it should generate a watermark event with this intervalMs
      * @param eventTsLagMs The max allowed lag behind the last watermark event before an event is considered late
@@ -107,7 +108,7 @@ public class WaterMarkEventGenerator<T> implements Runnable {
     }
 
     /**
-     * Computes the min ts across all intput topics.
+     * Computes the min ts across all input topics.
      */
     private long computeWaterMarkTs() {
         long ts = 0;
diff --git a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
index 94c2362..5f46365 100644
--- a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
+++ b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
@@ -89,7 +89,8 @@ public class WindowManager<T> implements TriggerHandler {
 
     /**
      * Add an event into the window, with the given ts as the tracking ts.
-     *  @param event the event to track
+     *
+     * @param event the event to track
      * @param ts the timestamp
      */
     public void add(T event, long ts, Record<?> record) {
@@ -225,7 +226,7 @@ public class WindowManager<T> implements TriggerHandler {
 
     /**
      * Scans the event queue and returns the next earliest event ts
-     * between the startTs and endTs
+     * between the startTs and endTs.
      *
      * @param startTs the start ts (exclusive)
      * @param endTs the end ts (inclusive)
diff --git a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/evictors/TimeEvictionPolicy.java b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/evictors/TimeEvictionPolicy.java
index 147ca5d..853a679 100644
--- a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/evictors/TimeEvictionPolicy.java
+++ b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/evictors/TimeEvictionPolicy.java
@@ -35,7 +35,7 @@ public class TimeEvictionPolicy<T> implements EvictionPolicy<T, EvictionContext>
 
     /**
      * Constructs a TimeEvictionPolicy that evicts events older
-     * than the given window length in millis
+     * than the given window length in millis.
      *
      * @param windowLength the duration in milliseconds
      */
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 3f8bec3..b40c6cd 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.function.Function;
 
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.functions.proto.Function.Assignment;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 325f3e7..8c79f83 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -452,7 +452,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
     }
 
     /**
-     * Get stats of a function instance.  If this worker is not running the function instance,
+     * Get stats of a function instance.  If this worker is not running the function instance.
      * @param tenant the tenant the function belongs to
      * @param namespace the namespace the function belongs to
      * @param functionName the function name
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 9163def..65dbc40 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import com.google.common.util.concurrent.AbstractService;
-
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 6efcbd2..df7f14d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -265,7 +265,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     private boolean authenticationEnabled = false;
     @FieldContext(
         category = CATEGORY_WORKER_SECURITY,
-        doc = "Autentication provider name list, which is a list of class names"
+        doc = "Authentication provider name list, which is a list of class names"
     )
     private Set<String> authenticationProviders = Sets.newTreeSet();
     @FieldContext(
@@ -439,7 +439,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     @FieldContext(
         category = CATEGORY_FUNC_RUNTIME_MNG,
         doc = "Any config the secret provider configurator might need. \n\nThis is passed on"
-            + " to the init method of the secretproviderconfigurator"
+            + " to the init method of the SecretsProviderConfigurator"
     )
     private Map<String, String> secretsProviderConfiguratorConfig;
     @FieldContext(
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 3c83c18..d06053b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -27,7 +27,6 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.net.URI;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -194,7 +193,7 @@ public class WorkerService {
             // Start function runtime manager
             this.functionRuntimeManager.start();
 
-            // indicate function worker service is done intializing
+            // indicate function worker service is done initializing
             this.isInitialized = true;
 
             this.connectorsManager = new ConnectorsManager(workerConfig);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/dlog/DLInputStream.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/dlog/DLInputStream.java
index fd78eab..10acb54 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/dlog/DLInputStream.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/dlog/DLInputStream.java
@@ -27,6 +27,9 @@ import org.apache.distributedlog.exceptions.EndOfStreamException;
 import java.io.IOException;
 import java.io.InputStream;
 
+/**
+ * DistributedLog Input Stream.
+ */
 public class DLInputStream extends InputStream {
 
   private LogRecordWithInputStream currentLogRecord = null;
@@ -60,7 +63,7 @@ public class DLInputStream extends InputStream {
   }
 
   /**
-   * Construct distributedlog input stream
+   * Construct DistributedLog input stream.
    *
    * @param dlm the Distributed Log Manager to access the stream
    */
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
index 4e6b34a..532d852 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
@@ -26,7 +26,6 @@ import javax.ws.rs.core.UriInfo;
 
 import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 
 public class FunctionApiResource implements Supplier<WorkerService> {
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index a6c4974..2c9a12e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
 
 import java.net.BindException;
 import java.net.URI;
-import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 2f4856e..2bae728 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -81,7 +81,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.functions.FunctionConfig;
@@ -91,7 +90,6 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Codec;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index be8fd49..1c5a090 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -65,10 +65,7 @@ public class FunctionsImpl extends ComponentImpl {
 
             List<ExceptionInformation> userExceptionInformationList = new LinkedList<>();
             for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
-                ExceptionInformation exceptionInformation
-                        = new ExceptionInformation();
-                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
-                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                ExceptionInformation exceptionInformation = getExceptionInformation(exceptionEntry);
                 userExceptionInformationList.add(exceptionInformation);
             }
             functionInstanceStatusData.setLatestUserExceptions(userExceptionInformationList);
@@ -78,24 +75,15 @@ public class FunctionsImpl extends ComponentImpl {
                     + status.getNumSourceExceptions() + status.getNumSinkExceptions());
             List<ExceptionInformation> systemExceptionInformationList = new LinkedList<>();
             for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSystemExceptionsList()) {
-                ExceptionInformation exceptionInformation
-                        = new ExceptionInformation();
-                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
-                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                ExceptionInformation exceptionInformation = getExceptionInformation(exceptionEntry);
                 systemExceptionInformationList.add(exceptionInformation);
             }
             for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSourceExceptionsList()) {
-                ExceptionInformation exceptionInformation
-                        = new ExceptionInformation();
-                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
-                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                ExceptionInformation exceptionInformation = getExceptionInformation(exceptionEntry);
                 systemExceptionInformationList.add(exceptionInformation);
             }
             for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSinkExceptionsList()) {
-                ExceptionInformation exceptionInformation
-                        = new ExceptionInformation();
-                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
-                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                ExceptionInformation exceptionInformation = getExceptionInformation(exceptionEntry);
                 systemExceptionInformationList.add(exceptionInformation);
             }
             functionInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
@@ -199,6 +187,14 @@ public class FunctionsImpl extends ComponentImpl {
         }
     }
 
+    private ExceptionInformation getExceptionInformation(InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry) {
+        ExceptionInformation exceptionInformation
+                = new ExceptionInformation();
+        exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+        exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+        return exceptionInformation;
+    }
+
     public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
         super(workerServiceSupplier, Utils.ComponentType.FUNCTION);
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 4a801b2..39828a0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -69,26 +69,17 @@ public class SinkImpl extends ComponentImpl {
                     + status.getNumUserExceptions() + status.getNumSourceExceptions());
             List<ExceptionInformation> systemExceptionInformationList = new LinkedList<>();
             for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
-                ExceptionInformation exceptionInformation
-                        = new ExceptionInformation();
-                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
-                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                ExceptionInformation exceptionInformation = getExceptionInformation(exceptionEntry);
                 systemExceptionInformationList.add(exceptionInformation);
             }
 
             for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSystemExceptionsList()) {
-                ExceptionInformation exceptionInformation
-                        = new ExceptionInformation();
-                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
-                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                ExceptionInformation exceptionInformation = getExceptionInformation(exceptionEntry);
                 systemExceptionInformationList.add(exceptionInformation);
             }
 
             for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSourceExceptionsList()) {
-                ExceptionInformation exceptionInformation
-                        = new ExceptionInformation();
-                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
-                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                ExceptionInformation exceptionInformation = getExceptionInformation(exceptionEntry);
                 systemExceptionInformationList.add(exceptionInformation);
             }
             sinkInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
@@ -96,10 +87,7 @@ public class SinkImpl extends ComponentImpl {
             sinkInstanceStatusData.setNumSinkExceptions(status.getNumSinkExceptions());
             List<ExceptionInformation> sinkExceptionInformationList = new LinkedList<>();
             for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSinkExceptionsList()) {
-                ExceptionInformation exceptionInformation
-                        = new ExceptionInformation();
-                exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
-                exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+                ExceptionInformation exceptionInformation = getExceptionInformation(exceptionEntry);
                 sinkExceptionInformationList.add(exceptionInformation);
             }
             sinkInstanceStatusData.setLatestSinkExceptions(sinkExceptionInformationList);
@@ -206,6 +194,14 @@ public class SinkImpl extends ComponentImpl {
         }
     }
 
+    private ExceptionInformation getExceptionInformation(InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry) {
+        ExceptionInformation exceptionInformation
+                = new ExceptionInformation();
+        exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+        exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+        return exceptionInformation;
+    }
+
     public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
         super(workerServiceSupplier, Utils.ComponentType.SINK);
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
index c532e3a..f2b1936 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
@@ -33,7 +33,6 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
 
 import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
index 9347ed9..70e1d90 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
@@ -62,10 +62,10 @@ public class RoundRobinScheduler implements IScheduler {
         int least = Integer.MAX_VALUE;
         for (Map.Entry<String, List<Assignment>> entry : workerIdToAssignment.entrySet()) {
             String workerId = entry.getKey();
-            List<Assignment> workerAssigments = entry.getValue();
-            if (workerAssigments.size() < least) {
+            List<Assignment> workerAssignments = entry.getValue();
+            if (workerAssignments.size() < least) {
                 targetWorkerId = workerId;
-                least = workerAssigments.size();
+                least = workerAssignments.size();
             }
         }
         return targetWorkerId;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index d8d54c9..2926c93 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -50,7 +50,6 @@ import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.RestException;
-import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 7695993..bdfb50e 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -50,7 +50,6 @@ import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.RestException;
-import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;