You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/05/25 00:37:48 UTC

[pulsar] branch master updated: Expose state to sources and sinks (#4364)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f8349e2  Expose state to sources and sinks (#4364)
f8349e2 is described below

commit f8349e2235a340a1ee9a416a07a4d2648dfe4f52
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri May 24 17:37:43 2019 -0700

    Expose state to sources and sinks (#4364)
    
    * Expose state to sources and sinks
    
    * Fix unittest
    
    * Fix unittest
---
 .../apache/pulsar/io/common/IOConfigUtilsTest.java | 79 ++++++++++++++++++++++
 .../org/apache/pulsar/io/core/SinkContext.java     | 69 +++++++++++++++++++
 .../org/apache/pulsar/io/core/SourceContext.java   | 70 +++++++++++++++++++
 .../io/kafka/sink/KafkaAbstractSinkTest.java       | 42 ++++++++++++
 .../io/kafka/source/KafkaAbstractSourceTest.java   | 42 ++++++++++++
 5 files changed, 302 insertions(+)

diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
index af62c7f..2296dae 100644
--- a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
+++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java
@@ -27,9 +27,11 @@ import org.slf4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 @Slf4j
 public class IOConfigUtilsTest {
@@ -115,6 +117,44 @@ public class IOConfigUtilsTest {
         public String getSecret(String secretName) {
             return secretsMap.get(secretName);
         }
+
+        @Override
+        public void incrCounter(String key, long amount) { }
+
+        @Override
+        public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
+            return null;
+        }
+
+        @Override
+        public long getCounter(String key) {
+            return 0;
+        }
+
+        @Override
+        public CompletableFuture<Long> getCounterAsync(String key) {
+            return null;
+        }
+
+        @Override
+        public void putState(String key, ByteBuffer value) {
+
+        }
+
+        @Override
+        public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) {
+            return null;
+        }
+
+        @Override
+        public ByteBuffer getState(String key) {
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<ByteBuffer> getStateAsync(String key) {
+            return null;
+        }
     }
 
     @Test
@@ -189,6 +229,45 @@ public class IOConfigUtilsTest {
         public String getSecret(String secretName) {
             return secretsMap.get(secretName);
         }
+
+        @Override
+        public void incrCounter(String key, long amount) {
+        }
+
+        @Override
+        public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
+            return null;
+        }
+
+        @Override
+        public long getCounter(String key) {
+            return 0;
+        }
+
+        @Override
+        public CompletableFuture<Long> getCounterAsync(String key) {
+            return null;
+        }
+
+        @Override
+        public void putState(String key, ByteBuffer value) {
+
+        }
+
+        @Override
+        public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) {
+            return null;
+        }
+
+        @Override
+        public ByteBuffer getState(String key) {
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<ByteBuffer> getStateAsync(String key) {
+            return null;
+        }
     }
 
     @Test
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
index d30ff7b..1a8a859 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
@@ -20,7 +20,9 @@ package org.apache.pulsar.io.core;
 
 import org.slf4j.Logger;
 
+import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 
 public interface SinkContext {
 
@@ -81,4 +83,71 @@ public interface SinkContext {
      * @return The secret if anything was found or null
      */
     String getSecret(String secretName);
+
+    /**
+     * Increment the builtin distributed counter referred by key.
+     *
+     * @param key    The name of the key
+     * @param amount The amount to be incremented
+     */
+    void incrCounter(String key, long amount);
+
+
+    /**
+     * Increment the builtin distributed counter referred by key
+     * but dont wait for the completion of the increment operation
+     *
+     * @param key    The name of the key
+     * @param amount The amount to be incremented
+     */
+    CompletableFuture<Void> incrCounterAsync(String key, long amount);
+
+    /**
+     * Retrieve the counter value for the key.
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    long getCounter(String key);
+
+    /**
+     * Retrieve the counter value for the key, but don't wait
+     * for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    CompletableFuture<Long> getCounterAsync(String key);
+
+    /**
+     * Update the state value for the key.
+     *
+     * @param key   name of the key
+     * @param value state value of the key
+     */
+    void putState(String key, ByteBuffer value);
+
+    /**
+     * Update the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key   name of the key
+     * @param value state value of the key
+     */
+    CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+
+    /**
+     * Retrieve the state value for the key.
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    ByteBuffer getState(String key);
+
+    /**
+     * Retrieve the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    CompletableFuture<ByteBuffer> getStateAsync(String key);
 }
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
index b758220..a27d05f 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.io.core;
 
 import org.slf4j.Logger;
 
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
 public interface SourceContext {
 
     /**
@@ -79,4 +82,71 @@ public interface SourceContext {
      * @return The secret if anything was found or null
      */
     String getSecret(String secretName);
+
+    /**
+     * Increment the builtin distributed counter referred by key.
+     *
+     * @param key    The name of the key
+     * @param amount The amount to be incremented
+     */
+    void incrCounter(String key, long amount);
+
+
+    /**
+     * Increment the builtin distributed counter referred by key
+     * but dont wait for the completion of the increment operation
+     *
+     * @param key    The name of the key
+     * @param amount The amount to be incremented
+     */
+    CompletableFuture<Void> incrCounterAsync(String key, long amount);
+
+    /**
+     * Retrieve the counter value for the key.
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    long getCounter(String key);
+
+    /**
+     * Retrieve the counter value for the key, but don't wait
+     * for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    CompletableFuture<Long> getCounterAsync(String key);
+
+    /**
+     * Update the state value for the key.
+     *
+     * @param key   name of the key
+     * @param value state value of the key
+     */
+    void putState(String key, ByteBuffer value);
+
+    /**
+     * Update the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key   name of the key
+     * @param value state value of the key
+     */
+    CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+
+    /**
+     * Retrieve the state value for the key.
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    ByteBuffer getState(String key);
+
+    /**
+     * Retrieve the state value for the key, but don't wait for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    CompletableFuture<ByteBuffer> getStateAsync(String key);
 }
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
index 9e74b89..c4522d5 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java
@@ -32,10 +32,12 @@ import org.testng.annotations.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
 
 import static org.testng.Assert.*;
 
@@ -115,6 +117,46 @@ public class KafkaAbstractSinkTest {
 
             @Override
             public String getSecret(String key) { return null; }
+
+            @Override
+            public void incrCounter(String key, long amount) {
+
+            }
+
+            @Override
+            public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
+                return null;
+            }
+
+            @Override
+            public long getCounter(String key) {
+                return 0;
+            }
+
+            @Override
+            public CompletableFuture<Long> getCounterAsync(String key) {
+                return null;
+            }
+
+            @Override
+            public void putState(String key, ByteBuffer value) {
+
+            }
+
+            @Override
+            public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) {
+                return null;
+            }
+
+            @Override
+            public ByteBuffer getState(String key) {
+                return null;
+            }
+
+            @Override
+            public CompletableFuture<ByteBuffer> getStateAsync(String key) {
+                return null;
+            }
         };
         ThrowingRunnable openAndClose = ()->{
             try {
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 2cee062..3bfd358 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -31,9 +31,11 @@ import org.testng.annotations.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
@@ -116,6 +118,46 @@ public class KafkaAbstractSourceTest {
 
             @Override
             public String getSecret(String key) { return null; }
+
+            @Override
+            public void incrCounter(String key, long amount) {
+                
+            }
+
+            @Override
+            public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
+                return null;
+            }
+
+            @Override
+            public long getCounter(String key) {
+                return 0;
+            }
+
+            @Override
+            public CompletableFuture<Long> getCounterAsync(String key) {
+                return null;
+            }
+
+            @Override
+            public void putState(String key, ByteBuffer value) {
+
+            }
+
+            @Override
+            public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) {
+                return null;
+            }
+
+            @Override
+            public ByteBuffer getState(String key) {
+                return null;
+            }
+
+            @Override
+            public CompletableFuture<ByteBuffer> getStateAsync(String key) {
+                return null;
+            }
         };
         Map<String, Object> config = new HashMap<>();
         ThrowingRunnable openAndClose = ()->{