You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/03/07 16:37:04 UTC

[incubator-streampipes] branch dev updated: Fix data set adapters

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 113f385  Fix data set adapters
113f385 is described below

commit 113f3852023ad033263f9efcf721b1c17baac963
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Sat Mar 7 17:36:47 2020 +0100

    Fix data set adapters
---
 .../container/worker/management/AdapterWorkerManagement.java       | 2 +-
 .../main/java/org/apache/streampipes/connect/adapter/Adapter.java  | 7 +++++++
 .../org/apache/streampipes/messaging/kafka/SpKafkaProducer.java    | 1 +
 3 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
index 7a38d16..1cc47b8 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/management/AdapterWorkerManagement.java
@@ -91,7 +91,7 @@ public class AdapterWorkerManagement {
 
         Protocol protocol = null;
         if (adapterSetDescription instanceof GenericAdapterSetDescription) {
-            protocol = AdapterDeclarerSingleton.getInstance().getProtocol(((GenericAdapterSetDescription) adapterSetDescription).getProtocolDescription().getElementId());
+            protocol = AdapterDeclarerSingleton.getInstance().getProtocol(((GenericAdapterSetDescription) adapterSetDescription).getProtocolDescription().getAppId());
             ((GenericAdapter) adapter).setProtocol(protocol);
         }
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
index bdb3492..d738322 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.connect.adapter;
 
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.streampipes.connect.adapter.exception.AdapterException;
@@ -87,6 +88,12 @@ public abstract class Adapter<T extends AdapterDescription> implements Connector
     public void changeEventGrounding(TransportProtocol transportProtocol) {
         List<AdapterPipelineElement> pipelineElements =  this.adapterPipeline.getPipelineElements();
         SendToKafkaAdapterSink sink = (SendToKafkaAdapterSink) this.adapterPipeline.getPipelineSink();
+
+
+        if ("true".equals(System.getenv("SP_DEBUG"))) {
+            transportProtocol.setBrokerHostname("localhost");
+            ((KafkaTransportProtocol) transportProtocol).setKafkaPort(9094);
+        }
         sink.changeTransportProtocol(transportProtocol);
     }
 
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
index c20a70d..9b311fd 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java
@@ -98,6 +98,7 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S
   private void createKafaTopic(KafkaTransportProtocol settings) {
     String zookeeperHost = settings.getZookeeperHost() + ":" + settings.getZookeeperPort();
 
+
     Properties props = new Properties();
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);