You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/09 01:46:47 UTC
[pulsar] branch master updated: [pulsar-functions] fix words error
and remove not use import java class (#3791)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4f03548 [pulsar-functions] fix words error and remove not use import java class (#3791)
4f03548 is described below
commit 4f03548c5ec468ecb7f63a64990bf17639c33c70
Author: wpl <12...@qq.com>
AuthorDate: Sat Mar 9 09:46:43 2019 +0800
[pulsar-functions] fix words error and remove not use import java class (#3791)
### Motivation
when I study and read this module code , I fix some error like:
1. words error, such as JaveInstance -> JavaInstance, Recieved -> Received, Unknwon -> Unknown, alterted -> alterted, Updare -> Update, etc.
2. remove not use import java class.
3. duplicated code in this java class I extract method.
4. javadoc styles are consistent (not required)
### Modifications
minor fix up in pulsar-functions module
---
.../org/apache/pulsar/functions/api/Context.java | 52 +++++++++++++-------
.../org/apache/pulsar/functions/api/Function.java | 1 +
.../org/apache/pulsar/functions/api/Record.java | 12 ++---
.../apache/pulsar/functions/api/WindowContext.java | 49 ++++++++++++-------
.../pulsar/functions/api/WindowFunction.java | 1 +
.../pulsar/functions/api/utils/JavaSerDeTest.java | 1 -
.../pulsar/functions/instance/ContextImpl.java | 3 +-
.../functions/instance/FunctionResultRouter.java | 2 +-
.../pulsar/functions/instance/InstanceConfig.java | 2 +-
.../pulsar/functions/instance/JavaInstance.java | 1 -
.../functions/instance/JavaInstanceRunnable.java | 3 +-
.../pulsar/functions/instance/LogAppender.java | 1 -
.../instance/stats/ComponentStatsManager.java | 1 -
.../instance/stats/FunctionStatsManager.java | 15 +++---
.../functions/instance/stats/SinkStatsManager.java | 15 +++---
.../instance/stats/SourceStatsManager.java | 15 +++---
.../pulsar/functions/source/PulsarSource.java | 3 --
.../pulsar/functions/source/TopicSchema.java | 2 +-
.../pulsar/functions/sink/PulsarSinkTest.java | 57 +++++++++-------------
.../pulsar/functions/source/PulsarSourceTest.java | 20 +-------
.../functions/api/examples/ByteBufferSerDe.java | 2 +-
.../api/examples/CommaWindowFunction.java | 2 +-
.../api/examples/ConfigBasedAppendFunction.java | 2 +-
.../functions/api/examples/CustomBaseSerde.java | 2 +-
.../api/examples/CustomBaseToBaseFunction.java | 2 +-
.../api/examples/CustomBaseToDerivedFunction.java | 2 +-
.../functions/api/examples/CustomDerivedSerde.java | 2 +-
.../examples/CustomDerivedToDerivedFunction.java | 2 +-
.../api/examples/CustomObjectFunction.java | 2 +-
.../api/examples/ExclamationFunction.java | 2 +-
.../api/examples/HostAppenderFunction.java | 2 +-
.../api/examples/InstanceIdAppenderFunction.java | 2 +-
.../api/examples/IntegerAdditionFunction.java | 2 +-
.../functions/api/examples/LoggingFunction.java | 2 +-
.../functions/api/examples/PublishFunction.java | 2 +-
.../functions/api/examples/UserConfigFunction.java | 2 +-
.../api/examples/UserPublishFunction.java | 2 +-
.../api/examples/WindowDurationFunction.java | 2 +-
.../pulsar/functions/api/examples/pojo/Tick.java | 2 +-
.../functions/api/examples/serde/CustomObject.java | 2 +-
.../api/examples/serde/CustomObjectSerde.java | 2 +-
.../pulsar/functions/runtime/JavaInstanceMain.java | 4 +-
.../functions/runtime/KubernetesRuntime.java | 6 +--
.../pulsar/functions/runtime/LocalRunner.java | 1 -
.../pulsar/functions/runtime/ProcessRuntime.java | 2 +-
.../functions/runtime/ProcessRuntimeFactory.java | 2 +-
.../pulsar/functions/runtime/RuntimeSpawner.java | 6 +--
.../pulsar/functions/runtime/RuntimeUtils.java | 4 +-
.../pulsar/functions/runtime/ThreadRuntime.java | 2 +-
.../secretsprovider/ClearTextSecretsProvider.java | 5 +-
.../EnvironmentBasedSecretsProvider.java | 3 +-
.../functions/secretsprovider/SecretsProvider.java | 8 +--
.../DefaultSecretsProviderConfigurator.java | 2 +-
.../KubernetesSecretsProviderConfigurator.java | 2 +-
.../SecretsProviderConfigurator.java | 16 +++---
.../EnvironmentBasedSecretsProviderTest.java | 5 --
.../functions/utils/FunctionConfigUtils.java | 2 +-
.../apache/pulsar/functions/utils/Reflections.java | 14 ++++--
.../pulsar/functions/utils/SinkConfigUtils.java | 3 +-
.../pulsar/functions/utils/SourceConfigUtils.java | 2 +-
.../org/apache/pulsar/functions/utils/Utils.java | 3 +-
.../functions/utils/FunctionConfigUtilsTest.java | 2 +-
.../pulsar/functions/utils/ReflectionsTest.java | 2 +-
.../functions/utils/SinkConfigUtilsTest.java | 2 +-
.../functions/utils/SourceConfigUtilsTest.java | 2 +-
.../apache/pulsar/functions/windowing/Event.java | 4 +-
.../functions/windowing/EvictionContext.java | 6 +--
.../pulsar/functions/windowing/EvictionPolicy.java | 9 ++--
.../windowing/WaterMarkEventGenerator.java | 3 +-
.../pulsar/functions/windowing/WindowManager.java | 5 +-
.../windowing/evictors/TimeEvictionPolicy.java | 2 +-
.../functions/worker/FunctionAssignmentTailer.java | 1 -
.../functions/worker/FunctionRuntimeManager.java | 2 +-
.../org/apache/pulsar/functions/worker/Worker.java | 2 -
.../pulsar/functions/worker/WorkerConfig.java | 4 +-
.../pulsar/functions/worker/WorkerService.java | 3 +-
.../functions/worker/dlog/DLInputStream.java | 5 +-
.../functions/worker/rest/FunctionApiResource.java | 1 -
.../pulsar/functions/worker/rest/WorkerServer.java | 1 -
.../functions/worker/rest/api/ComponentImpl.java | 2 -
.../functions/worker/rest/api/FunctionsImpl.java | 28 +++++------
.../pulsar/functions/worker/rest/api/SinkImpl.java | 28 +++++------
.../worker/rest/api/v3/SourceApiV3Resource.java | 1 -
.../worker/scheduler/RoundRobinScheduler.java | 6 +--
.../rest/api/v2/FunctionApiV2ResourceTest.java | 1 -
.../rest/api/v3/FunctionApiV3ResourceTest.java | 1 -
86 files changed, 253 insertions(+), 260 deletions(-)
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index e9b79c3..63cbc9e 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -34,43 +34,50 @@ import java.util.concurrent.CompletableFuture;
*/
public interface Context {
/**
- * Access the record associated with the current input value
+ * Access the record associated with the current input value.
+ *
* @return
*/
Record<?> getCurrentRecord();
/**
- * Get a list of all input topics
+ * Get a list of all input topics.
+ *
* @return a list of all input topics
*/
Collection<String> getInputTopics();
/**
- * Get the output topic of the function
+ * Get the output topic of the function.
+ *
* @return output topic name
*/
String getOutputTopic();
/**
- * Get output schema builtin type or custom class name
+ * Get output schema builtin type or custom class name.
+ *
* @return output schema builtin type or custom class name
*/
String getOutputSchemaType();
/**
- * The tenant this function belongs to
+ * The tenant this function belongs to.
+ *
* @return the tenant this function belongs to
*/
String getTenant();
/**
- * The namespace this function belongs to
+ * The namespace this function belongs to.
+ *
* @return the namespace this function belongs to
*/
String getNamespace();
/**
- * The name of the function that we are executing
+ * The name of the function that we are executing.
+ *
* @return The Function name
*/
String getFunctionName();
@@ -96,19 +103,22 @@ public interface Context {
int getNumInstances();
/**
- * The version of the function that we are executing
+ * The version of the function that we are executing.
+ *
* @return The version id
*/
String getFunctionVersion();
/**
- * The logger object that can be used to log in a function
+ * The logger object that can be used to log in a function.
+ *
* @return the logger object
*/
Logger getLogger();
/**
- * Increment the builtin distributed counter refered by key
+ * Increment the builtin distributed counter referred by key.
+ *
* @param key The name of the key
* @param amount The amount to be incremented
*/
@@ -123,7 +133,7 @@ public interface Context {
long getCounter(String key);
/**
- * Updare the state value for the key.
+ * Update the state value for the key.
*
* @param key name of the key
* @param value state value of the key
@@ -139,20 +149,23 @@ public interface Context {
ByteBuffer getState(String key);
/**
- * Get a map of all user-defined key/value configs for the function
+ * Get a map of all user-defined key/value configs for the function.
+ *
* @return The full map of user-defined config values
*/
Map<String, Object> getUserConfigMap();
/**
- * Get any user-defined key/value
+ * Get any user-defined key/value.
+ *
* @param key The key
* @return The Optional value specified by the user for that key.
*/
Optional<Object> getUserConfigValue(String key);
/**
- * Get any user-defined key/value or a default value if none is present
+ * Get any user-defined key/value or a default value if none is present.
+ *
* @param key
* @param defaultValue
* @return Either the user config value associated with a given key or a supplied default value
@@ -160,21 +173,23 @@ public interface Context {
Object getUserConfigValueOrDefault(String key, Object defaultValue);
/**
- * Get the secret associated with this key
+ * Get the secret associated with this key.
+ *
* @param secretName The name of the secret
* @return The secret if anything was found or null
*/
String getSecret(String secretName);
/**
- * Record a user defined metric
+ * Record a user defined metric.
+ *
* @param metricName The name of the metric
* @param value The value of the metric
*/
void recordMetric(String metricName, double value);
/**
- * Publish an object using serDe for serializing to the topic
+ * Publish an object using serDe for serializing to the topic.
*
* @param topicName
* The name of the topic for publishing
@@ -187,7 +202,8 @@ public interface Context {
<O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName);
/**
- * Publish an object to the topic using default schemas
+ * Publish an object to the topic using default schemas.
+ *
* @param topicName The name of the topic for publishing
* @param object The object that needs to be published
* @return A future that completes when the framework is done publishing the message
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java
index ca292eb..08b81ba 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java
@@ -29,6 +29,7 @@ package org.apache.pulsar.functions.api;
public interface Function<I, O> {
/**
* Process the input.
+ *
* @return the output
*/
O process(I input, Context context) throws Exception;
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
index cb03cb2..b99c4dc 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
@@ -28,21 +28,21 @@ import java.util.Optional;
public interface Record<T> {
/**
- * If the record originated from a topic, report the topic name
+ * If the record originated from a topic, report the topic name.
*/
default Optional<String> getTopicName() {
return Optional.empty();
}
/**
- * Return a key if the key has one associated
+ * Return a key if the key has one associated.
*/
default Optional<String> getKey() {
return Optional.empty();
}
/**
- * Retrieves the actual data of the record
+ * Retrieves the actual data of the record.
*
* @return The record data
*/
@@ -85,19 +85,19 @@ public interface Record<T> {
}
/**
- * Acknowledge that this record is fully processed
+ * Acknowledge that this record is fully processed.
*/
default void ack() {
}
/**
- * To indicate that this record has failed to be processed
+ * To indicate that this record has failed to be processed.
*/
default void fail() {
}
/**
- * To support message routing on a per message basis
+ * To support message routing on a per message basis.
*
* @return The topic this message should be written to
*/
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
index 15ef591..9effdb9 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
@@ -29,25 +29,29 @@ import java.util.concurrent.CompletableFuture;
public interface WindowContext {
/**
- * The tenant this function belongs to
+ * The tenant this function belongs to.
+ *
* @return the tenant this function belongs to
*/
String getTenant();
/**
- * The namespace this function belongs to
+ * The namespace this function belongs to.
+ *
* @return the namespace this function belongs to
*/
String getNamespace();
/**
- * The name of the function that we are executing
+ * The name of the function that we are executing.
+ *
* @return The Function name
*/
String getFunctionName();
/**
- * The id of the function that we are executing
+ * The id of the function that we are executing.
+ *
* @return The function id
*/
String getFunctionId();
@@ -67,37 +71,43 @@ public interface WindowContext {
int getNumInstances();
/**
- * The version of the function that we are executing
+ * The version of the function that we are executing.
+ *
* @return The version id
*/
String getFunctionVersion();
/**
- * Get a list of all input topics
+ * Get a list of all input topics.
+ *
* @return a list of all input topics
*/
Collection<String> getInputTopics();
/**
- * Get the output topic of the function
+ * Get the output topic of the function.
+ *
* @return output topic name
*/
String getOutputTopic();
/**
- * Get output schema builtin type or custom class name
+ * Get output schema builtin type or custom class name.
+ *
* @return output schema builtin type or custom class name
*/
String getOutputSchemaType();
/**
- * The logger object that can be used to log in a function
+ * The logger object that can be used to log in a function.
+ *
* @return the logger object
*/
Logger getLogger();
/**
- * Increment the builtin distributed counter refered by key
+ * Increment the builtin distributed counter referred by key.
+ *
* @param key The name of the key
* @param amount The amount to be incremented
*/
@@ -112,7 +122,7 @@ public interface WindowContext {
long getCounter(String key);
/**
- * Updare the state value for the key.
+ * Update the state value for the key.
*
* @param key name of the key
* @param value state value of the key
@@ -128,20 +138,23 @@ public interface WindowContext {
ByteBuffer getState(String key);
/**
- * Get a map of all user-defined key/value configs for the function
+ * Get a map of all user-defined key/value configs for the function.
+ *
* @return The full map of user-defined config values
*/
Map<String, Object> getUserConfigMap();
/**
- * Get any user-defined key/value
+ * Get any user-defined key/value.
+ *
* @param key The key
* @return The Optional value specified by the user for that key.
*/
Optional<Object> getUserConfigValue(String key);
/**
- * Get any user-defined key/value or a default value if none is present
+ * Get any user-defined key/value or a default value if none is present.
+ *
* @param key
* @param defaultValue
* @return Either the user config value associated with a given key or a supplied default value
@@ -149,14 +162,15 @@ public interface WindowContext {
Object getUserConfigValueOrDefault(String key, Object defaultValue);
/**
- * Record a user defined metric
+ * Record a user defined metric.
+ *
* @param metricName The name of the metric
* @param value The value of the metric
*/
void recordMetric(String metricName, double value);
/**
- * Publish an object using serDe for serializing to the topic
+ * Publish an object using serDe for serializing to the topic.
*
* @param topicName
* The name of the topic for publishing
@@ -169,7 +183,8 @@ public interface WindowContext {
<O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName);
/**
- * Publish an object to the topic using default schemas
+ * Publish an object to the topic using default schemas.
+ *
* @param topicName The name of the topic for publishing
* @param object The object that needs to be published
* @return A future that completes when the framework is done publishing the message
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java
index 6f2c421..111c570 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java
@@ -28,6 +28,7 @@ import java.util.Collection;
public interface WindowFunction<I, O> {
/**
* Process the input.
+ *
* @return the output
*/
O process(Collection<Record<I>> input, WindowContext context) throws Exception;
diff --git a/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/JavaSerDeTest.java b/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/JavaSerDeTest.java
index 36e6bfc..cb067e8 100644
--- a/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/JavaSerDeTest.java
+++ b/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/JavaSerDeTest.java
@@ -25,7 +25,6 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import org.apache.pulsar.functions.api.utils.JavaSerDe;
import org.testng.annotations.Test;
/**
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 60b8ec0..2cb0537 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -66,7 +66,6 @@ import static org.apache.pulsar.functions.instance.stats.FunctionStatsManager.US
/**
* This class implements the Context interface exposed to the user.
*/
-
class ContextImpl implements Context, SinkContext, SourceContext {
private InstanceConfig config;
private Logger logger;
@@ -215,7 +214,7 @@ class ContextImpl implements Context, SinkContext, SourceContext {
@Override
public String getFunctionId() {
- return config.getFunctionId().toString();
+ return config.getFunctionId();
}
@Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java
index be06ee3..aeeaf6c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/FunctionResultRouter.java
@@ -28,7 +28,7 @@ import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl;
/**
- * Router for routing function results
+ * Router for routing function results.
*/
public class FunctionResultRouter extends RoundRobinPartitionMessageRouterImpl {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index 51ec0d4..023f6a2 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -27,7 +27,7 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails;
/**
* This is the config passed to the Java Instance. Contains all the information
- * passed to run functions
+ * passed to run functions.
*/
@Data
@Getter
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index acd0781..8aee702 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -24,7 +24,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
import java.util.Map;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index c9bf644..8076e0f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -56,7 +56,6 @@ import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
-import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -219,7 +218,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
/**
- * The core logic that initialize the instance thread and executes the function
+ * The core logic that initialize the instance thread and executes the function.
*/
@Override
public void run() {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
index afdc6f9..107b224 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.functions.instance;
import org.apache.logging.log4j.core.*;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import java.io.Serializable;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
index 38a5de6..0c9b9af 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.functions.instance.stats;
import com.google.common.collect.EvictingQueue;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.common.TextFormat;
-import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.Utils;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index e4e03f8..b35c32a 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -243,9 +243,7 @@ public class FunctionStatsManager extends ComponentStatsManager{
// report exception throw prometheus
if (userExceptionRateLimiter.tryAcquire()) {
- String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
- exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
- exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+ String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
userExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}
@@ -257,13 +255,18 @@ public class FunctionStatsManager extends ComponentStatsManager{
// report exception throw prometheus
if (sysExceptionRateLimiter.tryAcquire()) {
- String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
- exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
- exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+ String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
sysExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}
+ private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
+ String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+ return exceptionMetricsLabels;
+ }
+
@Override
public void incrTotalReceived() {
_statTotalRecordsReceived.inc();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
index 1a23957..1b2d8b0 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
@@ -216,9 +216,7 @@ public class SinkStatsManager extends ComponentStatsManager {
// report exception throw prometheus
if (sysExceptionRateLimiter.tryAcquire()) {
- String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
- exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
- exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+ String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
sysExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}
@@ -241,13 +239,18 @@ public class SinkStatsManager extends ComponentStatsManager {
// report exception throw prometheus
if (sinkExceptionRateLimiter.tryAcquire()) {
- String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
- exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
- exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+ String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
sinkExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}
+ private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
+ String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+ return exceptionMetricsLabels;
+ }
+
@Override
public void setLastInvocation(long ts) {
_statlastInvocation.set(ts);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
index 87a60a3..aac6720 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
@@ -215,9 +215,7 @@ public class SourceStatsManager extends ComponentStatsManager {
// report exception throw prometheus
if (sysExceptionRateLimiter.tryAcquire()) {
- String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
- exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
- exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+ String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
sysExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}
@@ -235,13 +233,18 @@ public class SourceStatsManager extends ComponentStatsManager {
// report exception throw prometheus
if (sourceExceptionRateLimiter.tryAcquire()) {
- String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
- exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
- exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+ String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
sourceExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}
+ private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
+ String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+ return exceptionMetricsLabels;
+ }
+
@Override
public void incrSinkExceptions(Throwable ex) {
incrSysExceptions(ex);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index bad83b0..83e7566 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -33,12 +33,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
-import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 21f8b00..d8abcc6 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -47,7 +47,7 @@ public class TopicSchema {
}
/**
- * If there is no other information available, use JSON as default schema type
+ * If there is no other information available, use JSON as default schema type.
*/
private static final SchemaType DEFAULT_SCHEMA_TYPE = SchemaType.JSON;
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index caf5dec..1ef72c7 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -82,9 +82,8 @@ public class PulsarSinkTest {
}
/**
- * Verify that JavaInstance does not support functions that take Void type as input
+ * Verify that JavaInstance does not support functions that take Void type as input.
*/
-
private static PulsarClientImpl getPulsarClient() throws PulsarClientException {
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
@@ -160,7 +159,7 @@ public class PulsarSinkTest {
}
/**
- * Verify that JavaInstance does support functions that output Void type
+ * Verify that JavaInstance does support functions that output Void type.
*/
@Test
public void testVoidOutputClasses() throws Exception {
@@ -283,11 +282,7 @@ public class PulsarSinkTest {
@Override
public Optional<String> getDestinationTopic() {
- if (topic != null) {
- return Optional.of(topic);
- } else {
- return Optional.empty();
- }
+ return getTopicOptional(topic);
}
}, "out1");
@@ -307,11 +302,7 @@ public class PulsarSinkTest {
@Override
public boolean matches(Object o) {
if (o instanceof String) {
- if (topic != null) {
- return topic.equals(o);
- } else {
- return defaultTopic.equals(o);
- }
+ return getTopicEquals(o, topic, defaultTopic);
}
return false;
}
@@ -340,11 +331,7 @@ public class PulsarSinkTest {
@Override
public Optional<String> getDestinationTopic() {
- if (topic != null) {
- return Optional.of(topic);
- } else {
- return Optional.empty();
- }
+ return getTopicOptional(topic);
}
}, "out1");
@@ -364,11 +351,7 @@ public class PulsarSinkTest {
@Override
public boolean matches(Object o) {
if (o instanceof String) {
- if (topic != null) {
- return topic.equals(o);
- } else {
- return defaultTopic.equals(o);
- }
+ return getTopicEquals(o, topic, defaultTopic);
}
return false;
}
@@ -397,11 +380,7 @@ public class PulsarSinkTest {
@Override
public Optional<String> getDestinationTopic() {
- if (topic != null) {
- return Optional.of(topic);
- } else {
- return Optional.empty();
- }
+ return getTopicOptional(topic);
}
@Override
public Optional<String> getPartitionId() {
@@ -434,11 +413,7 @@ public class PulsarSinkTest {
@Override
public boolean matches(Object o) {
if (o instanceof String) {
- if (topic != null) {
- return topic.equals(o);
- } else {
- return defaultTopic.equals(o);
- }
+ return getTopicEquals(o, topic, defaultTopic);
}
return false;
}
@@ -460,4 +435,20 @@ public class PulsarSinkTest {
}
}
+ private Optional<String> getTopicOptional(String topic) {
+ if (topic != null) {
+ return Optional.of(topic);
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ private boolean getTopicEquals(Object o, String topic, String defaultTopic) {
+ if (topic != null) {
+ return topic.equals(o);
+ } else {
+ return defaultTopic.equals(o);
+ }
+ }
+
}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 88c9637..43b4bcb 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -46,7 +46,6 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.io.core.SourceContext;
import org.testng.annotations.Test;
@@ -73,9 +72,8 @@ public class PulsarSourceTest {
}
/**
- * Verify that JavaInstance does not support functions that take Void type as input
+ * Verify that JavaInstance does not support functions that take Void type as input.
*/
-
private static PulsarClientImpl getPulsarClient() throws PulsarClientException {
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
@@ -182,22 +180,6 @@ public class PulsarSourceTest {
pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
}
- /**
- * Verify that Explicit setting of Default Serializer works fine.
- */
- @Test
- public void testExplicitDefaultSerDe() throws Exception {
- PulsarSourceConfig pulsarConfig = getPulsarConfigs();
- // set type to void
- pulsarConfig.setTypeClassName(String.class.getName());
- consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
- ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build());
- pulsarConfig.setTopicSchema(consumerConfigs);
- PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
-
- pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
- }
-
@Test
public void testComplexOuputType() throws Exception {
PulsarSourceConfig pulsarConfig = getPulsarConfigs();
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ByteBufferSerDe.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ByteBufferSerDe.java
index f433372..0a1eb2a 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ByteBufferSerDe.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ByteBufferSerDe.java
@@ -23,7 +23,7 @@ import org.apache.pulsar.functions.api.SerDe;
import java.nio.ByteBuffer;
/**
- * Simple ByteBuffer Serializer and Deserializer
+ * Simple ByteBuffer Serializer and Deserializer.
*/
public class ByteBufferSerDe implements SerDe<Integer> {
@Override
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CommaWindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CommaWindowFunction.java
index da696fd..c966fae 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CommaWindowFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CommaWindowFunction.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.functions.api.examples;
import java.util.Collection;
/**
- * Comma based aggregation window function example
+ * Comma based aggregation window function example.
*/
public class CommaWindowFunction implements java.util.function.Function<Collection<String>, String> {
@Override
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConfigBasedAppendFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConfigBasedAppendFunction.java
index 1edd943..8e9a475 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConfigBasedAppendFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConfigBasedAppendFunction.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.functions.api.Function;
import java.util.Optional;
/**
- * Function that appends something to incoming input based on config supplied
+ * Function that appends something to incoming input based on config supplied.
*/
public class ConfigBasedAppendFunction implements Function<String, String> {
@Override
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseSerde.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseSerde.java
index b8a6a57..da84a40 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseSerde.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseSerde.java
@@ -23,7 +23,7 @@ import org.apache.pulsar.functions.api.SerDe;
import java.nio.ByteBuffer;
/**
- * Example of using a byte buffer serialization for Custom object
+ * Example of using a byte buffer serialization for Custom object.
*/
public class CustomBaseSerde implements SerDe<CustomBaseObject> {
@Override
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToBaseFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToBaseFunction.java
index d3b9225..f482fd6 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToBaseFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToBaseFunction.java
@@ -22,7 +22,7 @@ import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
/**
- * Function example of processing on a custom object type
+ * Function example of processing on a custom object type.
*/
public class CustomBaseToBaseFunction implements Function<CustomBaseObject, CustomBaseObject> {
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToDerivedFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToDerivedFunction.java
index 9e8cab5..41aa3e4 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToDerivedFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToDerivedFunction.java
@@ -22,7 +22,7 @@ import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
/**
- * Examplf of fucntion doing a type object conversion between input ann output type
+ * Example of function doing a type object conversion between input ann output type.
*/
public class CustomBaseToDerivedFunction implements Function<CustomBaseObject, CustomDerivedObject> {
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedSerde.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedSerde.java
index 219cdad..6305730 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedSerde.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedSerde.java
@@ -23,7 +23,7 @@ import org.apache.pulsar.functions.api.SerDe;
import java.nio.ByteBuffer;
/**
- * Example to derived object serialization
+ * Example to derived object serialization.
*/
public class CustomDerivedSerde implements SerDe<CustomDerivedObject> {
@Override
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToDerivedFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToDerivedFunction.java
index a86c82d..3937910 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToDerivedFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToDerivedFunction.java
@@ -22,7 +22,7 @@ import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
/**
- * Example of 2nd order conversion from a base object for composition pipelines
+ * Example of 2nd order conversion from a base object for composition pipelines.
*/
public class CustomDerivedToDerivedFunction implements Function<CustomDerivedObject, CustomDerivedObject> {
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomObjectFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomObjectFunction.java
index d2db0a6..d1df6ba 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomObjectFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomObjectFunction.java
@@ -23,7 +23,7 @@ import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.examples.serde.CustomObject;
/**
- * Function that deals with custom objects
+ * Function that deals with custom objects.
*/
public class CustomObjectFunction implements Function<CustomObject, CustomObject> {
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java
index 7d4d2ec..c2af6e5 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java
@@ -23,7 +23,7 @@ import org.apache.pulsar.functions.api.Function;
/**
* The classic Exclamation Function that appends an exclamation at the end
- * of the input
+ * of the input.
*/
public class ExclamationFunction implements Function<String, String> {
@Override
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/HostAppenderFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/HostAppenderFunction.java
index 2358e91..0a2527d 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/HostAppenderFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/HostAppenderFunction.java
@@ -25,7 +25,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
/**
- * Function that appends the host name to the payload message
+ * Function that appends the host name to the payload message.
*/
public class HostAppenderFunction implements Function<String, String> {
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/InstanceIdAppenderFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/InstanceIdAppenderFunction.java
index 0a56386..e7127ed 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/InstanceIdAppenderFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/InstanceIdAppenderFunction.java
@@ -22,7 +22,7 @@ import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
/**
- * Function that appends the instance id to the payload message
+ * Function that appends the instance id to the payload message.
*/
public class InstanceIdAppenderFunction implements Function<String, String> {
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/IntegerAdditionFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/IntegerAdditionFunction.java
index d6e8276..78c8bc7 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/IntegerAdditionFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/IntegerAdditionFunction.java
@@ -22,7 +22,7 @@ import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
/**
- * Function that appends to an integer input value and outputs the new value
+ * Function that appends to an integer input value and outputs the new value.
*/
public class IntegerAdditionFunction implements Function<Integer, Integer> {
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
index 4b73678..9f6e775 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
@@ -25,7 +25,7 @@ import org.apache.pulsar.functions.api.Function;
* A function that demonstrates how to redirect logging to a topic.
* In this particular example, for every input string, the function
* does some logging. If --logTopic topic is specified, these log statements
- * end up in that specified pulsar topic
+ * end up in that specified pulsar topic.
*/
public class LoggingFunction implements Function<String, String> {
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
index 45cbd72..cda8331 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
@@ -23,7 +23,7 @@ import org.apache.pulsar.functions.api.Function;
/**
* Example function that uses the built in publish function in the context
- * to publish to a desired topic based on config
+ * to publish to a desired topic based on config.
*/
public class PublishFunction implements Function<String, Void> {
@Override
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
index add14d4..5f29b1d 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.functions.api.Function;
import java.util.Optional;
/**
- * An example demonstrate retrieving user config value from Context
+ * An example demonstrate retrieving user config value from Context.
*/
public class UserConfigFunction implements Function<String, String> {
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
index 5bf04be..ec3fd95 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
/**
- * An example demonstrate publishing messages through Context
+ * An example demonstrate publishing messages through Context.
*/
public class UserPublishFunction implements Function<String, Void> {
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
index cb0aa9f..8c0da3b 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.functions.api.examples;
import java.util.Collection;
/**
- * This functions collects the timestamp during the window operation
+ * This functions collects the timestamp during the window operation.
*/
public class WindowDurationFunction implements java.util.function.Function<Collection<String>, String> {
@Override
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java
index 67d056e..efff849 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Tick.java
@@ -23,7 +23,7 @@ import lombok.EqualsAndHashCode;
import lombok.ToString;
/**
- * Pojo to represent a stock tick
+ * Pojo to represent a stock tick.
*/
@Data
@ToString
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObject.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObject.java
index f50d10c..bd7fa10 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObject.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObject.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.functions.api.examples.serde;
import lombok.*;
/**
- * This class simulates a user defined POJO
+ * This class simulates a user defined POJO.
*/
@Getter
@Setter
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObjectSerde.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObjectSerde.java
index fd5bcdc..f7f76f9 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObjectSerde.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/serde/CustomObjectSerde.java
@@ -23,7 +23,7 @@ import org.apache.pulsar.functions.api.SerDe;
import java.nio.ByteBuffer;
/**
- * This class takes care of serializing/deserializing CustomObject
+ * This class takes care of serializing/deserializing CustomObject.
*/
public class CustomObjectSerde implements SerDe<CustomObject> {
@Override
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index f480b2e..15af660 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -192,7 +192,7 @@ public class JavaInstanceMain implements AutoCloseable {
.addService(new InstanceControlImpl(runtimeSpawner))
.build()
.start();
- log.info("JaveInstance Server started, listening on " + port);
+ log.info("JavaInstance Server started, listening on " + port);
java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
@@ -341,7 +341,7 @@ public class JavaInstanceMain implements AutoCloseable {
@Override
public void healthCheck(com.google.protobuf.Empty request,
io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.HealthCheckResult> responseObserver) {
- log.debug("Recieved health check request...");
+ log.debug("Received health check request...");
InstanceCommunication.HealthCheckResult healthCheckResult
= InstanceCommunication.HealthCheckResult.newBuilder().setSuccess(true).build();
responseObserver.onNext(healthCheckResult);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 3c0468d..140de1c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -217,7 +217,7 @@ public class KubernetesRuntime implements Runtime {
}
/**
- * The core logic that creates a service first followed by statefulset
+ * The core logic that creates a service first followed by statefulset.
*/
@Override
public void start() throws Exception {
@@ -304,14 +304,14 @@ public class KubernetesRuntime implements Runtime {
@Override
public CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics() {
CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
- retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesnt support getAndReset metrics via rest"));
+ retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesn't support getAndReset metrics via rest"));
return retval;
}
@Override
public CompletableFuture<Void> resetMetrics() {
CompletableFuture<Void> retval = new CompletableFuture<>();
- retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesnt support resetting metrics via rest"));
+ retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesn't support resetting metrics via rest"));
return retval;
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
index c3bc756..717659e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
@@ -38,7 +38,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
-import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 7cc6efb..734925a 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -129,7 +129,7 @@ class ProcessRuntime implements Runtime {
}
/**
- * The core logic that initialize the thread container and executes the function
+ * The core logic that initialize the thread container and executes the function.
*/
@Override
public void start() {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index 2b7de14..2b85815 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -112,7 +112,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
@Override
public ProcessRuntime createContainer(InstanceConfig instanceConfig, String codeFile,
- String originalcodeFileName,
+ String originalCodeFileName,
Long expectedHealthCheckInterval) throws Exception {
String instanceFile;
switch (instanceConfig.getFunctionDetails().getRuntime()) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index aff3404..5bb3ab1 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -24,8 +24,6 @@
package org.apache.pulsar.functions.runtime;
import java.io.IOException;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -33,14 +31,12 @@ import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.utils.Utils;
-import static org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime.PYTHON;
@Slf4j
public class RuntimeSpawner implements AutoCloseable {
@@ -134,7 +130,7 @@ public class RuntimeSpawner implements AutoCloseable {
return Utils.printJson(msg);
} catch (IOException e) {
throw new RuntimeException(
- instanceConfig.getFunctionDetails().getName() + " Exception parsing getstatus", e);
+ instanceConfig.getFunctionDetails().getName() + " Exception parsing getStatus", e);
}
});
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 95f10ae..37d2b04 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -47,7 +47,7 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
/**
- * Util class for common runtime functionality
+ * Util class for common runtime functionality.
*/
@Slf4j
public class RuntimeUtils {
@@ -257,5 +257,5 @@ public class RuntimeUtils {
rd.close();
return result.toString();
}
-
+
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 93246f2..8aff81a 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -87,7 +87,7 @@ class ThreadRuntime implements Runtime {
}
/**
- * The core logic that initialize the thread container and executes the function
+ * The core logic that initialize the thread container and executes the function.
*/
@Override
public void start() {
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/ClearTextSecretsProvider.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/ClearTextSecretsProvider.java
index adb2852..79aa58a 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/ClearTextSecretsProvider.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/ClearTextSecretsProvider.java
@@ -23,8 +23,9 @@ package org.apache.pulsar.functions.secretsprovider;
* the secrets as being passed in cleartext.
*/
public class ClearTextSecretsProvider implements SecretsProvider {
- /**
- * Fetches a secret
+ /**
+ * Fetches a secret.
+ *
* @return The actual secret
*/
@Override
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProvider.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProvider.java
index b058709..9793b7e 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProvider.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProvider.java
@@ -25,7 +25,8 @@ package org.apache.pulsar.functions.secretsprovider;
public class EnvironmentBasedSecretsProvider implements SecretsProvider {
/**
- * Fetches a secret
+ * Fetches a secret.
+ *
* @return The actual secret
*/
@Override
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/SecretsProvider.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/SecretsProvider.java
index 7d5330d..2ee0a3f 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/SecretsProvider.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsprovider/SecretsProvider.java
@@ -23,17 +23,19 @@ import java.util.Map;
/**
* This file defines the SecretsProvider interface. This interface is used by the function
* instances/containers to actually fetch the secrets. What SecretsProvider to use is
- * decided by the SecretsProviderConfigurator
+ * decided by the SecretsProviderConfigurator.
*/
public interface SecretsProvider {
/**
- * Initialize the SecretsProvider
+ * Initialize the SecretsProvider.
+ *
* @return
*/
default void init(Map<String, String> config) {}
/**
- * Fetches a secret
+ * Fetches a secret.
+ *
* @return The actual secret
*/
String provideSecret(String secretName, Object pathToSecret);
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java
index c9b54f5..32c13b7 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/DefaultSecretsProviderConfigurator.java
@@ -41,7 +41,7 @@ public class DefaultSecretsProviderConfigurator implements SecretsProviderConfig
case PYTHON:
return "secretsprovider.ClearTextSecretsProvider";
default:
- throw new RuntimeException("Unknwon runtime " + functionDetails.getRuntime());
+ throw new RuntimeException("Unknown runtime " + functionDetails.getRuntime());
}
}
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
index bcccfe4..6dc31c7 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
@@ -35,7 +35,7 @@ import java.util.Map;
* As such this implementation is strictly when workers are configured to use kubernetes runtime.
* We use kubernetes in built secrets and bind them as environment variables within the function container
* to ensure that the secrets are available to the function at runtime. Then we plug in the
- * EnvironmentBasedSecretsConfig as the secrets provider who knows how to read these environment variables
+ * EnvironmentBasedSecretsConfig as the secrets provider who knows how to read these environment variables.
*/
public class KubernetesSecretsProviderConfigurator implements SecretsProviderConfigurator {
private static String ID_KEY = "path";
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java
index c31686f..349559e 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java
@@ -33,41 +33,41 @@ import java.util.Map;
*/
public interface SecretsProviderConfigurator {
/**
- * Initialize the SecretsProviderConfigurator
- * @return
+ * Initialize the SecretsProviderConfigurator.
*/
default void init(Map<String, String> config) {}
/**
* Return the Secrets Provider Classname. This will be passed to the cmdline
* of the instance and should contain the logic of connecting with the secrets
- * provider and obtaining secrets
+ * provider and obtaining secrets.
*/
String getSecretsProviderClassName(Function.FunctionDetails functionDetails);
/**
- * Return the secrets provider config
+ * Return the secrets provider config.
*/
Map<String, String> getSecretsProviderConfig(Function.FunctionDetails functionDetails);
/**
- * Attaches any secrets specific stuff to the k8 container for kubernetes runtime
+ * Attaches any secrets specific stuff to the k8 container for kubernetes runtime.
*/
void configureKubernetesRuntimeSecretsProvider(V1PodSpec podSpec, String functionsContainerName, Function.FunctionDetails functionDetails);
/**
- * Attaches any secrets specific stuff to the ProcessBuilder for process runtime
+ * Attaches any secrets specific stuff to the ProcessBuilder for process runtime.
*/
void configureProcessRuntimeSecretsProvider(ProcessBuilder processBuilder, Function.FunctionDetails functionDetails);
/**
- * What is the type of the object that should be in the user secret config
+ * What is the type of the object that should be in the user secret config.
+ *
* @return
*/
Type getSecretObjectType();
/**
- * Do config checks to see whether the secrets provided are conforming
+ * Do config checks to see whether the secrets provided are conforming.
*/
default void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, Function.FunctionDetails functionDetails) {}
diff --git a/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProviderTest.java b/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProviderTest.java
index fa80d70..3392e9a 100644
--- a/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProviderTest.java
+++ b/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsprovider/EnvironmentBasedSecretsProviderTest.java
@@ -19,17 +19,12 @@
package org.apache.pulsar.functions.secretsprovider;
-import org.mockito.Mockito;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.lang.reflect.Field;
import java.util.Map;
-import static org.mockito.Matchers.anyString;
-
/**
* Unit test of {@link Exceptions}.
*/
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 7932f58..d627700 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -657,7 +657,7 @@ public class FunctionConfigUtils {
mergedConfig.setLogTopic(newConfig.getLogTopic());
}
if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) {
- throw new IllegalArgumentException("Processing Guarantess cannot be alterted");
+ throw new IllegalArgumentException("Processing Guarantess cannot be altered");
}
if (newConfig.getRetainOrdering() != null && !newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) {
throw new IllegalArgumentException("Retain Orderning cannot be altered");
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
index 67dde1c..7ba22aa 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
@@ -133,7 +133,7 @@ public class Reflections {
ClassLoader classLoader, Object[] params, Class[] paramTypes) {
if (params.length != paramTypes.length) {
throw new RuntimeException(
- "Unequal number of params and paramTypes. Each param must have a correspoinding param type");
+ "Unequal number of params and paramTypes. Each param must have a corresponding param type");
}
Class<?> theCls;
try {
@@ -172,7 +172,8 @@ public class Reflections {
}
/**
- * Check if a class is in a jar
+ * Check if a class is in a jar.
+ *
* @param jar location of the jar
* @param fqcn fully qualified class name to search for in jar
* @return true if class can be loaded from jar and false if otherwise
@@ -189,7 +190,8 @@ public class Reflections {
}
/**
- * Check if class exists
+ * Check if class exists.
+ *
* @param fqcn fully qualified class name to search for
* @return true if class can be loaded from jar and false if otherwise
*/
@@ -203,7 +205,8 @@ public class Reflections {
}
/**
- * check if a class implements an interface
+ * check if a class implements an interface.
+ *
* @param fqcn fully qualified class name to search for in jar
* @param xface interface to check if implement
* @return true if class from jar implements interface xface and false if otherwise
@@ -223,7 +226,8 @@ public class Reflections {
}
/**
- * check if class implements interface
+ * check if class implements interface.
+ *
* @param fqcn fully qualified class name
* @param xface the interface the fqcn should implement
* @return true if class implements interface xface and false if otherwise
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index ecbe487..efb4133 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -30,7 +30,6 @@ import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -423,7 +422,7 @@ public class SinkConfigUtils {
});
}
if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) {
- throw new IllegalArgumentException("Processing Guarantess cannot be alterted");
+ throw new IllegalArgumentException("Processing Guarantess cannot be altered");
}
if (newConfig.getConfigs() != null) {
mergedConfig.setConfigs(newConfig.getConfigs());
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 59b05bd..0f7ff51 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -270,7 +270,7 @@ public class SourceConfigUtils {
mergedConfig.setSecrets(newConfig.getSecrets());
}
if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees().equals(existingConfig.getProcessingGuarantees())) {
- throw new IllegalArgumentException("Processing Guarantess cannot be alterted");
+ throw new IllegalArgumentException("Processing Guarantess cannot be altered");
}
if (newConfig.getParallelism() != null) {
mergedConfig.setParallelism(newConfig.getParallelism());
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index d2920ca..2318f5a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -236,7 +236,8 @@ public class Utils {
}
/**
- * Load a jar
+ * Load a jar.
+ *
* @param jar file of jar
* @return classloader
* @throws MalformedURLException
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 68dddc2..da2a728 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -221,7 +221,7 @@ public class FunctionConfigUtilsTest {
assertTrue(mergedConfig.getCleanupSubscription());
}
- @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be altered")
public void testMergeDifferentProcessingGuarantees() {
FunctionConfig functionConfig = createFunctionConfig();
FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("processingGuarantees", EFFECTIVELY_ONCE);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java
index 41f943e..b0fa14c 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java
@@ -154,7 +154,7 @@ public class ReflectionsTest {
}
@Test
- public void testclassInJarImplementsIface() {
+ public void testClassInJarImplementsIface() {
assertTrue(Reflections.classImplementsIface(aImplementation.class.getName(), aInterface.class));
assertTrue(!Reflections.classImplementsIface(aImplementation.class.getName(), bInterface.class));
}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index efde8ab..f147cfb 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -139,7 +139,7 @@ public class SinkConfigUtilsTest {
assertEquals(mergedConfig.getInputSpecs().get("test-input"), newSinkConfig.getInputSpecs().get("test-input"));
}
- @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be altered")
public void testMergeDifferentProcessingGuarantees() {
SinkConfig sinkConfig = createSinkConfig();
SinkConfig newSinkConfig = createUpdatedSinkConfig("processingGuarantees", EFFECTIVELY_ONCE);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 0f4b1cb..cb141e8 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -117,7 +117,7 @@ public class SourceConfigUtilsTest {
SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
}
- @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be alterted")
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Processing Guarantess cannot be altered")
public void testMergeDifferentProcessingGuarantees() {
SourceConfig sourceConfig = createSourceConfig();
SourceConfig newSourceConfig = createUpdatedSourceConfig("processingGuarantees", EFFECTIVELY_ONCE);
diff --git a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/Event.java b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/Event.java
index 1386f86..c9a12a1 100644
--- a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/Event.java
+++ b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/Event.java
@@ -33,14 +33,14 @@ public interface Event<T> {
Record<?> getRecord();
/**
- * The event timestamp in millis
+ * The event timestamp in millis.
*
* @return the event timestamp in milliseconds.
*/
long getTimestamp();
/**
- * Returns the wrapped object
+ * Returns the wrapped object.
*
* @return the wrapped object.
*/
diff --git a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/EvictionContext.java b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/EvictionContext.java
index d5700a8..4f1ef79 100644
--- a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/EvictionContext.java
+++ b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/EvictionContext.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.functions.windowing;
/**
- * Context information that can be used by the eviction policy
+ * Context information that can be used by the eviction policy.
*/
public interface EvictionContext {
/**
@@ -32,7 +32,7 @@ public interface EvictionContext {
Long getReferenceTime();
/**
- * Returns the sliding count for count based windows
+ * Returns the sliding count for count based windows.
*
* @return the sliding count
*/
@@ -40,7 +40,7 @@ public interface EvictionContext {
/**
- * Returns the sliding interval for time based windows
+ * Returns the sliding interval for time based windows.
*
* @return the sliding interval
*/
diff --git a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/EvictionPolicy.java b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/EvictionPolicy.java
index 2144fc1..25f4c99 100644
--- a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/EvictionPolicy.java
+++ b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/EvictionPolicy.java
@@ -33,14 +33,17 @@ public interface EvictionPolicy<T, S> {
/**
* expire the event and remove it from the queue.
*/
- EXPIRE, /**
+ EXPIRE,
+ /**
* process the event in the current window of events.
*/
- PROCESS, /**
+ PROCESS,
+ /**
* don't include in the current window but keep the event
* in the queue for evaluating as a part of future windows.
*/
- KEEP, /**
+ KEEP,
+ /**
* stop processing the queue, there cannot be anymore events
* satisfying the eviction policy.
*/
diff --git a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WaterMarkEventGenerator.java b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WaterMarkEventGenerator.java
index 046d584..d904202 100644
--- a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WaterMarkEventGenerator.java
+++ b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WaterMarkEventGenerator.java
@@ -53,6 +53,7 @@ public class WaterMarkEventGenerator<T> implements Runnable {
/**
* Creates a new WatermarkEventGenerator.
+ *
* @param windowManager The window manager this generator will submit watermark events to
* @param intervalMs The generator will check if it should generate a watermark event with this intervalMs
* @param eventTsLagMs The max allowed lag behind the last watermark event before an event is considered late
@@ -107,7 +108,7 @@ public class WaterMarkEventGenerator<T> implements Runnable {
}
/**
- * Computes the min ts across all intput topics.
+ * Computes the min ts across all input topics.
*/
private long computeWaterMarkTs() {
long ts = 0;
diff --git a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
index 94c2362..5f46365 100644
--- a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
+++ b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
@@ -89,7 +89,8 @@ public class WindowManager<T> implements TriggerHandler {
/**
* Add an event into the window, with the given ts as the tracking ts.
- * @param event the event to track
+ *
+ * @param event the event to track
* @param ts the timestamp
*/
public void add(T event, long ts, Record<?> record) {
@@ -225,7 +226,7 @@ public class WindowManager<T> implements TriggerHandler {
/**
* Scans the event queue and returns the next earliest event ts
- * between the startTs and endTs
+ * between the startTs and endTs.
*
* @param startTs the start ts (exclusive)
* @param endTs the end ts (inclusive)
diff --git a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/evictors/TimeEvictionPolicy.java b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/evictors/TimeEvictionPolicy.java
index 147ca5d..853a679 100644
--- a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/evictors/TimeEvictionPolicy.java
+++ b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/evictors/TimeEvictionPolicy.java
@@ -35,7 +35,7 @@ public class TimeEvictionPolicy<T> implements EvictionPolicy<T, EvictionContext>
/**
* Constructs a TimeEvictionPolicy that evicts events older
- * than the given window length in millis
+ * than the given window length in millis.
*
* @param windowLength the duration in milliseconds
*/
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
index 3f8bec3..b40c6cd 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.function.Function;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.functions.proto.Function.Assignment;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 325f3e7..8c79f83 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -452,7 +452,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
}
/**
- * Get stats of a function instance. If this worker is not running the function instance,
+ * Get stats of a function instance. If this worker is not running the function instance.
* @param tenant the tenant the function belongs to
* @param namespace the namespace the function belongs to
* @param functionName the function name
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 9163def..65dbc40 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.functions.worker;
-import com.google.common.util.concurrent.AbstractService;
-
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 6efcbd2..df7f14d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -265,7 +265,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
private boolean authenticationEnabled = false;
@FieldContext(
category = CATEGORY_WORKER_SECURITY,
- doc = "Autentication provider name list, which is a list of class names"
+ doc = "Authentication provider name list, which is a list of class names"
)
private Set<String> authenticationProviders = Sets.newTreeSet();
@FieldContext(
@@ -439,7 +439,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "Any config the secret provider configurator might need. \n\nThis is passed on"
- + " to the init method of the secretproviderconfigurator"
+ + " to the init method of the SecretsProviderConfigurator"
)
private Map<String, String> secretsProviderConfiguratorConfig;
@FieldContext(
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 3c83c18..d06053b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -27,7 +27,6 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -194,7 +193,7 @@ public class WorkerService {
// Start function runtime manager
this.functionRuntimeManager.start();
- // indicate function worker service is done intializing
+ // indicate function worker service is done initializing
this.isInitialized = true;
this.connectorsManager = new ConnectorsManager(workerConfig);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/dlog/DLInputStream.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/dlog/DLInputStream.java
index fd78eab..10acb54 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/dlog/DLInputStream.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/dlog/DLInputStream.java
@@ -27,6 +27,9 @@ import org.apache.distributedlog.exceptions.EndOfStreamException;
import java.io.IOException;
import java.io.InputStream;
+/**
+ * DistributedLog Input Stream.
+ */
public class DLInputStream extends InputStream {
private LogRecordWithInputStream currentLogRecord = null;
@@ -60,7 +63,7 @@ public class DLInputStream extends InputStream {
}
/**
- * Construct distributedlog input stream
+ * Construct DistributedLog input stream.
*
* @param dlm the Distributed Log Manager to access the stream
*/
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
index 4e6b34a..532d852 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
@@ -26,7 +26,6 @@ import javax.ws.rs.core.UriInfo;
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.functions.worker.WorkerService;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
public class FunctionApiResource implements Supplier<WorkerService> {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index a6c4974..2c9a12e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
import java.net.BindException;
import java.net.URI;
-import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 2f4856e..2bae728 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -81,7 +81,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.functions.FunctionConfig;
@@ -91,7 +90,6 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.Codec;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index be8fd49..1c5a090 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -65,10 +65,7 @@ public class FunctionsImpl extends ComponentImpl {
List<ExceptionInformation> userExceptionInformationList = new LinkedList<>();
for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
- ExceptionInformation exceptionInformation
- = new ExceptionInformation();
- exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
- exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ ExceptionInformation exceptionInformation = getExceptionInformation(exceptionEntry);
userExceptionInformationList.add(exceptionInformation);
}
functionInstanceStatusData.setLatestUserExceptions(userExceptionInformationList);
@@ -78,24 +75,15 @@ public class FunctionsImpl extends ComponentImpl {
+ status.getNumSourceExceptions() + status.getNumSinkExceptions());
List<ExceptionInformation> systemExceptionInformationList = new LinkedList<>();
for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSystemExceptionsList()) {
- ExceptionInformation exceptionInformation
- = new ExceptionInformation();
- exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
- exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ ExceptionInformation exceptionInformation = getExceptionInformation(exceptionEntry);
systemExceptionInformationList.add(exceptionInformation);
}
for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSourceExceptionsList()) {
- ExceptionInformation exceptionInformation
- = new ExceptionInformation();
- exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
- exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ ExceptionInformation exceptionInformation = getExceptionInformation(exceptionEntry);
systemExceptionInformationList.add(exceptionInformation);
}
for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSinkExceptionsList()) {
- ExceptionInformation exceptionInformation
- = new ExceptionInformation();
- exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
- exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ ExceptionInformation exceptionInformation = getExceptionInformation(exceptionEntry);
systemExceptionInformationList.add(exceptionInformation);
}
functionInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
@@ -199,6 +187,14 @@ public class FunctionsImpl extends ComponentImpl {
}
}
+ private ExceptionInformation getExceptionInformation(InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+ exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+ exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ return exceptionInformation;
+ }
+
public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
super(workerServiceSupplier, Utils.ComponentType.FUNCTION);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 4a801b2..39828a0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -69,26 +69,17 @@ public class SinkImpl extends ComponentImpl {
+ status.getNumUserExceptions() + status.getNumSourceExceptions());
List<ExceptionInformation> systemExceptionInformationList = new LinkedList<>();
for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
- ExceptionInformation exceptionInformation
- = new ExceptionInformation();
- exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
- exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ ExceptionInformation exceptionInformation = getExceptionInformation(exceptionEntry);
systemExceptionInformationList.add(exceptionInformation);
}
for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSystemExceptionsList()) {
- ExceptionInformation exceptionInformation
- = new ExceptionInformation();
- exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
- exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ ExceptionInformation exceptionInformation = getExceptionInformation(exceptionEntry);
systemExceptionInformationList.add(exceptionInformation);
}
for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSourceExceptionsList()) {
- ExceptionInformation exceptionInformation
- = new ExceptionInformation();
- exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
- exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ ExceptionInformation exceptionInformation = getExceptionInformation(exceptionEntry);
systemExceptionInformationList.add(exceptionInformation);
}
sinkInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);
@@ -96,10 +87,7 @@ public class SinkImpl extends ComponentImpl {
sinkInstanceStatusData.setNumSinkExceptions(status.getNumSinkExceptions());
List<ExceptionInformation> sinkExceptionInformationList = new LinkedList<>();
for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSinkExceptionsList()) {
- ExceptionInformation exceptionInformation
- = new ExceptionInformation();
- exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
- exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ ExceptionInformation exceptionInformation = getExceptionInformation(exceptionEntry);
sinkExceptionInformationList.add(exceptionInformation);
}
sinkInstanceStatusData.setLatestSinkExceptions(sinkExceptionInformationList);
@@ -206,6 +194,14 @@ public class SinkImpl extends ComponentImpl {
}
}
+ private ExceptionInformation getExceptionInformation(InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry) {
+ ExceptionInformation exceptionInformation
+ = new ExceptionInformation();
+ exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
+ exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
+ return exceptionInformation;
+ }
+
public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
super(workerServiceSupplier, Utils.ComponentType.SINK);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
index c532e3a..f2b1936 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
@@ -33,7 +33,6 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
index 9347ed9..70e1d90 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/scheduler/RoundRobinScheduler.java
@@ -62,10 +62,10 @@ public class RoundRobinScheduler implements IScheduler {
int least = Integer.MAX_VALUE;
for (Map.Entry<String, List<Assignment>> entry : workerIdToAssignment.entrySet()) {
String workerId = entry.getKey();
- List<Assignment> workerAssigments = entry.getValue();
- if (workerAssigments.size() < least) {
+ List<Assignment> workerAssignments = entry.getValue();
+ if (workerAssignments.size() < least) {
targetWorkerId = workerId;
- least = workerAssigments.size();
+ least = workerAssignments.size();
}
}
return targetWorkerId;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index d8d54c9..2926c93 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -50,7 +50,6 @@ import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.RestException;
-import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 7695993..bdfb50e 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -50,7 +50,6 @@ import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.RestException;
-import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;