You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/07/18 06:13:26 UTC

[incubator-pulsar] branch master updated: Moved Record interface as part of functions api (#2184)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b3845b2  Moved Record interface as part of functions api (#2184)
b3845b2 is described below

commit b3845b27a7f6edb4a6ac953e55593a4dd3d173a2
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jul 17 23:13:24 2018 -0700

    Moved Record interface as part of functions api (#2184)
    
    * Moved Record interface as part of functions api
    
    * Added org.apache.pulsar.functions.api.* to PowerMock ignore
---
 .../org/apache/pulsar/admin/cli/TestCmdSinks.java  | 18 +++++++--------
 .../org/apache/pulsar/functions/api}/Record.java   | 13 +----------
 .../functions/instance/JavaInstanceRunnable.java   |  2 +-
 .../pulsar/functions/instance/SinkRecord.java      | 10 ++-------
 .../apache/pulsar/functions/sink/PulsarSink.java   |  2 +-
 .../pulsar/functions/source/PulsarRecord.java      |  9 ++------
 .../pulsar/functions/source/PulsarSource.java      |  2 +-
 .../source/RecordWithEncryptionContext.java        | 26 +++++++++++-----------
 .../rest/api/v2/FunctionApiV2ResourceTest.java     |  2 +-
 .../pulsar/io/aerospike/AerospikeAbstractSink.java |  2 +-
 .../pulsar/io/aerospike/AerospikeStringSink.java   |  2 +-
 .../pulsar/io/cassandra/CassandraAbstractSink.java |  2 +-
 .../pulsar/io/cassandra/CassandraStringSink.java   |  2 +-
 pulsar-io/core/pom.xml                             | 10 ++-------
 .../java/org/apache/pulsar/io/core/PushSource.java |  2 ++
 .../main/java/org/apache/pulsar/io/core/Sink.java  |  2 ++
 .../java/org/apache/pulsar/io/core/Source.java     |  2 ++
 .../java/org/apache/pulsar/io/core/SinkTest.java   |  1 +
 .../java/org/apache/pulsar/io/core/SourceTest.java |  1 +
 .../apache/pulsar/io/kafka/KafkaAbstractSink.java  |  2 +-
 .../pulsar/io/kafka/KafkaAbstractSource.java       |  2 +-
 .../apache/pulsar/io/kafka/KafkaStringSink.java    |  2 +-
 pulsar-io/kinesis/pom.xml                          |  9 +++++++-
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  |  2 +-
 .../java/org/apache/pulsar/io/kinesis/Utils.java   | 14 ++++++++----
 .../org/apache/pulsar/io/kinesis/UtilsTest.java    |  5 +++--
 .../apache/pulsar/io/rabbitmq/RabbitMQSource.java  |  2 +-
 .../apache/pulsar/io/twitter/TwitterFireHose.java  |  2 +-
 28 files changed, 72 insertions(+), 78 deletions(-)

diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index d07c85e..9221978 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -56,7 +56,7 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
 @Slf4j
 @PrepareForTest({CmdFunctions.class})
-@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.core.*" })
+@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.core.*", "org.apache.pulsar.functions.api.*" })
 public class TestCmdSinks {
 
     @ObjectFactory
@@ -155,7 +155,7 @@ public class TestCmdSinks {
                 sinkConfig
         );
     }
-    
+
     @Test
     public void testMissingTenant() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
@@ -816,8 +816,8 @@ public class TestCmdSinks {
         verify(updateSink).validateSinkConfigs(eq(expectedSinkConfig));
         verify(localSinkRunner).validateSinkConfigs(eq(expectedSinkConfig));
     }
-    
-    
+
+
     @Test
     public void testCliOverwriteConfigFile() throws Exception {
 
@@ -837,7 +837,7 @@ public class TestCmdSinks {
         testSinkConfig.setArchive(JAR_FILE_PATH + "-prime");
         testSinkConfig.setResources(new Resources(CPU + 1, RAM + 1, DISK + 1));
         testSinkConfig.setConfigs(createSink.parseConfigs("{\"created_at-prime\":\"Mon Jul 02 00:33:15 +0000 2018\"}"));
-        
+
 
         SinkConfig expectedSinkConfig = getSinkConfig();
 
@@ -846,7 +846,7 @@ public class TestCmdSinks {
         new YAMLMapper().writeValue(file, testSinkConfig);
 
         Assert.assertEquals(testSinkConfig, CmdUtils.loadConfig(file.getAbsolutePath(), SinkConfig.class));
-        
+
         testMixCliAndConfigFile(
                 TENANT,
                 NAMESPACE,
@@ -866,7 +866,7 @@ public class TestCmdSinks {
                 expectedSinkConfig
         );
     }
-    
+
     public void testMixCliAndConfigFile(
             String tenant,
             String namespace,
@@ -885,8 +885,8 @@ public class TestCmdSinks {
             String sinkConfigFile,
             SinkConfig sinkConfig
     ) throws Exception {
-        
-        
+
+
         // test create sink
         createSink.tenant = tenant;
         createSink.namespace = namespace;
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
similarity index 86%
rename from pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
rename to pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
index 504d5f4..38d6ed3 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
@@ -16,14 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.core;
+package org.apache.pulsar.functions.api;
 
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 
-import org.apache.pulsar.common.api.EncryptionContext;
-
 /**
  * Pulsar Connect's Record interface. Record encapsulates the information about a record being read from a Source.
  */
@@ -60,15 +58,6 @@ public interface Record<T> {
     }
 
     /**
-     * Retrieves encryption-context that is attached to record.
-     *
-     * @return {@link Optional}<{@link EncryptionContext}>
-     */
-    default public Optional<EncryptionContext> getEncryptionCtx() {
-        return Optional.empty();
-    }
-
-    /**
      * Retrieves user-defined properties attached to record.
      *
      * @return Map of user-properties
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 33006b2..b479852 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
@@ -67,7 +68,6 @@ import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.Source;
 import org.slf4j.Logger;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index 4e2edbe..05c2114 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -24,8 +24,7 @@ import java.util.Optional;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 
-import org.apache.pulsar.common.api.EncryptionContext;
-import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.functions.api.Record;
 
 @Data
 @AllArgsConstructor
@@ -58,12 +57,7 @@ public class SinkRecord<T> implements Record<T> {
         return sourceRecord.getRecordSequence();
     }
 
-    @Override
-    public Optional<EncryptionContext> getEncryptionCtx() {
-        return sourceRecord.getEncryptionCtx();
-    }
-
-    @Override
+     @Override
     public Map<String, String> getProperties() {
         return sourceRecord.getProperties();
     }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 2170879..8ae9ad0 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -45,7 +46,6 @@ import org.apache.pulsar.functions.instance.producers.Producers;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 7a6d11b..c4a1657 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -30,13 +30,12 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.io.core.Record;
 
 @Builder
 @Getter
 @ToString
 @EqualsAndHashCode
-public class PulsarRecord<T> implements Record<T> {
+public class PulsarRecord<T> implements RecordWithEncryptionContext<T> {
 
     private final String topicName;
     private final int partition;
@@ -67,11 +66,7 @@ public class PulsarRecord<T> implements Record<T> {
         return Optional.of(Utils.getSequenceId(message.getMessageId()));
     }
 
-    /**
-     * Retrieves encryption-context that is attached to record.
-     *
-     * @return {@link Optional}<{@link EncryptionContext}>
-     */
+    @Override
     public Optional<EncryptionContext> getEncryptionCtx() {
         return message.getEncryptionCtx();
     }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index d5a1df9..f70100d 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -30,13 +30,13 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.Source;
 import org.apache.pulsar.io.core.SourceContext;
 
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/RecordWithEncryptionContext.java
similarity index 61%
copy from pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
copy to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/RecordWithEncryptionContext.java
index aab4474..5ca78e2 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/RecordWithEncryptionContext.java
@@ -16,19 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.pulsar.functions.source;
 
-package org.apache.pulsar.io.kafka;
+import java.util.Optional;
 
-import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.functions.api.Record;
 
-/**
- * Kafka sink that treats incoming messages on the input topic as Strings
- * and write identical key/value pairs.
- */
-public class KafkaStringSink extends KafkaAbstractSink<String, String> {
-    @Override
-    public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
-        return new KeyValue<>(record.getKey().orElse(null), new String(record.getValue()));
-    }
-}
\ No newline at end of file
+public interface RecordWithEncryptionContext<T> extends Record<T> {
+
+    /**
+     * Retrieves encryption-context that is attached to record.
+     *
+     * @return {@link Optional}<{@link EncryptionContext}>
+     */
+    Optional<EncryptionContext> getEncryptionCtx();
+}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 415912a..866b92e 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
@@ -68,7 +69,6 @@ import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
index 6ea657e..fe3787a 100644
--- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
+++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
@@ -36,8 +36,8 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.slf4j.Logger;
diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
index 40ffd90..bac07a0 100644
--- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
+++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
@@ -19,8 +19,8 @@
 
 package org.apache.pulsar.io.aerospike;
 
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
 
 /**
  * Aerospike sink that treats incoming messages on the input topic as Strings
diff --git a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
index fd76c0c..d40a7ce 100644
--- a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
+++ b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
@@ -30,8 +30,8 @@ import com.google.common.util.concurrent.Futures;
 
 import java.util.Map;
 
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 
diff --git a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
index c2e72b8..4e7feb5 100644
--- a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
+++ b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
@@ -19,8 +19,8 @@
 
 package org.apache.pulsar.io.cassandra;
 
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
 
 /**
  * Cassandra sink that treats incoming messages on the input topic as Strings
diff --git a/pulsar-io/core/pom.xml b/pulsar-io/core/pom.xml
index 003e6d8..05307ce 100644
--- a/pulsar-io/core/pom.xml
+++ b/pulsar-io/core/pom.xml
@@ -38,16 +38,10 @@
 
     <dependency>
       <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-common</artifactId>
+      <artifactId>pulsar-functions-api</artifactId>
       <version>${project.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>*</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
-    
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>protobuf-shaded</artifactId>
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
index 44d8162..13680c9 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.io.core;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.pulsar.functions.api.Record;
+
 /**
  * Pulsar's Push Source interface. PushSource read data from
  * external sources(database changes, twitter firehose, etc)
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
index 51d635a..16ed3c4 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.io.core;
 
 import java.util.Map;
 
+import org.apache.pulsar.functions.api.Record;
+
 /**
  * Generic sink interface users can implement to run Sink on top of Pulsar Functions
  */
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java
index e311761..a343844 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.io.core;
 
 import java.util.Map;
 
+import org.apache.pulsar.functions.api.Record;
+
 public interface Source<T> extends AutoCloseable {
     /**
      * Open connector with configuration
diff --git a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
index bb1100c..0980bd1 100644
--- a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
+++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.io.core;
 
+import org.apache.pulsar.functions.api.Record;
 import org.testng.annotations.Test;
 
 import java.util.HashMap;
diff --git a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java
index 18ad5c2..a281a54 100644
--- a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java
+++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.io.core;
 
+import org.apache.pulsar.functions.api.Record;
 import org.testng.annotations.Test;
 
 import java.util.HashMap;
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index d6acae8..a92a368 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -29,8 +29,8 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index bbafc8e..494d91b 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -25,8 +25,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.PushSource;
-import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
index aab4474..89e3e7f 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
@@ -19,8 +19,8 @@
 
 package org.apache.pulsar.io.kafka;
 
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
 
 /**
  * Kafka sink that treats incoming messages on the input topic as Strings
diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml
index f0c2776..1917e01 100644
--- a/pulsar-io/kinesis/pom.xml
+++ b/pulsar-io/kinesis/pom.xml
@@ -39,6 +39,13 @@
     </dependency>
 
     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-functions-instance</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-all</artifactId>
     </dependency>
@@ -81,7 +88,7 @@
       <version>0.12.8</version>
     </dependency>
 	<!-- /kinesis dependencies -->
-	
+
     <dependency>
       <groupId>com.google.flatbuffers</groupId>
       <artifactId>flatbuffers-java</artifactId>
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index c3c2c45..dc70b98 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -47,7 +47,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
index a7382b8..47f9222 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
@@ -30,7 +30,8 @@ import java.util.Map.Entry;
 import java.util.Optional;
 
 import org.apache.pulsar.common.api.EncryptionContext;
-import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.RecordWithEncryptionContext;
 import org.apache.pulsar.io.kinesis.fbs.EncryptionCtx;
 import org.apache.pulsar.io.kinesis.fbs.EncryptionKey;
 import org.apache.pulsar.io.kinesis.fbs.KeyValue;
@@ -67,7 +68,9 @@ public class Utils {
 
     public static ByteBuffer serializeRecordToFlatBuffer(FlatBufferBuilder builder, Record<byte[]> record) {
         checkNotNull(record, "record-context can't be null");
-        Optional<EncryptionContext> encryptionCtx = record.getEncryptionCtx();
+        Optional<EncryptionContext> encryptionCtx = (record instanceof RecordWithEncryptionContext)
+                ? ((RecordWithEncryptionContext<byte[]>) record).getEncryptionCtx()
+                : Optional.empty();
         Map<String, String> properties = record.getProperties();
 
         int encryptionCtxOffset = -1;
@@ -180,8 +183,11 @@ public class Utils {
             result.add(PROPERTIES_FIELD, properties);
         }
 
-        if (record.getEncryptionCtx().isPresent()) {
-            EncryptionContext encryptionCtx = record.getEncryptionCtx().get();
+        Optional<EncryptionContext> optEncryptionCtx = (record instanceof RecordWithEncryptionContext)
+                ? ((RecordWithEncryptionContext<byte[]>) record).getEncryptionCtx()
+                : Optional.empty();
+        if (optEncryptionCtx.isPresent()) {
+            EncryptionContext encryptionCtx = optEncryptionCtx.get();
             JsonObject encryptionCtxJson = new JsonObject();
             JsonObject keyBase64Map = new JsonObject();
             JsonObject keyMetadataMap = new JsonObject();
diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
index 45aa0c7..e6f669f 100644
--- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
+++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
@@ -35,7 +35,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
-import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.RecordWithEncryptionContext;
 import org.apache.pulsar.io.kinesis.fbs.KeyValue;
 import org.apache.pulsar.io.kinesis.fbs.Message;
 import org.testng.Assert;
@@ -201,7 +202,7 @@ public class UtilsTest {
         return new RecordImpl(data, properties, Optional.ofNullable(ctx));
     }
 
-    class RecordImpl implements Record<byte[]> {
+    class RecordImpl implements RecordWithEncryptionContext<byte[]> {
         byte[] data;
         Map<String, String> properties;
         Optional<EncryptionContext> ectx;
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index c05828f..2277cbb 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -32,8 +32,8 @@ import java.util.Optional;
 
 import lombok.Data;
 
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.PushSource;
-import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index 494ae48..76ea4ac 100644
--- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -35,8 +35,8 @@ import java.io.Serializable;
 import java.util.Map;
 import java.util.Optional;
 
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.PushSource;
-import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;