You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/09/24 08:03:38 UTC

[incubator-streampipes] 01/01: [STREAMPIPES-245] Add initial version of pipeline monitoring feature

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-245
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 6a828302501d045a053884c39f8786611afe327c
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Thu Sep 24 10:03:20 2020 +0200

    [STREAMPIPES-245] Add initial version of pipeline monitoring feature
---
 pom.xml                                            |   2 +-
 .../backend/StreamPipesResourceConfig.java         |   1 +
 .../kafka/config/ConsumerConfigFactory.java        |   2 +-
 .../monitoring/PipelineElementMonitoringInfo.java  |  85 ++++++++
 .../model/monitoring/PipelineElementTopicInfo.java |  63 ++++++
 .../model/monitoring/PipelineMonitoringInfo.java   |  67 ++++++
 .../manager/execution/http/PipelineExecutor.java   |  31 ++-
 .../pipeline/PipelineExecutionStatusCollector.java |  37 ++++
 .../monitoring/pipeline/TopicInfoCollector.java    | 228 +++++++++++++++++++++
 .../streampipes/rest/api/IPipelineMonitoring.java  |   9 +-
 .../streampipes/rest/impl/PipelineMonitoring.java  |  42 ++++
 .../app-transport-monitoring.module.ts             |   4 +-
 .../outgoing/outgoing-view.component.html          |  12 +-
 .../transport-summary.component.html               |  14 +-
 ui/src/app/core-model/gen/streampipes-model.ts     |  71 ++++++-
 ui/src/app/core-ui/core-ui.module.ts               | 139 +++++++------
 .../barchart/barchart-widget.component.html}       |  20 +-
 .../widget/barchart/barchart-widget.component.scss |   9 +-
 .../widget/barchart/barchart-widget.component.ts   |  34 ++-
 .../widget/status/status-widget.component.css}     |   0
 .../widget/status/status-widget.component.html}    |   7 +-
 .../widget/status/status-widget.component.ts}      |  13 +-
 .../elements/pipeline-elements-row.component.html  |   4 +-
 .../elements/pipeline-elements-row.component.ts    |   3 +
 .../components/model/pipeline-details.model.ts     |  10 +-
 .../monitoring/pipeline-monitoring.component.html  |  41 ++++
 .../monitoring/pipeline-monitoring.component.scss} |  30 +--
 .../monitoring/pipeline-monitoring.component.ts    |  89 ++++++++
 .../pipeline-element-statistics.component.html     |  45 ++++
 .../pipeline-element-statistics.component.scss     |  11 +-
 .../pipeline-element-statistics.component.ts       | 144 +++++++++++++
 .../pipeline-details.component.html                |   5 +-
 .../pipeline-details/pipeline-details.module.ts    |   6 +
 .../apis/pipeline-monitoring.service.ts            |  24 ++-
 ui/src/app/platform-services/platform.module.ts    |   2 +
 ui/src/scss/sp/colors.scss                         |   4 +
 36 files changed, 1152 insertions(+), 156 deletions(-)

diff --git a/pom.xml b/pom.xml
index 5853729..f40d05c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,7 +78,7 @@
         <jgrapht.version>1.3.1</jgrapht.version>
         <json-path.version>3.1.0</json-path.version>
         <jsr305.version>3.0.2</jsr305.version>
-        <kafka.version>2.2.0</kafka.version>
+        <kafka.version>2.6.0</kafka.version>
         <lightcouch.version>0.2.0</lightcouch.version>
         <log4j.version>2.12.1</log4j.version>
         <logback-classic.version>1.2.3</logback-classic.version>
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
index 9ed13f6..1b8998d 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
@@ -70,6 +70,7 @@ public class StreamPipesResourceConfig extends ResourceConfig {
     register(PipelineElementImportNoUser.class);
     register(PipelineElementImport.class);
     register(PipelineElementRuntimeInfo.class);
+    register(PipelineMonitoring.class);
     register(PipelineNoUserResource.class);
     register(PipelineTemplate.class);
     register(PipelineWithUserResource.class);
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
index 1c7cbf4..72faf42 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
@@ -26,7 +26,7 @@ import java.util.UUID;
 public class ConsumerConfigFactory extends AbstractConfigFactory {
 
   private static final String ENABLE_AUTO_COMMIT_CONFIG_DEFAULT = "true";
-  private static final String AUTO_COMMIT_INTERVAL_MS_CONFIG_DEFAULT = "10000";
+  private static final String AUTO_COMMIT_INTERVAL_MS_CONFIG_DEFAULT = "5000";
   private static final String SESSION_TIMEOUT_MS_CONFIG_DEFAULT = "30000";
   private static final Integer FETCH_MAX_BYTES_CONFIG_DEFAULT = 5000012;
   private static final String KEY_DESERIALIZER_CLASS_CONFIG_DEFAULT = "org.apache.kafka.common" +
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementMonitoringInfo.java b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementMonitoringInfo.java
new file mode 100644
index 0000000..4812ba2
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementMonitoringInfo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.model.monitoring;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PipelineElementMonitoringInfo {
+
+  private String pipelineElementId;
+  private String pipelineElementName;
+
+  private boolean inputTopicInfoExists;
+  private boolean outputTopicInfoExists;
+
+  private List<PipelineElementTopicInfo> inputTopicInfo;
+  private PipelineElementTopicInfo outputTopicInfo;
+
+  public PipelineElementMonitoringInfo() {
+    this.inputTopicInfo = new ArrayList<>();
+  }
+
+  public String getPipelineElementId() {
+    return pipelineElementId;
+  }
+
+  public void setPipelineElementId(String pipelineElementId) {
+    this.pipelineElementId = pipelineElementId;
+  }
+
+  public String getPipelineElementName() {
+    return pipelineElementName;
+  }
+
+  public void setPipelineElementName(String pipelineElementName) {
+    this.pipelineElementName = pipelineElementName;
+  }
+
+  public List<PipelineElementTopicInfo> getInputTopicInfo() {
+    return inputTopicInfo;
+  }
+
+  public void setInputTopicInfo(List<PipelineElementTopicInfo> inputTopicInfo) {
+    this.inputTopicInfo = inputTopicInfo;
+  }
+
+  public PipelineElementTopicInfo getOutputTopicInfo() {
+    return outputTopicInfo;
+  }
+
+  public void setOutputTopicInfo(PipelineElementTopicInfo outputTopicInfo) {
+    this.outputTopicInfo = outputTopicInfo;
+  }
+
+  public boolean isInputTopicInfoExists() {
+    return inputTopicInfoExists;
+  }
+
+  public void setInputTopicInfoExists(boolean inputTopicInfoExists) {
+    this.inputTopicInfoExists = inputTopicInfoExists;
+  }
+
+  public boolean isOutputTopicInfoExists() {
+    return outputTopicInfoExists;
+  }
+
+  public void setOutputTopicInfoExists(boolean outputTopicInfoExists) {
+    this.outputTopicInfoExists = outputTopicInfoExists;
+  }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementTopicInfo.java b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementTopicInfo.java
new file mode 100644
index 0000000..5270668
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementTopicInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.model.monitoring;
+
+public class PipelineElementTopicInfo {
+
+  private String topicName;
+
+  private Long currentOffset;
+  private Long latestOffset;
+  private Long offsetAtPipelineStart;
+
+  public PipelineElementTopicInfo() {
+
+  }
+
+  public String getTopicName() {
+    return topicName;
+  }
+
+  public void setTopicName(String topicName) {
+    this.topicName = topicName;
+  }
+
+  public Long getCurrentOffset() {
+    return currentOffset;
+  }
+
+  public void setCurrentOffset(Long currentOffset) {
+    this.currentOffset = currentOffset;
+  }
+
+  public Long getLatestOffset() {
+    return latestOffset;
+  }
+
+  public void setLatestOffset(Long latestOffset) {
+    this.latestOffset = latestOffset;
+  }
+
+  public Long getOffsetAtPipelineStart() {
+    return offsetAtPipelineStart;
+  }
+
+  public void setOffsetAtPipelineStart(Long offsetAtPipelineStart) {
+    this.offsetAtPipelineStart = offsetAtPipelineStart;
+  }
+}
\ No newline at end of file
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineMonitoringInfo.java b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineMonitoringInfo.java
new file mode 100644
index 0000000..43b6a07
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineMonitoringInfo.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.model.monitoring;
+
+import org.apache.streampipes.model.shared.annotation.TsModel;
+
+import java.util.List;
+
+@TsModel
+public class PipelineMonitoringInfo {
+
+  private String pipelineId;
+  private long createdAt;
+  private long startedAt;
+
+  private List<PipelineElementMonitoringInfo> pipelineElementMonitoringInfo;
+
+  public PipelineMonitoringInfo() {
+  }
+
+  public String getPipelineId() {
+    return pipelineId;
+  }
+
+  public void setPipelineId(String pipelineId) {
+    this.pipelineId = pipelineId;
+  }
+
+  public long getCreatedAt() {
+    return createdAt;
+  }
+
+  public void setCreatedAt(long createdAt) {
+    this.createdAt = createdAt;
+  }
+
+  public List<PipelineElementMonitoringInfo> getPipelineElementMonitoringInfo() {
+    return pipelineElementMonitoringInfo;
+  }
+
+  public void setPipelineElementMonitoringInfo(List<PipelineElementMonitoringInfo> pipelineElementMonitoringInfo) {
+    this.pipelineElementMonitoringInfo = pipelineElementMonitoringInfo;
+  }
+
+  public long getStartedAt() {
+    return startedAt;
+  }
+
+  public void setStartedAt(long startedAt) {
+    this.startedAt = startedAt;
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
index ce16327..d2f66ab 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
@@ -18,28 +18,26 @@
 
 package org.apache.streampipes.manager.execution.http;
 
-import org.lightcouch.DocumentConflictException;
 import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
 import org.apache.streampipes.manager.execution.status.SepMonitoringManager;
 import org.apache.streampipes.manager.util.TemporaryGraphStorage;
 import org.apache.streampipes.model.SpDataSet;
 import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.model.message.PipelineStatusMessage;
-import org.apache.streampipes.model.message.PipelineStatusMessageType;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.message.PipelineStatusMessage;
+import org.apache.streampipes.model.message.PipelineStatusMessageType;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
 import org.apache.streampipes.storage.api.IPipelineStorage;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.apache.streampipes.user.management.encryption.CredentialsManager;
+import org.lightcouch.DocumentConflictException;
 
 import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
 import java.util.stream.Collectors;
 
 public class PipelineExecutor {
@@ -59,8 +57,12 @@ public class PipelineExecutor {
 
   public PipelineOperationStatus startPipeline() {
 
+    pipeline.getSepas().forEach(this::updateGroupIds);
+    pipeline.getActions().forEach(this::updateGroupIds);
+
     List<DataProcessorInvocation> sepas = pipeline.getSepas();
     List<DataSinkInvocation> secs = pipeline.getActions();
+
     List<SpDataSet> dataSets = pipeline.getStreams().stream().filter(s -> s instanceof SpDataSet).map(s -> new
             SpDataSet((SpDataSet) s)).collect(Collectors.toList());
 
@@ -74,7 +76,7 @@ public class PipelineExecutor {
 
     List<InvocableStreamPipesEntity> decryptedGraphs = decryptSecrets(graphs);
 
-    graphs.forEach(g -> g.setStreamRequirements(Arrays.asList()));
+    graphs.forEach(g -> g.setStreamRequirements(Collections.emptyList()));
 
     PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(),
             pipeline.getName(), decryptedGraphs, dataSets)
@@ -97,6 +99,15 @@ public class PipelineExecutor {
     return status;
   }
 
+  private void updateGroupIds(InvocableStreamPipesEntity entity) {
+    entity.getInputStreams()
+            .stream()
+            .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol)
+            .map(is -> is.getEventGrounding().getTransportProtocol())
+            .map(KafkaTransportProtocol.class::cast)
+            .forEach(tp -> tp.setGroupId(UUID.randomUUID().toString()));
+  }
+
   private List<InvocableStreamPipesEntity> decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
     List<InvocableStreamPipesEntity> decryptedGraphs = new ArrayList<>();
     graphs.stream().map(g -> {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
index c6e991f..6a8ad30 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
@@ -18,7 +18,44 @@
 
 package org.apache.streampipes.manager.monitoring.pipeline;
 
+import org.apache.streampipes.model.monitoring.PipelineElementMonitoringInfo;
+import org.apache.streampipes.model.monitoring.PipelineMonitoringInfo;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import java.util.List;
+
 public class PipelineExecutionStatusCollector {
 
+  private String pipelineId;
+
+  public PipelineExecutionStatusCollector(String pipelineId) {
+    this.pipelineId = pipelineId;
+  }
+
+  public PipelineMonitoringInfo makePipelineMonitoringInfo() {
+    Pipeline pipeline = getPipeline();
+
+    PipelineMonitoringInfo monitoringInfo = new PipelineMonitoringInfo();
+    monitoringInfo.setCreatedAt(pipeline.getCreatedAt());
+    monitoringInfo.setStartedAt(pipeline.getStartedAt());
+    monitoringInfo.setPipelineId(pipelineId);
+
+    monitoringInfo.setPipelineElementMonitoringInfo(makePipelineElementMonitoringInfo(pipeline));
+
+    return monitoringInfo;
+  }
+
+  private List<PipelineElementMonitoringInfo> makePipelineElementMonitoringInfo(Pipeline pipeline) {
+    return new TopicInfoCollector(pipeline).makeMonitoringInfo();
+
+  }
 
+  private Pipeline getPipeline() {
+    return StorageDispatcher
+            .INSTANCE
+            .getNoSqlStore()
+            .getPipelineStorageAPI()
+            .getPipeline(this.pipelineId);
+  }
 }
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.java
new file mode 100644
index 0000000..0b035a4
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.monitoring.pipeline;
+
+import org.apache.kafka.clients.admin.*;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.model.monitoring.PipelineElementMonitoringInfo;
+import org.apache.streampipes.model.monitoring.PipelineElementTopicInfo;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.sdk.helpers.Tuple2;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class TopicInfoCollector {
+
+
+  private Pipeline pipeline;
+  private AdminClient kafkaAdminClient;
+
+  private Map<String, Long> latestTopicOffsets;
+  private Map<String, Long> topicOffsetAtPipelineStart;
+  private Map<String, Long> currentConsumerGroupOffsets;
+
+  private List<PipelineElementMonitoringInfo> monitoringInfo;
+
+  public TopicInfoCollector(Pipeline pipeline) {
+    this.pipeline = pipeline;
+    this.kafkaAdminClient = kafkaAdminClient();
+    this.latestTopicOffsets = new HashMap<>();
+    this.topicOffsetAtPipelineStart = new HashMap<>();
+    this.currentConsumerGroupOffsets = new HashMap<>();
+    this.monitoringInfo = new ArrayList<>();
+  }
+
+  private void makeTopicInfo() {
+    long currentTime = Instant.now().minus (1, ChronoUnit.SECONDS).toEpochMilli();
+    List<TransportProtocol> affectedProtocols = new ArrayList<>();
+
+    pipeline.getSepas().forEach(processor -> affectedProtocols.addAll(extractProtocols(processor)));
+    pipeline.getActions().forEach(sink -> affectedProtocols.addAll(extractProtocols(sink)));
+
+    affectedProtocols.forEach(protocol -> {
+      if (protocol instanceof KafkaTransportProtocol) {
+        try {
+          Tuple2<String, Long> latestTopicOffsets = makeTopicOffsetInfo((KafkaTransportProtocol) protocol, OffsetSpec.forTimestamp(currentTime));
+          Tuple2<String, Long> topicOffsetAtPipelineStart = makeTopicOffsetInfo((KafkaTransportProtocol) protocol, OffsetSpec.forTimestamp(pipeline.getStartedAt()));
+          Tuple2<String, Long> currentConsumerGroupOffsets = makeTopicInfo((KafkaTransportProtocol) protocol);
+
+          this.latestTopicOffsets.put(latestTopicOffsets.a, latestTopicOffsets.b);
+          this.topicOffsetAtPipelineStart.put(topicOffsetAtPipelineStart.a, topicOffsetAtPipelineStart.b);
+          this.currentConsumerGroupOffsets.put(currentConsumerGroupOffsets.a, currentConsumerGroupOffsets.b);
+
+        } catch (ExecutionException | InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    });
+  }
+
+  public List<PipelineElementMonitoringInfo> makeMonitoringInfo() {
+    this.makeTopicInfo();
+    // TODO filter for KafkaGrounding
+    this.pipeline.getStreams().forEach(stream -> this.monitoringInfo.add(makeStreamMonitoringInfo(stream)));
+    this.pipeline.getSepas().forEach(processor -> this.monitoringInfo.add(makeProcessorMonitoringInfo(processor)));
+    this.pipeline.getActions().forEach(sink -> this.monitoringInfo.add(makeSinkMonitoringInfo(sink)));
+
+    this.kafkaAdminClient.close();
+    return this.monitoringInfo;
+  }
+
+  private PipelineElementMonitoringInfo makeStreamMonitoringInfo(SpDataStream stream) {
+    PipelineElementMonitoringInfo info = prepare(stream.getElementId(), stream.getName(), false, true);
+    KafkaTransportProtocol protocol = (KafkaTransportProtocol) stream.getEventGrounding().getTransportProtocol();
+    info.setOutputTopicInfo(makeOutputTopicInfoForPipelineElement(protocol));
+
+    return info;
+  }
+
+  private PipelineElementMonitoringInfo makeProcessorMonitoringInfo(DataProcessorInvocation processor) {
+    PipelineElementMonitoringInfo info = prepare(processor.getElementId(), processor.getName(), true, true);
+    KafkaTransportProtocol outputProtocol = (KafkaTransportProtocol) processor.getOutputStream().getEventGrounding().getTransportProtocol();
+    PipelineElementTopicInfo outputTopicInfo = makeOutputTopicInfoForPipelineElement(outputProtocol);
+    List<PipelineElementTopicInfo> inputTopicInfo = makeInputTopicInfoForPipelineElement(processor.getInputStreams());
+
+    info.setOutputTopicInfo(outputTopicInfo);
+    info.setInputTopicInfo(inputTopicInfo);
+
+    return info;
+  }
+
+  private PipelineElementMonitoringInfo makeSinkMonitoringInfo(DataSinkInvocation sink) {
+    PipelineElementMonitoringInfo info = prepare(sink.getElementId(), sink.getName(), true, false);
+    info.setInputTopicInfo(makeInputTopicInfoForPipelineElement(sink.getInputStreams()));
+    return info;
+  }
+
+  private List<PipelineElementTopicInfo> makeInputTopicInfoForPipelineElement(List<SpDataStream> inputStreams) {
+    List<PipelineElementTopicInfo> topicInfos = new ArrayList<>();
+    inputStreams.stream().map(is -> is.getEventGrounding().getTransportProtocol()).forEach(protocol -> {
+      PipelineElementTopicInfo topicInfo = new PipelineElementTopicInfo();
+      String topic = getTopic((KafkaTransportProtocol) protocol);
+      topicInfo.setTopicName(topic);
+      topicInfo.setLatestOffset(latestTopicOffsets.get(topic));
+      topicInfo.setOffsetAtPipelineStart(topicOffsetAtPipelineStart.get(topic));
+      topicInfo.setCurrentOffset(currentConsumerGroupOffsets.get(((KafkaTransportProtocol) protocol).getGroupId()));
+      topicInfos.add(topicInfo);
+    });
+
+    return topicInfos;
+  }
+
+  private PipelineElementTopicInfo makeOutputTopicInfoForPipelineElement(KafkaTransportProtocol protocol) {
+    PipelineElementTopicInfo topicInfo = new PipelineElementTopicInfo();
+    String topic = getTopic(protocol);
+    topicInfo.setTopicName(topic);
+    topicInfo.setOffsetAtPipelineStart(this.topicOffsetAtPipelineStart.get(topic));
+    topicInfo.setLatestOffset(this.latestTopicOffsets.get(topic));
+
+    return topicInfo;
+  }
+
+  private String getTopic(KafkaTransportProtocol protocol) {
+    return protocol.getTopicDefinition().getActualTopicName();
+  }
+
+  private PipelineElementMonitoringInfo prepare(String elementId, String name, boolean inputTopics, boolean outputTopics) {
+    PipelineElementMonitoringInfo info = new PipelineElementMonitoringInfo();
+    info.setPipelineElementName(name);
+    info.setPipelineElementId(elementId);
+    info.setInputTopicInfoExists(inputTopics);
+    info.setOutputTopicInfoExists(outputTopics);
+    return info;
+  }
+
+  private AdminClient kafkaAdminClient() {
+    return KafkaAdminClient.create(makeProperties());
+  }
+
+  private Properties makeProperties() {
+    Properties props = new Properties();
+
+    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
+    props.put(AdminClientConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+
+    return props;
+  }
+
+  private String getBrokerUrl() {
+    String env = System.getenv("SP_DEBUG");
+    System.out.println(System.getenv("SP_DEBUG"));
+    if ("true".equals(env.replaceAll(" ", ""))) {
+      return "localhost:9094";
+    } else {
+      return BackendConfig.INSTANCE.getKafkaUrl();
+    }
+  }
+
+  private Tuple2<String, Long> makeTopicOffsetInfo(KafkaTransportProtocol protocol, OffsetSpec offsetSpec) throws ExecutionException, InterruptedException {
+    Map<TopicPartition, OffsetAndMetadata> partitions = kafkaAdminClient.listConsumerGroupOffsets(protocol.getGroupId()).partitionsToOffsetAndMetadata().get();
+    Map<TopicPartition, OffsetSpec> desiredOffsets = new HashMap<>();
+    partitions.forEach((key, value) -> desiredOffsets.put(key, offsetSpec));
+    ListOffsetsResult offsetsResult = kafkaAdminClient.listOffsets(desiredOffsets);
+    Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> resultInfo = offsetsResult.all().get();
+    Long offset = resultInfo
+            .values()
+            .stream()
+            .map(ListOffsetsResult.ListOffsetsResultInfo::offset)
+            .reduce(0L, Long::sum);
+
+    return new Tuple2<>(protocol.getTopicDefinition().getActualTopicName(), offset);
+  }
+
+  private Tuple2<String, Long> makeTopicInfo(KafkaTransportProtocol protocol) throws ExecutionException, InterruptedException {
+    Long offset = kafkaAdminClient.listConsumerGroupOffsets(protocol.getGroupId())
+              .partitionsToOffsetAndMetadata()
+              .get()
+              .values()
+              .stream()
+              .map(OffsetAndMetadata::offset)
+              .reduce(0L, Long::sum);
+
+    return new Tuple2<>(protocol.getGroupId(), offset);
+  }
+
+  public List<TransportProtocol> extractProtocols(InvocableStreamPipesEntity pipelineElement) {
+    return pipelineElement
+            .getInputStreams()
+            .stream()
+            .map(stream -> stream.getEventGrounding().getTransportProtocol())
+            .collect(Collectors.toList());
+  }
+
+  public static void main(String[] args) {
+    List<Pipeline> pipelines = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getAllPipelines();
+    Pipeline testPipeline = pipelines.get(0);
+
+    List<PipelineElementMonitoringInfo> monitoringInfo = new TopicInfoCollector(testPipeline).makeMonitoringInfo();
+    System.out.println(monitoringInfo.size());
+  }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipelineMonitoring.java
similarity index 82%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipelineMonitoring.java
index c6e991f..eace98d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipelineMonitoring.java
@@ -15,10 +15,11 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.rest.api;
 
-package org.apache.streampipes.manager.monitoring.pipeline;
-
-public class PipelineExecutionStatusCollector {
-
+import javax.ws.rs.core.Response;
 
+public interface IPipelineMonitoring {
+  
+  Response getPipelineMonitoringInfo(String pipelineId);
 }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineMonitoring.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineMonitoring.java
new file mode 100644
index 0000000..83f8db6
--- /dev/null
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineMonitoring.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.rest.impl;
+
+import org.apache.streampipes.manager.monitoring.pipeline.PipelineExecutionStatusCollector;
+import org.apache.streampipes.rest.api.IPipelineMonitoring;
+import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+@Path("/v2/users/{username}/pipeline-monitoring")
+public class PipelineMonitoring extends AbstractRestInterface implements IPipelineMonitoring {
+
+  @JacksonSerialized
+  @Path("{pipelineId}")
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Override
+  public Response getPipelineMonitoringInfo(@PathParam("pipelineId") String pipelineId) {
+    return ok(new PipelineExecutionStatusCollector(pipelineId).makePipelineMonitoringInfo());
+  }
+}
diff --git a/ui/src/app/app-transport-monitoring/app-transport-monitoring.module.ts b/ui/src/app/app-transport-monitoring/app-transport-monitoring.module.ts
index 143336a..05e43fa 100644
--- a/ui/src/app/app-transport-monitoring/app-transport-monitoring.module.ts
+++ b/ui/src/app/app-transport-monitoring/app-transport-monitoring.module.ts
@@ -34,16 +34,17 @@ import {DashboardItemComponent} from "./components/dashboard-item/dashboard-item
 import {DashboardImageComponent} from "./components/dashboard-image/dashboard-image.component";
 import {TransportSelectionComponent} from "./components/transport-selection/transport-selection.component";
 import {AppTransportMonitoringRestService} from "./services/app-transport-monitoring-rest.service";
-import {DashboardStatusFilledComponent} from "./components/dashboard-status-filled/dashboard-status-filled.component";
 import {TransportSummaryComponent} from "./components/transport-summary/transport-summary.component";
 import {SlideshowModule} from "ng-simple-slideshow";
 import {TransportActivityGraphComponent} from "./components/transport-activity-graph/transport-activity-graph.component";
 import {TimestampConverterService} from "./services/timestamp-converter.service";
 import {NgxChartsModule} from '@swimlane/ngx-charts';
+import {CoreUiModule} from "../core-ui/core-ui.module";
 
 @NgModule({
     imports: [
         CommonModule,
+        CoreUiModule,
         FlexLayoutModule,
         CustomMaterialModule,
         MatGridListModule,
@@ -62,7 +63,6 @@ import {NgxChartsModule} from '@swimlane/ngx-charts';
         DashboardItemComponent,
         DashboardStatusComponent,
         TransportSelectionComponent,
-        DashboardStatusFilledComponent,
         TransportSummaryComponent,
         TransportActivityGraphComponent
     ],
diff --git a/ui/src/app/app-transport-monitoring/components/outgoing/outgoing-view.component.html b/ui/src/app/app-transport-monitoring/components/outgoing/outgoing-view.component.html
index f8a681a..78276d4 100644
--- a/ui/src/app/app-transport-monitoring/components/outgoing/outgoing-view.component.html
+++ b/ui/src/app/app-transport-monitoring/components/outgoing/outgoing-view.component.html
@@ -24,16 +24,16 @@
     </div>
     <div fxFlex="33" fxLayout="column">
         <div fxFill fxFlex="100" fxLayout="row" fxLayoutAlign="start start">
-            <dashboard-status-filled fxFlex="100" [label]="'Total Boxes Detected'"
-                                     [statusValue]="totalBoxes"></dashboard-status-filled>
+            <sp-status-widget fxFlex="100" [label]="'Total Boxes Detected'"
+                                     [statusValue]="totalBoxes"></sp-status-widget>
         </div>
         <div fxFlex="100" fxLayout="row" fxLayoutAlign="start start">
-            <dashboard-status-filled fxFlex="100" [label]="'Transparent Boxes Detected'"
-                              [statusValue]="cardboardBoxes"></dashboard-status-filled>
+            <sp-status-widget fxFlex="100" [label]="'Transparent Boxes Detected'"
+                              [statusValue]="cardboardBoxes"></sp-status-widget>
         </div>
         <div fxFlex="100" fxLayout="row" fxLayoutAlign="start start">
-            <dashboard-status-filled fxFlex="100" [label]="'Cardboard Boxes Detected'"
-                              [statusValue]="transparentBoxes"></dashboard-status-filled>
+            <sp-status-widget fxFlex="100" [label]="'Cardboard Boxes Detected'"
+                              [statusValue]="transparentBoxes"></sp-status-widget>
         </div>
     </div>
 </div>
diff --git a/ui/src/app/app-transport-monitoring/components/transport-summary/transport-summary.component.html b/ui/src/app/app-transport-monitoring/components/transport-summary/transport-summary.component.html
index 31c5f72..1bb0ccc 100644
--- a/ui/src/app/app-transport-monitoring/components/transport-summary/transport-summary.component.html
+++ b/ui/src/app/app-transport-monitoring/components/transport-summary/transport-summary.component.html
@@ -17,13 +17,13 @@
   -->
 
 <div flex="100" fxLayout="row">
-    <dashboard-status-filled fxFlex="25" [label]="'Shipped'" [statusValue]="shippedTime"
-                             [color]="'rgb(156, 156, 156)'"></dashboard-status-filled>
-    <dashboard-status-filled fxFlex="25" [label]="'Delivered'" [statusValue]="deliveredTime"
-                             [color]="'rgb(156, 156, 156)'"></dashboard-status-filled>
-    <dashboard-status-filled fxFlex="25" [label]="'Took'" [statusValue]="tookTime"
-                             [color]="'rgb(156, 156, 156)'"></dashboard-status-filled>
-    <dashboard-status-filled fxFlex="25" [label]="'Status'" [statusValue]="errorCode" [color]="statusColor"></dashboard-status-filled>
+    <sp-status-widget fxFlex="25" [label]="'Shipped'" [statusValue]="shippedTime"
+                             [color]="'rgb(156, 156, 156)'"></sp-status-widget>
+    <sp-status-widget fxFlex="25" [label]="'Delivered'" [statusValue]="deliveredTime"
+                             [color]="'rgb(156, 156, 156)'"></sp-status-widget>
+    <sp-status-widget fxFlex="25" [label]="'Took'" [statusValue]="tookTime"
+                             [color]="'rgb(156, 156, 156)'"></sp-status-widget>
+    <sp-status-widget fxFlex="25" [label]="'Status'" [statusValue]="errorCode" [color]="statusColor"></sp-status-widget>
 </div>
 <div *ngFor="let statusMessage of statusMessages">
     <h4>{{statusMessage}}</h4>
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index 67be548..9f4abe5 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,7 +19,7 @@
 /* tslint:disable */
 /* eslint-disable */
 // @ts-nocheck
-// Generated using typescript-generator version 2.24.612 on 2020-09-15 08:51:13.
+// Generated using typescript-generator version 2.24.612 on 2020-09-20 21:03:26.
 
 export class AbstractStreamPipesEntity {
     "@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
@@ -151,8 +151,8 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
         instance.includedLocales = __getCopyArrayFn(__identity<string>())(data.includedLocales);
         instance.applicationLinks = __getCopyArrayFn(ApplicationLink.fromData)(data.applicationLinks);
         instance.connectedTo = __getCopyArrayFn(__identity<string>())(data.connectedTo);
-        instance.uri = data.uri;
         instance.dom = data.dom;
+        instance.uri = data.uri;
         return instance;
     }
 }
@@ -1563,9 +1563,9 @@ export class GenericAdapterSetDescription extends AdapterSetDescription implemen
         }
         const instance = target || new GenericAdapterSetDescription();
         super.fromData(data, instance);
-        instance.eventSchema = EventSchema.fromData(data.eventSchema);
         instance.formatDescription = FormatDescription.fromData(data.formatDescription);
         instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
+        instance.eventSchema = EventSchema.fromData(data.eventSchema);
         return instance;
     }
 }
@@ -1582,9 +1582,9 @@ export class GenericAdapterStreamDescription extends AdapterStreamDescription im
         }
         const instance = target || new GenericAdapterStreamDescription();
         super.fromData(data, instance);
-        instance.eventSchema = EventSchema.fromData(data.eventSchema);
         instance.formatDescription = FormatDescription.fromData(data.formatDescription);
         instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
+        instance.eventSchema = EventSchema.fromData(data.eventSchema);
         return instance;
     }
 }
@@ -1997,6 +1997,29 @@ export class PipelineCategory {
     }
 }
 
+export class PipelineElementMonitoringInfo {
+    inputTopicInfo: PipelineElementTopicInfo[];
+    inputTopicInfoExists: boolean;
+    outputTopicInfo: PipelineElementTopicInfo;
+    outputTopicInfoExists: boolean;
+    pipelineElementId: string;
+    pipelineElementName: string;
+
+    static fromData(data: PipelineElementMonitoringInfo, target?: PipelineElementMonitoringInfo): PipelineElementMonitoringInfo {
+        if (!data) {
+            return data;
+        }
+        const instance = target || new PipelineElementMonitoringInfo();
+        instance.pipelineElementId = data.pipelineElementId;
+        instance.pipelineElementName = data.pipelineElementName;
+        instance.inputTopicInfoExists = data.inputTopicInfoExists;
+        instance.outputTopicInfoExists = data.outputTopicInfoExists;
+        instance.inputTopicInfo = __getCopyArrayFn(PipelineElementTopicInfo.fromData)(data.inputTopicInfo);
+        instance.outputTopicInfo = PipelineElementTopicInfo.fromData(data.outputTopicInfo);
+        return instance;
+    }
+}
+
 export class PipelineElementRecommendation {
     count: number;
     description: string;
@@ -2054,6 +2077,25 @@ export class PipelineElementStatus {
     }
 }
 
+export class PipelineElementTopicInfo {
+    currentOffset: number;
+    latestOffset: number;
+    offsetAtPipelineStart: number;
+    topicName: string;
+
+    static fromData(data: PipelineElementTopicInfo, target?: PipelineElementTopicInfo): PipelineElementTopicInfo {
+        if (!data) {
+            return data;
+        }
+        const instance = target || new PipelineElementTopicInfo();
+        instance.topicName = data.topicName;
+        instance.currentOffset = data.currentOffset;
+        instance.latestOffset = data.latestOffset;
+        instance.offsetAtPipelineStart = data.offsetAtPipelineStart;
+        return instance;
+    }
+}
+
 export class PipelineModification {
     domId: string;
     elementId: string;
@@ -2089,6 +2131,25 @@ export class PipelineModificationMessage extends Message {
     }
 }
 
+export class PipelineMonitoringInfo {
+    createdAt: number;
+    pipelineElementMonitoringInfo: PipelineElementMonitoringInfo[];
+    pipelineId: string;
+    startedAt: number;
+
+    static fromData(data: PipelineMonitoringInfo, target?: PipelineMonitoringInfo): PipelineMonitoringInfo {
+        if (!data) {
+            return data;
+        }
+        const instance = target || new PipelineMonitoringInfo();
+        instance.pipelineId = data.pipelineId;
+        instance.createdAt = data.createdAt;
+        instance.startedAt = data.startedAt;
+        instance.pipelineElementMonitoringInfo = __getCopyArrayFn(PipelineElementMonitoringInfo.fromData)(data.pipelineElementMonitoringInfo);
+        return instance;
+    }
+}
+
 export class PipelineOperationStatus {
     elementStatus: PipelineElementStatus[];
     pipelineId: string;
@@ -2463,8 +2524,8 @@ export class SpDataSet extends SpDataStream {
         instance.supportedGrounding = EventGrounding.fromData(data.supportedGrounding);
         instance.datasetInvocationId = data.datasetInvocationId;
         instance.correspondingPipeline = data.correspondingPipeline;
-        instance.actualTopicName = data.actualTopicName;
         instance.brokerHostname = data.brokerHostname;
+        instance.actualTopicName = data.actualTopicName;
         return instance;
     }
 }
diff --git a/ui/src/app/core-ui/core-ui.module.ts b/ui/src/app/core-ui/core-ui.module.ts
index ae22d1e..83232bf 100644
--- a/ui/src/app/core-ui/core-ui.module.ts
+++ b/ui/src/app/core-ui/core-ui.module.ts
@@ -73,73 +73,78 @@ import {ColorPickerModule} from "ngx-color-picker";
 import {QuillModule} from "ngx-quill";
 import {CodemirrorModule} from "@ctrl/ngx-codemirror";
 import {MatAutocompleteModule} from "@angular/material/autocomplete";
+import {StatusWidgetComponent} from "./widget/status/status-widget.component";
+import {NgxChartsModule} from "@swimlane/ngx-charts";
+import {BarchartWidgetComponent} from "./widget/barchart/barchart-widget.component";
 
 @NgModule({
-    imports: [
-        CommonModule,
-        ColorPickerModule,
-        FlexLayoutModule,
-        CodemirrorModule,
-        CustomMaterialModule,
-        ReactiveFormsModule,
-        FormsModule,
-        CdkTableModule,
-        MatAutocompleteModule,
-        MatSnackBarModule,
-        MatProgressSpinnerModule,
-        MatDatepickerModule,
-        MatNativeDateModule,
-        PlotlyViaWindowModule,
-        MatSliderModule,
-        MatChipsModule,
-        PortalModule,
-        OverlayModule,
-        QuillModule.forRoot()
-    ],
-    declarations: [
-        ConfirmDialogComponent,
-        DisplayRecommendedPipe,
-        ImageComponent,
-        ImageContainerComponent,
-        ImageLabelingComponent,
-        ImageLabelsComponent,
-        ImageBarComponent,
-        ImageAnnotationsComponent,
-        ImageCategorizeComponent,
-        ImageViewerComponent,
-        StandardDialogComponent,
-        PanelDialogComponent,
-        StaticAnyInput,
-        StaticPropertyComponent,
-        StaticFreeInputComponent,
-        StaticSecretInputComponent,
-        StaticFileInputComponent,
-        StaticMappingNaryComponent,
-        StaticMappingUnaryComponent,
-        StaticGroupComponent,
-        StaticAlternativesComponent,
-        StaticCollectionComponent,
-        StaticColorPickerComponent,
-        StaticCodeInputComponent,
-        StaticOneOfInputComponent,
-        StaticRuntimeResolvableAnyInputComponent,
-        StaticRuntimeResolvableOneOfInputComponent,
-    ],
-    providers: [
-        MatDatepickerModule,
-        ColorService,
-        DisplayRecommendedPipe,
-        ReactLabelingService,
-        PolygonLabelingService,
-        BrushLabelingService,
-        CocoFormatService,
-        LabelingModeService,
-        DialogService,
-        RuntimeResolvableService,
-        StaticFileRestService,
-    ],
-    entryComponents: [
-    ],
+  imports: [
+    CommonModule,
+    ColorPickerModule,
+    FlexLayoutModule,
+    CodemirrorModule,
+    CustomMaterialModule,
+    ReactiveFormsModule,
+    FormsModule,
+    CdkTableModule,
+    MatAutocompleteModule,
+    MatSnackBarModule,
+    MatProgressSpinnerModule,
+    MatDatepickerModule,
+    MatNativeDateModule,
+    NgxChartsModule,
+    PlotlyViaWindowModule,
+    MatSliderModule,
+    MatChipsModule,
+    PortalModule,
+    OverlayModule,
+    QuillModule.forRoot()
+  ],
+  declarations: [
+    BarchartWidgetComponent,
+    ConfirmDialogComponent,
+    DisplayRecommendedPipe,
+    ImageComponent,
+    ImageContainerComponent,
+    ImageLabelingComponent,
+    ImageLabelsComponent,
+    ImageBarComponent,
+    ImageAnnotationsComponent,
+    ImageCategorizeComponent,
+    ImageViewerComponent,
+    StandardDialogComponent,
+    PanelDialogComponent,
+    StaticAnyInput,
+    StaticPropertyComponent,
+    StaticFreeInputComponent,
+    StaticSecretInputComponent,
+    StaticFileInputComponent,
+    StaticMappingNaryComponent,
+    StaticMappingUnaryComponent,
+    StaticGroupComponent,
+    StaticAlternativesComponent,
+    StaticCollectionComponent,
+    StaticColorPickerComponent,
+    StaticCodeInputComponent,
+    StaticOneOfInputComponent,
+    StaticRuntimeResolvableAnyInputComponent,
+    StaticRuntimeResolvableOneOfInputComponent,
+    StatusWidgetComponent,
+  ],
+  providers: [
+    MatDatepickerModule,
+    ColorService,
+    DisplayRecommendedPipe,
+    ReactLabelingService,
+    PolygonLabelingService,
+    BrushLabelingService,
+    CocoFormatService,
+    LabelingModeService,
+    DialogService,
+    RuntimeResolvableService,
+    StaticFileRestService,
+  ],
+  entryComponents: [],
   exports: [
     ImageComponent,
     ImageLabelingComponent,
@@ -161,7 +166,9 @@ import {MatAutocompleteModule} from "@angular/material/autocomplete";
     StaticOneOfInputComponent,
     StaticRuntimeResolvableAnyInputComponent,
     StaticRuntimeResolvableOneOfInputComponent,
-    ImageViewerComponent
+    ImageViewerComponent,
+    StatusWidgetComponent,
+    BarchartWidgetComponent,
   ]
 })
 export class CoreUiModule {
diff --git a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.html
similarity index 63%
copy from ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html
copy to ui/src/app/core-ui/widget/barchart/barchart-widget.component.html
index d4b52c3..3cb206b 100644
--- a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html
+++ b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.html
@@ -16,11 +16,17 @@
   ~
   -->
 
-<div fxFlex="100" fxLayoutAlign="center center" fxLayout="column">
-    <ngx-charts-number-card
-            [results]="chartData"
-            [scheme]="'cool'"
-            [cardColor]="color"
-    [bandColor]="'rgb(27, 20, 100)'" class="fix-margin">
-    </ngx-charts-number-card>
+<div fxFlex="100" fxLayoutAlign="center center" fxLayout="column" [ngStyle]="{'background-color': backgroundColor, 'width':'265px', 'max-width': '265px', 'margin-left': '8px'}">
+    <ngx-charts-bar-vertical
+            [view]="[265,150]"
+            [results]="data"
+            [scheme]="colorScheme"
+            [gradient]="false"
+            [xAxis]="false"
+            [yAxis]="true"
+            [legend]="false"
+            [showXAxisLabel]="false"
+            [showYAxisLabel]="false"
+            [ngStyle]="{'fill': backgroundColor}">
+    </ngx-charts-bar-vertical>
 </div>
\ No newline at end of file
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.scss
similarity index 87%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to ui/src/app/core-ui/widget/barchart/barchart-widget.component.scss
index c6e991f..41ecef0 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.scss
@@ -14,11 +14,4 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  *
- */
-
-package org.apache.streampipes.manager.monitoring.pipeline;
-
-public class PipelineExecutionStatusCollector {
-
-
-}
+ */
\ No newline at end of file
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.ts
similarity index 58%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to ui/src/app/core-ui/widget/barchart/barchart-widget.component.ts
index c6e991f..01c0860 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.ts
@@ -16,9 +16,37 @@
  *
  */
 
-package org.apache.streampipes.manager.monitoring.pipeline;
+import {Component, Input, OnInit} from '@angular/core';
 
-public class PipelineExecutionStatusCollector {
+@Component({
+  selector: 'sp-barchart-widget',
+  templateUrl: './barchart-widget.component.html',
+  styleUrls: ['./barchart-widget.component.scss']
+})
+export class BarchartWidgetComponent implements OnInit {
 
+  _data = [];
 
-}
+  @Input()
+  backgroundColor = "#cccccc";
+
+  @Input()
+  textColor = "#1b1464";
+
+  colorScheme = {
+    domain: ['#1b1464']
+  };
+
+  ngOnInit(): void {
+    this.colorScheme.domain = [this.textColor];
+  }
+
+  @Input()
+  set data(data) {
+    this._data = data;
+  }
+
+  get data() {
+    return this._data;
+  }
+}
\ No newline at end of file
diff --git a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.css b/ui/src/app/core-ui/widget/status/status-widget.component.css
similarity index 100%
copy from ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.css
copy to ui/src/app/core-ui/widget/status/status-widget.component.css
diff --git a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html b/ui/src/app/core-ui/widget/status/status-widget.component.html
similarity index 84%
rename from ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html
rename to ui/src/app/core-ui/widget/status/status-widget.component.html
index d4b52c3..1fd6197 100644
--- a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html
+++ b/ui/src/app/core-ui/widget/status/status-widget.component.html
@@ -17,10 +17,13 @@
   -->
 
 <div fxFlex="100" fxLayoutAlign="center center" fxLayout="column">
-    <ngx-charts-number-card
+    <ngx-charts-number-card style="width:300px;"
+            [view]="[300,150]"
             [results]="chartData"
             [scheme]="'cool'"
             [cardColor]="color"
-    [bandColor]="'rgb(27, 20, 100)'" class="fix-margin">
+            [animations]="false"
+            [bandColor]="bandColor"
+            [textColor]="textColor">
     </ngx-charts-number-card>
 </div>
\ No newline at end of file
diff --git a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.ts b/ui/src/app/core-ui/widget/status/status-widget.component.ts
similarity index 80%
rename from ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.ts
rename to ui/src/app/core-ui/widget/status/status-widget.component.ts
index aa89cd8..b639d4c 100644
--- a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.ts
+++ b/ui/src/app/core-ui/widget/status/status-widget.component.ts
@@ -16,16 +16,19 @@
  *
  */
 
-import {Component, Input} from '@angular/core';
+import {Component, Input, OnInit} from '@angular/core';
 
 @Component({
-    selector: 'dashboard-status-filled',
-    templateUrl: './dashboard-status-filled.component.html',
-    styleUrls: ['./dashboard-status-filled.component.css']
+    selector: 'sp-status-widget',
+    templateUrl: './status-widget.component.html',
+    styleUrls: ['./status-widget.component.css']
 })
-export class DashboardStatusFilledComponent {
+export class StatusWidgetComponent implements OnInit {
 
     @Input() color: string = "rgb(156, 156, 156)";
+    @Input() bandColor: string = "rgb(27, 20, 100)";
+    @Input() textColor: string = "rgb(96,96,96)";
+
     _label: string;
     _statusValue: string;
 
diff --git a/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.html b/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.html
index 5afd0e5..49f9875 100644
--- a/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.html
+++ b/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.html
@@ -18,12 +18,12 @@
 
 <div fxFlex="100" fxLayout="column">
     <div fxLayout="row" fxFlex="100">
-        <div fxFlex="30" fxLayout="row" style="margin-bottom:10px;">
+        <div fxFlex="{{showDescription ? 30 : 100}}}" fxLayout="row" style="margin-bottom:10px;">
             <span class="draggable-icon {{elementType}}">
                 <pipeline-element [preview]="true" [pipelineElement]="_element"></pipeline-element>
             </span>
         </div>
-        <div fxFlex="70" fxLayout="column" fxLayoutAlign="center left">
+        <div fxFlex="70" fxLayout="column" fxLayoutAlign="center left" *ngIf="showDescription">
             <b>{{element.name}}</b>
             {{element.description}}
         </div>
diff --git a/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.ts b/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.ts
index d353def..57cb3d6 100644
--- a/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.ts
+++ b/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.ts
@@ -32,6 +32,9 @@ export class PipelineElementsRowComponent implements OnInit {
     @Input()
     pipeline: Pipeline;
 
+    @Input()
+    showDescription: boolean = true;
+
     _element: PipelineElementUnion;
 
     constructor() {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/ui/src/app/pipeline-details/components/model/pipeline-details.model.ts
similarity index 87%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to ui/src/app/pipeline-details/components/model/pipeline-details.model.ts
index c6e991f..ea6782d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/ui/src/app/pipeline-details/components/model/pipeline-details.model.ts
@@ -16,9 +16,7 @@
  *
  */
 
-package org.apache.streampipes.manager.monitoring.pipeline;
-
-public class PipelineExecutionStatusCollector {
-
-
-}
+export interface HistoricalMonitoringData {
+  name: string;
+  value: number;
+}
\ No newline at end of file
diff --git a/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.html b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.html
new file mode 100644
index 0000000..f256d5d
--- /dev/null
+++ b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.html
@@ -0,0 +1,41 @@
+<div fxFlex="100" fxLayout="column">
+    <div fxLayout="column" class="fixed-height add-options">
+        <div class="add-options-item" fxLayoutAlign="start center" fxLayout="row" style="padding-right:10px;">
+            <div fxFlex="100" fxLayout="row" fxLayoutAlign="end center">
+                <mat-slide-toggle [(ngModel)]="autoRefresh" color="primary">Auto refresh</mat-slide-toggle>
+            </div>
+        </div>
+    </div>
+    <div fxFlex="100" fxLayout="column" class="page-container-padding-inner">
+        <pipeline-preview [jspcanvas]="'assembly-quickedit'" [pipeline]="pipeline"
+                          (selectedElementEmitter)="selectElement($event)"
+                          style="margin-bottom:15px;"></pipeline-preview>
+
+        <div fxFlex="100" *ngIf="pipelineMonitoringInfoAvailable">
+            <div *ngFor="let pipelineElement of allElements" fxLayout="column" class="mb-10">
+                <div class="assembly-options-preview sp-blue-bg">
+                    <div fxLayout="row" fxLayoutAlign="start center">
+                        <h4>{{pipelineElement.name}}</h4>
+                    </div>
+                </div>
+                <div class="sp-blue-border pipeline-element-statistics-panel">
+                    <div fxFlex="100" fxLayout="row">
+                        <div fxFlex="20" fxLayoutAlign="start start">
+                            <pipeline-elements-row style="width: 100%;"
+                                                   [showDescription]="false"
+                                                   [pipeline]="pipeline"
+                                                   [element]="pipelineElement"></pipeline-elements-row>
+                        </div>
+                        <div fxFlex="80" fxLayoutAlign="start center">
+                            <pipeline-element-statistics
+                                    [pipeline]="pipeline"
+                                    [pipelineElement]="pipelineElement"
+                                    [pipelineElementMonitoringInfo]="pipelineElementMonitoringInfo.get(pipelineElement.elementId)">
+                            </pipeline-element-statistics>
+                        </div>
+                    </div>
+                </div>
+            </div>
+        </div>
+    </div>
+</div>
\ No newline at end of file
diff --git a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.css b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.scss
similarity index 72%
rename from ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.css
rename to ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.scss
index 14e1198..a3d7690 100644
--- a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.css
+++ b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.scss
@@ -16,20 +16,26 @@
  *
  */
 
-.statusValue {
-    font-size: 40pt;
-    background: darkblue;
-    color: white;
-    margin:5px;
-    min-height:140px;
-    padding: 15px;
+.add-options {
+  background-color:#f6f6f6;
+  border-bottom: 1px solid #cccccc;
+  padding-top:10px;
+  padding-bottom:10px;
 }
 
-.statusValue>h4 {
-    font-size:20pt;
-    margin-bottom: 30px;
+.fixed-height {
+  width:100%;
+  height: 50px;
 }
 
-.fix-margin {
-    margin-top: -5px;
+.page-container-padding-inner {
+  margin: 10px;
+}
+
+.pipeline-element-statistics-panel {
+  padding: 10px;
+}
+
+.mb-10 {
+  margin-bottom: 10px;
 }
\ No newline at end of file
diff --git a/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.ts b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.ts
new file mode 100644
index 0000000..5fb4603
--- /dev/null
+++ b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.ts
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import {Component, Input, OnDestroy, OnInit} from "@angular/core";
+import {
+  DataProcessorInvocation, DataSinkInvocation,
+  Pipeline, PipelineElementMonitoringInfo,
+  PipelineMonitoringInfo,
+  SpDataSet, SpDataStream
+} from "../../../core-model/gen/streampipes-model";
+import {PipelineMonitoringService} from "../../../platform-services/apis/pipeline-monitoring.service";
+
+@Component({
+  selector: 'pipeline-monitoring',
+  templateUrl: './pipeline-monitoring.component.html',
+  styleUrls: ['./pipeline-monitoring.component.scss']
+})
+export class PipelineMonitoringComponent implements OnInit, OnDestroy {
+
+  @Input()
+  pipeline: Pipeline;
+
+  pipelineMonitoringInfo: PipelineMonitoringInfo;
+  pipelineMonitoringInfoAvailable: boolean = false;
+
+  allElements: (SpDataSet | SpDataStream | DataProcessorInvocation | DataSinkInvocation)[] = [];
+
+  autoRefresh: boolean = true;
+
+  pipelineElementMonitoringInfo: Map<string, PipelineElementMonitoringInfo>;
+
+  constructor(private pipelineMonitoringService: PipelineMonitoringService) {
+  }
+
+  ngOnInit(): void {
+    this.collectAllElements();
+    console.log(this.allElements);
+    this.refreshMonitoringInfo();
+  }
+
+  collectAllElements() {
+    this.allElements = this.allElements
+        .concat(this.pipeline.streams)
+        .concat(this.pipeline.sepas)
+        .concat(this.pipeline.actions);
+  }
+
+  refreshMonitoringInfo() {
+    this.pipelineMonitoringService
+        .getPipelineMonitoringInfo(this.pipeline._id)
+        .subscribe(monitoringInfo => {
+          this.pipelineElementMonitoringInfo = new Map<string, PipelineElementMonitoringInfo>();
+          this.pipelineMonitoringInfo = monitoringInfo;
+          monitoringInfo.pipelineElementMonitoringInfo.forEach(info => {
+            this.pipelineElementMonitoringInfo.set(info.pipelineElementId, info);
+          })
+          this.pipelineMonitoringInfoAvailable = true;
+          if (this.autoRefresh) {
+            setTimeout(() => {
+              this.refreshMonitoringInfo();
+            }, 5000);
+          }
+        })
+  }
+
+  selectElement(pipelineElement) {
+    console.log(pipelineElement);
+  }
+
+  ngOnDestroy(): void {
+    this.autoRefresh = false;
+  }
+
+}
\ No newline at end of file
diff --git a/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.html b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.html
new file mode 100644
index 0000000..400ca58
--- /dev/null
+++ b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.html
@@ -0,0 +1,45 @@
+<div fxFlex="100" fxLayout="row" fxLayoutAlign="start start">
+        <div fxFlex="33">
+            <div fxLayout="column" class="mb-10">
+                <sp-status-widget fxFlex="100" [label]="'Consumed Messages / Lag'"
+                                  [color]="(consumedMessagesFirstInputStream == notAvailable) ? deactivatedCardColor : cardColor"
+                                  [textColor]="(consumedMessagesFirstInputStream == notAvailable) ? deactivatedTextColor : textColor"
+                                  [bandColor]="consumedMessagesFirstStreamBandColor"
+                                  [statusValue]="consumedMessagesFirstInputStream">
+                </sp-status-widget>
+                <sp-barchart-widget *ngIf="(consumedMessagesFirstInputStream != notAvailable)"
+                                    [data]="historicFirstConsumedInputValues"
+                                    [textColor]="chartTextColor"
+                                    [backgroundColor]="chartBackgroundColor" class="mt--10">
+                </sp-barchart-widget>
+            </div>
+        </div>
+        <div fxFlex="33">
+            <div fxLayout="column" class="mb-10">
+                <sp-status-widget fxFlex="100" [label]="'Consumed Messages / Lag'"
+                                  [color]="consumedMessagesSecondInputStream === notAvailable ? deactivatedCardColor : cardColor"
+                                  [textColor]="consumedMessagesSecondInputStream === notAvailable ? deactivatedTextColor : textColor"
+                                  [bandColor]="consumedMessagesSecondStreamBandColor"
+                                  [statusValue]="consumedMessagesSecondInputStream"></sp-status-widget>
+                <sp-barchart-widget *ngIf="consumedMessagesSecondInputStream !== notAvailable"
+                                    [data]="historicSecondConsumedInputValues"
+                                    [textColor]="chartTextColor"
+                                    [backgroundColor]="chartBackgroundColor" class="mt--10">
+                </sp-barchart-widget>
+            </div>
+        </div>
+        <div fxFlex="33">
+            <div fxLayout="column" class="mb-10">
+                <sp-status-widget fxFlex="100" [label]="'Produced Messages'"
+                                  [color]="producedMessages === notAvailable ? deactivatedCardColor : cardColor"
+                                  [textColor]="producedMessages === notAvailable ? deactivatedTextColor : textColor"
+                                  [bandColor]="producedMessages === notAvailable ? deactivatedBandColor : bandColor"
+                                  [statusValue]="producedMessages"></sp-status-widget>
+                <sp-barchart-widget *ngIf="producedMessages !== notAvailable"
+                                    [data]="historicProducedOutputValues"
+                                    [textColor]="chartTextColor"
+                                    [backgroundColor]="chartBackgroundColor" class="mt--10">
+                </sp-barchart-widget>
+            </div>
+        </div>
+</div>
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.scss
similarity index 88%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.scss
index c6e991f..27392f1 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.scss
@@ -16,9 +16,10 @@
  *
  */
 
-package org.apache.streampipes.manager.monitoring.pipeline;
-
-public class PipelineExecutionStatusCollector {
-
-
+.mb-10 {
+  margin-bottom: 10px;
 }
+
+.mt--10 {
+  margin-top: -10px;
+}
\ No newline at end of file
diff --git a/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.ts b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.ts
new file mode 100644
index 0000000..5b2322d
--- /dev/null
+++ b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.ts
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import {Component, EventEmitter, Input, OnInit, Output} from "@angular/core";
+
+import {
+  DataProcessorInvocation, DataSinkInvocation,
+  Pipeline,
+  PipelineElementMonitoringInfo, SpDataSet, SpDataStream,
+} from "../../../../core-model/gen/streampipes-model";
+import {HistoricalMonitoringData} from "../../model/pipeline-details.model";
+
+@Component({
+  selector: 'pipeline-element-statistics',
+  templateUrl: './pipeline-element-statistics.component.html',
+  styleUrls: ['./pipeline-element-statistics.component.scss']
+})
+export class PipelineElementStatisticsComponent implements OnInit {
+
+  @Input()
+  pipeline: Pipeline;
+
+  @Input()
+  pipelineElement:  (SpDataSet | SpDataStream | DataProcessorInvocation | DataSinkInvocation);
+
+  _pipelineElementMonitoringInfo: PipelineElementMonitoringInfo;
+
+  currentPipelineElement: SpDataSet | SpDataStream | DataProcessorInvocation | DataSinkInvocation;
+  consumedMessagesFirstInputStream: string = "";
+  consumedMessagesSecondInputStream: string = "";
+
+  producedMessages: string | number = "";
+
+  cardColor: string = "rgb(27, 20, 100)";
+  deactivatedCardColor: string = "rgb(241,241,241)";
+
+  textColor: string = "rgb(208,208,208)";
+  deactivatedTextColor: string = "rgb(205,205,205)";
+
+  bandColor: string = "rgb(27, 20, 100)";
+  deactivatedBandColor: string = "rgb(241,241,241)";
+  okBandColor: string = "rgb(11,186,0)";
+  warningBandColor: string = "rgb(253,144,0)";
+
+  chartBackgroundColor: string = "rgb(27, 20, 100)";
+  chartTextColor: string = "rgb(208,208,208)";
+
+  consumedMessagesFirstStreamBandColor: string;
+  consumedMessagesSecondStreamBandColor: string;
+
+  notAvailable: string = "n/a";
+
+  historicFirstConsumedInputValues: HistoricalMonitoringData[] = [];
+  historicSecondConsumedInputValues: HistoricalMonitoringData[] = [];
+  historicProducedOutputValues: HistoricalMonitoringData[] = [];
+
+  consumedMessagesFirstStreamLastValue: number = -1;
+  consumedMessagesSecondStreamLastValue: number = -1;
+  producedMessageOutputLastValue: number = -1;
+
+  ngOnInit(): void {
+
+  }
+
+  updateMonitoringInfo() {
+    if (this.pipelineElementMonitoringInfo.inputTopicInfoExists) {
+      let consumedMessages = this.pipelineElementMonitoringInfo.inputTopicInfo.map(info => {
+        return {"count": (info.currentOffset - info.offsetAtPipelineStart),
+          "lag": info.latestOffset - info.currentOffset,
+          "currentOffset": info.currentOffset};
+      });
+      this.consumedMessagesFirstInputStream = consumedMessages[0].count + " / " + consumedMessages[0].lag;
+      this.consumedMessagesSecondInputStream = consumedMessages.length > 1 ? consumedMessages[1].count + " / " + consumedMessages[1].lag : this.notAvailable;
+      this.consumedMessagesFirstStreamBandColor = consumedMessages[0].lag > 10 ? this.warningBandColor : this.okBandColor;
+      this.consumedMessagesSecondStreamBandColor = (consumedMessages.length > 1 ? (consumedMessages[1].lag > 10 ? this.warningBandColor : this.okBandColor) : this.deactivatedBandColor);
+
+      this.makeHistoricData(consumedMessages[0], this.consumedMessagesFirstStreamLastValue, this.historicFirstConsumedInputValues);
+      this.consumedMessagesFirstStreamLastValue = consumedMessages[0].count;
+      this.historicFirstConsumedInputValues = [].concat(this.historicFirstConsumedInputValues);
+
+      if (consumedMessages.length > 1) {
+        this.makeHistoricData(consumedMessages[1], this.consumedMessagesSecondStreamLastValue, this.historicSecondConsumedInputValues);
+        this.consumedMessagesSecondStreamLastValue = consumedMessages[1].count;
+        this.historicSecondConsumedInputValues = [].concat(this.historicSecondConsumedInputValues);
+      }
+    } else {
+      this.consumedMessagesFirstInputStream = this.notAvailable;
+      this.consumedMessagesFirstStreamBandColor = this.deactivatedBandColor;
+      this.consumedMessagesSecondInputStream = this.notAvailable;
+      this.consumedMessagesSecondStreamBandColor = this.deactivatedBandColor;
+    }
+    if (this.pipelineElementMonitoringInfo.outputTopicInfoExists) {
+      this.producedMessages = this.pipelineElementMonitoringInfo.outputTopicInfo.latestOffset - this.pipelineElementMonitoringInfo.outputTopicInfo.offsetAtPipelineStart;
+      let producedMessage = {"count": this.producedMessages};
+      this.makeHistoricData(producedMessage, this.producedMessageOutputLastValue, this.historicProducedOutputValues);
+      this.producedMessageOutputLastValue = producedMessage.count;
+      this.historicProducedOutputValues = [].concat(this.historicProducedOutputValues);
+    } else {
+      this.producedMessages = this.notAvailable;
+    }
+  }
+
+  makeHistoricData(consumedMessage: any, lastValue: number, historicData: HistoricalMonitoringData[]) {
+    console.log(consumedMessage);
+    if (lastValue > -1) {
+      let entry: HistoricalMonitoringData = {"name": new Date().toLocaleTimeString(), value: (consumedMessage.count - lastValue)};
+      historicData.push(entry);
+    }
+    if (historicData.length > 10) {
+      historicData.shift();
+    } else {
+      for (let i = 0; i < (10 - historicData.length); i++) {
+        historicData.unshift({"name": i.toString(), "value": 0});
+      }
+    }
+  }
+
+  get pipelineElementMonitoringInfo() {
+    return this._pipelineElementMonitoringInfo;
+  }
+
+  @Input()
+  set pipelineElementMonitoringInfo(pipelineElementMonitoringInfo: PipelineElementMonitoringInfo) {
+    this._pipelineElementMonitoringInfo = pipelineElementMonitoringInfo;
+    this.updateMonitoringInfo();
+  }
+
+
+}
\ No newline at end of file
diff --git a/ui/src/app/pipeline-details/pipeline-details.component.html b/ui/src/app/pipeline-details/pipeline-details.component.html
index dbd2d97..cf7a2b2 100644
--- a/ui/src/app/pipeline-details/pipeline-details.component.html
+++ b/ui/src/app/pipeline-details/pipeline-details.component.html
@@ -22,7 +22,7 @@
             <div fxFlex="100" fxLayout="row">
                 <mat-tab-group [selectedIndex]="selectedIndex" (selectedIndexChange)="setSelectedIndex($event)">
                     <mat-tab label="Overview"></mat-tab>
-                    <mat-tab label="Statistics"></mat-tab>
+                    <mat-tab label="Monitoring"></mat-tab>
                     <mat-tab label="Errors"></mat-tab>
                     <mat-tab label="Quick Edit" [disabled]="pipelineAvailable && pipeline.running"></mat-tab>
                 </mat-tab-group>
@@ -48,6 +48,9 @@
             </div>
         </div>
     </div>
+    <div fxFlex fxLayout="column" fxLayoutAlign="start top" *ngIf="pipelineAvailable && selectedIndex == 1">
+        <pipeline-monitoring [pipeline]="pipeline"></pipeline-monitoring>
+    </div>
     <div fxFlex fxLayout="row" fxLayoutAlign="start top" *ngIf="pipelineAvailable && selectedIndex == 3">
         <div fxFlex="100" class="md-padding">
             <div fxFlex fxLayout="column">
diff --git a/ui/src/app/pipeline-details/pipeline-details.module.ts b/ui/src/app/pipeline-details/pipeline-details.module.ts
index 850a77f..478a0d4 100644
--- a/ui/src/app/pipeline-details/pipeline-details.module.ts
+++ b/ui/src/app/pipeline-details/pipeline-details.module.ts
@@ -33,6 +33,9 @@ import {PipelineElementsComponent} from "./components/elements/pipeline-elements
 import {PipelineElementsRowComponent} from "./components/elements/pipeline-elements-row.component";
 import {QuickEditComponent} from "./components/edit/quickedit.component";
 import {CoreUiModule} from "../core-ui/core-ui.module";
+import {PipelineMonitoringComponent} from "./components/monitoring/pipeline-monitoring.component";
+import {PipelineElementStatisticsComponent} from "./components/monitoring/statistics/pipeline-element-statistics.component";
+import {NgxChartsModule} from "@swimlane/ngx-charts";
 
 @NgModule({
   imports: [
@@ -44,6 +47,7 @@ import {CoreUiModule} from "../core-ui/core-ui.module";
     CustomMaterialModule,
     CommonModule,
     MatProgressSpinnerModule,
+    NgxChartsModule,
     EditorModule,
     FormsModule,
     ReactiveFormsModule
@@ -52,7 +56,9 @@ import {CoreUiModule} from "../core-ui/core-ui.module";
     PipelineActionsComponent,
     PipelineElementsComponent,
     PipelineElementsRowComponent,
+    PipelineElementStatisticsComponent,
     PipelineDetailsComponent,
+    PipelineMonitoringComponent,
     PipelineStatusComponent,
     PipelinePreviewComponent,
     QuickEditComponent
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/ui/src/app/platform-services/apis/pipeline-monitoring.service.ts
similarity index 50%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to ui/src/app/platform-services/apis/pipeline-monitoring.service.ts
index c6e991f..cef42ea 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/ui/src/app/platform-services/apis/pipeline-monitoring.service.ts
@@ -16,9 +16,27 @@
  *
  */
 
-package org.apache.streampipes.manager.monitoring.pipeline;
+import {Injectable} from "@angular/core";
+import {HttpClient} from "@angular/common/http";
+import {Observable} from "rxjs";
+import {
+  PipelineMonitoringInfo
+} from "../../core-model/gen/streampipes-model";
+import {PlatformServicesCommons} from "./commons.service";
+import {map} from "rxjs/operators";
 
-public class PipelineExecutionStatusCollector {
+@Injectable()
+export class PipelineMonitoringService {
 
+  constructor(private http: HttpClient,
+              private platformServicesCommons: PlatformServicesCommons) {
+  }
 
-}
+  getPipelineMonitoringInfo(pipelineId: string): Observable<PipelineMonitoringInfo> {
+    return this.http.get(this.platformServicesCommons.authUserBasePath()
+        + "/pipeline-monitoring/"
+        + pipelineId)
+        .pipe(map(response => PipelineMonitoringInfo.fromData(response as any)));
+  }
+
+}
\ No newline at end of file
diff --git a/ui/src/app/platform-services/platform.module.ts b/ui/src/app/platform-services/platform.module.ts
index e96fc12..4627904 100644
--- a/ui/src/app/platform-services/platform.module.ts
+++ b/ui/src/app/platform-services/platform.module.ts
@@ -22,6 +22,7 @@ import {PipelineService} from "./apis/pipeline.service";
 import {PlatformServicesCommons} from "./apis/commons.service";
 import {PipelineElementEndpointService} from "./apis/pipeline-element-endpoint.service";
 import {FilesService} from "./apis/files.service";
+import {PipelineMonitoringService} from "./apis/pipeline-monitoring.service";
 
 @NgModule({
   imports: [],
@@ -32,6 +33,7 @@ import {FilesService} from "./apis/files.service";
     PipelineElementEndpointService,
     //PipelineTemplateService,
     PipelineElementService,
+    PipelineMonitoringService,
     PipelineService
   ],
   entryComponents: []
diff --git a/ui/src/scss/sp/colors.scss b/ui/src/scss/sp/colors.scss
index eace269..97f99b7 100644
--- a/ui/src/scss/sp/colors.scss
+++ b/ui/src/scss/sp/colors.scss
@@ -58,6 +58,10 @@
     background: #FAFAFA;
 }
 
+.sp-border-left-accent-light {
+    border-left: 2px solid $sp-color-accent-light;
+}
+
 .sp-blue-border-nopadding {
     border: 3px solid $sp-color-accent-light;
 }