You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/09/24 08:03:38 UTC
[incubator-streampipes] 01/01: [STREAMPIPES-245] Add initial
version of pipeline monitoring feature
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch STREAMPIPES-245
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 6a828302501d045a053884c39f8786611afe327c
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Thu Sep 24 10:03:20 2020 +0200
[STREAMPIPES-245] Add initial version of pipeline monitoring feature
---
pom.xml | 2 +-
.../backend/StreamPipesResourceConfig.java | 1 +
.../kafka/config/ConsumerConfigFactory.java | 2 +-
.../monitoring/PipelineElementMonitoringInfo.java | 85 ++++++++
.../model/monitoring/PipelineElementTopicInfo.java | 63 ++++++
.../model/monitoring/PipelineMonitoringInfo.java | 67 ++++++
.../manager/execution/http/PipelineExecutor.java | 31 ++-
.../pipeline/PipelineExecutionStatusCollector.java | 37 ++++
.../monitoring/pipeline/TopicInfoCollector.java | 228 +++++++++++++++++++++
.../streampipes/rest/api/IPipelineMonitoring.java | 9 +-
.../streampipes/rest/impl/PipelineMonitoring.java | 42 ++++
.../app-transport-monitoring.module.ts | 4 +-
.../outgoing/outgoing-view.component.html | 12 +-
.../transport-summary.component.html | 14 +-
ui/src/app/core-model/gen/streampipes-model.ts | 71 ++++++-
ui/src/app/core-ui/core-ui.module.ts | 139 +++++++------
.../barchart/barchart-widget.component.html} | 20 +-
.../widget/barchart/barchart-widget.component.scss | 9 +-
.../widget/barchart/barchart-widget.component.ts | 34 ++-
.../widget/status/status-widget.component.css} | 0
.../widget/status/status-widget.component.html} | 7 +-
.../widget/status/status-widget.component.ts} | 13 +-
.../elements/pipeline-elements-row.component.html | 4 +-
.../elements/pipeline-elements-row.component.ts | 3 +
.../components/model/pipeline-details.model.ts | 10 +-
.../monitoring/pipeline-monitoring.component.html | 41 ++++
.../monitoring/pipeline-monitoring.component.scss} | 30 +--
.../monitoring/pipeline-monitoring.component.ts | 89 ++++++++
.../pipeline-element-statistics.component.html | 45 ++++
.../pipeline-element-statistics.component.scss | 11 +-
.../pipeline-element-statistics.component.ts | 144 +++++++++++++
.../pipeline-details.component.html | 5 +-
.../pipeline-details/pipeline-details.module.ts | 6 +
.../apis/pipeline-monitoring.service.ts | 24 ++-
ui/src/app/platform-services/platform.module.ts | 2 +
ui/src/scss/sp/colors.scss | 4 +
36 files changed, 1152 insertions(+), 156 deletions(-)
diff --git a/pom.xml b/pom.xml
index 5853729..f40d05c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,7 +78,7 @@
<jgrapht.version>1.3.1</jgrapht.version>
<json-path.version>3.1.0</json-path.version>
<jsr305.version>3.0.2</jsr305.version>
- <kafka.version>2.2.0</kafka.version>
+ <kafka.version>2.6.0</kafka.version>
<lightcouch.version>0.2.0</lightcouch.version>
<log4j.version>2.12.1</log4j.version>
<logback-classic.version>1.2.3</logback-classic.version>
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
index 9ed13f6..1b8998d 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
@@ -70,6 +70,7 @@ public class StreamPipesResourceConfig extends ResourceConfig {
register(PipelineElementImportNoUser.class);
register(PipelineElementImport.class);
register(PipelineElementRuntimeInfo.class);
+ register(PipelineMonitoring.class);
register(PipelineNoUserResource.class);
register(PipelineTemplate.class);
register(PipelineWithUserResource.class);
diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
index 1c7cbf4..72faf42 100644
--- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
+++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/config/ConsumerConfigFactory.java
@@ -26,7 +26,7 @@ import java.util.UUID;
public class ConsumerConfigFactory extends AbstractConfigFactory {
private static final String ENABLE_AUTO_COMMIT_CONFIG_DEFAULT = "true";
- private static final String AUTO_COMMIT_INTERVAL_MS_CONFIG_DEFAULT = "10000";
+ private static final String AUTO_COMMIT_INTERVAL_MS_CONFIG_DEFAULT = "5000";
private static final String SESSION_TIMEOUT_MS_CONFIG_DEFAULT = "30000";
private static final Integer FETCH_MAX_BYTES_CONFIG_DEFAULT = 5000012;
private static final String KEY_DESERIALIZER_CLASS_CONFIG_DEFAULT = "org.apache.kafka.common" +
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementMonitoringInfo.java b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementMonitoringInfo.java
new file mode 100644
index 0000000..4812ba2
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementMonitoringInfo.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.model.monitoring;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PipelineElementMonitoringInfo {
+
+ private String pipelineElementId;
+ private String pipelineElementName;
+
+ private boolean inputTopicInfoExists;
+ private boolean outputTopicInfoExists;
+
+ private List<PipelineElementTopicInfo> inputTopicInfo;
+ private PipelineElementTopicInfo outputTopicInfo;
+
+ public PipelineElementMonitoringInfo() {
+ this.inputTopicInfo = new ArrayList<>();
+ }
+
+ public String getPipelineElementId() {
+ return pipelineElementId;
+ }
+
+ public void setPipelineElementId(String pipelineElementId) {
+ this.pipelineElementId = pipelineElementId;
+ }
+
+ public String getPipelineElementName() {
+ return pipelineElementName;
+ }
+
+ public void setPipelineElementName(String pipelineElementName) {
+ this.pipelineElementName = pipelineElementName;
+ }
+
+ public List<PipelineElementTopicInfo> getInputTopicInfo() {
+ return inputTopicInfo;
+ }
+
+ public void setInputTopicInfo(List<PipelineElementTopicInfo> inputTopicInfo) {
+ this.inputTopicInfo = inputTopicInfo;
+ }
+
+ public PipelineElementTopicInfo getOutputTopicInfo() {
+ return outputTopicInfo;
+ }
+
+ public void setOutputTopicInfo(PipelineElementTopicInfo outputTopicInfo) {
+ this.outputTopicInfo = outputTopicInfo;
+ }
+
+ public boolean isInputTopicInfoExists() {
+ return inputTopicInfoExists;
+ }
+
+ public void setInputTopicInfoExists(boolean inputTopicInfoExists) {
+ this.inputTopicInfoExists = inputTopicInfoExists;
+ }
+
+ public boolean isOutputTopicInfoExists() {
+ return outputTopicInfoExists;
+ }
+
+ public void setOutputTopicInfoExists(boolean outputTopicInfoExists) {
+ this.outputTopicInfoExists = outputTopicInfoExists;
+ }
+}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementTopicInfo.java b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementTopicInfo.java
new file mode 100644
index 0000000..5270668
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineElementTopicInfo.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.model.monitoring;
+
+public class PipelineElementTopicInfo {
+
+ private String topicName;
+
+ private Long currentOffset;
+ private Long latestOffset;
+ private Long offsetAtPipelineStart;
+
+ public PipelineElementTopicInfo() {
+
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+ public Long getCurrentOffset() {
+ return currentOffset;
+ }
+
+ public void setCurrentOffset(Long currentOffset) {
+ this.currentOffset = currentOffset;
+ }
+
+ public Long getLatestOffset() {
+ return latestOffset;
+ }
+
+ public void setLatestOffset(Long latestOffset) {
+ this.latestOffset = latestOffset;
+ }
+
+ public Long getOffsetAtPipelineStart() {
+ return offsetAtPipelineStart;
+ }
+
+ public void setOffsetAtPipelineStart(Long offsetAtPipelineStart) {
+ this.offsetAtPipelineStart = offsetAtPipelineStart;
+ }
+}
\ No newline at end of file
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineMonitoringInfo.java b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineMonitoringInfo.java
new file mode 100644
index 0000000..43b6a07
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/monitoring/PipelineMonitoringInfo.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.model.monitoring;
+
+import org.apache.streampipes.model.shared.annotation.TsModel;
+
+import java.util.List;
+
+@TsModel
+public class PipelineMonitoringInfo {
+
+ private String pipelineId;
+ private long createdAt;
+ private long startedAt;
+
+ private List<PipelineElementMonitoringInfo> pipelineElementMonitoringInfo;
+
+ public PipelineMonitoringInfo() {
+ }
+
+ public String getPipelineId() {
+ return pipelineId;
+ }
+
+ public void setPipelineId(String pipelineId) {
+ this.pipelineId = pipelineId;
+ }
+
+ public long getCreatedAt() {
+ return createdAt;
+ }
+
+ public void setCreatedAt(long createdAt) {
+ this.createdAt = createdAt;
+ }
+
+ public List<PipelineElementMonitoringInfo> getPipelineElementMonitoringInfo() {
+ return pipelineElementMonitoringInfo;
+ }
+
+ public void setPipelineElementMonitoringInfo(List<PipelineElementMonitoringInfo> pipelineElementMonitoringInfo) {
+ this.pipelineElementMonitoringInfo = pipelineElementMonitoringInfo;
+ }
+
+ public long getStartedAt() {
+ return startedAt;
+ }
+
+ public void setStartedAt(long startedAt) {
+ this.startedAt = startedAt;
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
index ce16327..d2f66ab 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/http/PipelineExecutor.java
@@ -18,28 +18,26 @@
package org.apache.streampipes.manager.execution.http;
-import org.lightcouch.DocumentConflictException;
import org.apache.streampipes.manager.execution.status.PipelineStatusManager;
import org.apache.streampipes.manager.execution.status.SepMonitoringManager;
import org.apache.streampipes.manager.util.TemporaryGraphStorage;
import org.apache.streampipes.model.SpDataSet;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.pipeline.Pipeline;
-import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
-import org.apache.streampipes.model.message.PipelineStatusMessage;
-import org.apache.streampipes.model.message.PipelineStatusMessageType;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.message.PipelineStatusMessage;
+import org.apache.streampipes.model.message.PipelineStatusMessageType;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
import org.apache.streampipes.model.staticproperty.SecretStaticProperty;
import org.apache.streampipes.storage.api.IPipelineStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.apache.streampipes.user.management.encryption.CredentialsManager;
+import org.lightcouch.DocumentConflictException;
import java.security.GeneralSecurityException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
+import java.util.*;
import java.util.stream.Collectors;
public class PipelineExecutor {
@@ -59,8 +57,12 @@ public class PipelineExecutor {
public PipelineOperationStatus startPipeline() {
+ pipeline.getSepas().forEach(this::updateGroupIds);
+ pipeline.getActions().forEach(this::updateGroupIds);
+
List<DataProcessorInvocation> sepas = pipeline.getSepas();
List<DataSinkInvocation> secs = pipeline.getActions();
+
List<SpDataSet> dataSets = pipeline.getStreams().stream().filter(s -> s instanceof SpDataSet).map(s -> new
SpDataSet((SpDataSet) s)).collect(Collectors.toList());
@@ -74,7 +76,7 @@ public class PipelineExecutor {
List<InvocableStreamPipesEntity> decryptedGraphs = decryptSecrets(graphs);
- graphs.forEach(g -> g.setStreamRequirements(Arrays.asList()));
+ graphs.forEach(g -> g.setStreamRequirements(Collections.emptyList()));
PipelineOperationStatus status = new GraphSubmitter(pipeline.getPipelineId(),
pipeline.getName(), decryptedGraphs, dataSets)
@@ -97,6 +99,15 @@ public class PipelineExecutor {
return status;
}
+ private void updateGroupIds(InvocableStreamPipesEntity entity) {
+ entity.getInputStreams()
+ .stream()
+ .filter(is -> is.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol)
+ .map(is -> is.getEventGrounding().getTransportProtocol())
+ .map(KafkaTransportProtocol.class::cast)
+ .forEach(tp -> tp.setGroupId(UUID.randomUUID().toString()));
+ }
+
private List<InvocableStreamPipesEntity> decryptSecrets(List<InvocableStreamPipesEntity> graphs) {
List<InvocableStreamPipesEntity> decryptedGraphs = new ArrayList<>();
graphs.stream().map(g -> {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
index c6e991f..6a8ad30 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
@@ -18,7 +18,44 @@
package org.apache.streampipes.manager.monitoring.pipeline;
+import org.apache.streampipes.model.monitoring.PipelineElementMonitoringInfo;
+import org.apache.streampipes.model.monitoring.PipelineMonitoringInfo;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import java.util.List;
+
public class PipelineExecutionStatusCollector {
+ private String pipelineId;
+
+ public PipelineExecutionStatusCollector(String pipelineId) {
+ this.pipelineId = pipelineId;
+ }
+
+ public PipelineMonitoringInfo makePipelineMonitoringInfo() {
+ Pipeline pipeline = getPipeline();
+
+ PipelineMonitoringInfo monitoringInfo = new PipelineMonitoringInfo();
+ monitoringInfo.setCreatedAt(pipeline.getCreatedAt());
+ monitoringInfo.setStartedAt(pipeline.getStartedAt());
+ monitoringInfo.setPipelineId(pipelineId);
+
+ monitoringInfo.setPipelineElementMonitoringInfo(makePipelineElementMonitoringInfo(pipeline));
+
+ return monitoringInfo;
+ }
+
+ private List<PipelineElementMonitoringInfo> makePipelineElementMonitoringInfo(Pipeline pipeline) {
+ return new TopicInfoCollector(pipeline).makeMonitoringInfo();
+
+ }
+ private Pipeline getPipeline() {
+ return StorageDispatcher
+ .INSTANCE
+ .getNoSqlStore()
+ .getPipelineStorageAPI()
+ .getPipeline(this.pipelineId);
+ }
}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.java
new file mode 100644
index 0000000..0b035a4
--- /dev/null
+++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.manager.monitoring.pipeline;
+
+import org.apache.kafka.clients.admin.*;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.streampipes.config.backend.BackendConfig;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
+import org.apache.streampipes.model.grounding.TransportProtocol;
+import org.apache.streampipes.model.monitoring.PipelineElementMonitoringInfo;
+import org.apache.streampipes.model.monitoring.PipelineElementTopicInfo;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.sdk.helpers.Tuple2;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+public class TopicInfoCollector {
+
+
+ private Pipeline pipeline;
+ private AdminClient kafkaAdminClient;
+
+ private Map<String, Long> latestTopicOffsets;
+ private Map<String, Long> topicOffsetAtPipelineStart;
+ private Map<String, Long> currentConsumerGroupOffsets;
+
+ private List<PipelineElementMonitoringInfo> monitoringInfo;
+
+ public TopicInfoCollector(Pipeline pipeline) {
+ this.pipeline = pipeline;
+ this.kafkaAdminClient = kafkaAdminClient();
+ this.latestTopicOffsets = new HashMap<>();
+ this.topicOffsetAtPipelineStart = new HashMap<>();
+ this.currentConsumerGroupOffsets = new HashMap<>();
+ this.monitoringInfo = new ArrayList<>();
+ }
+
+ private void makeTopicInfo() {
+ long currentTime = Instant.now().minus (1, ChronoUnit.SECONDS).toEpochMilli();
+ List<TransportProtocol> affectedProtocols = new ArrayList<>();
+
+ pipeline.getSepas().forEach(processor -> affectedProtocols.addAll(extractProtocols(processor)));
+ pipeline.getActions().forEach(sink -> affectedProtocols.addAll(extractProtocols(sink)));
+
+ affectedProtocols.forEach(protocol -> {
+ if (protocol instanceof KafkaTransportProtocol) {
+ try {
+ Tuple2<String, Long> latestTopicOffsets = makeTopicOffsetInfo((KafkaTransportProtocol) protocol, OffsetSpec.forTimestamp(currentTime));
+ Tuple2<String, Long> topicOffsetAtPipelineStart = makeTopicOffsetInfo((KafkaTransportProtocol) protocol, OffsetSpec.forTimestamp(pipeline.getStartedAt()));
+ Tuple2<String, Long> currentConsumerGroupOffsets = makeTopicInfo((KafkaTransportProtocol) protocol);
+
+ this.latestTopicOffsets.put(latestTopicOffsets.a, latestTopicOffsets.b);
+ this.topicOffsetAtPipelineStart.put(topicOffsetAtPipelineStart.a, topicOffsetAtPipelineStart.b);
+ this.currentConsumerGroupOffsets.put(currentConsumerGroupOffsets.a, currentConsumerGroupOffsets.b);
+
+ } catch (ExecutionException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ public List<PipelineElementMonitoringInfo> makeMonitoringInfo() {
+ this.makeTopicInfo();
+ // TODO filter for KafkaGrounding
+ this.pipeline.getStreams().forEach(stream -> this.monitoringInfo.add(makeStreamMonitoringInfo(stream)));
+ this.pipeline.getSepas().forEach(processor -> this.monitoringInfo.add(makeProcessorMonitoringInfo(processor)));
+ this.pipeline.getActions().forEach(sink -> this.monitoringInfo.add(makeSinkMonitoringInfo(sink)));
+
+ this.kafkaAdminClient.close();
+ return this.monitoringInfo;
+ }
+
+ private PipelineElementMonitoringInfo makeStreamMonitoringInfo(SpDataStream stream) {
+ PipelineElementMonitoringInfo info = prepare(stream.getElementId(), stream.getName(), false, true);
+ KafkaTransportProtocol protocol = (KafkaTransportProtocol) stream.getEventGrounding().getTransportProtocol();
+ info.setOutputTopicInfo(makeOutputTopicInfoForPipelineElement(protocol));
+
+ return info;
+ }
+
+ private PipelineElementMonitoringInfo makeProcessorMonitoringInfo(DataProcessorInvocation processor) {
+ PipelineElementMonitoringInfo info = prepare(processor.getElementId(), processor.getName(), true, true);
+ KafkaTransportProtocol outputProtocol = (KafkaTransportProtocol) processor.getOutputStream().getEventGrounding().getTransportProtocol();
+ PipelineElementTopicInfo outputTopicInfo = makeOutputTopicInfoForPipelineElement(outputProtocol);
+ List<PipelineElementTopicInfo> inputTopicInfo = makeInputTopicInfoForPipelineElement(processor.getInputStreams());
+
+ info.setOutputTopicInfo(outputTopicInfo);
+ info.setInputTopicInfo(inputTopicInfo);
+
+ return info;
+ }
+
+ private PipelineElementMonitoringInfo makeSinkMonitoringInfo(DataSinkInvocation sink) {
+ PipelineElementMonitoringInfo info = prepare(sink.getElementId(), sink.getName(), true, false);
+ info.setInputTopicInfo(makeInputTopicInfoForPipelineElement(sink.getInputStreams()));
+ return info;
+ }
+
+ private List<PipelineElementTopicInfo> makeInputTopicInfoForPipelineElement(List<SpDataStream> inputStreams) {
+ List<PipelineElementTopicInfo> topicInfos = new ArrayList<>();
+ inputStreams.stream().map(is -> is.getEventGrounding().getTransportProtocol()).forEach(protocol -> {
+ PipelineElementTopicInfo topicInfo = new PipelineElementTopicInfo();
+ String topic = getTopic((KafkaTransportProtocol) protocol);
+ topicInfo.setTopicName(topic);
+ topicInfo.setLatestOffset(latestTopicOffsets.get(topic));
+ topicInfo.setOffsetAtPipelineStart(topicOffsetAtPipelineStart.get(topic));
+ topicInfo.setCurrentOffset(currentConsumerGroupOffsets.get(((KafkaTransportProtocol) protocol).getGroupId()));
+ topicInfos.add(topicInfo);
+ });
+
+ return topicInfos;
+ }
+
+ private PipelineElementTopicInfo makeOutputTopicInfoForPipelineElement(KafkaTransportProtocol protocol) {
+ PipelineElementTopicInfo topicInfo = new PipelineElementTopicInfo();
+ String topic = getTopic(protocol);
+ topicInfo.setTopicName(topic);
+ topicInfo.setOffsetAtPipelineStart(this.topicOffsetAtPipelineStart.get(topic));
+ topicInfo.setLatestOffset(this.latestTopicOffsets.get(topic));
+
+ return topicInfo;
+ }
+
+ private String getTopic(KafkaTransportProtocol protocol) {
+ return protocol.getTopicDefinition().getActualTopicName();
+ }
+
+ private PipelineElementMonitoringInfo prepare(String elementId, String name, boolean inputTopics, boolean outputTopics) {
+ PipelineElementMonitoringInfo info = new PipelineElementMonitoringInfo();
+ info.setPipelineElementName(name);
+ info.setPipelineElementId(elementId);
+ info.setInputTopicInfoExists(inputTopics);
+ info.setOutputTopicInfoExists(outputTopics);
+ return info;
+ }
+
+ private AdminClient kafkaAdminClient() {
+ return KafkaAdminClient.create(makeProperties());
+ }
+
+ private Properties makeProperties() {
+ Properties props = new Properties();
+
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
+ props.put(AdminClientConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+
+ return props;
+ }
+
+ private String getBrokerUrl() {
+ String env = System.getenv("SP_DEBUG");
+ System.out.println(System.getenv("SP_DEBUG"));
+ if ("true".equals(env.replaceAll(" ", ""))) {
+ return "localhost:9094";
+ } else {
+ return BackendConfig.INSTANCE.getKafkaUrl();
+ }
+ }
+
+ private Tuple2<String, Long> makeTopicOffsetInfo(KafkaTransportProtocol protocol, OffsetSpec offsetSpec) throws ExecutionException, InterruptedException {
+ Map<TopicPartition, OffsetAndMetadata> partitions = kafkaAdminClient.listConsumerGroupOffsets(protocol.getGroupId()).partitionsToOffsetAndMetadata().get();
+ Map<TopicPartition, OffsetSpec> desiredOffsets = new HashMap<>();
+ partitions.forEach((key, value) -> desiredOffsets.put(key, offsetSpec));
+ ListOffsetsResult offsetsResult = kafkaAdminClient.listOffsets(desiredOffsets);
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> resultInfo = offsetsResult.all().get();
+ Long offset = resultInfo
+ .values()
+ .stream()
+ .map(ListOffsetsResult.ListOffsetsResultInfo::offset)
+ .reduce(0L, Long::sum);
+
+ return new Tuple2<>(protocol.getTopicDefinition().getActualTopicName(), offset);
+ }
+
+ private Tuple2<String, Long> makeTopicInfo(KafkaTransportProtocol protocol) throws ExecutionException, InterruptedException {
+ Long offset = kafkaAdminClient.listConsumerGroupOffsets(protocol.getGroupId())
+ .partitionsToOffsetAndMetadata()
+ .get()
+ .values()
+ .stream()
+ .map(OffsetAndMetadata::offset)
+ .reduce(0L, Long::sum);
+
+ return new Tuple2<>(protocol.getGroupId(), offset);
+ }
+
+ public List<TransportProtocol> extractProtocols(InvocableStreamPipesEntity pipelineElement) {
+ return pipelineElement
+ .getInputStreams()
+ .stream()
+ .map(stream -> stream.getEventGrounding().getTransportProtocol())
+ .collect(Collectors.toList());
+ }
+
+ public static void main(String[] args) {
+ List<Pipeline> pipelines = StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().getAllPipelines();
+ Pipeline testPipeline = pipelines.get(0);
+
+ List<PipelineElementMonitoringInfo> monitoringInfo = new TopicInfoCollector(testPipeline).makeMonitoringInfo();
+ System.out.println(monitoringInfo.size());
+ }
+}
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipelineMonitoring.java
similarity index 82%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipelineMonitoring.java
index c6e991f..eace98d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/api/IPipelineMonitoring.java
@@ -15,10 +15,11 @@
* limitations under the License.
*
*/
+package org.apache.streampipes.rest.api;
-package org.apache.streampipes.manager.monitoring.pipeline;
-
-public class PipelineExecutionStatusCollector {
-
+import javax.ws.rs.core.Response;
+public interface IPipelineMonitoring {
+
+ Response getPipelineMonitoringInfo(String pipelineId);
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineMonitoring.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineMonitoring.java
new file mode 100644
index 0000000..83f8db6
--- /dev/null
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineMonitoring.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.rest.impl;
+
+import org.apache.streampipes.manager.monitoring.pipeline.PipelineExecutionStatusCollector;
+import org.apache.streampipes.rest.api.IPipelineMonitoring;
+import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+@Path("/v2/users/{username}/pipeline-monitoring")
+public class PipelineMonitoring extends AbstractRestInterface implements IPipelineMonitoring {
+
+ @JacksonSerialized
+ @Path("{pipelineId}")
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Override
+ public Response getPipelineMonitoringInfo(@PathParam("pipelineId") String pipelineId) {
+ return ok(new PipelineExecutionStatusCollector(pipelineId).makePipelineMonitoringInfo());
+ }
+}
diff --git a/ui/src/app/app-transport-monitoring/app-transport-monitoring.module.ts b/ui/src/app/app-transport-monitoring/app-transport-monitoring.module.ts
index 143336a..05e43fa 100644
--- a/ui/src/app/app-transport-monitoring/app-transport-monitoring.module.ts
+++ b/ui/src/app/app-transport-monitoring/app-transport-monitoring.module.ts
@@ -34,16 +34,17 @@ import {DashboardItemComponent} from "./components/dashboard-item/dashboard-item
import {DashboardImageComponent} from "./components/dashboard-image/dashboard-image.component";
import {TransportSelectionComponent} from "./components/transport-selection/transport-selection.component";
import {AppTransportMonitoringRestService} from "./services/app-transport-monitoring-rest.service";
-import {DashboardStatusFilledComponent} from "./components/dashboard-status-filled/dashboard-status-filled.component";
import {TransportSummaryComponent} from "./components/transport-summary/transport-summary.component";
import {SlideshowModule} from "ng-simple-slideshow";
import {TransportActivityGraphComponent} from "./components/transport-activity-graph/transport-activity-graph.component";
import {TimestampConverterService} from "./services/timestamp-converter.service";
import {NgxChartsModule} from '@swimlane/ngx-charts';
+import {CoreUiModule} from "../core-ui/core-ui.module";
@NgModule({
imports: [
CommonModule,
+ CoreUiModule,
FlexLayoutModule,
CustomMaterialModule,
MatGridListModule,
@@ -62,7 +63,6 @@ import {NgxChartsModule} from '@swimlane/ngx-charts';
DashboardItemComponent,
DashboardStatusComponent,
TransportSelectionComponent,
- DashboardStatusFilledComponent,
TransportSummaryComponent,
TransportActivityGraphComponent
],
diff --git a/ui/src/app/app-transport-monitoring/components/outgoing/outgoing-view.component.html b/ui/src/app/app-transport-monitoring/components/outgoing/outgoing-view.component.html
index f8a681a..78276d4 100644
--- a/ui/src/app/app-transport-monitoring/components/outgoing/outgoing-view.component.html
+++ b/ui/src/app/app-transport-monitoring/components/outgoing/outgoing-view.component.html
@@ -24,16 +24,16 @@
</div>
<div fxFlex="33" fxLayout="column">
<div fxFill fxFlex="100" fxLayout="row" fxLayoutAlign="start start">
- <dashboard-status-filled fxFlex="100" [label]="'Total Boxes Detected'"
- [statusValue]="totalBoxes"></dashboard-status-filled>
+ <sp-status-widget fxFlex="100" [label]="'Total Boxes Detected'"
+ [statusValue]="totalBoxes"></sp-status-widget>
</div>
<div fxFlex="100" fxLayout="row" fxLayoutAlign="start start">
- <dashboard-status-filled fxFlex="100" [label]="'Transparent Boxes Detected'"
- [statusValue]="cardboardBoxes"></dashboard-status-filled>
+ <sp-status-widget fxFlex="100" [label]="'Transparent Boxes Detected'"
+ [statusValue]="cardboardBoxes"></sp-status-widget>
</div>
<div fxFlex="100" fxLayout="row" fxLayoutAlign="start start">
- <dashboard-status-filled fxFlex="100" [label]="'Cardboard Boxes Detected'"
- [statusValue]="transparentBoxes"></dashboard-status-filled>
+ <sp-status-widget fxFlex="100" [label]="'Cardboard Boxes Detected'"
+ [statusValue]="transparentBoxes"></sp-status-widget>
</div>
</div>
</div>
diff --git a/ui/src/app/app-transport-monitoring/components/transport-summary/transport-summary.component.html b/ui/src/app/app-transport-monitoring/components/transport-summary/transport-summary.component.html
index 31c5f72..1bb0ccc 100644
--- a/ui/src/app/app-transport-monitoring/components/transport-summary/transport-summary.component.html
+++ b/ui/src/app/app-transport-monitoring/components/transport-summary/transport-summary.component.html
@@ -17,13 +17,13 @@
-->
<div flex="100" fxLayout="row">
- <dashboard-status-filled fxFlex="25" [label]="'Shipped'" [statusValue]="shippedTime"
- [color]="'rgb(156, 156, 156)'"></dashboard-status-filled>
- <dashboard-status-filled fxFlex="25" [label]="'Delivered'" [statusValue]="deliveredTime"
- [color]="'rgb(156, 156, 156)'"></dashboard-status-filled>
- <dashboard-status-filled fxFlex="25" [label]="'Took'" [statusValue]="tookTime"
- [color]="'rgb(156, 156, 156)'"></dashboard-status-filled>
- <dashboard-status-filled fxFlex="25" [label]="'Status'" [statusValue]="errorCode" [color]="statusColor"></dashboard-status-filled>
+ <sp-status-widget fxFlex="25" [label]="'Shipped'" [statusValue]="shippedTime"
+ [color]="'rgb(156, 156, 156)'"></sp-status-widget>
+ <sp-status-widget fxFlex="25" [label]="'Delivered'" [statusValue]="deliveredTime"
+ [color]="'rgb(156, 156, 156)'"></sp-status-widget>
+ <sp-status-widget fxFlex="25" [label]="'Took'" [statusValue]="tookTime"
+ [color]="'rgb(156, 156, 156)'"></sp-status-widget>
+ <sp-status-widget fxFlex="25" [label]="'Status'" [statusValue]="errorCode" [color]="statusColor"></sp-status-widget>
</div>
<div *ngFor="let statusMessage of statusMessages">
<h4>{{statusMessage}}</h4>
diff --git a/ui/src/app/core-model/gen/streampipes-model.ts b/ui/src/app/core-model/gen/streampipes-model.ts
index 67be548..9f4abe5 100644
--- a/ui/src/app/core-model/gen/streampipes-model.ts
+++ b/ui/src/app/core-model/gen/streampipes-model.ts
@@ -19,7 +19,7 @@
/* tslint:disable */
/* eslint-disable */
// @ts-nocheck
-// Generated using typescript-generator version 2.24.612 on 2020-09-15 08:51:13.
+// Generated using typescript-generator version 2.24.612 on 2020-09-20 21:03:26.
export class AbstractStreamPipesEntity {
"@class": "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStreamDescription" | "org.apache.streampipes.model.connect.adapter.G [...]
@@ -151,8 +151,8 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
instance.includedLocales = __getCopyArrayFn(__identity<string>())(data.includedLocales);
instance.applicationLinks = __getCopyArrayFn(ApplicationLink.fromData)(data.applicationLinks);
instance.connectedTo = __getCopyArrayFn(__identity<string>())(data.connectedTo);
- instance.uri = data.uri;
instance.dom = data.dom;
+ instance.uri = data.uri;
return instance;
}
}
@@ -1563,9 +1563,9 @@ export class GenericAdapterSetDescription extends AdapterSetDescription implemen
}
const instance = target || new GenericAdapterSetDescription();
super.fromData(data, instance);
- instance.eventSchema = EventSchema.fromData(data.eventSchema);
instance.formatDescription = FormatDescription.fromData(data.formatDescription);
instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
+ instance.eventSchema = EventSchema.fromData(data.eventSchema);
return instance;
}
}
@@ -1582,9 +1582,9 @@ export class GenericAdapterStreamDescription extends AdapterStreamDescription im
}
const instance = target || new GenericAdapterStreamDescription();
super.fromData(data, instance);
- instance.eventSchema = EventSchema.fromData(data.eventSchema);
instance.formatDescription = FormatDescription.fromData(data.formatDescription);
instance.protocolDescription = ProtocolDescription.fromData(data.protocolDescription);
+ instance.eventSchema = EventSchema.fromData(data.eventSchema);
return instance;
}
}
@@ -1997,6 +1997,29 @@ export class PipelineCategory {
}
}
+export class PipelineElementMonitoringInfo {
+ inputTopicInfo: PipelineElementTopicInfo[];
+ inputTopicInfoExists: boolean;
+ outputTopicInfo: PipelineElementTopicInfo;
+ outputTopicInfoExists: boolean;
+ pipelineElementId: string;
+ pipelineElementName: string;
+
+ static fromData(data: PipelineElementMonitoringInfo, target?: PipelineElementMonitoringInfo): PipelineElementMonitoringInfo {
+ if (!data) {
+ return data;
+ }
+ const instance = target || new PipelineElementMonitoringInfo();
+ instance.pipelineElementId = data.pipelineElementId;
+ instance.pipelineElementName = data.pipelineElementName;
+ instance.inputTopicInfoExists = data.inputTopicInfoExists;
+ instance.outputTopicInfoExists = data.outputTopicInfoExists;
+ instance.inputTopicInfo = __getCopyArrayFn(PipelineElementTopicInfo.fromData)(data.inputTopicInfo);
+ instance.outputTopicInfo = PipelineElementTopicInfo.fromData(data.outputTopicInfo);
+ return instance;
+ }
+}
+
export class PipelineElementRecommendation {
count: number;
description: string;
@@ -2054,6 +2077,25 @@ export class PipelineElementStatus {
}
}
+export class PipelineElementTopicInfo {
+ currentOffset: number;
+ latestOffset: number;
+ offsetAtPipelineStart: number;
+ topicName: string;
+
+ static fromData(data: PipelineElementTopicInfo, target?: PipelineElementTopicInfo): PipelineElementTopicInfo {
+ if (!data) {
+ return data;
+ }
+ const instance = target || new PipelineElementTopicInfo();
+ instance.topicName = data.topicName;
+ instance.currentOffset = data.currentOffset;
+ instance.latestOffset = data.latestOffset;
+ instance.offsetAtPipelineStart = data.offsetAtPipelineStart;
+ return instance;
+ }
+}
+
export class PipelineModification {
domId: string;
elementId: string;
@@ -2089,6 +2131,25 @@ export class PipelineModificationMessage extends Message {
}
}
+export class PipelineMonitoringInfo {
+ createdAt: number;
+ pipelineElementMonitoringInfo: PipelineElementMonitoringInfo[];
+ pipelineId: string;
+ startedAt: number;
+
+ static fromData(data: PipelineMonitoringInfo, target?: PipelineMonitoringInfo): PipelineMonitoringInfo {
+ if (!data) {
+ return data;
+ }
+ const instance = target || new PipelineMonitoringInfo();
+ instance.pipelineId = data.pipelineId;
+ instance.createdAt = data.createdAt;
+ instance.startedAt = data.startedAt;
+ instance.pipelineElementMonitoringInfo = __getCopyArrayFn(PipelineElementMonitoringInfo.fromData)(data.pipelineElementMonitoringInfo);
+ return instance;
+ }
+}
+
export class PipelineOperationStatus {
elementStatus: PipelineElementStatus[];
pipelineId: string;
@@ -2463,8 +2524,8 @@ export class SpDataSet extends SpDataStream {
instance.supportedGrounding = EventGrounding.fromData(data.supportedGrounding);
instance.datasetInvocationId = data.datasetInvocationId;
instance.correspondingPipeline = data.correspondingPipeline;
- instance.actualTopicName = data.actualTopicName;
instance.brokerHostname = data.brokerHostname;
+ instance.actualTopicName = data.actualTopicName;
return instance;
}
}
diff --git a/ui/src/app/core-ui/core-ui.module.ts b/ui/src/app/core-ui/core-ui.module.ts
index ae22d1e..83232bf 100644
--- a/ui/src/app/core-ui/core-ui.module.ts
+++ b/ui/src/app/core-ui/core-ui.module.ts
@@ -73,73 +73,78 @@ import {ColorPickerModule} from "ngx-color-picker";
import {QuillModule} from "ngx-quill";
import {CodemirrorModule} from "@ctrl/ngx-codemirror";
import {MatAutocompleteModule} from "@angular/material/autocomplete";
+import {StatusWidgetComponent} from "./widget/status/status-widget.component";
+import {NgxChartsModule} from "@swimlane/ngx-charts";
+import {BarchartWidgetComponent} from "./widget/barchart/barchart-widget.component";
@NgModule({
- imports: [
- CommonModule,
- ColorPickerModule,
- FlexLayoutModule,
- CodemirrorModule,
- CustomMaterialModule,
- ReactiveFormsModule,
- FormsModule,
- CdkTableModule,
- MatAutocompleteModule,
- MatSnackBarModule,
- MatProgressSpinnerModule,
- MatDatepickerModule,
- MatNativeDateModule,
- PlotlyViaWindowModule,
- MatSliderModule,
- MatChipsModule,
- PortalModule,
- OverlayModule,
- QuillModule.forRoot()
- ],
- declarations: [
- ConfirmDialogComponent,
- DisplayRecommendedPipe,
- ImageComponent,
- ImageContainerComponent,
- ImageLabelingComponent,
- ImageLabelsComponent,
- ImageBarComponent,
- ImageAnnotationsComponent,
- ImageCategorizeComponent,
- ImageViewerComponent,
- StandardDialogComponent,
- PanelDialogComponent,
- StaticAnyInput,
- StaticPropertyComponent,
- StaticFreeInputComponent,
- StaticSecretInputComponent,
- StaticFileInputComponent,
- StaticMappingNaryComponent,
- StaticMappingUnaryComponent,
- StaticGroupComponent,
- StaticAlternativesComponent,
- StaticCollectionComponent,
- StaticColorPickerComponent,
- StaticCodeInputComponent,
- StaticOneOfInputComponent,
- StaticRuntimeResolvableAnyInputComponent,
- StaticRuntimeResolvableOneOfInputComponent,
- ],
- providers: [
- MatDatepickerModule,
- ColorService,
- DisplayRecommendedPipe,
- ReactLabelingService,
- PolygonLabelingService,
- BrushLabelingService,
- CocoFormatService,
- LabelingModeService,
- DialogService,
- RuntimeResolvableService,
- StaticFileRestService,
- ],
- entryComponents: [
- ],
+ imports: [
+ CommonModule,
+ ColorPickerModule,
+ FlexLayoutModule,
+ CodemirrorModule,
+ CustomMaterialModule,
+ ReactiveFormsModule,
+ FormsModule,
+ CdkTableModule,
+ MatAutocompleteModule,
+ MatSnackBarModule,
+ MatProgressSpinnerModule,
+ MatDatepickerModule,
+ MatNativeDateModule,
+ NgxChartsModule,
+ PlotlyViaWindowModule,
+ MatSliderModule,
+ MatChipsModule,
+ PortalModule,
+ OverlayModule,
+ QuillModule.forRoot()
+ ],
+ declarations: [
+ BarchartWidgetComponent,
+ ConfirmDialogComponent,
+ DisplayRecommendedPipe,
+ ImageComponent,
+ ImageContainerComponent,
+ ImageLabelingComponent,
+ ImageLabelsComponent,
+ ImageBarComponent,
+ ImageAnnotationsComponent,
+ ImageCategorizeComponent,
+ ImageViewerComponent,
+ StandardDialogComponent,
+ PanelDialogComponent,
+ StaticAnyInput,
+ StaticPropertyComponent,
+ StaticFreeInputComponent,
+ StaticSecretInputComponent,
+ StaticFileInputComponent,
+ StaticMappingNaryComponent,
+ StaticMappingUnaryComponent,
+ StaticGroupComponent,
+ StaticAlternativesComponent,
+ StaticCollectionComponent,
+ StaticColorPickerComponent,
+ StaticCodeInputComponent,
+ StaticOneOfInputComponent,
+ StaticRuntimeResolvableAnyInputComponent,
+ StaticRuntimeResolvableOneOfInputComponent,
+ StatusWidgetComponent,
+ ],
+ providers: [
+ MatDatepickerModule,
+ ColorService,
+ DisplayRecommendedPipe,
+ ReactLabelingService,
+ PolygonLabelingService,
+ BrushLabelingService,
+ CocoFormatService,
+ LabelingModeService,
+ DialogService,
+ RuntimeResolvableService,
+ StaticFileRestService,
+ ],
+ entryComponents: [],
exports: [
ImageComponent,
ImageLabelingComponent,
@@ -161,7 +166,9 @@ import {MatAutocompleteModule} from "@angular/material/autocomplete";
StaticOneOfInputComponent,
StaticRuntimeResolvableAnyInputComponent,
StaticRuntimeResolvableOneOfInputComponent,
- ImageViewerComponent
+ ImageViewerComponent,
+ StatusWidgetComponent,
+ BarchartWidgetComponent,
]
})
export class CoreUiModule {
diff --git a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.html
similarity index 63%
copy from ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html
copy to ui/src/app/core-ui/widget/barchart/barchart-widget.component.html
index d4b52c3..3cb206b 100644
--- a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html
+++ b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.html
@@ -16,11 +16,17 @@
~
-->
-<div fxFlex="100" fxLayoutAlign="center center" fxLayout="column">
- <ngx-charts-number-card
- [results]="chartData"
- [scheme]="'cool'"
- [cardColor]="color"
- [bandColor]="'rgb(27, 20, 100)'" class="fix-margin">
- </ngx-charts-number-card>
+<div fxFlex="100" fxLayoutAlign="center center" fxLayout="column" [ngStyle]="{'background-color': backgroundColor, 'width':'265px', 'max-width': '265px', 'margin-left': '8px'}">
+ <ngx-charts-bar-vertical
+ [view]="[265,150]"
+ [results]="data"
+ [scheme]="colorScheme"
+ [gradient]="false"
+ [xAxis]="false"
+ [yAxis]="true"
+ [legend]="false"
+ [showXAxisLabel]="false"
+ [showYAxisLabel]="false"
+ [ngStyle]="{'fill': backgroundColor}">
+ </ngx-charts-bar-vertical>
</div>
\ No newline at end of file
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.scss
similarity index 87%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to ui/src/app/core-ui/widget/barchart/barchart-widget.component.scss
index c6e991f..41ecef0 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.scss
@@ -14,11 +14,4 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*
- */
-
-package org.apache.streampipes.manager.monitoring.pipeline;
-
-public class PipelineExecutionStatusCollector {
-
-
-}
+ */
\ No newline at end of file
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.ts
similarity index 58%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to ui/src/app/core-ui/widget/barchart/barchart-widget.component.ts
index c6e991f..01c0860 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/ui/src/app/core-ui/widget/barchart/barchart-widget.component.ts
@@ -16,9 +16,37 @@
*
*/
-package org.apache.streampipes.manager.monitoring.pipeline;
+import {Component, Input, OnInit} from '@angular/core';
-public class PipelineExecutionStatusCollector {
+@Component({
+ selector: 'sp-barchart-widget',
+ templateUrl: './barchart-widget.component.html',
+ styleUrls: ['./barchart-widget.component.scss']
+})
+export class BarchartWidgetComponent implements OnInit {
+ _data = [];
-}
+ @Input()
+ backgroundColor = "#cccccc";
+
+ @Input()
+ textColor = "#1b1464";
+
+ colorScheme = {
+ domain: ['#1b1464']
+ };
+
+ ngOnInit(): void {
+ this.colorScheme.domain = [this.textColor];
+ }
+
+ @Input()
+ set data(data) {
+ this._data = data;
+ }
+
+ get data() {
+ return this._data;
+ }
+}
\ No newline at end of file
diff --git a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.css b/ui/src/app/core-ui/widget/status/status-widget.component.css
similarity index 100%
copy from ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.css
copy to ui/src/app/core-ui/widget/status/status-widget.component.css
diff --git a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html b/ui/src/app/core-ui/widget/status/status-widget.component.html
similarity index 84%
rename from ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html
rename to ui/src/app/core-ui/widget/status/status-widget.component.html
index d4b52c3..1fd6197 100644
--- a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.html
+++ b/ui/src/app/core-ui/widget/status/status-widget.component.html
@@ -17,10 +17,13 @@
-->
<div fxFlex="100" fxLayoutAlign="center center" fxLayout="column">
- <ngx-charts-number-card
+ <ngx-charts-number-card style="width:300px;"
+ [view]="[300,150]"
[results]="chartData"
[scheme]="'cool'"
[cardColor]="color"
- [bandColor]="'rgb(27, 20, 100)'" class="fix-margin">
+ [animations]="false"
+ [bandColor]="bandColor"
+ [textColor]="textColor">
</ngx-charts-number-card>
</div>
\ No newline at end of file
diff --git a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.ts b/ui/src/app/core-ui/widget/status/status-widget.component.ts
similarity index 80%
rename from ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.ts
rename to ui/src/app/core-ui/widget/status/status-widget.component.ts
index aa89cd8..b639d4c 100644
--- a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.ts
+++ b/ui/src/app/core-ui/widget/status/status-widget.component.ts
@@ -16,16 +16,19 @@
*
*/
-import {Component, Input} from '@angular/core';
+import {Component, Input, OnInit} from '@angular/core';
@Component({
- selector: 'dashboard-status-filled',
- templateUrl: './dashboard-status-filled.component.html',
- styleUrls: ['./dashboard-status-filled.component.css']
+ selector: 'sp-status-widget',
+ templateUrl: './status-widget.component.html',
+ styleUrls: ['./status-widget.component.css']
})
-export class DashboardStatusFilledComponent {
+export class StatusWidgetComponent implements OnInit {
@Input() color: string = "rgb(156, 156, 156)";
+ @Input() bandColor: string = "rgb(27, 20, 100)";
+ @Input() textColor: string = "rgb(96,96,96)";
+
_label: string;
_statusValue: string;
diff --git a/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.html b/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.html
index 5afd0e5..49f9875 100644
--- a/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.html
+++ b/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.html
@@ -18,12 +18,12 @@
<div fxFlex="100" fxLayout="column">
<div fxLayout="row" fxFlex="100">
- <div fxFlex="30" fxLayout="row" style="margin-bottom:10px;">
+ <div fxFlex="{{showDescription ? 30 : 100}}}" fxLayout="row" style="margin-bottom:10px;">
<span class="draggable-icon {{elementType}}">
<pipeline-element [preview]="true" [pipelineElement]="_element"></pipeline-element>
</span>
</div>
- <div fxFlex="70" fxLayout="column" fxLayoutAlign="center left">
+ <div fxFlex="70" fxLayout="column" fxLayoutAlign="center left" *ngIf="showDescription">
<b>{{element.name}}</b>
{{element.description}}
</div>
diff --git a/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.ts b/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.ts
index d353def..57cb3d6 100644
--- a/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.ts
+++ b/ui/src/app/pipeline-details/components/elements/pipeline-elements-row.component.ts
@@ -32,6 +32,9 @@ export class PipelineElementsRowComponent implements OnInit {
@Input()
pipeline: Pipeline;
+ @Input()
+ showDescription: boolean = true;
+
_element: PipelineElementUnion;
constructor() {
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/ui/src/app/pipeline-details/components/model/pipeline-details.model.ts
similarity index 87%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to ui/src/app/pipeline-details/components/model/pipeline-details.model.ts
index c6e991f..ea6782d 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/ui/src/app/pipeline-details/components/model/pipeline-details.model.ts
@@ -16,9 +16,7 @@
*
*/
-package org.apache.streampipes.manager.monitoring.pipeline;
-
-public class PipelineExecutionStatusCollector {
-
-
-}
+export interface HistoricalMonitoringData {
+ name: string;
+ value: number;
+}
\ No newline at end of file
diff --git a/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.html b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.html
new file mode 100644
index 0000000..f256d5d
--- /dev/null
+++ b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.html
@@ -0,0 +1,41 @@
+<div fxFlex="100" fxLayout="column">
+ <div fxLayout="column" class="fixed-height add-options">
+ <div class="add-options-item" fxLayoutAlign="start center" fxLayout="row" style="padding-right:10px;">
+ <div fxFlex="100" fxLayout="row" fxLayoutAlign="end center">
+ <mat-slide-toggle [(ngModel)]="autoRefresh" color="primary">Auto refresh</mat-slide-toggle>
+ </div>
+ </div>
+ </div>
+ <div fxFlex="100" fxLayout="column" class="page-container-padding-inner">
+ <pipeline-preview [jspcanvas]="'assembly-quickedit'" [pipeline]="pipeline"
+ (selectedElementEmitter)="selectElement($event)"
+ style="margin-bottom:15px;"></pipeline-preview>
+
+ <div fxFlex="100" *ngIf="pipelineMonitoringInfoAvailable">
+ <div *ngFor="let pipelineElement of allElements" fxLayout="column" class="mb-10">
+ <div class="assembly-options-preview sp-blue-bg">
+ <div fxLayout="row" fxLayoutAlign="start center">
+ <h4>{{pipelineElement.name}}</h4>
+ </div>
+ </div>
+ <div class="sp-blue-border pipeline-element-statistics-panel">
+ <div fxFlex="100" fxLayout="row">
+ <div fxFlex="20" fxLayoutAlign="start start">
+ <pipeline-elements-row style="width: 100%;"
+ [showDescription]="false"
+ [pipeline]="pipeline"
+ [element]="pipelineElement"></pipeline-elements-row>
+ </div>
+ <div fxFlex="80" fxLayoutAlign="start center">
+ <pipeline-element-statistics
+ [pipeline]="pipeline"
+ [pipelineElement]="pipelineElement"
+ [pipelineElementMonitoringInfo]="pipelineElementMonitoringInfo.get(pipelineElement.elementId)">
+ </pipeline-element-statistics>
+ </div>
+ </div>
+ </div>
+ </div>
+ </div>
+ </div>
+</div>
\ No newline at end of file
diff --git a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.css b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.scss
similarity index 72%
rename from ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.css
rename to ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.scss
index 14e1198..a3d7690 100644
--- a/ui/src/app/app-transport-monitoring/components/dashboard-status-filled/dashboard-status-filled.component.css
+++ b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.scss
@@ -16,20 +16,26 @@
*
*/
-.statusValue {
- font-size: 40pt;
- background: darkblue;
- color: white;
- margin:5px;
- min-height:140px;
- padding: 15px;
+.add-options {
+ background-color:#f6f6f6;
+ border-bottom: 1px solid #cccccc;
+ padding-top:10px;
+ padding-bottom:10px;
}
-.statusValue>h4 {
- font-size:20pt;
- margin-bottom: 30px;
+.fixed-height {
+ width:100%;
+ height: 50px;
}
-.fix-margin {
- margin-top: -5px;
+.page-container-padding-inner {
+ margin: 10px;
+}
+
+.pipeline-element-statistics-panel {
+ padding: 10px;
+}
+
+.mb-10 {
+ margin-bottom: 10px;
}
\ No newline at end of file
diff --git a/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.ts b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.ts
new file mode 100644
index 0000000..5fb4603
--- /dev/null
+++ b/ui/src/app/pipeline-details/components/monitoring/pipeline-monitoring.component.ts
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import {Component, Input, OnDestroy, OnInit} from "@angular/core";
+import {
+ DataProcessorInvocation, DataSinkInvocation,
+ Pipeline, PipelineElementMonitoringInfo,
+ PipelineMonitoringInfo,
+ SpDataSet, SpDataStream
+} from "../../../core-model/gen/streampipes-model";
+import {PipelineMonitoringService} from "../../../platform-services/apis/pipeline-monitoring.service";
+
+@Component({
+ selector: 'pipeline-monitoring',
+ templateUrl: './pipeline-monitoring.component.html',
+ styleUrls: ['./pipeline-monitoring.component.scss']
+})
+export class PipelineMonitoringComponent implements OnInit, OnDestroy {
+
+ @Input()
+ pipeline: Pipeline;
+
+ pipelineMonitoringInfo: PipelineMonitoringInfo;
+ pipelineMonitoringInfoAvailable: boolean = false;
+
+ allElements: (SpDataSet | SpDataStream | DataProcessorInvocation | DataSinkInvocation)[] = [];
+
+ autoRefresh: boolean = true;
+
+ pipelineElementMonitoringInfo: Map<string, PipelineElementMonitoringInfo>;
+
+ constructor(private pipelineMonitoringService: PipelineMonitoringService) {
+ }
+
+ ngOnInit(): void {
+ this.collectAllElements();
+ console.log(this.allElements);
+ this.refreshMonitoringInfo();
+ }
+
+ collectAllElements() {
+ this.allElements = this.allElements
+ .concat(this.pipeline.streams)
+ .concat(this.pipeline.sepas)
+ .concat(this.pipeline.actions);
+ }
+
+ refreshMonitoringInfo() {
+ this.pipelineMonitoringService
+ .getPipelineMonitoringInfo(this.pipeline._id)
+ .subscribe(monitoringInfo => {
+ this.pipelineElementMonitoringInfo = new Map<string, PipelineElementMonitoringInfo>();
+ this.pipelineMonitoringInfo = monitoringInfo;
+ monitoringInfo.pipelineElementMonitoringInfo.forEach(info => {
+ this.pipelineElementMonitoringInfo.set(info.pipelineElementId, info);
+ })
+ this.pipelineMonitoringInfoAvailable = true;
+ if (this.autoRefresh) {
+ setTimeout(() => {
+ this.refreshMonitoringInfo();
+ }, 5000);
+ }
+ })
+ }
+
+ selectElement(pipelineElement) {
+ console.log(pipelineElement);
+ }
+
+ ngOnDestroy(): void {
+ this.autoRefresh = false;
+ }
+
+}
\ No newline at end of file
diff --git a/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.html b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.html
new file mode 100644
index 0000000..400ca58
--- /dev/null
+++ b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.html
@@ -0,0 +1,45 @@
+<div fxFlex="100" fxLayout="row" fxLayoutAlign="start start">
+ <div fxFlex="33">
+ <div fxLayout="column" class="mb-10">
+ <sp-status-widget fxFlex="100" [label]="'Consumed Messages / Lag'"
+ [color]="(consumedMessagesFirstInputStream == notAvailable) ? deactivatedCardColor : cardColor"
+ [textColor]="(consumedMessagesFirstInputStream == notAvailable) ? deactivatedTextColor : textColor"
+ [bandColor]="consumedMessagesFirstStreamBandColor"
+ [statusValue]="consumedMessagesFirstInputStream">
+ </sp-status-widget>
+ <sp-barchart-widget *ngIf="(consumedMessagesFirstInputStream != notAvailable)"
+ [data]="historicFirstConsumedInputValues"
+ [textColor]="chartTextColor"
+ [backgroundColor]="chartBackgroundColor" class="mt--10">
+ </sp-barchart-widget>
+ </div>
+ </div>
+ <div fxFlex="33">
+ <div fxLayout="column" class="mb-10">
+ <sp-status-widget fxFlex="100" [label]="'Consumed Messages / Lag'"
+ [color]="consumedMessagesSecondInputStream === notAvailable ? deactivatedCardColor : cardColor"
+ [textColor]="consumedMessagesSecondInputStream === notAvailable ? deactivatedTextColor : textColor"
+ [bandColor]="consumedMessagesSecondStreamBandColor"
+ [statusValue]="consumedMessagesSecondInputStream"></sp-status-widget>
+ <sp-barchart-widget *ngIf="consumedMessagesSecondInputStream !== notAvailable"
+ [data]="historicSecondConsumedInputValues"
+ [textColor]="chartTextColor"
+ [backgroundColor]="chartBackgroundColor" class="mt--10">
+ </sp-barchart-widget>
+ </div>
+ </div>
+ <div fxFlex="33">
+ <div fxLayout="column" class="mb-10">
+ <sp-status-widget fxFlex="100" [label]="'Produced Messages'"
+ [color]="producedMessages === notAvailable ? deactivatedCardColor : cardColor"
+ [textColor]="producedMessages === notAvailable ? deactivatedTextColor : textColor"
+ [bandColor]="producedMessages === notAvailable ? deactivatedBandColor : bandColor"
+ [statusValue]="producedMessages"></sp-status-widget>
+ <sp-barchart-widget *ngIf="producedMessages !== notAvailable"
+ [data]="historicProducedOutputValues"
+ [textColor]="chartTextColor"
+ [backgroundColor]="chartBackgroundColor" class="mt--10">
+ </sp-barchart-widget>
+ </div>
+ </div>
+</div>
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.scss
similarity index 88%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.scss
index c6e991f..27392f1 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.scss
@@ -16,9 +16,10 @@
*
*/
-package org.apache.streampipes.manager.monitoring.pipeline;
-
-public class PipelineExecutionStatusCollector {
-
-
+.mb-10 {
+ margin-bottom: 10px;
}
+
+.mt--10 {
+ margin-top: -10px;
+}
\ No newline at end of file
diff --git a/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.ts b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.ts
new file mode 100644
index 0000000..5b2322d
--- /dev/null
+++ b/ui/src/app/pipeline-details/components/monitoring/statistics/pipeline-element-statistics.component.ts
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+import {Component, EventEmitter, Input, OnInit, Output} from "@angular/core";
+
+import {
+ DataProcessorInvocation, DataSinkInvocation,
+ Pipeline,
+ PipelineElementMonitoringInfo, SpDataSet, SpDataStream,
+} from "../../../../core-model/gen/streampipes-model";
+import {HistoricalMonitoringData} from "../../model/pipeline-details.model";
+
+@Component({
+ selector: 'pipeline-element-statistics',
+ templateUrl: './pipeline-element-statistics.component.html',
+ styleUrls: ['./pipeline-element-statistics.component.scss']
+})
+export class PipelineElementStatisticsComponent implements OnInit {
+
+ @Input()
+ pipeline: Pipeline;
+
+ @Input()
+ pipelineElement: (SpDataSet | SpDataStream | DataProcessorInvocation | DataSinkInvocation);
+
+ _pipelineElementMonitoringInfo: PipelineElementMonitoringInfo;
+
+ currentPipelineElement: SpDataSet | SpDataStream | DataProcessorInvocation | DataSinkInvocation;
+ consumedMessagesFirstInputStream: string = "";
+ consumedMessagesSecondInputStream: string = "";
+
+ producedMessages: string | number = "";
+
+ cardColor: string = "rgb(27, 20, 100)";
+ deactivatedCardColor: string = "rgb(241,241,241)";
+
+ textColor: string = "rgb(208,208,208)";
+ deactivatedTextColor: string = "rgb(205,205,205)";
+
+ bandColor: string = "rgb(27, 20, 100)";
+ deactivatedBandColor: string = "rgb(241,241,241)";
+ okBandColor: string = "rgb(11,186,0)";
+ warningBandColor: string = "rgb(253,144,0)";
+
+ chartBackgroundColor: string = "rgb(27, 20, 100)";
+ chartTextColor: string = "rgb(208,208,208)";
+
+ consumedMessagesFirstStreamBandColor: string;
+ consumedMessagesSecondStreamBandColor: string;
+
+ notAvailable: string = "n/a";
+
+ historicFirstConsumedInputValues: HistoricalMonitoringData[] = [];
+ historicSecondConsumedInputValues: HistoricalMonitoringData[] = [];
+ historicProducedOutputValues: HistoricalMonitoringData[] = [];
+
+ consumedMessagesFirstStreamLastValue: number = -1;
+ consumedMessagesSecondStreamLastValue: number = -1;
+ producedMessageOutputLastValue: number = -1;
+
+ ngOnInit(): void {
+
+ }
+
+ updateMonitoringInfo() {
+ if (this.pipelineElementMonitoringInfo.inputTopicInfoExists) {
+ let consumedMessages = this.pipelineElementMonitoringInfo.inputTopicInfo.map(info => {
+ return {"count": (info.currentOffset - info.offsetAtPipelineStart),
+ "lag": info.latestOffset - info.currentOffset,
+ "currentOffset": info.currentOffset};
+ });
+ this.consumedMessagesFirstInputStream = consumedMessages[0].count + " / " + consumedMessages[0].lag;
+ this.consumedMessagesSecondInputStream = consumedMessages.length > 1 ? consumedMessages[1].count + " / " + consumedMessages[1].lag : this.notAvailable;
+ this.consumedMessagesFirstStreamBandColor = consumedMessages[0].lag > 10 ? this.warningBandColor : this.okBandColor;
+ this.consumedMessagesSecondStreamBandColor = (consumedMessages.length > 1 ? (consumedMessages[1].lag > 10 ? this.warningBandColor : this.okBandColor) : this.deactivatedBandColor);
+
+ this.makeHistoricData(consumedMessages[0], this.consumedMessagesFirstStreamLastValue, this.historicFirstConsumedInputValues);
+ this.consumedMessagesFirstStreamLastValue = consumedMessages[0].count;
+ this.historicFirstConsumedInputValues = [].concat(this.historicFirstConsumedInputValues);
+
+ if (consumedMessages.length > 1) {
+ this.makeHistoricData(consumedMessages[1], this.consumedMessagesSecondStreamLastValue, this.historicSecondConsumedInputValues);
+ this.consumedMessagesSecondStreamLastValue = consumedMessages[1].count;
+ this.historicSecondConsumedInputValues = [].concat(this.historicSecondConsumedInputValues);
+ }
+ } else {
+ this.consumedMessagesFirstInputStream = this.notAvailable;
+ this.consumedMessagesFirstStreamBandColor = this.deactivatedBandColor;
+ this.consumedMessagesSecondInputStream = this.notAvailable;
+ this.consumedMessagesSecondStreamBandColor = this.deactivatedBandColor;
+ }
+ if (this.pipelineElementMonitoringInfo.outputTopicInfoExists) {
+ this.producedMessages = this.pipelineElementMonitoringInfo.outputTopicInfo.latestOffset - this.pipelineElementMonitoringInfo.outputTopicInfo.offsetAtPipelineStart;
+ let producedMessage = {"count": this.producedMessages};
+ this.makeHistoricData(producedMessage, this.producedMessageOutputLastValue, this.historicProducedOutputValues);
+ this.producedMessageOutputLastValue = producedMessage.count;
+ this.historicProducedOutputValues = [].concat(this.historicProducedOutputValues);
+ } else {
+ this.producedMessages = this.notAvailable;
+ }
+ }
+
+ makeHistoricData(consumedMessage: any, lastValue: number, historicData: HistoricalMonitoringData[]) {
+ console.log(consumedMessage);
+ if (lastValue > -1) {
+ let entry: HistoricalMonitoringData = {"name": new Date().toLocaleTimeString(), value: (consumedMessage.count - lastValue)};
+ historicData.push(entry);
+ }
+ if (historicData.length > 10) {
+ historicData.shift();
+ } else {
+ for (let i = 0; i < (10 - historicData.length); i++) {
+ historicData.unshift({"name": i.toString(), "value": 0});
+ }
+ }
+ }
+
+ get pipelineElementMonitoringInfo() {
+ return this._pipelineElementMonitoringInfo;
+ }
+
+ @Input()
+ set pipelineElementMonitoringInfo(pipelineElementMonitoringInfo: PipelineElementMonitoringInfo) {
+ this._pipelineElementMonitoringInfo = pipelineElementMonitoringInfo;
+ this.updateMonitoringInfo();
+ }
+
+
+}
\ No newline at end of file
diff --git a/ui/src/app/pipeline-details/pipeline-details.component.html b/ui/src/app/pipeline-details/pipeline-details.component.html
index dbd2d97..cf7a2b2 100644
--- a/ui/src/app/pipeline-details/pipeline-details.component.html
+++ b/ui/src/app/pipeline-details/pipeline-details.component.html
@@ -22,7 +22,7 @@
<div fxFlex="100" fxLayout="row">
<mat-tab-group [selectedIndex]="selectedIndex" (selectedIndexChange)="setSelectedIndex($event)">
<mat-tab label="Overview"></mat-tab>
- <mat-tab label="Statistics"></mat-tab>
+ <mat-tab label="Monitoring"></mat-tab>
<mat-tab label="Errors"></mat-tab>
<mat-tab label="Quick Edit" [disabled]="pipelineAvailable && pipeline.running"></mat-tab>
</mat-tab-group>
@@ -48,6 +48,9 @@
</div>
</div>
</div>
+ <div fxFlex fxLayout="column" fxLayoutAlign="start top" *ngIf="pipelineAvailable && selectedIndex == 1">
+ <pipeline-monitoring [pipeline]="pipeline"></pipeline-monitoring>
+ </div>
<div fxFlex fxLayout="row" fxLayoutAlign="start top" *ngIf="pipelineAvailable && selectedIndex == 3">
<div fxFlex="100" class="md-padding">
<div fxFlex fxLayout="column">
diff --git a/ui/src/app/pipeline-details/pipeline-details.module.ts b/ui/src/app/pipeline-details/pipeline-details.module.ts
index 850a77f..478a0d4 100644
--- a/ui/src/app/pipeline-details/pipeline-details.module.ts
+++ b/ui/src/app/pipeline-details/pipeline-details.module.ts
@@ -33,6 +33,9 @@ import {PipelineElementsComponent} from "./components/elements/pipeline-elements
import {PipelineElementsRowComponent} from "./components/elements/pipeline-elements-row.component";
import {QuickEditComponent} from "./components/edit/quickedit.component";
import {CoreUiModule} from "../core-ui/core-ui.module";
+import {PipelineMonitoringComponent} from "./components/monitoring/pipeline-monitoring.component";
+import {PipelineElementStatisticsComponent} from "./components/monitoring/statistics/pipeline-element-statistics.component";
+import {NgxChartsModule} from "@swimlane/ngx-charts";
@NgModule({
imports: [
@@ -44,6 +47,7 @@ import {CoreUiModule} from "../core-ui/core-ui.module";
CustomMaterialModule,
CommonModule,
MatProgressSpinnerModule,
+ NgxChartsModule,
EditorModule,
FormsModule,
ReactiveFormsModule
@@ -52,7 +56,9 @@ import {CoreUiModule} from "../core-ui/core-ui.module";
PipelineActionsComponent,
PipelineElementsComponent,
PipelineElementsRowComponent,
+ PipelineElementStatisticsComponent,
PipelineDetailsComponent,
+ PipelineMonitoringComponent,
PipelineStatusComponent,
PipelinePreviewComponent,
QuickEditComponent
diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java b/ui/src/app/platform-services/apis/pipeline-monitoring.service.ts
similarity index 50%
copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
copy to ui/src/app/platform-services/apis/pipeline-monitoring.service.ts
index c6e991f..cef42ea 100644
--- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/monitoring/pipeline/PipelineExecutionStatusCollector.java
+++ b/ui/src/app/platform-services/apis/pipeline-monitoring.service.ts
@@ -16,9 +16,27 @@
*
*/
-package org.apache.streampipes.manager.monitoring.pipeline;
+import {Injectable} from "@angular/core";
+import {HttpClient} from "@angular/common/http";
+import {Observable} from "rxjs";
+import {
+ PipelineMonitoringInfo
+} from "../../core-model/gen/streampipes-model";
+import {PlatformServicesCommons} from "./commons.service";
+import {map} from "rxjs/operators";
-public class PipelineExecutionStatusCollector {
+@Injectable()
+export class PipelineMonitoringService {
+ constructor(private http: HttpClient,
+ private platformServicesCommons: PlatformServicesCommons) {
+ }
-}
+ getPipelineMonitoringInfo(pipelineId: string): Observable<PipelineMonitoringInfo> {
+ return this.http.get(this.platformServicesCommons.authUserBasePath()
+ + "/pipeline-monitoring/"
+ + pipelineId)
+ .pipe(map(response => PipelineMonitoringInfo.fromData(response as any)));
+ }
+
+}
\ No newline at end of file
diff --git a/ui/src/app/platform-services/platform.module.ts b/ui/src/app/platform-services/platform.module.ts
index e96fc12..4627904 100644
--- a/ui/src/app/platform-services/platform.module.ts
+++ b/ui/src/app/platform-services/platform.module.ts
@@ -22,6 +22,7 @@ import {PipelineService} from "./apis/pipeline.service";
import {PlatformServicesCommons} from "./apis/commons.service";
import {PipelineElementEndpointService} from "./apis/pipeline-element-endpoint.service";
import {FilesService} from "./apis/files.service";
+import {PipelineMonitoringService} from "./apis/pipeline-monitoring.service";
@NgModule({
imports: [],
@@ -32,6 +33,7 @@ import {FilesService} from "./apis/files.service";
PipelineElementEndpointService,
//PipelineTemplateService,
PipelineElementService,
+ PipelineMonitoringService,
PipelineService
],
entryComponents: []
diff --git a/ui/src/scss/sp/colors.scss b/ui/src/scss/sp/colors.scss
index eace269..97f99b7 100644
--- a/ui/src/scss/sp/colors.scss
+++ b/ui/src/scss/sp/colors.scss
@@ -58,6 +58,10 @@
background: #FAFAFA;
}
+.sp-border-left-accent-light {
+ border-left: 2px solid $sp-color-accent-light;
+}
+
.sp-blue-border-nopadding {
border: 3px solid $sp-color-accent-light;
}