You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/07/18 06:13:26 UTC
[incubator-pulsar] branch master updated: Moved Record interface as
part of functions api (#2184)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b3845b2 Moved Record interface as part of functions api (#2184)
b3845b2 is described below
commit b3845b27a7f6edb4a6ac953e55593a4dd3d173a2
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Jul 17 23:13:24 2018 -0700
Moved Record interface as part of functions api (#2184)
* Moved Record interface as part of functions api
* Added org.apache.pulsar.functions.api.* to PowerMock ignore
---
.../org/apache/pulsar/admin/cli/TestCmdSinks.java | 18 +++++++--------
.../org/apache/pulsar/functions/api}/Record.java | 13 +----------
.../functions/instance/JavaInstanceRunnable.java | 2 +-
.../pulsar/functions/instance/SinkRecord.java | 10 ++-------
.../apache/pulsar/functions/sink/PulsarSink.java | 2 +-
.../pulsar/functions/source/PulsarRecord.java | 9 ++------
.../pulsar/functions/source/PulsarSource.java | 2 +-
.../source/RecordWithEncryptionContext.java | 26 +++++++++++-----------
.../rest/api/v2/FunctionApiV2ResourceTest.java | 2 +-
.../pulsar/io/aerospike/AerospikeAbstractSink.java | 2 +-
.../pulsar/io/aerospike/AerospikeStringSink.java | 2 +-
.../pulsar/io/cassandra/CassandraAbstractSink.java | 2 +-
.../pulsar/io/cassandra/CassandraStringSink.java | 2 +-
pulsar-io/core/pom.xml | 10 ++-------
.../java/org/apache/pulsar/io/core/PushSource.java | 2 ++
.../main/java/org/apache/pulsar/io/core/Sink.java | 2 ++
.../java/org/apache/pulsar/io/core/Source.java | 2 ++
.../java/org/apache/pulsar/io/core/SinkTest.java | 1 +
.../java/org/apache/pulsar/io/core/SourceTest.java | 1 +
.../apache/pulsar/io/kafka/KafkaAbstractSink.java | 2 +-
.../pulsar/io/kafka/KafkaAbstractSource.java | 2 +-
.../apache/pulsar/io/kafka/KafkaStringSink.java | 2 +-
pulsar-io/kinesis/pom.xml | 9 +++++++-
.../org/apache/pulsar/io/kinesis/KinesisSink.java | 2 +-
.../java/org/apache/pulsar/io/kinesis/Utils.java | 14 ++++++++----
.../org/apache/pulsar/io/kinesis/UtilsTest.java | 5 +++--
.../apache/pulsar/io/rabbitmq/RabbitMQSource.java | 2 +-
.../apache/pulsar/io/twitter/TwitterFireHose.java | 2 +-
28 files changed, 72 insertions(+), 78 deletions(-)
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index d07c85e..9221978 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -56,7 +56,7 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic;
@Slf4j
@PrepareForTest({CmdFunctions.class})
-@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.core.*" })
+@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.core.*", "org.apache.pulsar.functions.api.*" })
public class TestCmdSinks {
@ObjectFactory
@@ -155,7 +155,7 @@ public class TestCmdSinks {
sinkConfig
);
}
-
+
@Test
public void testMissingTenant() throws Exception {
SinkConfig sinkConfig = getSinkConfig();
@@ -816,8 +816,8 @@ public class TestCmdSinks {
verify(updateSink).validateSinkConfigs(eq(expectedSinkConfig));
verify(localSinkRunner).validateSinkConfigs(eq(expectedSinkConfig));
}
-
-
+
+
@Test
public void testCliOverwriteConfigFile() throws Exception {
@@ -837,7 +837,7 @@ public class TestCmdSinks {
testSinkConfig.setArchive(JAR_FILE_PATH + "-prime");
testSinkConfig.setResources(new Resources(CPU + 1, RAM + 1, DISK + 1));
testSinkConfig.setConfigs(createSink.parseConfigs("{\"created_at-prime\":\"Mon Jul 02 00:33:15 +0000 2018\"}"));
-
+
SinkConfig expectedSinkConfig = getSinkConfig();
@@ -846,7 +846,7 @@ public class TestCmdSinks {
new YAMLMapper().writeValue(file, testSinkConfig);
Assert.assertEquals(testSinkConfig, CmdUtils.loadConfig(file.getAbsolutePath(), SinkConfig.class));
-
+
testMixCliAndConfigFile(
TENANT,
NAMESPACE,
@@ -866,7 +866,7 @@ public class TestCmdSinks {
expectedSinkConfig
);
}
-
+
public void testMixCliAndConfigFile(
String tenant,
String namespace,
@@ -885,8 +885,8 @@ public class TestCmdSinks {
String sinkConfigFile,
SinkConfig sinkConfig
) throws Exception {
-
-
+
+
// test create sink
createSink.tenant = tenant;
createSink.namespace = namespace;
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
similarity index 86%
rename from pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
rename to pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
index 504d5f4..38d6ed3 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
@@ -16,14 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.core;
+package org.apache.pulsar.functions.api;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
-import org.apache.pulsar.common.api.EncryptionContext;
-
/**
* Pulsar Connect's Record interface. Record encapsulates the information about a record being read from a Source.
*/
@@ -60,15 +58,6 @@ public interface Record<T> {
}
/**
- * Retrieves encryption-context that is attached to record.
- *
- * @return {@link Optional}<{@link EncryptionContext}>
- */
- default public Optional<EncryptionContext> getEncryptionCtx() {
- return Optional.empty();
- }
-
- /**
* Retrieves user-defined properties attached to record.
*
* @return Map of user-properties
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 33006b2..b479852 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
@@ -67,7 +68,6 @@ import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index 4e2edbe..05c2114 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -24,8 +24,7 @@ import java.util.Optional;
import lombok.AllArgsConstructor;
import lombok.Data;
-import org.apache.pulsar.common.api.EncryptionContext;
-import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.functions.api.Record;
@Data
@AllArgsConstructor
@@ -58,12 +57,7 @@ public class SinkRecord<T> implements Record<T> {
return sourceRecord.getRecordSequence();
}
- @Override
- public Optional<EncryptionContext> getEncryptionCtx() {
- return sourceRecord.getEncryptionCtx();
- }
-
- @Override
+ @Override
public Map<String, String> getProperties() {
return sourceRecord.getProperties();
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 2170879..8ae9ad0 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -45,7 +46,6 @@ import org.apache.pulsar.functions.instance.producers.Producers;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 7a6d11b..c4a1657 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -30,13 +30,12 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.io.core.Record;
@Builder
@Getter
@ToString
@EqualsAndHashCode
-public class PulsarRecord<T> implements Record<T> {
+public class PulsarRecord<T> implements RecordWithEncryptionContext<T> {
private final String topicName;
private final int partition;
@@ -67,11 +66,7 @@ public class PulsarRecord<T> implements Record<T> {
return Optional.of(Utils.getSequenceId(message.getMessageId()));
}
- /**
- * Retrieves encryption-context that is attached to record.
- *
- * @return {@link Optional}<{@link EncryptionContext}>
- */
+ @Override
public Optional<EncryptionContext> getEncryptionCtx() {
return message.getEncryptionCtx();
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index d5a1df9..f70100d 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -30,13 +30,13 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/RecordWithEncryptionContext.java
similarity index 61%
copy from pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
copy to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/RecordWithEncryptionContext.java
index aab4474..5ca78e2 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/RecordWithEncryptionContext.java
@@ -16,19 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.pulsar.functions.source;
-package org.apache.pulsar.io.kafka;
+import java.util.Optional;
-import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.functions.api.Record;
-/**
- * Kafka sink that treats incoming messages on the input topic as Strings
- * and write identical key/value pairs.
- */
-public class KafkaStringSink extends KafkaAbstractSink<String, String> {
- @Override
- public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
- return new KeyValue<>(record.getKey().orElse(null), new String(record.getValue()));
- }
-}
\ No newline at end of file
+public interface RecordWithEncryptionContext<T> extends Record<T> {
+
+ /**
+ * Retrieves encryption-context that is attached to record.
+ *
+ * @return {@link Optional}<{@link EncryptionContext}>
+ */
+ Optional<EncryptionContext> getEncryptionCtx();
+}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 415912a..866b92e 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -54,6 +54,7 @@ import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
@@ -68,7 +69,6 @@ import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
index 6ea657e..fe3787a 100644
--- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
+++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
@@ -36,8 +36,8 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.slf4j.Logger;
diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
index 40ffd90..bac07a0 100644
--- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
+++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
@@ -19,8 +19,8 @@
package org.apache.pulsar.io.aerospike;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
/**
* Aerospike sink that treats incoming messages on the input topic as Strings
diff --git a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
index fd76c0c..d40a7ce 100644
--- a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
+++ b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
@@ -30,8 +30,8 @@ import com.google.common.util.concurrent.Futures;
import java.util.Map;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
diff --git a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
index c2e72b8..4e7feb5 100644
--- a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
+++ b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
@@ -19,8 +19,8 @@
package org.apache.pulsar.io.cassandra;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
/**
* Cassandra sink that treats incoming messages on the input topic as Strings
diff --git a/pulsar-io/core/pom.xml b/pulsar-io/core/pom.xml
index 003e6d8..05307ce 100644
--- a/pulsar-io/core/pom.xml
+++ b/pulsar-io/core/pom.xml
@@ -38,16 +38,10 @@
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>pulsar-common</artifactId>
+ <artifactId>pulsar-functions-api</artifactId>
<version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
</dependency>
-
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>protobuf-shaded</artifactId>
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
index 44d8162..13680c9 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.io.core;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.pulsar.functions.api.Record;
+
/**
* Pulsar's Push Source interface. PushSource read data from
* external sources(database changes, twitter firehose, etc)
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
index 51d635a..16ed3c4 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.io.core;
import java.util.Map;
+import org.apache.pulsar.functions.api.Record;
+
/**
* Generic sink interface users can implement to run Sink on top of Pulsar Functions
*/
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java
index e311761..a343844 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.io.core;
import java.util.Map;
+import org.apache.pulsar.functions.api.Record;
+
public interface Source<T> extends AutoCloseable {
/**
* Open connector with configuration
diff --git a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
index bb1100c..0980bd1 100644
--- a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
+++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.core;
+import org.apache.pulsar.functions.api.Record;
import org.testng.annotations.Test;
import java.util.HashMap;
diff --git a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java
index 18ad5c2..a281a54 100644
--- a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java
+++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.core;
+import org.apache.pulsar.functions.api.Record;
import org.testng.annotations.Test;
import java.util.HashMap;
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index d6acae8..a92a368 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -29,8 +29,8 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index bbafc8e..494d91b 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -25,8 +25,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
index aab4474..89e3e7f 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
@@ -19,8 +19,8 @@
package org.apache.pulsar.io.kafka;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.Record;
/**
* Kafka sink that treats incoming messages on the input topic as Strings
diff --git a/pulsar-io/kinesis/pom.xml b/pulsar-io/kinesis/pom.xml
index f0c2776..1917e01 100644
--- a/pulsar-io/kinesis/pom.xml
+++ b/pulsar-io/kinesis/pom.xml
@@ -39,6 +39,13 @@
</dependency>
<dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-instance</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
@@ -81,7 +88,7 @@
<version>0.12.8</version>
</dependency>
<!-- /kinesis dependencies -->
-
+
<dependency>
<groupId>com.google.flatbuffers</groupId>
<artifactId>flatbuffers-java</artifactId>
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index c3c2c45..dc70b98 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -47,7 +47,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
index a7382b8..47f9222 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
@@ -30,7 +30,8 @@ import java.util.Map.Entry;
import java.util.Optional;
import org.apache.pulsar.common.api.EncryptionContext;
-import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.RecordWithEncryptionContext;
import org.apache.pulsar.io.kinesis.fbs.EncryptionCtx;
import org.apache.pulsar.io.kinesis.fbs.EncryptionKey;
import org.apache.pulsar.io.kinesis.fbs.KeyValue;
@@ -67,7 +68,9 @@ public class Utils {
public static ByteBuffer serializeRecordToFlatBuffer(FlatBufferBuilder builder, Record<byte[]> record) {
checkNotNull(record, "record-context can't be null");
- Optional<EncryptionContext> encryptionCtx = record.getEncryptionCtx();
+ Optional<EncryptionContext> encryptionCtx = (record instanceof RecordWithEncryptionContext)
+ ? ((RecordWithEncryptionContext<byte[]>) record).getEncryptionCtx()
+ : Optional.empty();
Map<String, String> properties = record.getProperties();
int encryptionCtxOffset = -1;
@@ -180,8 +183,11 @@ public class Utils {
result.add(PROPERTIES_FIELD, properties);
}
- if (record.getEncryptionCtx().isPresent()) {
- EncryptionContext encryptionCtx = record.getEncryptionCtx().get();
+ Optional<EncryptionContext> optEncryptionCtx = (record instanceof RecordWithEncryptionContext)
+ ? ((RecordWithEncryptionContext<byte[]>) record).getEncryptionCtx()
+ : Optional.empty();
+ if (optEncryptionCtx.isPresent()) {
+ EncryptionContext encryptionCtx = optEncryptionCtx.get();
JsonObject encryptionCtxJson = new JsonObject();
JsonObject keyBase64Map = new JsonObject();
JsonObject keyMetadataMap = new JsonObject();
diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
index 45aa0c7..e6f669f 100644
--- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
+++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
@@ -35,7 +35,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
-import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.RecordWithEncryptionContext;
import org.apache.pulsar.io.kinesis.fbs.KeyValue;
import org.apache.pulsar.io.kinesis.fbs.Message;
import org.testng.Assert;
@@ -201,7 +202,7 @@ public class UtilsTest {
return new RecordImpl(data, properties, Optional.ofNullable(ctx));
}
- class RecordImpl implements Record<byte[]> {
+ class RecordImpl implements RecordWithEncryptionContext<byte[]> {
byte[] data;
Map<String, String> properties;
Optional<EncryptionContext> ectx;
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index c05828f..2277cbb 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -32,8 +32,8 @@ import java.util.Optional;
import lombok.Data;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index 494ae48..76ea4ac 100644
--- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -35,8 +35,8 @@ import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
-import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;