You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/09/24 08:03:37 UTC

[incubator-streampipes] branch STREAMPIPES-245 created (now 6a82830)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch STREAMPIPES-245
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


      at 6a82830  [STREAMPIPES-245] Add initial version of pipeline monitoring feature

This branch includes the following new commits:

     new 6a82830  [STREAMPIPES-245] Add initial version of pipeline monitoring feature

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-streampipes] 01/01: [STREAMPIPES-245] Add initial version of pipeline monitoring feature

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-245
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 6a828302501d045a053884c39f8786611afe327c
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Thu Sep 24 10:03:20 2020 +0200

    [STREAMPIPES-245] Add initial version of pipeline monitoring feature
---
 pom.xml                                            |   2 +-
 .../backend/StreamPipesResourceConfig.java         |   1 +
 .../kafka/config/ConsumerConfigFactory.java        |   2 +-
 .../monitoring/PipelineElementMonitoringInfo.java  |  85 ++++++++
 .../model/monitoring/PipelineElementTopicInfo.java |  63 ++++++
 .../model/monitoring/PipelineMonitoringInfo.java   |  67 ++++++
 .../manager/execution/http/PipelineExecutor.java   |  31 ++-
 .../pipeline/PipelineExecutionStatusCollector.java |  37 ++++
 .../monitoring/pipeline/TopicInfoCollector.java    | 228 +++++++++++++++++++++
 .../streampipes/rest/api/IPipelineMonitoring.java  |   9 +-
 .../streampipes/rest/impl/PipelineMonitoring.java  |  42 ++++
 .../app-transport-monitoring.module.ts             |   4 +-
 .../outgoing/outgoing-view.component.html          |  12 +-
 .../transport-summary.component.html               |  14 +-
 ui/src/app/core-model/gen/streampipes-model.ts     |  71 ++++++-
 ui/src/app/core-ui/core-ui.module.ts               | 139 +++++++------
 .../barchart/barchart-widget.component.html}       |  20 +-
 .../widget/barchart/barchart-widget.component.scss |   9 +-
 .../widget/barchart/barchart-widget.component.ts   |  34 ++-
 .../widget/status/status-widget.component.css}     |   0
 .../widget/status/status-widget.component.html}    |   7 +-
 .../widget/status/status-widget.component.ts}      |  13 +-
 .../elements/pipeline-elements-row.component.html  |   4 +-
 .../elements/pipeline-elements-row.component.ts    |   3 +
 .../components/model/pipeline-details.model.ts     |  10 +-
 .../monitoring/pipeline-monitoring.component.html  |  41 ++++
 .../monitoring/pipeline-monitoring.component.scss} |  30 +--
 .../monitoring/pipeline-monitoring.component.ts    |  89 ++++++++
 .../pipeline-element-statistics.component.html     |  45 ++++
 .../pipeline-element-statistics.component.scss     |  11 +-
 .../pipeline-element-statistics.component.ts       | 144 +++++++++++++
 .../pipeline-details.component.html                |   5 +-
 .../pipeline-details/pipeline-details.module.ts    |   6 +
 .../apis/pipeline-monitoring.service.ts            |  24 ++-
 ui/src/app/platform-services/platform.module.ts    |   2 +
 ui/src/scss/sp/colors.scss                         |   4 +
 36 files changed, 1152 insertions(+), 156 deletions(-)

diff --git a/pom.xml b/pom.xml
index 5853729..f40d05c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,7 +78,7 @@
         <jgrapht.version>1.3.1</jgrapht.version>
         <json-path.version>3.1.0</json-path.version>
         <jsr305.version>3.0.2</jsr305.version>
-        <kafka.version>2.2.0</kafka.version>
+        <kafka.version>2.6.0</kafka.version>
         <lightcouch.version>0.2.0</lightcouch.version>
         <log4j.version>2.12.1</log4j.version>
         <logback-classic.version>1.2.3</logback-classic.version>
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
index 9ed13f6..1b8998d 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
@@ -70,6 +70,7 @@ public class StreamPipesResourceConfig extends ResourceConfig {
     register(PipelineElementImportNoUser.class);
     register(PipelineElementImport.class);
     register(PipelineElementRuntimeInfo.class);
+    register(PipelineMonitoring.class);
     register(PipelineNoUserResource.class);
     register(PipelineTemplate.class);
     register(PipelineWithUserResource.class);
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
index 1c7cbf4..72faf42 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
@@ -26,7 +26,7 @@ import java.util.UUID;
 public class ConsumerConfigFactory extends AbstractConfigFactory {
 
   private static final String ENABLE_AUTO_COMMIT_CONFIG_DEFAULT = "true";
-  private static final String AUTO_COMMIT_INTERVAL_MS_CONFIG_DEFAULT = "10000";
+  private static final String AUTO_COMMIT_INTERVAL_MS_CONFIG_DEFAULT = "5000";
   private static final String SESSION_TIMEOUT_MS_CONFIG_DEFAULT = "30000";
   private static final Integer FETCH_MAX_BYTES_CONFIG_DEFAULT = 5000012;
   private static final String KEY_DESERIALIZER_CLASS_CONFIG_DEFAULT = "org.apache.kafka.common" +
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementMonitoringInfo.java b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementMonitoringInfo.java
new file mode 100644
index 0000000..4812ba2
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementMonitoringInfo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.model.monitoring;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PipelineElementMonitoringInfo {
+
+  private String pipelineElementId;
+  private String pipelineElementName;
+
+  private boolean inputTopicInfoExists;
+  private boolean outputTopicInfoExists;
+
+  private List<PipelineElementTopicInfo> inputTopicInfo;
+  private PipelineElementTopicInfo outputTopicInfo;
+
+  public PipelineElementMonitoringInfo() {
+    this.inputTopicInfo = new ArrayList<>();
+  }
+
+  public String getPipelineElementId() {
+    return pipelineElementId;
+  }
+
+  public void setPipelineElementId(String pipelineElementId) {
+    this.pipelineElementId = pipelineElementId;
+  }
+
+  public String getPipelineElementName() {
+    return pipelineElementName;
+  }
+
+  public void setPipelineElementName(String pipelineElementName) {
+    this.pipelineElementName = pipelineElementName;
+  }
+
+  public List<PipelineElementTopicInfo> getInputTopicInfo() {
+    return inputTopicInfo;
+  }
+
+  public void setInputTopicInfo(List<PipelineElementTopicInfo> inputTopicInfo) {
+    this.inputTopicInfo = inputTopicInfo;
+  }
+
+  public PipelineElementTopicInfo getOutputTopicInfo() {
+    return outputTopicInfo;
+  }
+
+  public void setOutputTopicInfo(PipelineElementTopicInfo outputTopicInfo) {
+    this.outputTopicInfo = outputTopicInfo;
+  }
+
+  public boolean isInputTopicInfoExists() {
+    return inputTopicInfoExists;
+  }
+
+  public void setInputTopicInfoExists(boolean inputTopicInfoExists) {
+    this.inputTopicInfoExists = inputTopicInfoExists;
+  }
+
+  public boolean isOutputTopicInfoExists() {
+    return outputTopicInfoExists;
+  }
+
+  public void setOutputTopicInfoExists(boolean outputTopicInfoExists) {
+    this.outputTopicInfoExists = outputTopicInfoExists;
+  }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementTopicInfo.java b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementTopicInfo.java
new file mode 100644
index 0000000..5270668
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementTopicInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.model.monitoring;
+
+public class PipelineElementTopicInfo {
+
+  private String topicName;
+
+  private Long currentOffset;
+  private Long latestOffset;
+  private Long offsetAtPipelineStart;
+
+  public PipelineElementTopicInfo() {
+
+  }
+
+  public String getTopicName() {
+    return topicName;
+  }
+
+  public void setTopicName(String topicName) {
+    this.topicName = topicName;
+  }
+
+  public Long getCurrentOffset() {
+    return currentOffset;
+  }
+
+  public void setCurrentOffset(Long currentOffset) {
+    this.currentOffset = currentOffset;
+  }
+
+  public Long getLatestOffset() {
+    return latestOffset;
+  }
+
+  public void setLatestOffset(Long latestOffset) {
+    this.latestOffset = latestOffset;
+  }
+
+  public Long getOffsetAtPipelineStart() {
+    return offsetAtPipelineStart;
+  }
+
+  public void setOffsetAtPipelineStart(Long offsetAtPipelineStart) {
+    this.offsetAtPipelineStart = offsetAtPipelineStart;
+  }
+}
\ No newline at end of file
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineMonitoringInfo.java b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineMonitoringInfo.java
new file mode 100644
index 0000000..43b6a07
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineMonitoringInfo.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.model.monitoring;
+
+import org.apache.streampipes.model.shared.annotation.TsModel;
+
+import java.util.List;
+
+@TsModel
+public class PipelineMonitoringInfo {
+
+  private String pipelineId;
+  private long createdAt;
+  private long startedAt;
+
+  private List<PipelineElementMonitoringInfo> pipelineElementMonitoringInfo;
+
+  public PipelineMonitoringInfo() {
+  }
+
+  public String getPipelineId() {
+    return pipelineId;
+  }
+
+  public void setPipelineId(String pipelineId) {
+    this.pipelineId = pipelineId;
+  }
+
+  public long getCreatedAt() {
+    return createdAt;
+  }
+
+  public void setCreatedAt(long createdAt) {
+    this.createdAt = createdAt;
+  }
+
+  public List<PipelineElementMonitoringInfo> getPipelineElementMonitoringInfo() {
+    return pipelineElementMonitoringInfo;
+  }
+
+  public void setPipelineElementMonitoringInfo(List<PipelineElementMonitoringInfo> pipelineElementMonitoringInfo) {
+    this.pipelineElementMonitoringInfo = pipelineElementMonitoringInfo;
+  }
+
+  public long getStartedAt() {
+    return startedAt;
+  }
+
+  public void setStartedAt(long startedAt) {
+    this.startedAt = startedAt;
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
index ce16327..d2f66ab 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
@@ -18,28 +18,26 @@
 
 package org.apache.streampipes.manager.execution.http;
 
-import org.lightcouch.DocumentConflictException;
 import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
 import org.apache.streampipes.manager.execution.status.SepMonitoringManager;
 import org.apache.streampipes.manager.util.TemporaryGraphStorage;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.model.message.PipelineStatusMessage;
-import org.apache.streampipes.model.message.PipelineStatusMessageType;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.message.PipelineStatusMessage;
+import org.apache.streampipes.model.message.PipelineStatusMessageType;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
 import org.apache.streampipes.storage.api.IPipelineStorage;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.apache.streampipes.user.management.encryption.CredentialsManager;
+import org.lightcouch.DocumentConflictException;
 
 import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
 import java.util.stream.Collectors;
 
 public class PipelineExecutor {
@@ -59,8 +57,12 @@ public class PipelineExecutor {
 
   public PipelineOperationStatus startPipeline() {
 
+    pipeline.getSepas().forEach(this::updateGroupIds);
+    pipeline.getActions().forEach(this::updateGroupIds);
+
     List<DataProcessorInvocation> sepas = pipeline.getSepas();
     List<DataSinkInvocation> secs = pipeline.getActions();
+
     List<SpDataSet> dataSets = pipeline.getStreams().stream().filter(s -> s instanceof SpDataSet).map(s -> new
             SpDataSet((SpDataSet) s)).collect(Collectors.toList());
 
@@ -74,7 +76,7 @@ public class PipelineExecutor {
 
     List<InvocableStreamPipesEntity> decryptedGraphs = decryptSecrets(graphs);
 
-    graphs.forEach(g -> g.setStreamRequirements(Arrays.asList()));
+    graphs.forEach(g -> g.setStreamRequirements(Collections.emptyList()));
 
     PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(),
             pipeline.getName(), decryptedGraphs, dataSets)
@@ -97,6 +99,15 @@ public class PipelineExecutor {
     return status;
   }
 
+  private void updateGroupIds(InvocableStreamPipesEntity entity) {
+    entity.getInputStreams()
+            .stream()
+            .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol)
+            .map(is -> is.getEventGrounding().getTransportProtocol())
+            .map(KafkaTransportProtocol.class::cast)
+            .forEach(tp -> tp.setGroupId(UUID.randomUUID().toString()));
+  }
+
   private List<InvocableStreamPipesEntity> decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
     List<InvocableStreamPipesEntity> decryptedGraphs = new ArrayList<>();
     graphs.stream().map(g -> {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
index c6e991f..6a8ad30 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
@@ -18,7 +18,44 @@
 
 package org.apache.streampipes.manager.monitoring.pipeline;
 
+import org.apache.streampipes.model.monitoring.PipelineElementMonitoringInfo;
+import org.apache.streampipes.model.monitoring.PipelineMonitoringInfo;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import java.util.List;
+
 public class PipelineExecutionStatusCollector {
 
+  private String pipelineId;
+
+  public PipelineExecutionStatusCollector(String pipelineId) {
+    this.pipelineId = pipelineId;
+  }
+
+  public PipelineMonitoringInfo makePipelineMonitoringInfo() {
+    Pipeline pipeline = getPipeline();
+
+    PipelineMonitoringInfo monitoringInfo = new PipelineMonitoringInfo();
+    monitoringInfo.setCreatedAt(pipeline.getCreatedAt());
+    monitoringInfo.setStartedAt(pipeline.getStartedAt());
+    monitoringInfo.setPipelineId(pipelineId);
+
+    monitoringInfo.setPipelineElementMonitoringInfo(makePipelineElementMonitoringInfo(pipeline));
+
+    return monitoringInfo;
+  }
+
+  private List<PipelineElementMonitoringInfo> makePipelineElementMonitoringInfo(Pipeline pipeline) {
+    return new TopicInfoCollector(pipeline).makeMonitoringInfo();
+
+  }
 
+  private Pipeline getPipeline() {
+    return StorageDispatcher
+            .INSTANCE
+            .getNoSqlStore()
+            .getPipelineStorageAPI()
+            .getPipeline(this.pipelineId);
+  }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.java
new file mode 100644
index 0000000..0b035a4
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.monitoring.pipeline;
+
+import org.apache.kafka.clients.admin.*;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.model.monitoring.PipelineElementMonitoringInfo;
+import org.apache.streampipes.model.monitoring.PipelineElementTopicInfo;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.sdk.helpers.Tuple2;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class TopicInfoCollector {
+
+
+  private Pipeline pipeline;
+  private AdminClient kafkaAdminClient;
+
+  private Map<String, Long> latestTopicOffsets;
+  private Map<String, Long> topicOffsetAtPipelineStart;
+  private Map<String, Long> currentConsumerGroupOffsets;
+
+  private List<PipelineElementMonitoringInfo> monitoringInfo;
+
+  public TopicInfoCollector(Pipeline pipeline) {
+    this.pipeline = pipeline;
+    this.kafkaAdminClient = kafkaAdminClient();
+    this.latestTopicOffsets = new HashMap<>();
+    this.topicOffsetAtPipelineStart = new HashMap<>();
+    this.currentConsumerGroupOffsets = new HashMap<>();
+    this.monitoringInfo = new ArrayList<>();
+  }
+
+  private void makeTopicInfo() {
+    long currentTime = Instant.now().minus (1, ChronoUnit.SECONDS).toEpochMilli();
+    List<TransportProtocol> affectedProtocols = new ArrayList<>();
+
+    pipeline.getSepas().forEach(processor -> affectedProtocols.addAll(extractProtocols(processor)));
+    pipeline.getActions().forEach(sink -> affectedProtocols.addAll(extractProtocols(sink)));
+
+    affectedProtocols.forEach(protocol -> {
+      if (protocol instanceof KafkaTransportProtocol) {
+        try {
+          Tuple2<String, Long> latestTopicOffsets = makeTopicOffsetInfo((KafkaTransportProtocol) protocol, OffsetSpec.forTimestamp(currentTime));
+          Tuple2<String, Long> topicOffsetAtPipelineStart = makeTopicOffsetInfo((KafkaTransportProtocol) protocol, OffsetSpec.forTimestamp(pipeline.getStartedAt()));
+          Tuple2<String, Long> currentConsumerGroupOffsets = makeTopicInfo((KafkaTransportProtocol) protocol);
+
+          this.latestTopicOffsets.put(latestTopicOffsets.a, latestTopicOffsets.b);
+          this.topicOffsetAtPipelineStart.put(topicOffsetAtPipelineStart.a, topicOffsetAtPipelineStart.b);
+          this.currentConsumerGroupOffsets.put(currentConsumerGroupOffsets.a, currentConsumerGroupOffsets.b);
+
+        } catch (ExecutionException | InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    });
+  }
+
+  public List<PipelineElementMonitoringInfo> makeMonitoringInfo() {
+    this.makeTopicInfo();
+    // TODO filter for KafkaGrounding
+    this.pipeline.getStreams().forEach(stream -> this.monitoringInfo.add(makeStreamMonitoringInfo(stream)));
+    this.pipeline.getSepas().forEach(processor -> this.monitoringInfo.add(makeProcessorMonitoringInfo(processor)));
+    this.pipeline.getActions().forEach(sink -> this.monitoringInfo.add(makeSinkMonitoringInfo(sink)));
+
+    this.kafkaAdminClient.close();
+    return this.monitoringInfo;
+  }
+
+  private PipelineElementMonitoringInfo makeStreamMonitoringInfo(SpDataStream stream) {
+    PipelineElementMonitoringInfo info = prepare(stream.getElementId(), stream.getName(), false, true);
+    KafkaTransportProtocol protocol = (KafkaTransportProtocol) stream.getEventGrounding().getTransportProtocol();
+    info.setOutputTopicInfo(makeOutputTopicInfoForPipelineElement(protocol));
+
+    return info;
+  }
+
+  private PipelineElementMonitoringInfo makeProcessorMonitoringInfo(DataProcessorInvocation processor) {
+    PipelineElementMonitoringInfo info = prepare(processor.getElementId(), processor.getName(), true, true);
+    KafkaTransportProtocol outputProtocol = (KafkaTransportProtocol) processor.getOutputStream().getEventGrounding().getTransportProtocol();
+    PipelineElementTopicInfo outputTopicInfo = makeOutputTopicInfoForPipelineElement(outputProtocol);
+    List<PipelineElementTopicInfo> inputTopicInfo = makeInputTopicInfoForPipelineElement(processor.getInputStreams());
+
+    info.setOutputTopicInfo(outputTopicInfo);
+    info.setInputTopicInfo(inputTopicInfo);
+
+    return info;
+  }
+
+  private PipelineElementMonitoringInfo makeSinkMonitoringInfo(DataSinkInvocation sink) {
+    PipelineElementMonitoringInfo info = prepare(sink.getElementId(), sink.getName(), true, false);
+    info.setInputTopicInfo(makeInputTopicInfoForPipelineElement(sink.getInputStreams()));
+    return info;
+  }
+
+  private List<PipelineElementTopicInfo> makeInputTopicInfoForPipelineElement(List<SpDataStream> inputStreams) {
+    List<PipelineElementTopicInfo> topicInfos = new ArrayList<>();
+    inputStreams.stream().map(is -> is.getEventGrounding().getTransportProtocol()).forEach(protocol -> {
+      PipelineElementTopicInfo topicInfo = new PipelineElementTopicInfo();
+      String topic = getTopic((KafkaTransportProtocol) protocol);
+      topicInfo.setTopicName(topic);
+      topicInfo.setLatestOffset(latestTopicOffsets.get(topic));
+      topicInfo.setOffsetAtPipelineStart(topicOffsetAtPipelineStart.get(topic));
+      topicInfo.setCurrentOffset(currentConsumerGroupOffsets.get(((KafkaTransportProtocol) protocol).getGroupId()));
+      topicInfos.add(topicInfo);
+    });
+
+    return topicInfos;
+  }
+
+  private PipelineElementTopicInfo makeOutputTopicInfoForPipelineElement(KafkaTransportProtocol protocol) {
+    PipelineElementTopicInfo topicInfo = new PipelineElementTopicInfo();
+    String topic = getTopic(protocol);
+    topicInfo.setTopicName(topic);
+    topicInfo.setOffsetAtPipelineStart(this.topicOffsetAtPipelineStart.get(topic));
+    topicInfo.setLatestOffset(this.latestTopicOffsets.get(topic));
+
+    return topicInfo;
+  }
+
+  private String getTopic(KafkaTransportProtocol protocol) {
+    return protocol.getTopicDefinition().getActualTopicName();
+  }
+
+  private PipelineElementMonitoringInfo prepare(String elementId, String name, boolean inputTopics, boolean outputTopics) {
+    PipelineElementMonitoringInfo info = new PipelineElementMonitoringInfo();
+    info.setPipelineElementName(name);
+    info.setPipelineElementId(elementId);
+    info.setInputTopicInfoExists(inputTopics);
+    info.setOutputTopicInfoExists(outputTopics);
+    return info;
+  }
+
+  private AdminClient kafkaAdminClient() {
+    return KafkaAdminClient.create(makeProperties());
+  }
+
+  private Properties makeProperties() {
+    Properties props = new Properties();
+
+    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
+    props.put(AdminClientConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+
+    return props;
+  }
+
+  private String getBrokerUrl() {
+    String env = System.getenv("SP_DEBUG");
+    System.out.println(System.getenv("SP_DEBUG"));
+    if ("true".equals(env.replaceAll(" ", ""))) {
+      return "localhost:9094";
+    } else {
+      return BackendConfig.INSTANCE.getKafkaUrl();
+    }
+  }
+
+  private Tuple2<String, Long> makeTopicOffsetInfo(KafkaTransportProtocol protocol, OffsetSpec offsetSpec) throws ExecutionException, InterruptedException {
+    Map<TopicPartition, OffsetAndMetadata> partitions = kafkaAdminClient.listConsumerGroupOffsets(protocol.getGroupId()).partitionsToOffsetAndMetadata().get();
+    Map<TopicPartition, OffsetSpec> desiredOffsets = new HashMap<>();
+    partitions.forEach((key, value) -> desiredOffsets.put(key, offsetSpec));
+    ListOffsetsResult offsetsResult = kafkaAdminClient.listOffsets(desiredOffsets);
+    Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> resultInfo = offsetsResult.all().get();
+    Long offset = resultInfo
+            .values()
+            .stream()
+            .map(ListOffsetsResult.ListOffsetsResultInfo::offset)
+            .reduce(0L, Long::sum);
+
+    return new Tuple2<>(protocol.getTopicDefinition().getActualTopicName(), offset);
+  }
+
+  private Tuple2<String, Long> makeTopicInfo(KafkaTransportProtocol protocol) throws ExecutionException, InterruptedException {
+    Long offset = kafkaAdminClient.listConsumerGroupOffsets(protocol.getGroupId())
+              .partitionsToOffsetAndMetadata()
+              .get()
+              .values()
+              .stream()
+              .map(OffsetAndMetadata::offset)
+              .reduce(0L, Long::sum);
+
+    return new Tuple2<>(protocol.getGroupId(), offset);
+  }
+
+  public List<TransportProtocol> extractProtocols(InvocableStreamPipesEntity pipelineElement) {
+    return pipelineElement
+            .getInputStreams()
+            .stream()
+            .map(stream -> stream.getEventGrounding().getTransportProtocol())
+            .collect(Collectors.toList());
+  }
+
+  public static void main(String[] args) {
+    List<Pipeline> pipelines = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getAllPipelines();
+    Pipeline testPipeline = pipelines.get(0);
+
+    List<PipelineElementMonitoringInfo> monitoringInfo = new TopicInfoCollector(testPipeline).makeMonitoringInfo();
+    System.out.println(monitoringInfo.size());
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipelineMonitoring.java
similarity index 82%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipelineMonitoring.java
index c6e991f..eace98d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipelineMonitoring.java
@@ -15,10 +15,11 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.rest.api;
 
-package org.apache.streampipes.manager.monitoring.pipeline;
-
-public class PipelineExecutionStatusCollector {
-
+import javax.ws.rs.core.Response;
 
+public interface IPipelineMonitoring {
+  
+  Response getPipelineMonitoringInfo(String pipelineId);
 }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineMonitoring.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineMonitoring.java
new file mode 100644
index 0000000..83f8db6
--- /dev/null
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineMonitoring.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.rest.impl;
+
+import org.apache.streampipes.manager.monitoring.pipeline.PipelineExecutionStatusCollector;
+import org.apache.streampipes.rest.api.IPipelineMonitoring;
+import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+@Path("/v2/users/{username}/pipeline-monitoring")
+public class PipelineMonitoring extends AbstractRestInterface implements IPipelineMonitoring {
+
+  @JacksonSerialized
+  @Path("{pipelineId}")
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Override
+  public Response getPipelineMonitoringInfo(@PathParam("pipelineId") String pipelineId) {
+    return ok(new PipelineExecutionStatusCollector(pipelineId).makePipelineMonitoringInfo());
+  }
+}
diff --git a/ui/src/app/app-transport-monitoring/app-transport-monitoring.module.ts b/ui/src/app/app-transport-monitoring/app-transport-monitoring.module.ts
index 143336a..05e43fa 100644
--- a/ui/src/app/app-transport-monitoring/app-transport-monitoring.module.ts
+++ b/ui/src/app/app-transport-monitoring/app-transport-monitoring.module.ts
@@ -34,16 +34,17 @@ import {DashboardItemComponent} from "./components/dashboard-item/dashboard-item
 import {DashboardImageComponent} from "./components/dashboard-image/dashboard-image.component";
 import {TransportSelectionComponent} from "./components/transport-selection/transport-selection.component";
 import {AppTransportMonitoringRestService} from "./services/app-transport-monitoring-rest.service";
-import {DashboardStatusFilledComponent} from "./components/dashboard-status-filled/dashboard-status-filled.component";
 import {TransportSummaryComponent} from "./components/transport-summary/transport-summary.component";
 import {SlideshowModule} from "ng-simple-slideshow";
 import {TransportActivityGraphComponent} from "./components/transport-activity-graph/transport-activity-graph.component";
 import {TimestampConverterService} from "./services/timestamp-converter.service";
 import {NgxChartsModule} from '@swimlane/ngx-charts';
+import {CoreUiModule} from "../core-ui/core-ui.module";
 
 @NgModule({
     imports: [
         CommonModule,
+        CoreUiModule,
         FlexLayoutModule,
         CustomMaterialModule,
         MatGridListModule,
@@ -62,7 +63,6 @@ import {NgxChartsModule} from '@swimlane/ngx-charts';
         DashboardItemComponent,
         DashboardStatusComponent,
         TransportSelectionComponent,
-        DashboardStatusFilledComponent,
         TransportSummaryComponent,
         TransportActivityGraphComponent
     ],
diff --git a/ui/src/app/app-transport-monitoring/components/outgoing/outgoing-view.component.html b/ui/src/app/app-transport-monitoring/components/outgoing/outgoing-view.component.html
index f8a681a..78276d4 100644
--- a/ui/src/app/app-transport-monitoring/components/outgoing/outgoing-view.component.html
+++ b/ui/src/app/app-transport-monitoring/components/outgoing/outgoing-view.component.html
@@ -24,16 +24,16 @@
     </div>
     <div fxFlex="33" fxLayout="column">
         <div fxFill fxFlex="100" fxLayout="row" fxLayoutAlign="start start">
-            <dashboard-status-filled fxFlex="100" [label]="'Total Boxes Detected'"
-                                     [statusValue]="totalBoxes"></dashboard-status-filled>
+            <sp-status-widget fxFlex="100" [label]="'Total Boxes Detected'"
+                                     [statusValue]="totalBoxes"></sp-status-widget>
         </div>
         <div fxFlex="100" fxLayout="row" fxLayoutAlign="start start">
-            <dashboard-status-filled fxFlex="100" [label]="'Transparent Boxes Detected'"
-                              [statusValue]="cardboardBoxes"></dashboard-status-filled>
+            <sp-status-widget fxFlex="100" [label]="'Transparent Boxes Detected'"
+                              [statusValue]="cardboardBoxes"></sp-status-widget>
         </div>
         <div fxFlex="100" fxLayout="row" fxLayoutAlign="start start">
-            <dashboard-status-filled fxFlex="100" [label]="'Cardboard Boxes Detected'"
-                              [statusValue]="transparentBoxes"></dashboard-status-filled>
+            <sp-status-widget fxFlex="100" [label]="'Cardboard Boxes Detected'"
+                              [statusValue]="transparentBoxes"></sp-status-widget>
         </div>
     </div>
 </div>
diff --git a/ui/src/app/app-transport-monitoring/components/transport-summary/transport-summary.component.html b/ui/src/app/app-transport-monitoring/components/transport-summary/transport-summary.component.html
index 31c5f72..1bb0ccc 100644
--- a/ui/src/app/app-transport-monitoring/components/transport-summary/transport-summary.component.html
+++ b/ui/src/app/app-transport-monitoring/components/transport-summary/transport-summary.component.html
@@ -17,13 +17,13 @@
   -->
 
 <div flex="100" fxLayout="row">
-    <dashboard-status-filled fxFlex="25" [label]="'Shipped'" [statusValue]="shippedTime"
-                             [color]="'rgb(156, 156, 156)'"></dashboard-status-filled>
-    <dashboard-status-filled fxFlex="25" [label]="'Delivered'" [statusValue]="deliveredTime"
-                             [color]="'rgb(156, 156, 156)'"></dashboard-status-filled>
-    <dashboard-status-filled fxFlex="25" [label]="'Took'" [statusValue]="tookTime"
-                             [color]="'rgb(156, 156, 156)'"></dashboard-status-filled>
-    <dashboard-status-filled fxFlex="25" [label]="'Status'" [statusValue]="errorCode" [color]="statusColor"></dashboard-status-filled>
+    <sp-status-widget fxFlex="25" [label]="'Shipped'" [statusValue]="shippedTime"
+                             [color]="'rgb(156, 156, 156)'"></sp-status-widget>
+    <sp-status-widget fxFlex="25" [label]="'Delivered'" [statusValue]="deliveredTime"
+                             [color]="'rgb(156, 156, 156)'"></sp-status-widget>
+    <sp-status-widget fxFlex="25" [label]="'Took'" [statusValue]="tookTime"
+                             [color]="'rgb(156, 156, 156)'"></sp-status-widget>
+    <sp-status-widget fxFlex="25" [label]="'Status'" [statusValue]="errorCode" [color]="statusColor"></sp-status-widget>
 </div>
 <div *ngFor="let statusMessage of statusMessages">
     <h4>{{statusMessage}}</h4>
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index 67be548..9f4abe5 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,7 +19,7 @@
 /* tslint:disable */
 /* eslint-disable */
 // @ts-nocheck
-// Generated using typescript-generator version 2.24.612 on 2020-09-15 08:51:13.
+// Generated using typescript-generator version 2.24.612 on 2020-09-20 21:03:26.
 
 export class AbstractStreamPipesEntity {
     "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
@@ -151,8 +151,8 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
         instance.includedLocales = __getCopyArrayFn(__identity<string>())(data.includedLocales);
         instance.applicationLinks = __getCopyArrayFn(ApplicationLink.fromData)(data.applicationLinks);
         instance.connectedTo = __getCopyArrayFn(__identity<string>())(data.connectedTo);
-        instance.uri = data.uri;
         instance.dom = data.dom;
+        instance.uri = data.uri;
         return instance;
     }
 }
@@ -1563,9 +1563,9 @@ export class GenericAdapterSetDescription extends AdapterSetDescription implemen
         }
         const instance = target || new GenericAdapterSetDescription();
         super.fromData(data, instance);
-        instance.eventSchema = EventSchema.fromData(data.eventSchema);
         instance.formatDescription = FormatDescription.fromData(data.formatDescription);
         instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
+        instance.eventSchema = EventSchema.fromData(data.eventSchema);
         return instance;
     }
 }
@@ -1582,9 +1582,9 @@ export class GenericAdapterStreamDescription extends AdapterStreamDescription im
         }
         const instance = target || new GenericAdapterStreamDescription();
         super.fromData(data, instance);
-        instance.eventSchema = EventSchema.fromData(data.eventSchema);
         instance.formatDescription = FormatDescription.fromData(data.formatDescription);
         instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
+        instance.eventSchema = EventSchema.fromData(data.eventSchema);
         return instance;
     }
 }
@@ -1997,6 +1997,29 @@ export class PipelineCategory {
     }
 }
 
+export class PipelineElementMonitoringInfo {
+    inputTopicInfo: PipelineElementTopicInfo[];
+    inputTopicInfoExists: boolean;
+    outputTopicInfo: PipelineElementTopicInfo;
+    outputTopicInfoExists: boolean;
+    pipelineElementId: string;
+    pipelineElementName: string;
+
+    static fromData(data: PipelineElementMonitoringInfo, target?: PipelineElementMonitoringInfo): PipelineElementMonitoringInfo {
+        if (!data) {
+            return data;
+        }
+        const instance = target || new PipelineElementMonitoringInfo();
+        instance.pipelineElementId = data.pipelineElementId;
+        instance.pipelineElementName = data.pipelineElementName;
+        instance.inputTopicInfoExists = data.inputTopicInfoExists;
+        instance.outputTopicInfoExists = data.outputTopicInfoExists;
+        instance.inputTopicInfo = __getCopyArrayFn(PipelineElementTopicInfo.fromData)(data.inputTopicInfo);
+        instance.outputTopicInfo = PipelineElementTopicInfo.fromData(data.outputTopicInfo);
+        return instance;
+    }
+}
+
 export class PipelineElementRecommendation {
     count: number;
     description: string;
@@ -2054,6 +2077,25 @@ export class PipelineElementStatus {
     }
 }
 
+export class PipelineElementTopicInfo {
+    currentOffset: number;
+    latestOffset: number;
+    offsetAtPipelineStart: number;
+    topicName: string;
+
+    static fromData(data: PipelineElementTopicInfo, target?: PipelineElementTopicInfo): PipelineElementTopicInfo {
+        if (!data) {
+            return data;
+        }
+        const instance = target || new PipelineElementTopicInfo();
+        instance.topicName = data.topicName;
+        instance.currentOffset = data.currentOffset;
+        instance.latestOffset = data.latestOffset;
+        instance.offsetAtPipelineStart = data.offsetAtPipelineStart;
+        return instance;
+    }
+}
+
 export class PipelineModification {
     domId: string;
     elementId: string;
@@ -2089,6 +2131,25 @@ export class PipelineModificationMessage extends Message {
     }
 }
 
+export class PipelineMonitoringInfo {
+    createdAt: number;
+    pipelineElementMonitoringInfo: PipelineElementMonitoringInfo[];
+    pipelineId: string;
+    startedAt: number;
+
+    static fromData(data: PipelineMonitoringInfo, target?: PipelineMonitoringInfo): PipelineMonitoringInfo {
+        if (!data) {
+            return data;
+        }
+        const instance = target || new PipelineMonitoringInfo();
+        instance.pipelineId = data.pipelineId;
+        instance.createdAt = data.createdAt;
+        instance.startedAt = data.startedAt;
+        instance.pipelineElementMonitoringInfo = __getCopyArrayFn(PipelineElementMonitoringInfo.fromData)(data.pipelineElementMonitoringInfo);
+        return instance;
+    }
+}
+
 export class PipelineOperationStatus {
     elementStatus: PipelineElementStatus[];
     pipelineId: string;
@@ -2463,8 +2524,8 @@ export class SpDataSet extends SpDataStream {
         instance.supportedGrounding = EventGrounding.fromData(data.supportedGrounding);
         instance.datasetInvocationId = data.datasetInvocationId;
         instance.correspondingPipeline = data.correspondingPipeline;
-        instance.actualTopicName = data.actualTopicName;
         instance.brokerHostname = data.brokerHostname;
+        instance.actualTopicName = data.actualTopicName;
         return instance;
     }
 }
diff --git a/ui/src/app/core-ui/core-ui.module.ts b/ui/src/app/core-ui/core-ui.module.ts
index ae22d1e..83232bf 100644
--- a/ui/src/app/core-ui/core-ui.module.ts
+++ b/ui/src/app/core-ui/core-ui.module.ts
@@ -73,73 +73,78 @@ import {ColorPickerModule} from "ngx-color-picker";
 import {QuillModule} from "ngx-quill";
 import {CodemirrorModule} from "@ctrl/ngx-codemirror";
 import {MatAutocompleteModule} from "@angular/material/autocomplete";
+import {StatusWidgetComponent} from "./widget/status/status-widget.component";
+import {NgxChartsModule} from "@swimlane/ngx-charts";
+import {BarchartWidgetComponent} from "./widget/barchart/barchart-widget.component";
 
 @NgModule({
-    imports: [
-        CommonModule,
-        ColorPickerModule,
-        FlexLayoutModule,
-        CodemirrorModule,
-        CustomMaterialModule,
-        ReactiveFormsModule,
-        FormsModule,
-        CdkTableModule,
-        MatAutocompleteModule,
-        MatSnackBarModule,
-        MatProgressSpinnerModule,
-        MatDatepickerModule,
-        MatNativeDateModule,
-        PlotlyViaWindowModule,
-        MatSliderModule,
-        MatChipsModule,
-        PortalModule,
-        OverlayModule,
-        QuillModule.forRoot()
-    ],
-    declarations: [
-        ConfirmDialogComponent,
-        DisplayRecommendedPipe,
-        ImageComponent,
-        ImageContainerComponent,
-        ImageLabelingComponent,
-        ImageLabelsComponent,
-        ImageBarComponent,
-        ImageAnnotationsComponent,
-        ImageCategorizeComponent,
-        ImageViewerComponent,
-        StandardDialogComponent,
-        PanelDialogComponent,
-        StaticAnyInput,
-        StaticPropertyComponent,
-        StaticFreeInputComponent,
-        StaticSecretInputComponent,
-        StaticFileInputComponent,
-        StaticMappingNaryComponent,
-        StaticMappingUnaryComponent,
-        StaticGroupComponent,
-        StaticAlternativesComponent,
-        StaticCollectionComponent,
-        StaticColorPickerComponent,
-        StaticCodeInputComponent,
-        StaticOneOfInputComponent,
-        StaticRuntimeResolvableAnyInputComponent,
-        StaticRuntimeResolvableOneOfInputComponent,
-    ],
-    providers: [
-        MatDatepickerModule,
-        ColorService,
-        DisplayRecommendedPipe,
-        ReactLabelingService,
-        PolygonLabelingService,
-        BrushLabelingService,
-        CocoFormatService,
-        LabelingModeService,
-        DialogService,
-        RuntimeResolvableService,
-        StaticFileRestService,
-    ],
-    entryComponents: [
-    ],
+  imports: [
+    CommonModule,
+    ColorPickerModule,
+    FlexLayoutModule,
+    CodemirrorModule,
+    CustomMaterialModule,
+    ReactiveFormsModule,
+    FormsModule,
+    CdkTableModule,
+    MatAutocompleteModule,
+    MatSnackBarModule,
+    MatProgressSpinnerModule,
+    MatDatepickerModule,
+    MatNativeDateModule,
+    NgxChartsModule,
+    PlotlyViaWindowModule,
+    MatSliderModule,
+    MatChipsModule,
+    PortalModule,
+    OverlayModule,
+    QuillModule.forRoot()
+  ],
+  declarations: [
+    BarchartWidgetComponent,
+    ConfirmDialogComponent,
+    DisplayRecommendedPipe,
+    ImageComponent,
+    ImageContainerComponent,
+    ImageLabelingComponent,
+    ImageLabelsComponent,
+    ImageBarComponent,
+    ImageAnnotationsComponent,
+    ImageCategorizeComponent,
+    ImageViewerComponent,
+    StandardDialogComponent,
+    PanelDialogComponent,
+    StaticAnyInput,
+    StaticPropertyComponent,
+    StaticFreeInputComponent,
+    StaticSecretInputComponent,
+    StaticFileInputComponent,
+    StaticMappingNaryComponent,
+    StaticMappingUnaryComponent,
+    StaticGroupComponent,
+    StaticAlternativesComponent,
+    StaticCollectionComponent,
+    StaticColorPickerComponent,
+    StaticCodeInputComponent,
+    StaticOneOfInputComponent,
+    StaticRuntimeResolvableAnyInputComponent,
+    StaticRuntimeResolvableOneOfInputComponent,
+    StatusWidgetComponent,
+  ],
+  providers: [
+    MatDatepickerModule,
+    ColorService,
+    DisplayRecommendedPipe,
+    ReactLabelingService,
+    PolygonLabelingService,
+    BrushLabelingService,
+    CocoFormatService,
+    LabelingModeService,
+    DialogService,
+    RuntimeResolvableService,
+    StaticFileRestService,
+  ],
+  entryComponents: [],
   exports: [
     ImageComponent,
     ImageLabelingComponent,
@@ -161,7 +166,9 @@ import {MatAutocompleteModule} from "@angular/material/autocomplete";
     StaticOneOfInputComponent,
     StaticRuntimeResolvableAnyInputComponent,
     StaticRuntimeResolvableOneOfInputComponent,
-    ImageViewerComponent
+    ImageViewerComponent,
+    StatusWidgetComponent,
+    BarchartWidgetComponent,
   ]
 })
 export class CoreUiModule {
diff --git a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.html
similarity index 63%
copy from ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html
copy to ui/src/app/core-ui/widget/barchart/barchart-widget.component.html
index d4b52c3..3cb206b 100644
--- a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html
+++ b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.html
@@ -16,11 +16,17 @@
   ~
   -->
 
-<div fxFlex="100" fxLayoutAlign="center center" fxLayout="column">
-    <ngx-charts-number-card
-            [results]="chartData"
-            [scheme]="'cool'"
-            [cardColor]="color"
-    [bandColor]="'rgb(27, 20, 100)'" class="fix-margin">
-    </ngx-charts-number-card>
+<div fxFlex="100" fxLayoutAlign="center center" fxLayout="column" [ngStyle]="{'background-color': backgroundColor, 'width':'265px', 'max-width': '265px', 'margin-left': '8px'}">
+    <ngx-charts-bar-vertical
+            [view]="[265,150]"
+            [results]="data"
+            [scheme]="colorScheme"
+            [gradient]="false"
+            [xAxis]="false"
+            [yAxis]="true"
+            [legend]="false"
+            [showXAxisLabel]="false"
+            [showYAxisLabel]="false"
+            [ngStyle]="{'fill': backgroundColor}">
+    </ngx-charts-bar-vertical>
 </div>
\ No newline at end of file
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.scss
similarity index 87%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to ui/src/app/core-ui/widget/barchart/barchart-widget.component.scss
index c6e991f..41ecef0 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.scss
@@ -14,11 +14,4 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  *
- */
-
-package org.apache.streampipes.manager.monitoring.pipeline;
-
-public class PipelineExecutionStatusCollector {
-
-
-}
+ */
\ No newline at end of file
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.ts
similarity index 58%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to ui/src/app/core-ui/widget/barchart/barchart-widget.component.ts
index c6e991f..01c0860 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.ts
@@ -16,9 +16,37 @@
  *
  */
 
-package org.apache.streampipes.manager.monitoring.pipeline;
+import {Component, Input, OnInit} from '@angular/core';
 
-public class PipelineExecutionStatusCollector {
+@Component({
+  selector: 'sp-barchart-widget',
+  templateUrl: './barchart-widget.component.html',
+  styleUrls: ['./barchart-widget.component.scss']
+})
+export class BarchartWidgetComponent implements OnInit {
 
+  _data = [];
 
-}
+  @Input()
+  backgroundColor = "#cccccc";
+
+  @Input()
+  textColor = "#1b1464";
+
+  colorScheme = {
+    domain: ['#1b1464']
+  };
+
+  ngOnInit(): void {
+    this.colorScheme.domain = [this.textColor];
+  }
+
+  @Input()
+  set data(data) {
+    this._data = data;
+  }
+
+  get data() {
+    return this._data;
+  }
+}
\ No newline at end of file
diff --git a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.css b/ui/src/app/core-ui/widget/status/status-widget.component.css
similarity index 100%
copy from ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.css
copy to ui/src/app/core-ui/widget/status/status-widget.component.css
diff --git a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html b/ui/src/app/core-ui/widget/status/status-widget.component.html
similarity index 84%
rename from ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html
rename to ui/src/app/core-ui/widget/status/status-widget.component.html
index d4b52c3..1fd6197 100644
--- a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html
+++ b/ui/src/app/core-ui/widget/status/status-widget.component.html
@@ -17,10 +17,13 @@
   -->
 
 <div fxFlex="100" fxLayoutAlign="center center" fxLayout="column">
-    <ngx-charts-number-card
+    <ngx-charts-number-card style="width:300px;"
+            [view]="[300,150]"
             [results]="chartData"
             [scheme]="'cool'"
             [cardColor]="color"
-    [bandColor]="'rgb(27, 20, 100)'" class="fix-margin">
+            [animations]="false"
+            [bandColor]="bandColor"
+            [textColor]="textColor">
     </ngx-charts-number-card>
 </div>
\ No newline at end of file
diff --git a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.ts b/ui/src/app/core-ui/widget/status/status-widget.component.ts
similarity index 80%
rename from ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.ts
rename to ui/src/app/core-ui/widget/status/status-widget.component.ts
index aa89cd8..b639d4c 100644
--- a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.ts
+++ b/ui/src/app/core-ui/widget/status/status-widget.component.ts
@@ -16,16 +16,19 @@
  *
  */
 
-import {Component, Input} from '@angular/core';
+import {Component, Input, OnInit} from '@angular/core';
 
 @Component({
-    selector: 'dashboard-status-filled',
-    templateUrl: './dashboard-status-filled.component.html',
-    styleUrls: ['./dashboard-status-filled.component.css']
+    selector: 'sp-status-widget',
+    templateUrl: './status-widget.component.html',
+    styleUrls: ['./status-widget.component.css']
 })
-export class DashboardStatusFilledComponent {
+export class StatusWidgetComponent implements OnInit {
 
     @Input() color: string = "rgb(156, 156, 156)";
+    @Input() bandColor: string = "rgb(27, 20, 100)";
+    @Input() textColor: string = "rgb(96,96,96)";
+
     _label: string;
     _statusValue: string;
 
diff --git a/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.html b/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.html
index 5afd0e5..49f9875 100644
--- a/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.html
+++ b/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.html
@@ -18,12 +18,12 @@
 
 <div fxFlex="100" fxLayout="column">
     <div fxLayout="row" fxFlex="100">
-        <div fxFlex="30" fxLayout="row" style="margin-bottom:10px;">
+        <div fxFlex="{{showDescription ? 30 : 100}}}" fxLayout="row" style="margin-bottom:10px;">
             <span class="draggable-icon {{elementType}}">
                 <pipeline-element [preview]="true" [pipelineElement]="_element"></pipeline-element>
             </span>
         </div>
-        <div fxFlex="70" fxLayout="column" fxLayoutAlign="center left">
+        <div fxFlex="70" fxLayout="column" fxLayoutAlign="center left" *ngIf="showDescription">
             <b>{{element.name}}</b>
             {{element.description}}
         </div>
diff --git a/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.ts b/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.ts
index d353def..57cb3d6 100644
--- a/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.ts
+++ b/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.ts
@@ -32,6 +32,9 @@ export class PipelineElementsRowComponent implements OnInit {
     @Input()
     pipeline: Pipeline;
 
+    @Input()
+    showDescription: boolean = true;
+
     _element: PipelineElementUnion;
 
     constructor() {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/ui/src/app/pipeline-details/components/model/pipeline-details.model.ts
similarity index 87%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to ui/src/app/pipeline-details/components/model/pipeline-details.model.ts
index c6e991f..ea6782d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/ui/src/app/pipeline-details/components/model/pipeline-details.model.ts
@@ -16,9 +16,7 @@
  *
  */
 
-package org.apache.streampipes.manager.monitoring.pipeline;
-
-public class PipelineExecutionStatusCollector {
-
-
-}
+export interface HistoricalMonitoringData {
+  name: string;
+  value: number;
+}
\ No newline at end of file
diff --git a/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.html b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.html
new file mode 100644
index 0000000..f256d5d
--- /dev/null
+++ b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.html
@@ -0,0 +1,41 @@
+<div fxFlex="100" fxLayout="column">
+    <div fxLayout="column" class="fixed-height add-options">
+        <div class="add-options-item" fxLayoutAlign="start center" fxLayout="row" style="padding-right:10px;">
+            <div fxFlex="100" fxLayout="row" fxLayoutAlign="end center">
+                <mat-slide-toggle [(ngModel)]="autoRefresh" color="primary">Auto refresh</mat-slide-toggle>
+            </div>
+        </div>
+    </div>
+    <div fxFlex="100" fxLayout="column" class="page-container-padding-inner">
+        <pipeline-preview [jspcanvas]="'assembly-quickedit'" [pipeline]="pipeline"
+                          (selectedElementEmitter)="selectElement($event)"
+                          style="margin-bottom:15px;"></pipeline-preview>
+
+        <div fxFlex="100" *ngIf="pipelineMonitoringInfoAvailable">
+            <div *ngFor="let pipelineElement of allElements" fxLayout="column" class="mb-10">
+                <div class="assembly-options-preview sp-blue-bg">
+                    <div fxLayout="row" fxLayoutAlign="start center">
+                        <h4>{{pipelineElement.name}}</h4>
+                    </div>
+                </div>
+                <div class="sp-blue-border pipeline-element-statistics-panel">
+                    <div fxFlex="100" fxLayout="row">
+                        <div fxFlex="20" fxLayoutAlign="start start">
+                            <pipeline-elements-row style="width: 100%;"
+                                                   [showDescription]="false"
+                                                   [pipeline]="pipeline"
+                                                   [element]="pipelineElement"></pipeline-elements-row>
+                        </div>
+                        <div fxFlex="80" fxLayoutAlign="start center">
+                            <pipeline-element-statistics
+                                    [pipeline]="pipeline"
+                                    [pipelineElement]="pipelineElement"
+                                    [pipelineElementMonitoringInfo]="pipelineElementMonitoringInfo.get(pipelineElement.elementId)">
+                            </pipeline-element-statistics>
+                        </div>
+                    </div>
+                </div>
+            </div>
+        </div>
+    </div>
+</div>
\ No newline at end of file
diff --git a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.css b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.scss
similarity index 72%
rename from ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.css
rename to ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.scss
index 14e1198..a3d7690 100644
--- a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.css
+++ b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.scss
@@ -16,20 +16,26 @@
  *
  */
 
-.statusValue {
-    font-size: 40pt;
-    background: darkblue;
-    color: white;
-    margin:5px;
-    min-height:140px;
-    padding: 15px;
+.add-options {
+  background-color:#f6f6f6;
+  border-bottom: 1px solid #cccccc;
+  padding-top:10px;
+  padding-bottom:10px;
 }
 
-.statusValue>h4 {
-    font-size:20pt;
-    margin-bottom: 30px;
+.fixed-height {
+  width:100%;
+  height: 50px;
 }
 
-.fix-margin {
-    margin-top: -5px;
+.page-container-padding-inner {
+  margin: 10px;
+}
+
+.pipeline-element-statistics-panel {
+  padding: 10px;
+}
+
+.mb-10 {
+  margin-bottom: 10px;
 }
\ No newline at end of file
diff --git a/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.ts b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.ts
new file mode 100644
index 0000000..5fb4603
--- /dev/null
+++ b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.ts
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import {Component, Input, OnDestroy, OnInit} from "@angular/core";
+import {
+  DataProcessorInvocation, DataSinkInvocation,
+  Pipeline, PipelineElementMonitoringInfo,
+  PipelineMonitoringInfo,
+  SpDataSet, SpDataStream
+} from "../../../core-model/gen/streampipes-model";
+import {PipelineMonitoringService} from "../../../platform-services/apis/pipeline-monitoring.service";
+
+@Component({
+  selector: 'pipeline-monitoring',
+  templateUrl: './pipeline-monitoring.component.html',
+  styleUrls: ['./pipeline-monitoring.component.scss']
+})
+export class PipelineMonitoringComponent implements OnInit, OnDestroy {
+
+  @Input()
+  pipeline: Pipeline;
+
+  pipelineMonitoringInfo: PipelineMonitoringInfo;
+  pipelineMonitoringInfoAvailable: boolean = false;
+
+  allElements: (SpDataSet | SpDataStream | DataProcessorInvocation | DataSinkInvocation)[] = [];
+
+  autoRefresh: boolean = true;
+
+  pipelineElementMonitoringInfo: Map<string, PipelineElementMonitoringInfo>;
+
+  constructor(private pipelineMonitoringService: PipelineMonitoringService) {
+  }
+
+  ngOnInit(): void {
+    this.collectAllElements();
+    console.log(this.allElements);
+    this.refreshMonitoringInfo();
+  }
+
+  collectAllElements() {
+    this.allElements = this.allElements
+        .concat(this.pipeline.streams)
+        .concat(this.pipeline.sepas)
+        .concat(this.pipeline.actions);
+  }
+
+  refreshMonitoringInfo() {
+    this.pipelineMonitoringService
+        .getPipelineMonitoringInfo(this.pipeline._id)
+        .subscribe(monitoringInfo => {
+          this.pipelineElementMonitoringInfo = new Map<string, PipelineElementMonitoringInfo>();
+          this.pipelineMonitoringInfo = monitoringInfo;
+          monitoringInfo.pipelineElementMonitoringInfo.forEach(info => {
+            this.pipelineElementMonitoringInfo.set(info.pipelineElementId, info);
+          })
+          this.pipelineMonitoringInfoAvailable = true;
+          if (this.autoRefresh) {
+            setTimeout(() => {
+              this.refreshMonitoringInfo();
+            }, 5000);
+          }
+        })
+  }
+
+  selectElement(pipelineElement) {
+    console.log(pipelineElement);
+  }
+
+  ngOnDestroy(): void {
+    this.autoRefresh = false;
+  }
+
+}
\ No newline at end of file
diff --git a/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.html b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.html
new file mode 100644
index 0000000..400ca58
--- /dev/null
+++ b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.html
@@ -0,0 +1,45 @@
+<div fxFlex="100" fxLayout="row" fxLayoutAlign="start start">
+        <div fxFlex="33">
+            <div fxLayout="column" class="mb-10">
+                <sp-status-widget fxFlex="100" [label]="'Consumed Messages / Lag'"
+                                  [color]="(consumedMessagesFirstInputStream == notAvailable) ? deactivatedCardColor : cardColor"
+                                  [textColor]="(consumedMessagesFirstInputStream == notAvailable) ? deactivatedTextColor : textColor"
+                                  [bandColor]="consumedMessagesFirstStreamBandColor"
+                                  [statusValue]="consumedMessagesFirstInputStream">
+                </sp-status-widget>
+                <sp-barchart-widget *ngIf="(consumedMessagesFirstInputStream != notAvailable)"
+                                    [data]="historicFirstConsumedInputValues"
+                                    [textColor]="chartTextColor"
+                                    [backgroundColor]="chartBackgroundColor" class="mt--10">
+                </sp-barchart-widget>
+            </div>
+        </div>
+        <div fxFlex="33">
+            <div fxLayout="column" class="mb-10">
+                <sp-status-widget fxFlex="100" [label]="'Consumed Messages / Lag'"
+                                  [color]="consumedMessagesSecondInputStream === notAvailable ? deactivatedCardColor : cardColor"
+                                  [textColor]="consumedMessagesSecondInputStream === notAvailable ? deactivatedTextColor : textColor"
+                                  [bandColor]="consumedMessagesSecondStreamBandColor"
+                                  [statusValue]="consumedMessagesSecondInputStream"></sp-status-widget>
+                <sp-barchart-widget *ngIf="consumedMessagesSecondInputStream !== notAvailable"
+                                    [data]="historicSecondConsumedInputValues"
+                                    [textColor]="chartTextColor"
+                                    [backgroundColor]="chartBackgroundColor" class="mt--10">
+                </sp-barchart-widget>
+            </div>
+        </div>
+        <div fxFlex="33">
+            <div fxLayout="column" class="mb-10">
+                <sp-status-widget fxFlex="100" [label]="'Produced Messages'"
+                                  [color]="producedMessages === notAvailable ? deactivatedCardColor : cardColor"
+                                  [textColor]="producedMessages === notAvailable ? deactivatedTextColor : textColor"
+                                  [bandColor]="producedMessages === notAvailable ? deactivatedBandColor : bandColor"
+                                  [statusValue]="producedMessages"></sp-status-widget>
+                <sp-barchart-widget *ngIf="producedMessages !== notAvailable"
+                                    [data]="historicProducedOutputValues"
+                                    [textColor]="chartTextColor"
+                                    [backgroundColor]="chartBackgroundColor" class="mt--10">
+                </sp-barchart-widget>
+            </div>
+        </div>
+</div>
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.scss
similarity index 88%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.scss
index c6e991f..27392f1 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.scss
@@ -16,9 +16,10 @@
  *
  */
 
-package org.apache.streampipes.manager.monitoring.pipeline;
-
-public class PipelineExecutionStatusCollector {
-
-
+.mb-10 {
+  margin-bottom: 10px;
 }
+
+.mt--10 {
+  margin-top: -10px;
+}
\ No newline at end of file
diff --git a/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.ts b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.ts
new file mode 100644
index 0000000..5b2322d
--- /dev/null
+++ b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.ts
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import {Component, EventEmitter, Input, OnInit, Output} from "@angular/core";
+
+import {
+  DataProcessorInvocation, DataSinkInvocation,
+  Pipeline,
+  PipelineElementMonitoringInfo, SpDataSet, SpDataStream,
+} from "../../../../core-model/gen/streampipes-model";
+import {HistoricalMonitoringData} from "../../model/pipeline-details.model";
+
+@Component({
+  selector: 'pipeline-element-statistics',
+  templateUrl: './pipeline-element-statistics.component.html',
+  styleUrls: ['./pipeline-element-statistics.component.scss']
+})
+export class PipelineElementStatisticsComponent implements OnInit {
+
+  @Input()
+  pipeline: Pipeline;
+
+  @Input()
+  pipelineElement:  (SpDataSet | SpDataStream | DataProcessorInvocation | DataSinkInvocation);
+
+  _pipelineElementMonitoringInfo: PipelineElementMonitoringInfo;
+
+  currentPipelineElement: SpDataSet | SpDataStream | DataProcessorInvocation | DataSinkInvocation;
+  consumedMessagesFirstInputStream: string = "";
+  consumedMessagesSecondInputStream: string = "";
+
+  producedMessages: string | number = "";
+
+  cardColor: string = "rgb(27, 20, 100)";
+  deactivatedCardColor: string = "rgb(241,241,241)";
+
+  textColor: string = "rgb(208,208,208)";
+  deactivatedTextColor: string = "rgb(205,205,205)";
+
+  bandColor: string = "rgb(27, 20, 100)";
+  deactivatedBandColor: string = "rgb(241,241,241)";
+  okBandColor: string = "rgb(11,186,0)";
+  warningBandColor: string = "rgb(253,144,0)";
+
+  chartBackgroundColor: string = "rgb(27, 20, 100)";
+  chartTextColor: string = "rgb(208,208,208)";
+
+  consumedMessagesFirstStreamBandColor: string;
+  consumedMessagesSecondStreamBandColor: string;
+
+  notAvailable: string = "n/a";
+
+  historicFirstConsumedInputValues: HistoricalMonitoringData[] = [];
+  historicSecondConsumedInputValues: HistoricalMonitoringData[] = [];
+  historicProducedOutputValues: HistoricalMonitoringData[] = [];
+
+  consumedMessagesFirstStreamLastValue: number = -1;
+  consumedMessagesSecondStreamLastValue: number = -1;
+  producedMessageOutputLastValue: number = -1;
+
+  ngOnInit(): void {
+
+  }
+
+  updateMonitoringInfo() {
+    if (this.pipelineElementMonitoringInfo.inputTopicInfoExists) {
+      let consumedMessages = this.pipelineElementMonitoringInfo.inputTopicInfo.map(info => {
+        return {"count": (info.currentOffset - info.offsetAtPipelineStart),
+          "lag": info.latestOffset - info.currentOffset,
+          "currentOffset": info.currentOffset};
+      });
+      this.consumedMessagesFirstInputStream = consumedMessages[0].count + " / " + consumedMessages[0].lag;
+      this.consumedMessagesSecondInputStream = consumedMessages.length > 1 ? consumedMessages[1].count + " / " + consumedMessages[1].lag : this.notAvailable;
+      this.consumedMessagesFirstStreamBandColor = consumedMessages[0].lag > 10 ? this.warningBandColor : this.okBandColor;
+      this.consumedMessagesSecondStreamBandColor = (consumedMessages.length > 1 ? (consumedMessages[1].lag > 10 ? this.warningBandColor : this.okBandColor) : this.deactivatedBandColor);
+
+      this.makeHistoricData(consumedMessages[0], this.consumedMessagesFirstStreamLastValue, this.historicFirstConsumedInputValues);
+      this.consumedMessagesFirstStreamLastValue = consumedMessages[0].count;
+      this.historicFirstConsumedInputValues = [].concat(this.historicFirstConsumedInputValues);
+
+      if (consumedMessages.length > 1) {
+        this.makeHistoricData(consumedMessages[1], this.consumedMessagesSecondStreamLastValue, this.historicSecondConsumedInputValues);
+        this.consumedMessagesSecondStreamLastValue = consumedMessages[1].count;
+        this.historicSecondConsumedInputValues = [].concat(this.historicSecondConsumedInputValues);
+      }
+    } else {
+      this.consumedMessagesFirstInputStream = this.notAvailable;
+      this.consumedMessagesFirstStreamBandColor = this.deactivatedBandColor;
+      this.consumedMessagesSecondInputStream = this.notAvailable;
+      this.consumedMessagesSecondStreamBandColor = this.deactivatedBandColor;
+    }
+    if (this.pipelineElementMonitoringInfo.outputTopicInfoExists) {
+      this.producedMessages = this.pipelineElementMonitoringInfo.outputTopicInfo.latestOffset - this.pipelineElementMonitoringInfo.outputTopicInfo.offsetAtPipelineStart;
+      let producedMessage = {"count": this.producedMessages};
+      this.makeHistoricData(producedMessage, this.producedMessageOutputLastValue, this.historicProducedOutputValues);
+      this.producedMessageOutputLastValue = producedMessage.count;
+      this.historicProducedOutputValues = [].concat(this.historicProducedOutputValues);
+    } else {
+      this.producedMessages = this.notAvailable;
+    }
+  }
+
+  makeHistoricData(consumedMessage: any, lastValue: number, historicData: HistoricalMonitoringData[]) {
+    console.log(consumedMessage);
+    if (lastValue > -1) {
+      let entry: HistoricalMonitoringData = {"name": new Date().toLocaleTimeString(), value: (consumedMessage.count - lastValue)};
+      historicData.push(entry);
+    }
+    if (historicData.length > 10) {
+      historicData.shift();
+    } else {
+      for (let i = 0; i < (10 - historicData.length); i++) {
+        historicData.unshift({"name": i.toString(), "value": 0});
+      }
+    }
+  }
+
+  get pipelineElementMonitoringInfo() {
+    return this._pipelineElementMonitoringInfo;
+  }
+
+  @Input()
+  set pipelineElementMonitoringInfo(pipelineElementMonitoringInfo: PipelineElementMonitoringInfo) {
+    this._pipelineElementMonitoringInfo = pipelineElementMonitoringInfo;
+    this.updateMonitoringInfo();
+  }
+
+
+}
\ No newline at end of file
diff --git a/ui/src/app/pipeline-details/pipeline-details.component.html b/ui/src/app/pipeline-details/pipeline-details.component.html
index dbd2d97..cf7a2b2 100644
--- a/ui/src/app/pipeline-details/pipeline-details.component.html
+++ b/ui/src/app/pipeline-details/pipeline-details.component.html
@@ -22,7 +22,7 @@
             <div fxFlex="100" fxLayout="row">
                 <mat-tab-group [selectedIndex]="selectedIndex" (selectedIndexChange)="setSelectedIndex($event)">
                     <mat-tab label="Overview"></mat-tab>
-                    <mat-tab label="Statistics"></mat-tab>
+                    <mat-tab label="Monitoring"></mat-tab>
                     <mat-tab label="Errors"></mat-tab>
                     <mat-tab label="Quick Edit" [disabled]="pipelineAvailable && pipeline.running"></mat-tab>
                 </mat-tab-group>
@@ -48,6 +48,9 @@
             </div>
         </div>
     </div>
+    <div fxFlex fxLayout="column" fxLayoutAlign="start top" *ngIf="pipelineAvailable && selectedIndex == 1">
+        <pipeline-monitoring [pipeline]="pipeline"></pipeline-monitoring>
+    </div>
     <div fxFlex fxLayout="row" fxLayoutAlign="start top" *ngIf="pipelineAvailable && selectedIndex == 3">
         <div fxFlex="100" class="md-padding">
             <div fxFlex fxLayout="column">
diff --git a/ui/src/app/pipeline-details/pipeline-details.module.ts b/ui/src/app/pipeline-details/pipeline-details.module.ts
index 850a77f..478a0d4 100644
--- a/ui/src/app/pipeline-details/pipeline-details.module.ts
+++ b/ui/src/app/pipeline-details/pipeline-details.module.ts
@@ -33,6 +33,9 @@ import {PipelineElementsComponent} from "./components/elements/pipeline-elements
 import {PipelineElementsRowComponent} from "./components/elements/pipeline-elements-row.component";
 import {QuickEditComponent} from "./components/edit/quickedit.component";
 import {CoreUiModule} from "../core-ui/core-ui.module";
+import {PipelineMonitoringComponent} from "./components/monitoring/pipeline-monitoring.component";
+import {PipelineElementStatisticsComponent} from "./components/monitoring/statistics/pipeline-element-statistics.component";
+import {NgxChartsModule} from "@swimlane/ngx-charts";
 
 @NgModule({
   imports: [
@@ -44,6 +47,7 @@ import {CoreUiModule} from "../core-ui/core-ui.module";
     CustomMaterialModule,
     CommonModule,
     MatProgressSpinnerModule,
+    NgxChartsModule,
     EditorModule,
     FormsModule,
     ReactiveFormsModule
@@ -52,7 +56,9 @@ import {CoreUiModule} from "../core-ui/core-ui.module";
     PipelineActionsComponent,
     PipelineElementsComponent,
     PipelineElementsRowComponent,
+    PipelineElementStatisticsComponent,
     PipelineDetailsComponent,
+    PipelineMonitoringComponent,
     PipelineStatusComponent,
     PipelinePreviewComponent,
     QuickEditComponent
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/ui/src/app/platform-services/apis/pipeline-monitoring.service.ts
similarity index 50%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to ui/src/app/platform-services/apis/pipeline-monitoring.service.ts
index c6e991f..cef42ea 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/ui/src/app/platform-services/apis/pipeline-monitoring.service.ts
@@ -16,9 +16,27 @@
  *
  */
 
-package org.apache.streampipes.manager.monitoring.pipeline;
+import {Injectable} from "@angular/core";
+import {HttpClient} from "@angular/common/http";
+import {Observable} from "rxjs";
+import {
+  PipelineMonitoringInfo
+} from "../../core-model/gen/streampipes-model";
+import {PlatformServicesCommons} from "./commons.service";
+import {map} from "rxjs/operators";
 
-public class PipelineExecutionStatusCollector {
+@Injectable()
+export class PipelineMonitoringService {
 
+  constructor(private http: HttpClient,
+              private platformServicesCommons: PlatformServicesCommons) {
+  }
 
-}
+  getPipelineMonitoringInfo(pipelineId: string): Observable<PipelineMonitoringInfo> {
+    return this.http.get(this.platformServicesCommons.authUserBasePath()
+        + "/pipeline-monitoring/"
+        + pipelineId)
+        .pipe(map(response => PipelineMonitoringInfo.fromData(response as any)));
+  }
+
+}
\ No newline at end of file
diff --git a/ui/src/app/platform-services/platform.module.ts b/ui/src/app/platform-services/platform.module.ts
index e96fc12..4627904 100644
--- a/ui/src/app/platform-services/platform.module.ts
+++ b/ui/src/app/platform-services/platform.module.ts
@@ -22,6 +22,7 @@ import {PipelineService} from "./apis/pipeline.service";
 import {PlatformServicesCommons} from "./apis/commons.service";
 import {PipelineElementEndpointService} from "./apis/pipeline-element-endpoint.service";
 import {FilesService} from "./apis/files.service";
+import {PipelineMonitoringService} from "./apis/pipeline-monitoring.service";
 
 @NgModule({
   imports: [],
@@ -32,6 +33,7 @@ import {FilesService} from "./apis/files.service";
     PipelineElementEndpointService,
     //PipelineTemplateService,
     PipelineElementService,
+    PipelineMonitoringService,
     PipelineService
   ],
   entryComponents: []
diff --git a/ui/src/scss/sp/colors.scss b/ui/src/scss/sp/colors.scss
index eace269..97f99b7 100644
--- a/ui/src/scss/sp/colors.scss
+++ b/ui/src/scss/sp/colors.scss
@@ -58,6 +58,10 @@
     background: #FAFAFA;
 }
 
+.sp-border-left-accent-light {
+    border-left: 2px solid $sp-color-accent-light;
+}
+
 .sp-blue-border-nopadding {
     border: 3px solid $sp-color-accent-light;
 }