You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2021/05/08 00:18:21 UTC

[pulsar] branch master updated: [Issue 10445][pulsar-io] Exposed SubscriptionType in the SinkContext (#10446)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b5fd8ef  [Issue 10445][pulsar-io] Exposed SubscriptionType in the SinkContext (#10446)
b5fd8ef is described below

commit b5fd8efb4248765dc8906f6ae1f18dc35d2da7a8
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Fri May 7 17:17:34 2021 -0700

    [Issue 10445][pulsar-io] Exposed SubscriptionType in the SinkContext (#10446)
    
    Fixes #10445
    
    ### Motivation
    
    SinkContext should expose Subscription type to the Sink
    More context: https://github.com/apache/pulsar/pull/9927#discussion_r621441678
    
    Needed for https://github.com/apache/pulsar/pull/9927
    
    ### Modifications
    
    Added `getSubscriptionType()` to the `SinkContext` interface and `ContextImpl`
---
 .../pulsar/functions/instance/ContextImpl.java      | 21 +++++++++++++++++++++
 .../functions/instance/JavaInstanceRunnable.java    | 15 +++------------
 .../pulsar/functions/instance/ContextImplTest.java  | 11 +++++++++++
 .../org/apache/pulsar/io/core/ConnectorContext.java |  2 +-
 .../java/org/apache/pulsar/io/core/SinkContext.java |  9 +++++++++
 5 files changed, 45 insertions(+), 13 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index aaeb3ce..ad36877 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.common.functions.ExternalPulsarConfig;
@@ -106,6 +107,8 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
     private final String[] metricsLabels;
     private final Summary userMetricsSummary;
 
+    private final SubscriptionType subscriptionType;
+
     private final static String[] userMetricsLabelNames;
 
     private boolean exposePulsarAdminClientEnabled;
@@ -195,6 +198,19 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
             config.getFunctionDetails().getName()
         );
         this.exposePulsarAdminClientEnabled = config.isExposePulsarAdminClientEnabled();
+
+        Function.SourceSpec sourceSpec = config.getFunctionDetails().getSource();
+        switch (sourceSpec.getSubscriptionType()) {
+            case FAILOVER:
+                subscriptionType = SubscriptionType.Failover;
+                break;
+            case KEY_SHARED:
+                subscriptionType = SubscriptionType.Key_Shared;
+                break;
+            default:
+                subscriptionType = SubscriptionType.Shared;
+                break;
+        }
     }
 
     public void setCurrentMessageContext(Record<?> record) {
@@ -442,6 +458,11 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
         return this.externalPulsarClusters.get(defaultPulsarCluster).getClient().newConsumer(schema);
     }
 
+    @Override
+    public SubscriptionType getSubscriptionType() {
+        return subscriptionType;
+    }
+
     public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O> schema) {
         return publish(defaultPulsarCluster, topicName, object, schema);
     }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index c8ba4d7..8798e7b 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.functions.instance;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Preconditions;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -39,7 +40,6 @@ import org.apache.logging.log4j.core.config.LoggerConfig;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
@@ -668,17 +668,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                     break;
             }
 
-            switch (sourceSpec.getSubscriptionType()) {
-                case FAILOVER:
-                    pulsarSourceConfig.setSubscriptionType(SubscriptionType.Failover);
-                    break;
-                case KEY_SHARED:
-                    pulsarSourceConfig.setSubscriptionType(SubscriptionType.Key_Shared);
-                    break;
-                default:
-                    pulsarSourceConfig.setSubscriptionType(SubscriptionType.Shared);
-                    break;
-            }
+            Preconditions.checkNotNull(contextImpl.getSubscriptionType());
+            pulsarSourceConfig.setSubscriptionType(contextImpl.getSubscriptionType());
 
             pulsarSourceConfig.setTypeClassName(sourceSpec.getTypeClassName());
 
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 8bc8a54..28d7699 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.ProducerBase;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
@@ -53,7 +54,9 @@ import org.apache.pulsar.functions.instance.state.InstanceStateManager;
 import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
+import org.apache.pulsar.io.core.SinkContext;
 import org.slf4j.Logger;
+import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -139,6 +142,14 @@ public class ContextImplTest {
     }
 
     @Test
+    public void testGetSubscriptionType()  {
+        SinkContext ctx = context;
+        // make sure SinkContext can get SubscriptionType.
+        Assert.assertEquals(ctx.getSubscriptionType(), SubscriptionType.Shared);
+    }
+
+
+    @Test
     public void testPutStateStateEnabled() throws Exception {
         context.defaultStateStore = mock(BKStateStoreImpl.class);
         ByteBuffer buffer = ByteBuffer.wrap("test-value".getBytes(UTF_8));
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
index 6c0006d..0c76cbd 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/ConnectorContext.java
@@ -68,7 +68,7 @@ public interface ConnectorContext {
      * @return the namespace this source belongs to
      */
     String getNamespace();
-    
+
     /**
      * The logger object that can be used to log in a sink
      * @return the logger object
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
index 4c2f6b3..af5499e 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.io.core;
 
 import java.util.Collection;
+
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 
@@ -42,4 +44,11 @@ public interface SinkContext extends ConnectorContext {
      */
     String getSinkName();
 
+    /**
+     * Get subscription type used by the source providing data for the sink
+     * @return subscription type
+     */
+    default SubscriptionType getSubscriptionType() {
+        throw new UnsupportedOperationException("Context does not provide SubscriptionType");
+    }
 }