You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/05/25 00:37:48 UTC
[pulsar] branch master updated: Expose state to sources and sinks
(#4364)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f8349e2 Expose state to sources and sinks (#4364)
f8349e2 is described below
commit f8349e2235a340a1ee9a416a07a4d2648dfe4f52
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri May 24 17:37:43 2019 -0700
Expose state to sources and sinks (#4364)
* Expose state to sources and sinks
* Fix unittest
* Fix unittest
---
.../apache/pulsar/io/common/IOConfigUtilsTest.java | 79 ++++++++++++++++++++++
.../org/apache/pulsar/io/core/SinkContext.java | 69 +++++++++++++++++++
.../org/apache/pulsar/io/core/SourceContext.java | 70 +++++++++++++++++++
.../io/kafka/sink/KafkaAbstractSinkTest.java | 42 ++++++++++++
.../io/kafka/source/KafkaAbstractSourceTest.java | 42 ++++++++++++
5 files changed, 302 insertions(+)
diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
index af62c7f..2296dae 100644
--- a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
+++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
@@ -27,9 +27,11 @@ import org.slf4j.Logger;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
@Slf4j
public class IOConfigUtilsTest {
@@ -115,6 +117,44 @@ public class IOConfigUtilsTest {
public String getSecret(String secretName) {
return secretsMap.get(secretName);
}
+
+ @Override
+ public void incrCounter(String key, long amount) { }
+
+ @Override
+ public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
+ return null;
+ }
+
+ @Override
+ public long getCounter(String key) {
+ return 0;
+ }
+
+ @Override
+ public CompletableFuture<Long> getCounterAsync(String key) {
+ return null;
+ }
+
+ @Override
+ public void putState(String key, ByteBuffer value) {
+
+ }
+
+ @Override
+ public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) {
+ return null;
+ }
+
+ @Override
+ public ByteBuffer getState(String key) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<ByteBuffer> getStateAsync(String key) {
+ return null;
+ }
}
@Test
@@ -189,6 +229,45 @@ public class IOConfigUtilsTest {
public String getSecret(String secretName) {
return secretsMap.get(secretName);
}
+
+ @Override
+ public void incrCounter(String key, long amount) {
+ }
+
+ @Override
+ public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
+ return null;
+ }
+
+ @Override
+ public long getCounter(String key) {
+ return 0;
+ }
+
+ @Override
+ public CompletableFuture<Long> getCounterAsync(String key) {
+ return null;
+ }
+
+ @Override
+ public void putState(String key, ByteBuffer value) {
+
+ }
+
+ @Override
+ public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) {
+ return null;
+ }
+
+ @Override
+ public ByteBuffer getState(String key) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<ByteBuffer> getStateAsync(String key) {
+ return null;
+ }
}
@Test
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
index d30ff7b..1a8a859 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
@@ -20,7 +20,9 @@ package org.apache.pulsar.io.core;
import org.slf4j.Logger;
+import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
public interface SinkContext {
@@ -81,4 +83,71 @@ public interface SinkContext {
* @return The secret if anything was found or null
*/
String getSecret(String secretName);
+
+ /**
+ * Increment the builtin distributed counter referred by key.
+ *
+ * @param key The name of the key
+ * @param amount The amount to be incremented
+ */
+ void incrCounter(String key, long amount);
+
+
+ /**
+ * Increment the builtin distributed counter referred by key
+ * but dont wait for the completion of the increment operation
+ *
+ * @param key The name of the key
+ * @param amount The amount to be incremented
+ */
+ CompletableFuture<Void> incrCounterAsync(String key, long amount);
+
+ /**
+ * Retrieve the counter value for the key.
+ *
+ * @param key name of the key
+ * @return the amount of the counter value for this key
+ */
+ long getCounter(String key);
+
+ /**
+ * Retrieve the counter value for the key, but don't wait
+ * for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the amount of the counter value for this key
+ */
+ CompletableFuture<Long> getCounterAsync(String key);
+
+ /**
+ * Update the state value for the key.
+ *
+ * @param key name of the key
+ * @param value state value of the key
+ */
+ void putState(String key, ByteBuffer value);
+
+ /**
+ * Update the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @param value state value of the key
+ */
+ CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+
+ /**
+ * Retrieve the state value for the key.
+ *
+ * @param key name of the key
+ * @return the state value for the key.
+ */
+ ByteBuffer getState(String key);
+
+ /**
+ * Retrieve the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the state value for the key.
+ */
+ CompletableFuture<ByteBuffer> getStateAsync(String key);
}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
index b758220..a27d05f 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.io.core;
import org.slf4j.Logger;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
public interface SourceContext {
/**
@@ -79,4 +82,71 @@ public interface SourceContext {
* @return The secret if anything was found or null
*/
String getSecret(String secretName);
+
+ /**
+ * Increment the builtin distributed counter referred by key.
+ *
+ * @param key The name of the key
+ * @param amount The amount to be incremented
+ */
+ void incrCounter(String key, long amount);
+
+
+ /**
+ * Increment the builtin distributed counter referred by key
+ * but dont wait for the completion of the increment operation
+ *
+ * @param key The name of the key
+ * @param amount The amount to be incremented
+ */
+ CompletableFuture<Void> incrCounterAsync(String key, long amount);
+
+ /**
+ * Retrieve the counter value for the key.
+ *
+ * @param key name of the key
+ * @return the amount of the counter value for this key
+ */
+ long getCounter(String key);
+
+ /**
+ * Retrieve the counter value for the key, but don't wait
+ * for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the amount of the counter value for this key
+ */
+ CompletableFuture<Long> getCounterAsync(String key);
+
+ /**
+ * Update the state value for the key.
+ *
+ * @param key name of the key
+ * @param value state value of the key
+ */
+ void putState(String key, ByteBuffer value);
+
+ /**
+ * Update the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @param value state value of the key
+ */
+ CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+
+ /**
+ * Retrieve the state value for the key.
+ *
+ * @param key name of the key
+ * @return the state value for the key.
+ */
+ ByteBuffer getState(String key);
+
+ /**
+ * Retrieve the state value for the key, but don't wait for the operation to be completed
+ *
+ * @param key name of the key
+ * @return the state value for the key.
+ */
+ CompletableFuture<ByteBuffer> getStateAsync(String key);
}
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
index 9e74b89..c4522d5 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
@@ -32,10 +32,12 @@ import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
import static org.testng.Assert.*;
@@ -115,6 +117,46 @@ public class KafkaAbstractSinkTest {
@Override
public String getSecret(String key) { return null; }
+
+ @Override
+ public void incrCounter(String key, long amount) {
+
+ }
+
+ @Override
+ public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
+ return null;
+ }
+
+ @Override
+ public long getCounter(String key) {
+ return 0;
+ }
+
+ @Override
+ public CompletableFuture<Long> getCounterAsync(String key) {
+ return null;
+ }
+
+ @Override
+ public void putState(String key, ByteBuffer value) {
+
+ }
+
+ @Override
+ public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) {
+ return null;
+ }
+
+ @Override
+ public ByteBuffer getState(String key) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<ByteBuffer> getStateAsync(String key) {
+ return null;
+ }
};
ThrowingRunnable openAndClose = ()->{
try {
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 2cee062..3bfd358 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -31,9 +31,11 @@ import org.testng.annotations.Test;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@@ -116,6 +118,46 @@ public class KafkaAbstractSourceTest {
@Override
public String getSecret(String key) { return null; }
+
+ @Override
+ public void incrCounter(String key, long amount) {
+
+ }
+
+ @Override
+ public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
+ return null;
+ }
+
+ @Override
+ public long getCounter(String key) {
+ return 0;
+ }
+
+ @Override
+ public CompletableFuture<Long> getCounterAsync(String key) {
+ return null;
+ }
+
+ @Override
+ public void putState(String key, ByteBuffer value) {
+
+ }
+
+ @Override
+ public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) {
+ return null;
+ }
+
+ @Override
+ public ByteBuffer getState(String key) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<ByteBuffer> getStateAsync(String key) {
+ return null;
+ }
};
Map<String, Object> config = new HashMap<>();
ThrowingRunnable openAndClose = ()->{