You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/10/19 19:07:13 UTC

[incubator-streampipes] branch STREAMPIPES-445 created (now 4c98aad)

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a change to branch STREAMPIPES-445
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


      at 4c98aad  Remove unused constructors in adapters using a debug parameter

This branch includes the following new commits:

     new 4c98aad  Remove unused constructors in adapters using a debug parameter

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[incubator-streampipes] 01/01: Remove unused constructors in adapters using a debug parameter

Posted by ze...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-445
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 4c98aad62eba02196cc1d0e0c821963fa8643200
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Tue Oct 19 21:06:28 2021 +0200

    Remove unused constructors in adapters using a debug parameter
---
 .../apache/streampipes/connect/api/IAdapter.java   |  1 -
 .../init/ConnectWorkerRegistrationService.java     |  3 ++
 .../connect/RunningAdapterInstances.java           |  3 ++
 .../streampipes/connect/adapter/Adapter.java       | 33 +++++-----------------
 .../adapter/model/generic/GenericAdapter.java      |  4 ---
 .../model/generic/GenericDataSetAdapter.java       |  5 ----
 .../model/generic/GenericDataStreamAdapter.java    |  4 ---
 .../adapter/model/pipeline/AdapterPipeline.java    | 15 ++++------
 .../adapter/model/specific/SpecificAdapter.java    |  3 --
 .../model/specific/SpecificDataSetAdapter.java     |  3 --
 .../model/specific/SpecificDataStreamAdapter.java  |  5 ----
 .../AdapterMonitoring.java}                        | 21 ++++++--------
 .../AdapterStatus.java}                            | 29 ++++++++++++-------
 13 files changed, 46 insertions(+), 83 deletions(-)

diff --git a/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IAdapter.java b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IAdapter.java
index 4eea9c4..bce9b1d 100644
--- a/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IAdapter.java
+++ b/streampipes-connect-api/src/main/java/org/apache/streampipes/connect/api/IAdapter.java
@@ -38,5 +38,4 @@ public interface IAdapter<T extends AdapterDescription> extends Connector {
 
   void changeEventGrounding(TransportProtocol transportProtocol);
 
-  boolean isDebug();
 }
diff --git a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
index 9de2137..6d8260f 100644
--- a/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
+++ b/streampipes-connect-container-worker/src/main/java/org/apache/streampipes/connect/container/worker/init/ConnectWorkerRegistrationService.java
@@ -56,6 +56,9 @@ public class ConnectWorkerRegistrationService {
         }
       }
     }
+
+
+
   }
 
   private List<String> getConnectMasterUrl() {
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/RunningAdapterInstances.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/RunningAdapterInstances.java
index d4cf08d..374880d 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/RunningAdapterInstances.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/RunningAdapterInstances.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.connect;
 
+import org.apache.streampipes.connect.adapter.monitoring.AdapterMonitoring;
 import org.apache.streampipes.connect.api.IAdapter;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 
@@ -31,6 +32,8 @@ public enum RunningAdapterInstances {
     private final Map<String, IAdapter<?>> runningAdapterInstances = new HashMap<>();
     private final Map<String, AdapterDescription> runningAdapterDescriptionInstances = new HashMap<>();
 
+    private AdapterMonitoring adapterMonitoring = new AdapterMonitoring();
+
     public void addAdapter(String elementId, IAdapter<?> adapter, AdapterDescription adapterDescription) {
         runningAdapterInstances.put(elementId, adapter);
         runningAdapterDescriptionInstances.put(elementId, adapterDescription);
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
index e6336ca..5fa4474 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/Adapter.java
@@ -44,28 +44,21 @@ import java.util.List;
 public abstract class Adapter<T extends AdapterDescription> implements IAdapter<T> {
     Logger logger = LoggerFactory.getLogger(Adapter.class);
 
-    private boolean debug;
-
     protected AdapterPipeline adapterPipeline;
 
     protected T adapterDescription;
 
-    public Adapter(T adapterDescription, boolean debug) {
-        this.adapterDescription = adapterDescription;
-        this.debug = debug;
-        this.adapterPipeline = getAdapterPipeline(adapterDescription);
+    public Adapter() {
     }
 
     public Adapter(T adapterDescription) {
-        this(adapterDescription, false);
-    }
+        this.adapterDescription = adapterDescription;
 
-    public Adapter(boolean debug) {
-        this.debug = debug;
-    }
+        if (adapterDescription.getEventGrounding() != null && adapterDescription.getEventGrounding().getTransportProtocol() != null
+                && adapterDescription.getEventGrounding().getTransportProtocol().getBrokerHostname() != null) {
 
-    public Adapter() {
-        this(false);
+            this.adapterPipeline = getAdapterPipeline(adapterDescription);
+        }
     }
 
     @Override
@@ -134,13 +127,7 @@ public abstract class Adapter<T extends AdapterDescription> implements IAdapter<
         }
         pipelineElements.add(transformStreamAdapterElement);
 
-        // Needed when adapter is (
-        if (adapterDescription.getEventGrounding() != null && adapterDescription.getEventGrounding().getTransportProtocol() != null
-                && adapterDescription.getEventGrounding().getTransportProtocol().getBrokerHostname() != null) {
-            return new AdapterPipeline(pipelineElements, getAdapterSink(adapterDescription));
-        }
-
-        return new AdapterPipeline(pipelineElements);
+        return new AdapterPipeline(pipelineElements, getAdapterSink(adapterDescription));
     }
 
     private SendToBrokerAdapterSink<?> getAdapterSink(AdapterDescription adapterDescription) {
@@ -190,10 +177,4 @@ public abstract class Adapter<T extends AdapterDescription> implements IAdapter<
 
         return null;
     }
-
-    @Override
-    public boolean isDebug() {
-        return debug;
-    }
-
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericAdapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericAdapter.java
index 36d6284..0f2203c 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericAdapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericAdapter.java
@@ -42,10 +42,6 @@ public abstract class GenericAdapter<T extends AdapterDescription> extends Adapt
         super(adapterDescription);
     }
 
-    public GenericAdapter(T adapterDescription, boolean debug) {
-        super(adapterDescription, debug);
-    }
-
     public GenericAdapter() {
         super();
     }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericDataSetAdapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericDataSetAdapter.java
index 70acd19..bb50a17 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericDataSetAdapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericDataSetAdapter.java
@@ -35,11 +35,6 @@ public class GenericDataSetAdapter extends GenericAdapter<GenericAdapterSetDescr
         super();
     }
 
-
-    public GenericDataSetAdapter(GenericAdapterSetDescription adapterDescription, boolean debug) {
-        super(adapterDescription, debug);
-    }
-
     public GenericDataSetAdapter(GenericAdapterSetDescription adapterDescription) {
         super(adapterDescription);
     }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericDataStreamAdapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericDataStreamAdapter.java
index 87588e6..82c0ea4 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericDataStreamAdapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/generic/GenericDataStreamAdapter.java
@@ -35,10 +35,6 @@ public class GenericDataStreamAdapter extends GenericAdapter<GenericAdapterStrea
         super();
     }
 
-    public GenericDataStreamAdapter(GenericAdapterStreamDescription adapterDescription, boolean debug) {
-        super(adapterDescription, debug);
-    }
-
     public GenericDataStreamAdapter(GenericAdapterStreamDescription adapterDescription) {
         super(adapterDescription);
     }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java
index 98613c8..4bf7af4 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/pipeline/AdapterPipeline.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.connect.adapter.model.pipeline;
 
+import org.apache.streampipes.connect.adapter.monitoring.AdapterMonitoring;
 import org.apache.streampipes.connect.api.IAdapterPipeline;
 import org.apache.streampipes.connect.api.IAdapterPipelineElement;
 
@@ -29,10 +30,7 @@ public class AdapterPipeline implements IAdapterPipeline {
     private List<IAdapterPipelineElement> pipelineElements;
     private IAdapterPipelineElement pipelineSink;
 
-
-    public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements) {
-        this.pipelineElements = pipelineElements;
-    }
+    private AdapterMonitoring adapterMonitoring;
 
     public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements, IAdapterPipelineElement pipelineSink) {
         this.pipelineElements = pipelineElements;
@@ -42,16 +40,13 @@ public class AdapterPipeline implements IAdapterPipeline {
     @Override
     public void process(Map<String, Object> event) {
 
-        // TODO remove, just for performance tests
-        if ("true".equals(System.getenv("SP_DEBUG_CONNECT"))) {
-            event.put("internal_t1", System.currentTimeMillis());
-        }
-
-
         for (IAdapterPipelineElement pipelineElement : pipelineElements) {
             event = pipelineElement.process(event);
         }
+
         if (pipelineSink != null) {
+            // Write to statistics here
+
             pipelineSink.process(event);
         }
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificAdapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificAdapter.java
index f2ee27d..f2ef1eb 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificAdapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificAdapter.java
@@ -31,7 +31,4 @@ public abstract class SpecificAdapter <T extends AdapterDescription> extends Ada
         super(adapterDescription);
     }
 
-    public SpecificAdapter(T adapterDescription, boolean debug) {
-        super(adapterDescription, debug);
-    }
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificDataSetAdapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificDataSetAdapter.java
index 1cdac75..b81f76e 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificDataSetAdapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificDataSetAdapter.java
@@ -30,7 +30,4 @@ public abstract class SpecificDataSetAdapter extends SpecificAdapter<SpecificAda
         super(adapterDescription);
     }
 
-    public SpecificDataSetAdapter(SpecificAdapterSetDescription adapterDescription, boolean debug) {
-        super(adapterDescription, debug);
-    }
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificDataStreamAdapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificDataStreamAdapter.java
index 7b85884..cfca442 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificDataStreamAdapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificDataStreamAdapter.java
@@ -29,9 +29,4 @@ public abstract class SpecificDataStreamAdapter extends SpecificAdapter<Specific
     public SpecificDataStreamAdapter(SpecificAdapterStreamDescription adapterDescription) {
         super(adapterDescription);
     }
-
-    public SpecificDataStreamAdapter(SpecificAdapterStreamDescription adapterDescription, boolean debug) {
-        super(adapterDescription, debug);
-    }
-
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificAdapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/monitoring/AdapterMonitoring.java
similarity index 60%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificAdapter.java
copy to streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/monitoring/AdapterMonitoring.java
index f2ee27d..f732dcd 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificAdapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/monitoring/AdapterMonitoring.java
@@ -16,22 +16,19 @@
  *
  */
 
-package org.apache.streampipes.connect.adapter.model.specific;
+package org.apache.streampipes.connect.adapter.monitoring;
 
-import org.apache.streampipes.connect.adapter.Adapter;
-import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import java.util.concurrent.ConcurrentHashMap;
 
-public abstract class SpecificAdapter <T extends AdapterDescription> extends Adapter<T> {
+public class AdapterMonitoring {
+    private ConcurrentHashMap<String, AdapterStatus> adapterStatus;
 
-    public SpecificAdapter() {
-        super();
+    public AdapterMonitoring() {
+        // Should I start the thread here?
+        this.adapterStatus = new ConcurrentHashMap<>();
     }
 
-    public SpecificAdapter(T adapterDescription) {
-        super(adapterDescription);
-    }
-
-    public SpecificAdapter(T adapterDescription, boolean debug) {
-        super(adapterDescription, debug);
+    public ConcurrentHashMap<String, AdapterStatus> getAdapterStatus() {
+        return adapterStatus;
     }
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificAdapter.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/monitoring/AdapterStatus.java
similarity index 61%
copy from streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificAdapter.java
copy to streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/monitoring/AdapterStatus.java
index f2ee27d..a32dc9d 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/model/specific/SpecificAdapter.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/monitoring/AdapterStatus.java
@@ -16,22 +16,31 @@
  *
  */
 
-package org.apache.streampipes.connect.adapter.model.specific;
+package org.apache.streampipes.connect.adapter.monitoring;
 
-import org.apache.streampipes.connect.adapter.Adapter;
-import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+public class AdapterStatus {
+    private long timestamp;
+    private int count;
 
-public abstract class SpecificAdapter <T extends AdapterDescription> extends Adapter<T> {
+    public AdapterStatus() {
+        reset();
+    }
+
+    public void reset() {
+        timestamp = -1;
+        count = 0;
+    }
 
-    public SpecificAdapter() {
-        super();
+    public void increaseCount(long newTimestamp) {
+        this.timestamp = newTimestamp;
+        this.count++;
     }
 
-    public SpecificAdapter(T adapterDescription) {
-        super(adapterDescription);
+    public long getTimestamp() {
+        return timestamp;
     }
 
-    public SpecificAdapter(T adapterDescription, boolean debug) {
-        super(adapterDescription, debug);
+    public int getCount() {
+        return count;
     }
 }