You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/05/08 00:18:21 UTC
[pulsar] branch master updated: [Issue 10445][pulsar-io] Exposed
SubscriptionType in the SinkContext (#10446)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b5fd8ef [Issue 10445][pulsar-io] Exposed SubscriptionType in the SinkContext (#10446)
b5fd8ef is described below
commit b5fd8efb4248765dc8906f6ae1f18dc35d2da7a8
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Fri May 7 17:17:34 2021 -0700
[Issue 10445][pulsar-io] Exposed SubscriptionType in the SinkContext (#10446)
Fixes #10445
### Motivation
SinkContext should expose Subscription type to the Sink
More context: https://github.com/apache/pulsar/pull/9927#discussion_r621441678
Needed for https://github.com/apache/pulsar/pull/9927
### Modifications
Added `getSubscriptionType()` to the `SinkContext` interface and `ContextImpl`
---
.../pulsar/functions/instance/ContextImpl.java | 21 +++++++++++++++++++++
.../functions/instance/JavaInstanceRunnable.java | 15 +++------------
.../pulsar/functions/instance/ContextImplTest.java | 11 +++++++++++
.../org/apache/pulsar/io/core/ConnectorContext.java | 2 +-
.../java/org/apache/pulsar/io/core/SinkContext.java | 9 +++++++++
5 files changed, 45 insertions(+), 13 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index aaeb3ce..ad36877 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.common.functions.ExternalPulsarConfig;
@@ -106,6 +107,8 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
private final String[] metricsLabels;
private final Summary userMetricsSummary;
+ private final SubscriptionType subscriptionType;
+
private final static String[] userMetricsLabelNames;
private boolean exposePulsarAdminClientEnabled;
@@ -195,6 +198,19 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
config.getFunctionDetails().getName()
);
this.exposePulsarAdminClientEnabled = config.isExposePulsarAdminClientEnabled();
+
+ Function.SourceSpec sourceSpec = config.getFunctionDetails().getSource();
+ switch (sourceSpec.getSubscriptionType()) {
+ case FAILOVER:
+ subscriptionType = SubscriptionType.Failover;
+ break;
+ case KEY_SHARED:
+ subscriptionType = SubscriptionType.Key_Shared;
+ break;
+ default:
+ subscriptionType = SubscriptionType.Shared;
+ break;
+ }
}
public void setCurrentMessageContext(Record<?> record) {
@@ -442,6 +458,11 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
return this.externalPulsarClusters.get(defaultPulsarCluster).getClient().newConsumer(schema);
}
+ @Override
+ public SubscriptionType getSubscriptionType() {
+ return subscriptionType;
+ }
+
public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O> schema) {
return publish(defaultPulsarCluster, topicName, object, schema);
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index c8ba4d7..8798e7b 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -20,6 +20,7 @@
package org.apache.pulsar.functions.instance;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.HashMap;
@@ -39,7 +40,6 @@ import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
@@ -668,17 +668,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
break;
}
- switch (sourceSpec.getSubscriptionType()) {
- case FAILOVER:
- pulsarSourceConfig.setSubscriptionType(SubscriptionType.Failover);
- break;
- case KEY_SHARED:
- pulsarSourceConfig.setSubscriptionType(SubscriptionType.Key_Shared);
- break;
- default:
- pulsarSourceConfig.setSubscriptionType(SubscriptionType.Shared);
- break;
- }
+ Preconditions.checkNotNull(contextImpl.getSubscriptionType());
+ pulsarSourceConfig.setSubscriptionType(contextImpl.getSubscriptionType());
pulsarSourceConfig.setTypeClassName(sourceSpec.getTypeClassName());
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 8bc8a54..28d7699 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
@@ -53,7 +54,9 @@ import org.apache.pulsar.functions.instance.state.InstanceStateManager;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
+import org.apache.pulsar.io.core.SinkContext;
import org.slf4j.Logger;
+import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -139,6 +142,14 @@ public class ContextImplTest {
}
@Test
+ public void testGetSubscriptionType() {
+ SinkContext ctx = context;
+ // make sure SinkContext can get SubscriptionType.
+ Assert.assertEquals(ctx.getSubscriptionType(), SubscriptionType.Shared);
+ }
+
+
+ @Test
public void testPutStateStateEnabled() throws Exception {
context.defaultStateStore = mock(BKStateStoreImpl.class);
ByteBuffer buffer = ByteBuffer.wrap("test-value".getBytes(UTF_8));
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
index 6c0006d..0c76cbd 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
@@ -68,7 +68,7 @@ public interface ConnectorContext {
* @return the namespace this source belongs to
*/
String getNamespace();
-
+
/**
* The logger object that can be used to log in a sink
* @return the logger object
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
index 4c2f6b3..af5499e 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.io.core;
import java.util.Collection;
+
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
@@ -42,4 +44,11 @@ public interface SinkContext extends ConnectorContext {
*/
String getSinkName();
+ /**
+ * Get subscription type used by the source providing data for the sink
+ * @return subscription type
+ */
+ default SubscriptionType getSubscriptionType() {
+ throw new UnsupportedOperationException("Context does not provide SubscriptionType");
+ }
}