You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/01/28 04:09:14 UTC
[pulsar] branch master updated: Exposing more methods in
Sink/Source Context (#3397)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ee25415 Exposing more methods in Sink/Source Context (#3397)
ee25415 is described below
commit ee2541577e6a56d6592769a44686113d14670ca2
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun Jan 27 22:09:09 2019 -0600
Exposing more methods in Sink/Source Context (#3397)
* Exposing more methods in Sink/Source Context
* remove test code
* update comments
* remove extra space
* addressing comments
* removing unused imports
* fix tests
---
.../pulsar/functions/instance/ContextImpl.java | 10 ++++++
.../org/apache/pulsar/io/core/SinkContext.java | 40 ++++++++++++++++++++--
.../org/apache/pulsar/io/core/SourceContext.java | 38 ++++++++++++++++++--
.../io/kafka/source/KafkaAbstractSourceTest.java | 26 ++++++++++++++
4 files changed, 108 insertions(+), 6 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 97956b8..85b8080 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -193,6 +193,16 @@ class ContextImpl implements Context, SinkContext, SourceContext {
}
@Override
+ public String getSinkName() {
+ return config.getFunctionDetails().getName();
+ }
+
+ @Override
+ public String getSourceName() {
+ return config.getFunctionDetails().getName();
+ }
+
+ @Override
public String getFunctionName() {
return config.getFunctionDetails().getName();
}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
index bf8678b..384302a 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
@@ -18,19 +18,23 @@
*/
package org.apache.pulsar.io.core;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+
public interface SinkContext {
/**
- * The id of the instance that invokes this function.
+ * The id of the instance that invokes this sink.
*
* @return the instance id
*/
int getInstanceId();
/**
- * Get the number of instances that invoke this function.
+ * Get the number of instances that invoke this sink.
*
- * @return the number of instances that invoke this function.
+ * @return the number of instances that invoke this sink.
*/
int getNumInstances();
@@ -40,4 +44,34 @@ public interface SinkContext {
* @param value The value of the metric
*/
void recordMetric(String metricName, double value);
+
+ /**
+ * Get a list of all input topics
+ * @return a list of all input topics
+ */
+ Collection<String> getInputTopics();
+
+ /**
+ * The tenant this sink belongs to
+ * @return the tenant this sink belongs to
+ */
+ String getTenant();
+
+ /**
+ * The namespace this sink belongs to
+ * @return the namespace this sink belongs to
+ */
+ String getNamespace();
+
+ /**
+ * The name of the sink that we are executing
+ * @return The Sink name
+ */
+ String getSinkName();
+
+ /**
+ * The logger object that can be used to log in a sink
+ * @return the logger object
+ */
+ Logger getLogger();
}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
index b557f53..201ff47 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
@@ -18,19 +18,21 @@
*/
package org.apache.pulsar.io.core;
+import org.slf4j.Logger;
+
public interface SourceContext {
/**
- * The id of the instance that invokes this function.
+ * The id of the instance that invokes this source.
*
* @return the instance id
*/
int getInstanceId();
/**
- * Get the number of instances that invoke this function.
+ * Get the number of instances that invoke this source.
*
- * @return the number of instances that invoke this function.
+ * @return the number of instances that invoke this source.
*/
int getNumInstances();
@@ -40,4 +42,34 @@ public interface SourceContext {
* @param value The value of the metric
*/
void recordMetric(String metricName, double value);
+
+ /**
+ * Get the output topic of the source
+ * @return output topic name
+ */
+ String getOutputTopic();
+
+ /**
+ * The tenant this source belongs to
+ * @return the tenant this source belongs to
+ */
+ String getTenant();
+
+ /**
+ * The namespace this source belongs to
+ * @return the namespace this source belongs to
+ */
+ String getNamespace();
+
+ /**
+ * The name of the source that we are executing
+ * @return The Source name
+ */
+ String getSourceName();
+
+ /**
+ * The logger object that can be used to log in a source
+ * @return the logger object
+ */
+ Logger getLogger();
}
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 4bd4b39..d3c5fac 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -23,6 +23,7 @@ package org.apache.pulsar.io.kafka.source;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.kafka.KafkaAbstractSource;
+import org.slf4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -81,6 +82,31 @@ public class KafkaAbstractSourceTest {
public void recordMetric(String metricName, double value) {
}
+
+ @Override
+ public String getOutputTopic() {
+ return null;
+ }
+
+ @Override
+ public String getTenant() {
+ return null;
+ }
+
+ @Override
+ public String getNamespace() {
+ return null;
+ }
+
+ @Override
+ public String getSourceName() {
+ return null;
+ }
+
+ @Override
+ public Logger getLogger() {
+ return null;
+ }
};
Map<String, Object> config = new HashMap<>();
ThrowingRunnable openAndClose = ()->{