You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/10/02 18:31:18 UTC
[pulsar] branch master updated: [Issue-8162] [pulsar-io] Added
org.apache.pulsar.io.core.Context interface (#8164)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 700131b [Issue-8162] [pulsar-io] Added org.apache.pulsar.io.core.Context interface (#8164)
700131b is described below
commit 700131bab6bfd7746f40ad362ea96f9f2ef339c4
Author: David Kjerrumgaard <35...@users.noreply.github.com>
AuthorDate: Fri Oct 2 11:30:37 2020 -0700
[Issue-8162] [pulsar-io] Added org.apache.pulsar.io.core.Context interface (#8164)
* Added org.apache.pulsar.io.core.Context interface
* Added the Context interface
* Renamed Context to ConnectorContext
Co-authored-by: David Kjerrumgaard <dk...@splunk.com>
---
.../{SinkContext.java => ConnectorContext.java} | 43 +++----
.../org/apache/pulsar/io/core/SinkContext.java | 118 +-------------------
.../org/apache/pulsar/io/core/SourceContext.java | 124 +--------------------
3 files changed, 18 insertions(+), 267 deletions(-)
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
similarity index 82%
copy from pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
copy to pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
index f7e3b4d..a06451b 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
@@ -18,63 +18,52 @@
*/
package org.apache.pulsar.io.core;
-import org.slf4j.Logger;
-
import java.nio.ByteBuffer;
-import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import org.slf4j.Logger;
+
/**
- * Interface for a sink connector providing information about environment where it is running.
+ * Interface for a connector providing information about environment where it is running.
* It also allows to propagate information, such as logs, metrics, states, back to the Pulsar environment.
*/
-public interface SinkContext {
+public interface ConnectorContext {
/**
- * The id of the instance that invokes this sink.
+ * The id of the instance that invokes this source.
*
* @return the instance id
*/
int getInstanceId();
/**
- * Get the number of instances that invoke this sink.
+ * Get the number of instances that invoke this source.
*
- * @return the number of instances that invoke this sink.
+ * @return the number of instances that invoke this source.
*/
int getNumInstances();
-
+
/**
* Record a user defined metric
* @param metricName The name of the metric
* @param value The value of the metric
*/
void recordMetric(String metricName, double value);
-
+
/**
- * Get a list of all input topics
- * @return a list of all input topics
- */
- Collection<String> getInputTopics();
-
- /**
- * The tenant this sink belongs to
- * @return the tenant this sink belongs to
+ * The tenant this source belongs to.
+ *
+ * @return the tenant this source belongs to
*/
String getTenant();
/**
- * The namespace this sink belongs to
- * @return the namespace this sink belongs to
+ * The namespace this source belongs to.
+ *
+ * @return the namespace this source belongs to
*/
String getNamespace();
-
- /**
- * The name of the sink that we are executing
- * @return The Sink name
- */
- String getSinkName();
-
+
/**
* The logger object that can be used to log in a sink
* @return the logger object
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
index f7e3b4d..f145784 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
@@ -18,38 +18,13 @@
*/
package org.apache.pulsar.io.core;
-import org.slf4j.Logger;
-
-import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
/**
* Interface for a sink connector providing information about environment where it is running.
* It also allows to propagate information, such as logs, metrics, states, back to the Pulsar environment.
*/
-public interface SinkContext {
-
- /**
- * The id of the instance that invokes this sink.
- *
- * @return the instance id
- */
- int getInstanceId();
-
- /**
- * Get the number of instances that invoke this sink.
- *
- * @return the number of instances that invoke this sink.
- */
- int getNumInstances();
-
- /**
- * Record a user defined metric
- * @param metricName The name of the metric
- * @param value The value of the metric
- */
- void recordMetric(String metricName, double value);
+public interface SinkContext extends ConnectorContext {
/**
* Get a list of all input topics
@@ -58,100 +33,9 @@ public interface SinkContext {
Collection<String> getInputTopics();
/**
- * The tenant this sink belongs to
- * @return the tenant this sink belongs to
- */
- String getTenant();
-
- /**
- * The namespace this sink belongs to
- * @return the namespace this sink belongs to
- */
- String getNamespace();
-
- /**
* The name of the sink that we are executing
* @return The Sink name
*/
String getSinkName();
- /**
- * The logger object that can be used to log in a sink
- * @return the logger object
- */
- Logger getLogger();
-
- /**
- * Get the secret associated with this key
- * @param secretName The name of the secret
- * @return The secret if anything was found or null
- */
- String getSecret(String secretName);
-
- /**
- * Increment the builtin distributed counter referred by key.
- *
- * @param key The name of the key
- * @param amount The amount to be incremented
- */
- void incrCounter(String key, long amount);
-
-
- /**
- * Increment the builtin distributed counter referred by key
- * but dont wait for the completion of the increment operation
- *
- * @param key The name of the key
- * @param amount The amount to be incremented
- */
- CompletableFuture<Void> incrCounterAsync(String key, long amount);
-
- /**
- * Retrieve the counter value for the key.
- *
- * @param key name of the key
- * @return the amount of the counter value for this key
- */
- long getCounter(String key);
-
- /**
- * Retrieve the counter value for the key, but don't wait
- * for the operation to be completed
- *
- * @param key name of the key
- * @return the amount of the counter value for this key
- */
- CompletableFuture<Long> getCounterAsync(String key);
-
- /**
- * Update the state value for the key.
- *
- * @param key name of the key
- * @param value state value of the key
- */
- void putState(String key, ByteBuffer value);
-
- /**
- * Update the state value for the key, but don't wait for the operation to be completed
- *
- * @param key name of the key
- * @param value state value of the key
- */
- CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
-
- /**
- * Retrieve the state value for the key.
- *
- * @param key name of the key
- * @return the state value for the key.
- */
- ByteBuffer getState(String key);
-
- /**
- * Retrieve the state value for the key, but don't wait for the operation to be completed
- *
- * @param key name of the key
- * @return the state value for the key.
- */
- CompletableFuture<ByteBuffer> getStateAsync(String key);
}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
index 512d1ba..fae3ea4 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
@@ -22,38 +22,12 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.slf4j.Logger;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
/**
* Interface for a source connector providing information about environment where it is running.
* It also allows to propagate information, such as logs, metrics, states, back to the Pulsar environment.
*/
-public interface SourceContext {
-
- /**
- * The id of the instance that invokes this source.
- *
- * @return the instance id
- */
- int getInstanceId();
-
- /**
- * Get the number of instances that invoke this source.
- *
- * @return the number of instances that invoke this source.
- */
- int getNumInstances();
-
- /**
- * Record a user defined metric.
- *
- * @param metricName The name of the metric
- * @param value The value of the metric
- */
- void recordMetric(String metricName, double value);
+public interface SourceContext extends ConnectorContext {
/**
* Get the output topic of the source.
@@ -63,20 +37,6 @@ public interface SourceContext {
String getOutputTopic();
/**
- * The tenant this source belongs to.
- *
- * @return the tenant this source belongs to
- */
- String getTenant();
-
- /**
- * The namespace this source belongs to.
- *
- * @return the namespace this source belongs to
- */
- String getNamespace();
-
- /**
* The name of the source that we are executing.
*
* @return The Source name
@@ -84,88 +44,6 @@ public interface SourceContext {
String getSourceName();
/**
- * The logger object that can be used to log in a source.
- *
- * @return the logger object
- */
- Logger getLogger();
-
- /**
- * Get the secret associated with this key.
- *
- * @param secretName The name of the secret
- * @return The secret if anything was found or null
- */
- String getSecret(String secretName);
-
- /**
- * Increment the builtin distributed counter referred by key.
- *
- * @param key The name of the key
- * @param amount The amount to be incremented
- */
- void incrCounter(String key, long amount);
-
-
- /**
- * Increment the builtin distributed counter referred by key
- * but don't wait for the completion of the increment operation.
- *
- * @param key The name of the key
- * @param amount The amount to be incremented
- */
- CompletableFuture<Void> incrCounterAsync(String key, long amount);
-
- /**
- * Retrieve the counter value for the key.
- *
- * @param key name of the key
- * @return the amount of the counter value for this key
- */
- long getCounter(String key);
-
- /**
- * Retrieve the counter value for the key, but don't wait
- * for the operation to be completed.
- *
- * @param key name of the key
- * @return the amount of the counter value for this key
- */
- CompletableFuture<Long> getCounterAsync(String key);
-
- /**
- * Update the state value for the key.
- *
- * @param key name of the key
- * @param value state value of the key
- */
- void putState(String key, ByteBuffer value);
-
- /**
- * Update the state value for the key, but don't wait for the operation to be completed.
- *
- * @param key name of the key
- * @param value state value of the key
- */
- CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
-
- /**
- * Retrieve the state value for the key.
- *
- * @param key name of the key
- * @return the state value for the key.
- */
- ByteBuffer getState(String key);
-
- /**
- * Retrieve the state value for the key, but don't wait for the operation to be completed.
- *
- * @param key name of the key
- * @return the state value for the key.
- */
- CompletableFuture<ByteBuffer> getStateAsync(String key);
-
- /**
* New output message using schema for serializing to the topic
*
* @param topicName The name of the topic for output message