You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/01/28 04:09:14 UTC

[pulsar] branch master updated: Exposing more methods in Sink/Source Context (#3397)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ee25415  Exposing more methods in Sink/Source Context (#3397)
ee25415 is described below

commit ee2541577e6a56d6592769a44686113d14670ca2
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun Jan 27 22:09:09 2019 -0600

    Exposing more methods in Sink/Source Context (#3397)
    
    * Exposing more methods in Sink/Source Context
    
    * remove test code
    
    * update comments
    
    * remove extra space
    
    * addressing comments
    
    * removing unused imports
    
    * fix tests
---
 .../pulsar/functions/instance/ContextImpl.java     | 10 ++++++
 .../org/apache/pulsar/io/core/SinkContext.java     | 40 ++++++++++++++++++++--
 .../org/apache/pulsar/io/core/SourceContext.java   | 38 ++++++++++++++++++--
 .../io/kafka/source/KafkaAbstractSourceTest.java   | 26 ++++++++++++++
 4 files changed, 108 insertions(+), 6 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 97956b8..85b8080 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -193,6 +193,16 @@ class ContextImpl implements Context, SinkContext, SourceContext {
     }
 
     @Override
+    public String getSinkName() {
+        return config.getFunctionDetails().getName();
+    }
+
+    @Override
+    public String getSourceName() {
+        return config.getFunctionDetails().getName();
+    }
+
+    @Override
     public String getFunctionName() {
         return config.getFunctionDetails().getName();
     }
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
index bf8678b..384302a 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
@@ -18,19 +18,23 @@
  */
 package org.apache.pulsar.io.core;
 
+import org.slf4j.Logger;
+
+import java.util.Collection;
+
 public interface SinkContext {
 
     /**
-     * The id of the instance that invokes this function.
+     * The id of the instance that invokes this sink.
      *
      * @return the instance id
      */
     int getInstanceId();
 
     /**
-     * Get the number of instances that invoke this function.
+     * Get the number of instances that invoke this sink.
      *
-     * @return the number of instances that invoke this function.
+     * @return the number of instances that invoke this sink.
      */
     int getNumInstances();
 
@@ -40,4 +44,34 @@ public interface SinkContext {
      * @param value The value of the metric
      */
     void recordMetric(String metricName, double value);
+
+    /**
+     * Get a list of all input topics
+     * @return a list of all input topics
+     */
+    Collection<String> getInputTopics();
+
+    /**
+     * The tenant this sink belongs to
+     * @return the tenant this sink belongs to
+     */
+    String getTenant();
+
+    /**
+     * The namespace this sink belongs to
+     * @return the namespace this sink belongs to
+     */
+    String getNamespace();
+
+    /**
+     * The name of the sink that we are executing
+     * @return The Sink name
+     */
+    String getSinkName();
+
+    /**
+     * The logger object that can be used to log in a sink
+     * @return the logger object
+     */
+    Logger getLogger();
 }
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
index b557f53..201ff47 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
@@ -18,19 +18,21 @@
  */
 package org.apache.pulsar.io.core;
 
+import org.slf4j.Logger;
+
 public interface SourceContext {
 
     /**
-     * The id of the instance that invokes this function.
+     * The id of the instance that invokes this source.
      *
      * @return the instance id
      */
     int getInstanceId();
 
     /**
-     * Get the number of instances that invoke this function.
+     * Get the number of instances that invoke this source.
      *
-     * @return the number of instances that invoke this function.
+     * @return the number of instances that invoke this source.
      */
     int getNumInstances();
 
@@ -40,4 +42,34 @@ public interface SourceContext {
      * @param value The value of the metric
      */
     void recordMetric(String metricName, double value);
+
+    /**
+     * Get the output topic of the source
+     * @return output topic name
+     */
+    String getOutputTopic();
+
+    /**
+     * The tenant this source belongs to
+     * @return the tenant this source belongs to
+     */
+    String getTenant();
+
+    /**
+     * The namespace this source belongs to
+     * @return the namespace this source belongs to
+     */
+    String getNamespace();
+
+    /**
+     * The name of the source that we are executing
+     * @return The Source name
+     */
+    String getSourceName();
+
+    /**
+     * The logger object that can be used to log in a source
+     * @return the logger object
+     */
+    Logger getLogger();
 }
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 4bd4b39..d3c5fac 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -23,6 +23,7 @@ package org.apache.pulsar.io.kafka.source;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.kafka.KafkaAbstractSource;
+import org.slf4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -81,6 +82,31 @@ public class KafkaAbstractSourceTest {
             public void recordMetric(String metricName, double value) {
 
             }
+
+            @Override
+            public String getOutputTopic() {
+                return null;
+            }
+
+            @Override
+            public String getTenant() {
+                return null;
+            }
+
+            @Override
+            public String getNamespace() {
+                return null;
+            }
+
+            @Override
+            public String getSourceName() {
+                return null;
+            }
+
+            @Override
+            public Logger getLogger() {
+                return null;
+            }
         };
         Map<String, Object> config = new HashMap<>();
         ThrowingRunnable openAndClose = ()->{