You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/04/30 07:11:24 UTC

[incubator-streampipes-extensions] branch dev updated: StringTimer & StringCounter Processor

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/dev by this push:
     new 436df47  StringTimer & StringCounter Processor
     new e313c91  Merge pull request #11 from Madabaru/dev
436df47 is described below

commit 436df4795bf4f85ce3311f85a96ce4acd6bd557d
Author: Madabaru <jo...@axantu.com>
AuthorDate: Thu Apr 30 09:04:43 2020 +0200

    StringTimer & StringCounter Processor
---
 .../pe/jvm/AllPipelineElementsInit.java            |   4 +
 .../transformation/jvm/TransformationJvmInit.java  |   6 +-
 .../stringoperator/counter/StringCounter.java      |  79 ++++++++++++++++++++
 .../counter/StringCounterController.java           |  71 ++++++++++++++++++
 .../counter/StringCounterParameters.java           |  36 +++++++++
 .../stringoperator/timer/StringTimer.java          |  72 ++++++++++++++++++
 .../timer/StringTimerController.java               |  83 +++++++++++++++++++++
 .../timer/StringTimerParameters.java               |  42 +++++++++++
 .../documentation.md                               |  55 ++++++++++++++
 .../icon.png                                       | Bin 0 -> 16458 bytes
 .../strings.en                                     |  16 ++++
 .../documentation.md                               |  54 ++++++++++++++
 .../icon.png                                       | Bin 0 -> 14398 bytes
 .../strings.en                                     |  17 +++++
 14 files changed, 534 insertions(+), 1 deletion(-)

diff --git a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
index b27e745..987ee33 100644
--- a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
+++ b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
@@ -64,6 +64,8 @@ import org.apache.streampipes.processors.transformation.jvm.processor.booloperat
 import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timekeeping.BooleanTimekeepingController;
 import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timer.BooleanTimerController;
 import org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata.CsvMetadataEnrichmentController;
+import org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter.StringCounterController;
+import org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer.StringTimerController;
 import org.apache.streampipes.processors.transformation.jvm.processor.task.TaskDurationController;
 import org.apache.streampipes.processors.transformation.jvm.processor.timestampextractor.TimestampExtractorController;
 import org.apache.streampipes.processors.transformation.jvm.processor.transformtoboolean.TransformToBooleanController;
@@ -145,6 +147,8 @@ public class AllPipelineElementsInit extends StandaloneModelSubmitter {
             .add(new TaskDurationController())
             .add(new BooleanInverterController())
             .add(new TransformToBooleanController())
+            .add(new StringCounterController())
+            .add(new StringTimerController())
             // streampipes-sinks-brokers-jvm
             .add(new KafkaController())
             .add(new JmsController())
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
index 08bd7d5..9da848f 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
@@ -34,6 +34,8 @@ import org.apache.streampipes.processors.transformation.jvm.processor.booloperat
 import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timekeeping.BooleanTimekeepingController;
 import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timer.BooleanTimerController;
 import org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata.CsvMetadataEnrichmentController;
+import org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter.StringCounterController;
+import org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer.StringTimerController;
 import org.apache.streampipes.processors.transformation.jvm.processor.task.TaskDurationController;
 import org.apache.streampipes.processors.transformation.jvm.processor.timestampextractor.TimestampExtractorController;
 import org.apache.streampipes.processors.transformation.jvm.processor.transformtoboolean.TransformToBooleanController;
@@ -57,7 +59,9 @@ public class TransformationJvmInit extends StandaloneModelSubmitter {
             .add(new CsvMetadataEnrichmentController())
             .add(new TaskDurationController())
             .add(new BooleanInverterController())
-            .add(new TransformToBooleanController());
+            .add(new TransformToBooleanController())
+            .add(new StringTimerController())
+            .add(new StringCounterController());
 
     DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
             new CborDataFormatFactory(),
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounter.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounter.java
new file mode 100644
index 0000000..a6a6a57
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter;
+
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.runtime.EventProcessor;
+
+import java.util.HashMap;
+
+public class StringCounter implements EventProcessor<StringCounterParameters> {
+
+  private static Logger LOG;
+
+  private String selectedFieldName;
+  private String fieldValueOfLastEvent;
+
+  private HashMap<String, Integer> changeCounter;
+
+  @Override
+  public void onInvocation(StringCounterParameters stringCounterParametersParameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) {
+    LOG = stringCounterParametersParameters.getGraph().getLogger(StringCounter.class);
+    this.selectedFieldName = stringCounterParametersParameters.getSelectedFieldName();
+    this.fieldValueOfLastEvent = "";
+    this.changeCounter = new HashMap<>();
+  }
+
+  @Override
+  public void onEvent(Event inputEvent, SpOutputCollector out) {
+
+      String value = inputEvent.getFieldBySelector(selectedFieldName).getAsPrimitive().getAsString();
+      String key = this.fieldValueOfLastEvent + ">" + value;
+      boolean updateCounter = false;
+
+      if (!this.fieldValueOfLastEvent.equals(value) && !this.fieldValueOfLastEvent.isEmpty()) {
+          updateCounter = true;
+
+          if (changeCounter.containsKey(key)) {
+              changeCounter.put(key, changeCounter.get(key) + 1);
+          } else {
+              changeCounter.put(key, 1);
+          }
+      }
+
+      if (updateCounter) {
+          inputEvent.addField(StringCounterController.CHANGE_FROM_FIELD_RUNTIME_NAME, this.fieldValueOfLastEvent);
+          inputEvent.addField(StringCounterController.CHANGE_TO_FIELD_RUNTIME_NAME, value);
+          inputEvent.addField(StringCounterController.COUNT_FIELD_RUNTIME_NAME, changeCounter.get(key));
+          out.collect(inputEvent);
+      }
+
+      this.fieldValueOfLastEvent = value;
+  }
+
+  @Override
+  public void onDetach() {
+  }
+
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterController.java
new file mode 100644
index 0000000..290733f
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterController.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter;
+
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
+import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+
+public class StringCounterController extends StandaloneEventProcessingDeclarer<StringCounterParameters> {
+
+  private static final String FIELD_ID = "field";
+  private static final String COUNT_FIELD_ID = "countField";
+  private static final String CHANGE_FROM_FIELD_ID = "changeFromField";
+  private static final String CHANGE_TO_FIELD_ID = "changeToField";
+
+  public static final String COUNT_FIELD_RUNTIME_NAME = "counter";
+  public static final String CHANGE_FROM_FIELD_RUNTIME_NAME = "change_from";
+  public static final String CHANGE_TO_FIELD_RUNTIME_NAME = "change_to";
+
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.stringoperator.counter")
+            .withLocales(Locales.EN)
+            .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+            .requiredStream(StreamRequirementsBuilder.create()
+                    .requiredPropertyWithUnaryMapping(
+                            EpRequirements.stringReq(),
+                            Labels.withId(FIELD_ID),
+                            PropertyScope.NONE)
+                    .build())
+            .outputStrategy(OutputStrategies.append(
+                    EpProperties.stringEp(Labels.withId(CHANGE_FROM_FIELD_ID), CHANGE_FROM_FIELD_RUNTIME_NAME, "http://schema.org/String"),
+                    EpProperties.stringEp(Labels.withId(CHANGE_TO_FIELD_ID), CHANGE_TO_FIELD_RUNTIME_NAME, "http://schema.org/String"),
+                    EpProperties.numberEp(Labels.withId(COUNT_FIELD_ID), COUNT_FIELD_RUNTIME_NAME, "http://schema.org/Number")
+            ))
+            .build();
+  }
+
+  @Override
+  public ConfiguredEventProcessor<StringCounterParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+
+    String selectedFieldName = extractor.mappingPropertyValue(FIELD_ID);
+
+    StringCounterParameters params = new StringCounterParameters(graph, selectedFieldName);
+    return new ConfiguredEventProcessor<>(params, StringCounter::new);
+  }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterParameters.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterParameters.java
new file mode 100644
index 0000000..27c0cef
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterParameters.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter;
+
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+
+public class StringCounterParameters extends EventProcessorBindingParams {
+    private String selectedFieldName;
+
+    public StringCounterParameters(DataProcessorInvocation graph, String selectedFieldName) {
+        super(graph);
+        this.selectedFieldName = selectedFieldName;
+    }
+
+    public String getSelectedFieldName() {
+        return selectedFieldName;
+    }
+
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimer.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimer.java
new file mode 100644
index 0000000..454284d
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer;
+
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.runtime.EventProcessor;
+
+public class StringTimer implements EventProcessor<StringTimerParameters> {
+
+  private static Logger LOG;
+
+  private String selectedFieldName;
+  private Long timestamp;
+  private double outputDivisor;
+  private String fieldValueOfLastEvent;
+
+  @Override
+  public void onInvocation(StringTimerParameters stringTimerParameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) {
+    LOG = stringTimerParameters.getGraph().getLogger(StringTimer.class);
+    this.selectedFieldName = stringTimerParameters.getSelectedFieldName();
+    this.outputDivisor = stringTimerParameters.getOutputDivisor();
+
+  }
+
+  @Override
+  public void onEvent(Event inputEvent, SpOutputCollector out) {
+
+      String value = inputEvent.getFieldBySelector(selectedFieldName).getAsPrimitive().getAsString();
+      Long currentTime = System.currentTimeMillis();
+
+      if (this.fieldValueOfLastEvent == null) {
+          this.timestamp = currentTime;
+      } else {
+          if (!this.fieldValueOfLastEvent.equals(value)) {
+              Long difference = currentTime - this.timestamp;
+              double result = difference / this.outputDivisor;
+
+              inputEvent.addField(StringTimerController.MEASURED_TIME_FIELD_RUNTIME_NAME, result);
+              inputEvent.addField(StringTimerController.FIELD_VALUE_RUNTIME_NAME, this.fieldValueOfLastEvent);
+              out.collect(inputEvent);
+
+              timestamp = currentTime;
+          }
+      }
+      this.fieldValueOfLastEvent = value;
+  }
+
+  @Override
+  public void onDetach() {
+  }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerController.java
new file mode 100644
index 0000000..f58cfaf
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerController.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer;
+
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.*;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
+import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+
+public class StringTimerController extends StandaloneEventProcessingDeclarer<StringTimerParameters> {
+
+  public static final String FIELD_ID = "field";
+  public static final String MEASURED_TIME_ID = "measuredTime";
+  public static final String FIELD_VALUE_ID = "fieldValue";
+
+  public static final String OUTPUT_UNIT_ID = "outputUnit";
+  private static final String MILLISECONDS = "Milliseconds";
+  private static final String SECONDS = "Seconds";
+  private static final String MINUTES = "Minutes";
+
+  public static final String MEASURED_TIME_FIELD_RUNTIME_NAME = "measured_time";
+  public static final String FIELD_VALUE_RUNTIME_NAME = "field_value";
+
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.stringoperator.timer")
+            .withLocales(Locales.EN)
+            .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+            .requiredStream(StreamRequirementsBuilder.create()
+                    .requiredPropertyWithUnaryMapping(
+                            EpRequirements.stringReq(),
+                            Labels.withId(FIELD_ID),
+                            PropertyScope.NONE)
+                    .build())
+            .requiredSingleValueSelection(Labels.withId(OUTPUT_UNIT_ID), Options.from(MILLISECONDS, SECONDS, MINUTES))
+            .outputStrategy(OutputStrategies.append(
+                    EpProperties.numberEp(Labels.withId(MEASURED_TIME_ID), MEASURED_TIME_FIELD_RUNTIME_NAME, "http://schema.org/Number"),
+                    EpProperties.stringEp(Labels.withId(FIELD_VALUE_ID), FIELD_VALUE_RUNTIME_NAME, "http://schema.org/String")
+            ))
+            .build();
+  }
+
+  @Override
+  public ConfiguredEventProcessor<StringTimerParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+
+    String selectedFieldName = extractor.mappingPropertyValue(FIELD_ID);
+    String outputUnit = extractor.selectedSingleValue(OUTPUT_UNIT_ID, String.class);
+
+    double outputDivisor= 1.0;
+    if (outputUnit.equals(SECONDS)) {
+      outputDivisor = 1000.0;
+    } else if (outputUnit.equals(MINUTES)) {
+      outputDivisor = 60000.0;
+    }
+
+    StringTimerParameters params = new StringTimerParameters(graph, selectedFieldName, outputDivisor);
+
+    return new ConfiguredEventProcessor<>(params, StringTimer::new);
+  }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerParameters.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerParameters.java
new file mode 100644
index 0000000..5e26dc9
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerParameters.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer;
+
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
+
+public class StringTimerParameters extends EventProcessorBindingParams {
+    private String fieldName;
+    private double outputDivisor;
+
+    public StringTimerParameters(DataProcessorInvocation graph, String fieldName, double outputDivisor) {
+        super(graph);
+        this.fieldName = fieldName;
+        this.outputDivisor = outputDivisor;
+    }
+
+    public String getSelectedFieldName() {
+        return fieldName;
+    }
+
+    public double getOutputDivisor() {
+        return outputDivisor;
+    }
+
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.counter/documentation.md b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.counter/documentation.md
new file mode 100644
index 0000000..4996f0e
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.counter/documentation.md
@@ -0,0 +1,55 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+## String Counter
+
+<p align="center"> 
+    <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+
+This processor monitors a string field and counts how often the value of the string changes. Hereby, a change is characterized by 
+the value of the field before and the value after the change, combined forming a pair. The processor keeps track of the counter for each pair. 
+
+***
+
+## Required input
+
+A string field is required in the data stream and can be selected with the field mapping.
+
+### String Field
+
+The string field to be monitored.
+
+***
+
+## Configuration
+
+(no further configuration required)
+
+## Output
+The following three fields are appended to the event:
+* [counter] numerical field with the current count value for the given value pair
+* [change_from] the value of the string before the change
+* [change_to] the value of the string after the change 
+
+The event is emitted whenever the value of the string field changes.
+
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.counter/icon.png b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.counter/icon.png
new file mode 100644
index 0000000..acf8fd2
Binary files /dev/null and b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.counter/icon.png differ
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.counter/strings.en b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.counter/strings.en
new file mode 100644
index 0000000..6273e45
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.counter/strings.en
@@ -0,0 +1,16 @@
+org.apache.streampipes.processors.transformation.jvm.stringoperator.counter.title=String Counter
+org.apache.streampipes.processors.transformation.jvm.stringoperator.counter.description= Increases a counter on each change of a string value.
+
+field.title=String Field
+field.description=The field of the string to monitor
+
+countField.title=Counter
+countField.description= Number of changes of the given pair
+
+changeFromField.title = Change From Value
+changeFromField.description= Field value before the change
+
+changeFromField.title = Change To Value
+changeFromField.description= Field value after the change
+
+
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.timer/documentation.md b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.timer/documentation.md
new file mode 100644
index 0000000..8a4bd24
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.timer/documentation.md
@@ -0,0 +1,54 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+## String Timer
+
+<p align="center"> 
+    <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+
+This processor measures how long a value of a string field does not change. Once the value is changes the event with the measured time and the corresponding string value is emitted.
+
+
+***
+
+## Required input
+
+A string field is required in the data stream.
+
+### Field
+
+The string field which is monitored for any value changes.
+
+***
+
+## Configuration
+
+(no further configuration required)
+
+## Output
+The following two fields are appended to the event:
+* [measured_time] the measured time for the string value to not change
+* [field_value] the corresponding string value 
+
+The event is emitted whenever the value of the string field changes.
+
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.timer/icon.png b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.timer/icon.png
new file mode 100644
index 0000000..3272e33
Binary files /dev/null and b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.timer/icon.png differ
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.timer/strings.en b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.timer/strings.en
new file mode 100644
index 0000000..9034741
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.stringoperator.timer/strings.en
@@ -0,0 +1,17 @@
+org.apache.streampipes.processors.transformation.jvm.stringoperator.timer.title=String Timer
+org.apache.streampipes.processors.transformation.jvm.stringoperator.timer.description=Measures how long a string value does not change
+
+field.title=String Field
+field.description=The string field that is monitored
+
+measuredTime.title = Measured Time
+measuredTime.description = The time for the va
+
+fieldValue.title = Field Value
+fieldValue.description = The corresponding field value
+
+outputUnit.title= Output Unit
+outputUnit.description = The output unit of the measured time
+
+
+