You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/03/07 16:37:04 UTC
[incubator-streampipes] branch dev updated: Fix data set adapters
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 113f385 Fix data set adapters
113f385 is described below
commit 113f3852023ad033263f9efcf721b1c17baac963
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Sat Mar 7 17:36:47 2020 +0100
Fix data set adapters
---
.../container/worker/management/AdapterWorkerManagement.java | 2 +-
.../main/java/org/apache/streampipes/connect/adapter/Adapter.java | 7 +++++++
.../org/apache/streampipes/messaging/kafka/SpKafkaProducer.java | 1 +
3 files changed, 9 insertions(+), 1 deletion(-)
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
index 7a38d16..1cc47b8 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
@@ -91,7 +91,7 @@ public class AdapterWorkerManagement {
Protocol protocol = null;
if (adapterSetDescription instanceof GenericAdapterSetDescription) {
- protocol = AdapterDeclarerSingleton.getInstance().getProtocol(((GenericAdapterSetDescription) adapterSetDescription).getProtocolDescription().getElementId());
+ protocol = AdapterDeclarerSingleton.getInstance().getProtocol(((GenericAdapterSetDescription) adapterSetDescription).getProtocolDescription().getAppId());
((GenericAdapter) adapter).setProtocol(protocol);
}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
index bdb3492..d738322 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.connect.adapter;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.streampipes.connect.adapter.exception.AdapterException;
@@ -87,6 +88,12 @@ public abstract class Adapter<T extends AdapterDescription> implements Connector
public void changeEventGrounding(TransportProtocol transportProtocol) {
List<AdapterPipelineElement> pipelineElements = this.adapterPipeline.getPipelineElements();
SendToKafkaAdapterSink sink = (SendToKafkaAdapterSink) this.adapterPipeline.getPipelineSink();
+
+
+ if ("true".equals(System.getenv("SP_DEBUG"))) {
+ transportProtocol.setBrokerHostname("localhost");
+ ((KafkaTransportProtocol) transportProtocol).setKafkaPort(9094);
+ }
sink.changeTransportProtocol(transportProtocol);
}
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
index c20a70d..9b311fd 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
@@ -98,6 +98,7 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
private void createKafaTopic(KafkaTransportProtocol settings) {
String zookeeperHost = settings.getZookeeperHost() + ":" + settings.getZookeeperPort();
+
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);