You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/05/24 21:46:56 UTC

[incubator-streampipes] branch dev updated: [STREAMPIPES-272] Allow client to subscribe to processors and sinks

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new f7945db  [STREAMPIPES-272] Allow client to subscribe to processors and sinks
f7945db is described below

commit f7945dbff6095f193a13f86832456f51400f0a58
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon May 24 23:46:42 2021 +0200

    [STREAMPIPES-272] Allow client to subscribe to processors and sinks
---
 .../streampipes/client/api/DataProcessorApi.java   | 41 ++++++++++++++++++++++
 .../apache/streampipes/client/api/DataSinkApi.java | 28 +++++++++++++++
 .../streampipes/client/model/InputStreamIndex.java | 33 +++++++++++++++++
 3 files changed, 102 insertions(+)

diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataProcessorApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataProcessorApi.java
index cf3fbf0..7170ea2 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataProcessorApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataProcessorApi.java
@@ -21,6 +21,7 @@ import org.apache.streampipes.client.annotation.NotYetImplemented;
 import org.apache.streampipes.client.live.EventProcessor;
 import org.apache.streampipes.client.live.KafkaConfig;
 import org.apache.streampipes.client.live.SubscriptionManager;
+import org.apache.streampipes.client.model.InputStreamIndex;
 import org.apache.streampipes.client.model.StreamPipesClientConfig;
 import org.apache.streampipes.client.util.StreamPipesApiPath;
 import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
@@ -69,15 +70,55 @@ public class DataProcessorApi extends AbstractClientApi<DataProcessorInvocation>
 
   }
 
+  /**
+   * Subscribe to the output stream of the processor
+   * @param processor The data processor to subscribe to
+   * @param callback The callback where events will be received
+   */
   public SpKafkaConsumer subscribe(DataProcessorInvocation processor,
                                    EventProcessor callback) {
     return new SubscriptionManager(clientConfig, processor.getOutputStream().getEventGrounding(), callback).subscribe();
   }
 
+  /**
+   * Subscribe to the output stream of the processor
+   * @param processor The data processor to subscribe to
+   * @param kafkaConfig Additional kafka settings which will override the default value (see docs)
+   * @param callback The callback where events will be received
+   */
   public SpKafkaConsumer subscribe(DataProcessorInvocation processor,
                                    KafkaConfig kafkaConfig,
                                    EventProcessor callback) {
     return new SubscriptionManager(clientConfig, kafkaConfig, processor.getOutputStream().getEventGrounding(), callback)
             .subscribe();
   }
+
+  /**
+   * Subscribe to the input stream of the processor
+   * @param processor The data processor to subscribe to
+   * @param index The index of the input stream
+   * @param callback The callback where events will be received
+   */
+  public SpKafkaConsumer subscribe(DataProcessorInvocation processor,
+                                   InputStreamIndex index,
+                                   EventProcessor callback) {
+    return new SubscriptionManager(clientConfig,
+            processor.getInputStreams().get(index.toIndex()).getEventGrounding(), callback).subscribe();
+  }
+
+  /**
+   * Subscribe to the input stream of the sink
+   * @param processor The data processor to subscribe to
+   * @param index The index of the input stream
+   * @param kafkaConfig Additional kafka settings which will override the default value (see docs)
+   * @param callback The callback where events will be received
+   */
+  public SpKafkaConsumer subscribe(DataProcessorInvocation processor,
+                                   InputStreamIndex index,
+                                   KafkaConfig kafkaConfig,
+                                   EventProcessor callback) {
+    return new SubscriptionManager(clientConfig, kafkaConfig,
+           processor.getInputStreams().get(index.toIndex()).getEventGrounding(), callback)
+            .subscribe();
+  }
 }
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java
index 83fbdb3..6855ae5 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java
@@ -18,8 +18,12 @@
 package org.apache.streampipes.client.api;
 
 import org.apache.streampipes.client.annotation.NotYetImplemented;
+import org.apache.streampipes.client.live.EventProcessor;
+import org.apache.streampipes.client.live.KafkaConfig;
+import org.apache.streampipes.client.live.SubscriptionManager;
 import org.apache.streampipes.client.model.StreamPipesClientConfig;
 import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
 
 import java.util.List;
@@ -57,6 +61,30 @@ public class DataSinkApi extends AbstractClientApi<DataSinkInvocation> implement
 
   }
 
+  /**
+   * Subscribe to the input stream of the sink
+   * @param sink The data sink to subscribe to
+   * @param callback The callback where events will be received
+   */
+  public SpKafkaConsumer subscribe(DataSinkInvocation sink,
+                                   EventProcessor callback) {
+    return new SubscriptionManager(clientConfig,
+            sink.getInputStreams().get(0).getEventGrounding(), callback).subscribe();
+  }
+
+  /**
+   * Subscribe to the input stream of the sink
+   * @param sink The data sink to subscribe to
+   * @param kafkaConfig Additional kafka settings which will override the default value (see docs)
+   * @param callback The callback where events will be received
+   */
+  public SpKafkaConsumer subscribe(DataSinkInvocation sink,
+                                   KafkaConfig kafkaConfig,
+                                   EventProcessor callback) {
+    return new SubscriptionManager(clientConfig, kafkaConfig,
+            sink.getInputStreams().get(0).getEventGrounding(), callback).subscribe();
+  }
+
   public DataSinkInvocation getDataSinkForPipelineElement(String templateId, DataSinkInvocation pipelineElement) {
     StreamPipesApiPath path = StreamPipesApiPath.fromUserApiPath(clientConfig.getCredentials())
             .addToPath("pipeline-element-templates")
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/model/InputStreamIndex.java b/streampipes-client/src/main/java/org/apache/streampipes/client/model/InputStreamIndex.java
new file mode 100644
index 0000000..488d456
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/model/InputStreamIndex.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.model;
+
+public enum InputStreamIndex {
+  FIRST(0),
+  SECOND(1);
+
+  private Integer index;
+
+  InputStreamIndex(Integer index) {
+    this.index = index;
+  }
+
+  public Integer toIndex() {
+    return index;
+  }
+}