You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/05/24 21:46:56 UTC
[incubator-streampipes] branch dev updated: [STREAMPIPES-272] Allow
client to subscribe to processors and sinks
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new f7945db [STREAMPIPES-272] Allow client to subscribe to processors and sinks
f7945db is described below
commit f7945dbff6095f193a13f86832456f51400f0a58
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon May 24 23:46:42 2021 +0200
[STREAMPIPES-272] Allow client to subscribe to processors and sinks
---
.../streampipes/client/api/DataProcessorApi.java | 41 ++++++++++++++++++++++
.../apache/streampipes/client/api/DataSinkApi.java | 28 +++++++++++++++
.../streampipes/client/model/InputStreamIndex.java | 33 +++++++++++++++++
3 files changed, 102 insertions(+)
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataProcessorApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataProcessorApi.java
index cf3fbf0..7170ea2 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataProcessorApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataProcessorApi.java
@@ -21,6 +21,7 @@ import org.apache.streampipes.client.annotation.NotYetImplemented;
import org.apache.streampipes.client.live.EventProcessor;
import org.apache.streampipes.client.live.KafkaConfig;
import org.apache.streampipes.client.live.SubscriptionManager;
+import org.apache.streampipes.client.model.InputStreamIndex;
import org.apache.streampipes.client.model.StreamPipesClientConfig;
import org.apache.streampipes.client.util.StreamPipesApiPath;
import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
@@ -69,15 +70,55 @@ public class DataProcessorApi extends AbstractClientApi<DataProcessorInvocation>
}
+ /**
+ * Subscribe to the output stream of the processor
+ * @param processor The data processor to subscribe to
+ * @param callback The callback where events will be received
+ */
public SpKafkaConsumer subscribe(DataProcessorInvocation processor,
EventProcessor callback) {
return new SubscriptionManager(clientConfig, processor.getOutputStream().getEventGrounding(), callback).subscribe();
}
+ /**
+ * Subscribe to the output stream of the processor
+ * @param processor The data processor to subscribe to
+ * @param kafkaConfig Additional kafka settings which will override the default value (see docs)
+ * @param callback The callback where events will be received
+ */
public SpKafkaConsumer subscribe(DataProcessorInvocation processor,
KafkaConfig kafkaConfig,
EventProcessor callback) {
return new SubscriptionManager(clientConfig, kafkaConfig, processor.getOutputStream().getEventGrounding(), callback)
.subscribe();
}
+
+ /**
+ * Subscribe to the input stream of the processor
+ * @param processor The data processor to subscribe to
+ * @param index The index of the input stream
+ * @param callback The callback where events will be received
+ */
+ public SpKafkaConsumer subscribe(DataProcessorInvocation processor,
+ InputStreamIndex index,
+ EventProcessor callback) {
+ return new SubscriptionManager(clientConfig,
+ processor.getInputStreams().get(index.toIndex()).getEventGrounding(), callback).subscribe();
+ }
+
+ /**
+ * Subscribe to the input stream of the sink
+ * @param processor The data processor to subscribe to
+ * @param index The index of the input stream
+ * @param kafkaConfig Additional kafka settings which will override the default value (see docs)
+ * @param callback The callback where events will be received
+ */
+ public SpKafkaConsumer subscribe(DataProcessorInvocation processor,
+ InputStreamIndex index,
+ KafkaConfig kafkaConfig,
+ EventProcessor callback) {
+ return new SubscriptionManager(clientConfig, kafkaConfig,
+ processor.getInputStreams().get(index.toIndex()).getEventGrounding(), callback)
+ .subscribe();
+ }
}
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java
index 83fbdb3..6855ae5 100644
--- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/DataSinkApi.java
@@ -18,8 +18,12 @@
package org.apache.streampipes.client.api;
import org.apache.streampipes.client.annotation.NotYetImplemented;
+import org.apache.streampipes.client.live.EventProcessor;
+import org.apache.streampipes.client.live.KafkaConfig;
+import org.apache.streampipes.client.live.SubscriptionManager;
import org.apache.streampipes.client.model.StreamPipesClientConfig;
import org.apache.streampipes.client.util.StreamPipesApiPath;
+import org.apache.streampipes.messaging.kafka.SpKafkaConsumer;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import java.util.List;
@@ -57,6 +61,30 @@ public class DataSinkApi extends AbstractClientApi<DataSinkInvocation> implement
}
+ /**
+ * Subscribe to the input stream of the sink
+ * @param sink The data sink to subscribe to
+ * @param callback The callback where events will be received
+ */
+ public SpKafkaConsumer subscribe(DataSinkInvocation sink,
+ EventProcessor callback) {
+ return new SubscriptionManager(clientConfig,
+ sink.getInputStreams().get(0).getEventGrounding(), callback).subscribe();
+ }
+
+ /**
+ * Subscribe to the input stream of the sink
+ * @param sink The data sink to subscribe to
+ * @param kafkaConfig Additional kafka settings which will override the default value (see docs)
+ * @param callback The callback where events will be received
+ */
+ public SpKafkaConsumer subscribe(DataSinkInvocation sink,
+ KafkaConfig kafkaConfig,
+ EventProcessor callback) {
+ return new SubscriptionManager(clientConfig, kafkaConfig,
+ sink.getInputStreams().get(0).getEventGrounding(), callback).subscribe();
+ }
+
public DataSinkInvocation getDataSinkForPipelineElement(String templateId, DataSinkInvocation pipelineElement) {
StreamPipesApiPath path = StreamPipesApiPath.fromUserApiPath(clientConfig.getCredentials())
.addToPath("pipeline-element-templates")
diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/model/InputStreamIndex.java b/streampipes-client/src/main/java/org/apache/streampipes/client/model/InputStreamIndex.java
new file mode 100644
index 0000000..488d456
--- /dev/null
+++ b/streampipes-client/src/main/java/org/apache/streampipes/client/model/InputStreamIndex.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.client.model;
+
+public enum InputStreamIndex {
+ FIRST(0),
+ SECOND(1);
+
+ private Integer index;
+
+ InputStreamIndex(Integer index) {
+ this.index = index;
+ }
+
+ public Integer toIndex() {
+ return index;
+ }
+}