You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/10/02 18:31:18 UTC

[pulsar] branch master updated: [Issue-8162] [pulsar-io] Added org.apache.pulsar.io.core.Context interface (#8164)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 700131b  [Issue-8162] [pulsar-io] Added org.apache.pulsar.io.core.Context interface (#8164)
700131b is described below

commit 700131bab6bfd7746f40ad362ea96f9f2ef339c4
Author: David Kjerrumgaard <35...@users.noreply.github.com>
AuthorDate: Fri Oct 2 11:30:37 2020 -0700

    [Issue-8162] [pulsar-io] Added org.apache.pulsar.io.core.Context interface (#8164)
    
    * Added org.apache.pulsar.io.core.Context interface
    
    * Added the Context interface
    
    * Renamed Context to ConnectorContext
    
    Co-authored-by: David Kjerrumgaard <dk...@splunk.com>
---
 .../{SinkContext.java => ConnectorContext.java}    |  43 +++----
 .../org/apache/pulsar/io/core/SinkContext.java     | 118 +-------------------
 .../org/apache/pulsar/io/core/SourceContext.java   | 124 +--------------------
 3 files changed, 18 insertions(+), 267 deletions(-)

diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
similarity index 82%
copy from pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
copy to pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
index f7e3b4d..a06451b 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
@@ -18,63 +18,52 @@
  */
 package org.apache.pulsar.io.core;
 
-import org.slf4j.Logger;
-
 import java.nio.ByteBuffer;
-import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
+import org.slf4j.Logger;
+
 /**
- * Interface for a sink connector providing information about environment where it is running.
+ * Interface for a connector providing information about environment where it is running.
  * It also allows to propagate information, such as logs, metrics, states, back to the Pulsar environment.
  */
-public interface SinkContext {
+public interface ConnectorContext {
 
     /**
-     * The id of the instance that invokes this sink.
+     * The id of the instance that invokes this source.
      *
      * @return the instance id
      */
     int getInstanceId();
 
     /**
-     * Get the number of instances that invoke this sink.
+     * Get the number of instances that invoke this source.
      *
-     * @return the number of instances that invoke this sink.
+     * @return the number of instances that invoke this source.
      */
     int getNumInstances();
-
+    
     /**
      * Record a user defined metric
      * @param metricName The name of the metric
      * @param value The value of the metric
      */
     void recordMetric(String metricName, double value);
-
+    
     /**
-     * Get a list of all input topics
-     * @return a list of all input topics
-     */
-    Collection<String> getInputTopics();
-
-    /**
-     * The tenant this sink belongs to
-     * @return the tenant this sink belongs to
+     * The tenant this source belongs to.
+     *
+     * @return the tenant this source belongs to
      */
     String getTenant();
 
     /**
-     * The namespace this sink belongs to
-     * @return the namespace this sink belongs to
+     * The namespace this source belongs to.
+     *
+     * @return the namespace this source belongs to
      */
     String getNamespace();
-
-    /**
-     * The name of the sink that we are executing
-     * @return The Sink name
-     */
-    String getSinkName();
-
+    
     /**
      * The logger object that can be used to log in a sink
      * @return the logger object
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
index f7e3b4d..f145784 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
@@ -18,38 +18,13 @@
  */
 package org.apache.pulsar.io.core;
 
-import org.slf4j.Logger;
-
-import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * Interface for a sink connector providing information about environment where it is running.
  * It also allows to propagate information, such as logs, metrics, states, back to the Pulsar environment.
  */
-public interface SinkContext {
-
-    /**
-     * The id of the instance that invokes this sink.
-     *
-     * @return the instance id
-     */
-    int getInstanceId();
-
-    /**
-     * Get the number of instances that invoke this sink.
-     *
-     * @return the number of instances that invoke this sink.
-     */
-    int getNumInstances();
-
-    /**
-     * Record a user defined metric
-     * @param metricName The name of the metric
-     * @param value The value of the metric
-     */
-    void recordMetric(String metricName, double value);
+public interface SinkContext extends ConnectorContext {
 
     /**
      * Get a list of all input topics
@@ -58,100 +33,9 @@ public interface SinkContext {
     Collection<String> getInputTopics();
 
     /**
-     * The tenant this sink belongs to
-     * @return the tenant this sink belongs to
-     */
-    String getTenant();
-
-    /**
-     * The namespace this sink belongs to
-     * @return the namespace this sink belongs to
-     */
-    String getNamespace();
-
-    /**
      * The name of the sink that we are executing
      * @return The Sink name
      */
     String getSinkName();
 
-    /**
-     * The logger object that can be used to log in a sink
-     * @return the logger object
-     */
-    Logger getLogger();
-
-    /**
-     * Get the secret associated with this key
-     * @param secretName The name of the secret
-     * @return The secret if anything was found or null
-     */
-    String getSecret(String secretName);
-
-    /**
-     * Increment the builtin distributed counter referred by key.
-     *
-     * @param key    The name of the key
-     * @param amount The amount to be incremented
-     */
-    void incrCounter(String key, long amount);
-
-
-    /**
-     * Increment the builtin distributed counter referred by key
-     * but dont wait for the completion of the increment operation
-     *
-     * @param key    The name of the key
-     * @param amount The amount to be incremented
-     */
-    CompletableFuture<Void> incrCounterAsync(String key, long amount);
-
-    /**
-     * Retrieve the counter value for the key.
-     *
-     * @param key name of the key
-     * @return the amount of the counter value for this key
-     */
-    long getCounter(String key);
-
-    /**
-     * Retrieve the counter value for the key, but don't wait
-     * for the operation to be completed
-     *
-     * @param key name of the key
-     * @return the amount of the counter value for this key
-     */
-    CompletableFuture<Long> getCounterAsync(String key);
-
-    /**
-     * Update the state value for the key.
-     *
-     * @param key   name of the key
-     * @param value state value of the key
-     */
-    void putState(String key, ByteBuffer value);
-
-    /**
-     * Update the state value for the key, but don't wait for the operation to be completed
-     *
-     * @param key   name of the key
-     * @param value state value of the key
-     */
-    CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
-
-    /**
-     * Retrieve the state value for the key.
-     *
-     * @param key name of the key
-     * @return the state value for the key.
-     */
-    ByteBuffer getState(String key);
-
-    /**
-     * Retrieve the state value for the key, but don't wait for the operation to be completed
-     *
-     * @param key name of the key
-     * @return the state value for the key.
-     */
-    CompletableFuture<ByteBuffer> getStateAsync(String key);
 }
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
index 512d1ba..fae3ea4 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
@@ -22,38 +22,12 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.slf4j.Logger;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
 
 /**
  * Interface for a source connector providing information about environment where it is running.
  * It also allows to propagate information, such as logs, metrics, states, back to the Pulsar environment.
  */
-public interface SourceContext {
-
-    /**
-     * The id of the instance that invokes this source.
-     *
-     * @return the instance id
-     */
-    int getInstanceId();
-
-    /**
-     * Get the number of instances that invoke this source.
-     *
-     * @return the number of instances that invoke this source.
-     */
-    int getNumInstances();
-
-    /**
-     * Record a user defined metric.
-     *
-     * @param metricName The name of the metric
-     * @param value The value of the metric
-     */
-    void recordMetric(String metricName, double value);
+public interface SourceContext extends ConnectorContext {
 
     /**
      * Get the output topic of the source.
@@ -63,20 +37,6 @@ public interface SourceContext {
     String getOutputTopic();
 
     /**
-     * The tenant this source belongs to.
-     *
-     * @return the tenant this source belongs to
-     */
-    String getTenant();
-
-    /**
-     * The namespace this source belongs to.
-     *
-     * @return the namespace this source belongs to
-     */
-    String getNamespace();
-
-    /**
      * The name of the source that we are executing.
      *
      * @return The Source name
@@ -84,88 +44,6 @@ public interface SourceContext {
     String getSourceName();
 
     /**
-     * The logger object that can be used to log in a source.
-     *
-     * @return the logger object
-     */
-    Logger getLogger();
-
-    /**
-     * Get the secret associated with this key.
-     *
-     * @param secretName The name of the secret
-     * @return The secret if anything was found or null
-     */
-    String getSecret(String secretName);
-
-    /**
-     * Increment the builtin distributed counter referred by key.
-     *
-     * @param key    The name of the key
-     * @param amount The amount to be incremented
-     */
-    void incrCounter(String key, long amount);
-
-
-    /**
-     * Increment the builtin distributed counter referred by key
-     * but don't wait for the completion of the increment operation.
-     *
-     * @param key    The name of the key
-     * @param amount The amount to be incremented
-     */
-    CompletableFuture<Void> incrCounterAsync(String key, long amount);
-
-    /**
-     * Retrieve the counter value for the key.
-     *
-     * @param key name of the key
-     * @return the amount of the counter value for this key
-     */
-    long getCounter(String key);
-
-    /**
-     * Retrieve the counter value for the key, but don't wait
-     * for the operation to be completed.
-     *
-     * @param key name of the key
-     * @return the amount of the counter value for this key
-     */
-    CompletableFuture<Long> getCounterAsync(String key);
-
-    /**
-     * Update the state value for the key.
-     *
-     * @param key   name of the key
-     * @param value state value of the key
-     */
-    void putState(String key, ByteBuffer value);
-
-    /**
-     * Update the state value for the key, but don't wait for the operation to be completed.
-     *
-     * @param key   name of the key
-     * @param value state value of the key
-     */
-    CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
-
-    /**
-     * Retrieve the state value for the key.
-     *
-     * @param key name of the key
-     * @return the state value for the key.
-     */
-    ByteBuffer getState(String key);
-
-    /**
-     * Retrieve the state value for the key, but don't wait for the operation to be completed.
-     *
-     * @param key name of the key
-     * @return the state value for the key.
-     */
-    CompletableFuture<ByteBuffer> getStateAsync(String key);
-
-    /**
      * New output message using schema for serializing to the topic
      *
      * @param topicName The name of the topic for output message