You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2023/01/03 09:25:22 UTC
[streampipes] 02/02: [#1026] Remove flinkspector from tests
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch SP-1026
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 87c820a87205d0ac104ab2fb05ad3619a77e236c
Author: Philipp Zehnder <te...@users.noreply.github.com>
AuthorDate: Tue Jan 3 10:24:49 2023 +0100
[#1026] Remove flinkspector from tests
---
pom.xml | 18 ---
.../pom.xml | 25 ----
.../processor/aggregation/AggregationTestData.java | 67 ----------
.../aggregation/TestTimeAggregationProgram.java | 82 -------------
.../flink/processor/count/TestCountProgram.java | 135 ---------------------
.../flink/processor/rate/TestRateProgram.java | 129 --------------------
.../pom.xml | 14 ---
.../detection/processor/absence/TestAbsence.java | 125 -------------------
.../pattern/detection/processor/and/TestAnd.java | 135 ---------------------
.../pom.xml | 23 ----
.../pom.xml | 15 ---
.../processor/converter/TestConverterProgram.java | 95 ---------------
.../flink/processor/hasher/TestFieldHasher.java | 124 -------------------
.../hasher/TestFieldHasherController.java | 38 ------
.../processor/hasher/TestFieldHasherProgram.java | 84 -------------
.../processor/hasher/TestFieldHasherUtils.java | 44 -------
.../flink/processor/rename/TestRenameProgram.java | 102 ----------------
.../transformation/flink/utils/DummyCollector.java | 48 --------
.../smp/extractor/TestDockerImageExtractor.java | 44 -------
19 files changed, 1347 deletions(-)
diff --git a/pom.xml b/pom.xml
index 1e7c5967c..7e1822550 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,7 +53,6 @@
<elasticsearch.version>6.8.17</elasticsearch.version>
<file-management.version>3.1.0</file-management.version>
<flink.version>1.13.5</flink.version>
- <flinkspector.version>0.9.4</flinkspector.version>
<fogsy-qudt.version>1.0</fogsy-qudt.version>
<fst.version>2.57</fst.version>
<geojson-jackson.version>1.14</geojson-jackson.version>
@@ -993,23 +992,6 @@
<version>${rest-assured.version}</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>io.flinkspector</groupId>
- <artifactId>flinkspector-datastream_2.11</artifactId>
- <version>${flinkspector.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_2.11</artifactId>
- <version>${flink.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/pom.xml b/streampipes-extensions/streampipes-processors-aggregation-flink/pom.xml
index c6bc7aa90..ff25119f8 100644
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/pom.xml
+++ b/streampipes-extensions/streampipes-processors-aggregation-flink/pom.xml
@@ -84,31 +84,6 @@
<!-- Test dependencies -->
- <dependency>
- <groupId>io.flinkspector</groupId>
- <artifactId>flinkspector-datastream_2.11</artifactId>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_2.11</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.11</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_2.11</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_2.11</artifactId>
- <type>test-jar</type>
- </dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-test-utils</artifactId>
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationTestData.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationTestData.java
deleted file mode 100644
index e29b16481..000000000
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationTestData.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.aggregation.flink.processor.aggregation;
-
-import org.apache.streampipes.model.runtime.Event;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class AggregationTestData {
-
- private List<Event> expectedOutput;
- private List<Event> input;
-
- public AggregationTestData() {
- buildOutput();
- buildInput();
- }
-
- private void buildOutput() {
- this.expectedOutput = new ArrayList<>();
- this.expectedOutput.add(buildOutputMap(1.0f, 1.0f));
- this.expectedOutput.add(buildOutputMap(2.0f, 1.5f));
- }
-
- private void buildInput() {
- this.input = new ArrayList<>();
- input.add(buildEvent(1.0f));
- input.add(buildEvent(2.0f));
- }
-
- private Event buildOutputMap(Float value, Float aggregatedValue) {
- Event event = buildEvent(value);
- event.addField("aggregatedValue", aggregatedValue);
- return event;
- }
-
- private Event buildEvent(Float value) {
- Event event = new Event();
- event.addField("sensorId", "a");
- event.addField("value", value);
- return event;
- }
-
- public List<Event> getExpectedOutput() {
- return expectedOutput;
- }
-
- public List<Event> getInput() {
- return input;
- }
-}
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/TestTimeAggregationProgram.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/TestTimeAggregationProgram.java
deleted file mode 100644
index b8bfd6c7f..000000000
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/TestTimeAggregationProgram.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.aggregation.flink.processor.aggregation;
-
-import org.apache.streampipes.extensions.management.config.ConfigExtractor;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.processors.aggregation.flink.AggregationFlinkInit;
-import org.apache.streampipes.test.generator.InvocationGraphGenerator;
-
-import io.flinkspector.core.collection.ExpectedRecords;
-import io.flinkspector.datastream.DataStreamTestBase;
-import io.flinkspector.datastream.input.EventTimeInput;
-import io.flinkspector.datastream.input.EventTimeInputBuilder;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.Arrays;
-
-@Ignore
-//@RunWith(Parameterized.class)
-public class TestTimeAggregationProgram extends DataStreamTestBase {
-
-// @Parameterized.Parameters
-// public static Iterable<Object[]> algorithm() {
-// return Arrays.asList(new Object[][]{
-// {"a", 1},
-// {new Sha1HashAlgorithm(), HashAlgorithmType.SHA1},
-// {new Sha2HashAlgorithm(), HashAlgorithmType.SHA2}
-// });
-// }
-
-
- @Test
- public void testAggregationProgram() {
- AggregationParameters params = makeParams();
- ConfigExtractor configExtractor = ConfigExtractor.from(AggregationFlinkInit.SERVICE_GROUP);
- AggregationProgram program = new AggregationProgram(params, configExtractor, null);
- AggregationTestData testData = new AggregationTestData();
-
- DataStream<Event> stream = program.getApplicationLogic(createTestStream(makeInputData
- (testData)));
-
- ExpectedRecords<Event> expected =
- new ExpectedRecords<Event>().expectAll(testData.getExpectedOutput());
-
- assertStream(stream, expected);
- }
-
- private AggregationParameters makeParams() {
- return new AggregationParameters(
- InvocationGraphGenerator.makeEmptyInvocation(new AggregationController().declareModel()),
- AggregationType.AVG,
- 1,
- Arrays.asList("sensorId"),
- Arrays.asList("value"),
- 10,
- Arrays.asList("value"),
- true);
- }
-
- private EventTimeInput<Event> makeInputData(AggregationTestData testData) {
- return EventTimeInputBuilder.startWith(testData.getInput().get(0))
- .emit(testData.getInput().get(1), after(1, seconds));
- }
-
-}
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/count/TestCountProgram.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/count/TestCountProgram.java
deleted file mode 100644
index 0692a4099..000000000
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/count/TestCountProgram.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.aggregation.flink.processor.count;
-
-import org.apache.streampipes.extensions.management.config.ConfigExtractor;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.processors.aggregation.flink.AggregationFlinkInit;
-import org.apache.streampipes.test.generator.InvocationGraphGenerator;
-
-import io.flinkspector.core.collection.ExpectedRecords;
-import io.flinkspector.datastream.DataStreamTestBase;
-import io.flinkspector.datastream.input.EventTimeInput;
-import io.flinkspector.datastream.input.EventTimeInputBuilder;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-@Ignore
-public class TestCountProgram extends DataStreamTestBase {
-
- @Test
- public void testCountProgram() {
-
- EventTimeInput input = makeInputData(makeTestData(), makeTestData().size());
-
- ExpectedRecords<Event> expected =
- new ExpectedRecords<Event>().expectAll(getOutput());
-
- runProgram(input, expected);
- }
-
- @Test
- public void testOutOfWindow() {
-
- EventTimeInput input = makeInputData(makeTestData(), 2);
-
- ExpectedRecords<Event> expected =
- new ExpectedRecords<Event>().expectAll(getOutOfWindowOutput());
-
- runProgram(input, expected);
- }
-
- private void runProgram(EventTimeInput<Event> input, ExpectedRecords<Event>
- expected) {
- CountParameters params =
- new CountParameters(InvocationGraphGenerator.makeEmptyInvocation(new CountController().declareModel()), 10,
- "SECONDS", "field");
- ConfigExtractor configExtractor = ConfigExtractor.from(AggregationFlinkInit.SERVICE_GROUP);
- CountProgram program = new CountProgram(params, configExtractor, null);
-
- DataStream<Event> stream = program.getApplicationLogic(createTestStream(input));
-
- assertStream(stream, expected);
- }
-
- private Collection<Event> getOutput() {
- List<Event> outRecords = new ArrayList<>();
- outRecords.add(makeOutMap("v1", 1));
- outRecords.add(makeOutMap("v2", 1));
- outRecords.add(makeOutMap("v1", 2));
- outRecords.add(makeOutMap("v3", 1));
- outRecords.add(makeOutMap("v2", 2));
-
- return outRecords;
- }
-
- private Collection<Event> getOutOfWindowOutput() {
- List<Event> outRecords = new ArrayList<>();
- outRecords.add(makeOutMap("v1", 1));
- outRecords.add(makeOutMap("v2", 1));
- outRecords.add(makeOutMap("v1", 1));
- outRecords.add(makeOutMap("v3", 1));
- outRecords.add(makeOutMap("v2", 1));
-
- return outRecords;
- }
-
- private Event makeOutMap(String key, Integer count) {
- Event outEvent = new Event();
- outEvent.addField("value", key);
- outEvent.addField("count", count);
- return outEvent;
- }
-
- private EventTimeInput<Event> makeInputData(List<Event> testData, Integer
- splitIndex) {
- EventTimeInputBuilder<Event> builder = EventTimeInputBuilder.startWith(testData.get(0));
-
- for (int i = 1; i < splitIndex; i++) {
- builder.emit(testData.get(i), after(1, seconds));
- }
-
- for (int j = splitIndex; j < testData.size(); j++) {
- builder.emit(testData.get(j), after(10, seconds));
- }
-
- return builder;
- }
-
- private List<Event> makeTestData() {
- List<Event> inEvent = new ArrayList<>();
- inEvent.add(makeMap("v1"));
- inEvent.add(makeMap("v2"));
- inEvent.add(makeMap("v1"));
- inEvent.add(makeMap("v3"));
- inEvent.add(makeMap("v2"));
-
- return inEvent;
- }
-
- private Event makeMap(String s) {
- Event testEvent = new Event();
- testEvent.addField("field", s);
- return testEvent;
- }
-}
diff --git a/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/rate/TestRateProgram.java b/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/rate/TestRateProgram.java
deleted file mode 100644
index a0c3d77ef..000000000
--- a/streampipes-extensions/streampipes-processors-aggregation-flink/src/test/java/org/apache/streampipes/processors/aggregation/flink/processor/rate/TestRateProgram.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.aggregation.flink.processor.rate;
-
-import org.apache.streampipes.extensions.management.config.ConfigExtractor;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.processors.aggregation.flink.AggregationFlinkInit;
-import org.apache.streampipes.test.generator.InvocationGraphGenerator;
-
-import io.flinkspector.core.collection.ExpectedRecords;
-import io.flinkspector.datastream.DataStreamTestBase;
-import io.flinkspector.datastream.input.EventTimeInput;
-import io.flinkspector.datastream.input.EventTimeInputBuilder;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-@Ignore
-@RunWith(Parameterized.class)
-public class TestRateProgram extends DataStreamTestBase {
-
- @Parameterized.Parameters
- public static Iterable<Object[]> data() {
- return Arrays.asList(new Object[][]{
- {1, 1000, TimeUnit.MILLISECONDS, 1.0f, 1},
- {10, 1000, TimeUnit.MILLISECONDS, 1.0f, 1},
- {100, 1000, TimeUnit.MILLISECONDS, 1.0f, 1},
- {10, 100, TimeUnit.MILLISECONDS, 1.0f, 10},
- {2, 500, TimeUnit.MILLISECONDS, 2.0f, 1},
- {4, 250, TimeUnit.MILLISECONDS, 4.0f, 1},
- {8, 250, TimeUnit.MILLISECONDS, 4.0f, 2},
- });
- }
-
- @Parameterized.Parameter
- public Integer numEvents;
-
- @Parameterized.Parameter(1)
- public Integer waitTime;
-
- @Parameterized.Parameter(2)
- public TimeUnit timeUnit;
-
- @Parameterized.Parameter(3)
- public Float expectedFrequency;
-
- @Parameterized.Parameter(4)
- public Integer timeWindowSize;
-
- @Test
- public void testRateProgram() {
- EventRateParameter params =
- new EventRateParameter(InvocationGraphGenerator.makeEmptyInvocation(new EventRateController().declareModel()),
- timeWindowSize);
- ConfigExtractor configExtractor = ConfigExtractor.from(AggregationFlinkInit.SERVICE_GROUP);
-
- EventRateProgram program = new EventRateProgram(params, configExtractor, null);
-
- DataStream<Event> stream = program.getApplicationLogic(createTestStream(makeInputData
- (numEvents, waitTime, timeUnit)));
-
- ExpectedRecords<Event> expected =
- new ExpectedRecords<Event>().expectAll(getOutput(timeWindowSize, expectedFrequency,
- numEvents));
-
- assertStream(stream, expected);
- }
-
- private Collection<Event> getOutput(Integer timeWindowSize, Float eventsPerSecond, Integer
- numEvents) {
- List<Event> allEvents = new ArrayList<>();
- Event outMap = new Event();
- outMap.addField("rate", eventsPerSecond);
-
- for (int i = 0; i < numEvents % timeWindowSize; i++) {
- allEvents.add(outMap);
- }
-
- return allEvents;
- }
-
- private EventTimeInput<Event> makeInputData(Integer count, Integer time, TimeUnit timeUnit) {
- List<Event> testData = makeTestData(count);
- EventTimeInputBuilder<Event> builder = EventTimeInputBuilder.startWith(testData.get(0));
-
- for (int i = 1; i < testData.size(); i++) {
- builder.emit(testData.get(i), after(time, timeUnit));
- }
-
- return builder;
- }
-
- private List<Event> makeTestData(Integer count) {
- List<Event> allEvents = new ArrayList<>();
- Event event = new Event();
- event.addField("test", 1);
-
- for (int i = 0; i < count; i++) {
- allEvents.add(event);
- }
-
- return allEvents;
- }
-
-
-}
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/pom.xml b/streampipes-extensions/streampipes-processors-pattern-detection-flink/pom.xml
index feb672448..753313463 100644
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/pom.xml
+++ b/streampipes-extensions/streampipes-processors-pattern-detection-flink/pom.xml
@@ -96,20 +96,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>io.flinkspector</groupId>
- <artifactId>flinkspector-datastream_2.11</artifactId>
- <exclusions>
- <exclusion>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_2.11</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.11</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-test-utils</artifactId>
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/absence/TestAbsence.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/absence/TestAbsence.java
deleted file mode 100644
index 1a13836cd..000000000
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/absence/TestAbsence.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.pattern.detection.processor.absence;
-
-import org.apache.streampipes.extensions.management.config.ConfigExtractor;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.processors.pattern.detection.flink.PatternDetectionFlinkInit;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.absence.AbsenceController;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.absence.AbsenceParameters;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.absence.AbsenceProgram;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.and.TimeUnit;
-import org.apache.streampipes.test.generator.InvocationGraphGenerator;
-
-import io.flinkspector.datastream.DataStreamTestBase;
-import io.flinkspector.datastream.input.EventTimeInput;
-import io.flinkspector.datastream.input.EventTimeInputBuilder;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import static org.hamcrest.core.IsEqual.equalTo;
-
-@Ignore
-@RunWith(Parameterized.class)
-public class TestAbsence extends DataStreamTestBase {
-
- @Parameterized.Parameters
- public static Iterable<Object[]> data() {
- return Arrays.asList(new Object[][]{
- {10, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 12},
- {10, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), false, 5},
- {5, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 6},
- });
- }
-
- @Parameterized.Parameter
- public Integer timeWindow;
-
- @Parameterized.Parameter(1)
- public TimeUnit timeUnit;
-
- @Parameterized.Parameter(2)
- public List<String> leftMapping;
-
- @Parameterized.Parameter(3)
- public List<String> rightMapping;
-
- @Parameterized.Parameter(4)
- public Boolean shouldMatch;
-
- @Parameterized.Parameter(5)
- public Integer waitForMs;
-
-
- @Test
- public void testAbsenceProgram() {
- AbsenceParameters params =
- new AbsenceParameters(InvocationGraphGenerator.makeEmptyInvocation(new AbsenceController().declareModel()),
- Arrays.asList("id", "timestamp", "value"), timeWindow, timeUnit);
-
- ConfigExtractor configExtractor = ConfigExtractor.from(PatternDetectionFlinkInit.SERVICE_GROUP);
- AbsenceProgram program = new AbsenceProgram(params, configExtractor, null);
-
- DataStream<Event> stream = program.getApplicationLogic(createTestStream(makeInputData(1, makeMap(), 0)),
- createTestStream(makeInputData(waitForMs, makeMap(), 1)));
-
- assertStream(stream, equalTo(getOutput(shouldMatch)));
- }
-
- private Collection<Event> getOutput(Boolean shouldMatch) {
- List<Event> allEvents = new ArrayList<>();
-
- if (shouldMatch) {
- allEvents.add(makeMap().get(0));
- }
-
- return allEvents;
- }
-
- private EventTimeInput<Event> makeInputData(Integer delayEvent, List<Event> inputMap, Integer i) {
- List<Event> testData = inputMap;
- EventTimeInputBuilder<Event> builder = EventTimeInputBuilder.startWith(testData.get(i), after(delayEvent, seconds));
-
- return builder;
- }
-
- private List<Event> makeMap() {
- List<Event> allEvents = new ArrayList<>();
- Event event1 = new Event();
- event1.addField("id", "a");
- event1.addField("timestamp", 0);
-
- allEvents.add(event1);
-
- Event event2 = new Event();
- event2.addField("id", "a");
- event2.addField("timestamp", waitForMs);
-
- allEvents.add(event2);
-
- return allEvents;
- }
-}
diff --git a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/and/TestAnd.java b/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/and/TestAnd.java
deleted file mode 100644
index 0546c3f57..000000000
--- a/streampipes-extensions/streampipes-processors-pattern-detection-flink/src/test/java/org/apache/streampipes/processors/pattern/detection/processor/and/TestAnd.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.pattern.detection.processor.and;
-import org.apache.streampipes.extensions.management.config.ConfigExtractor;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.processors.pattern.detection.flink.PatternDetectionFlinkInit;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.and.AndController;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.and.AndParameters;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.and.AndProgram;
-import org.apache.streampipes.processors.pattern.detection.flink.processor.and.TimeUnit;
-import org.apache.streampipes.test.generator.InvocationGraphGenerator;
-import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator;
-
-import io.flinkspector.datastream.DataStreamTestBase;
-import io.flinkspector.datastream.input.EventTimeInput;
-import io.flinkspector.datastream.input.EventTimeInputBuilder;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import static org.hamcrest.core.IsEqual.equalTo;
-
-@Ignore
-@RunWith(Parameterized.class)
-public class TestAnd extends DataStreamTestBase {
-
-
- @Parameterized.Parameters
- public static Iterable<Object[]> data() {
- return Arrays.asList(new Object[][]{
- {2, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 1, 1},
- {1, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 1, 1},
- {10, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), false, 1, 12},
- {10, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 3, 4},
- {1, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), false, 1, 2},
- {3600, TimeUnit.Seconds, Arrays.asList("id"), Arrays.asList("id"), true, 10, 3500},
-
- });
- }
-
- @Parameterized.Parameter
- public Integer timeWindow;
-
- @Parameterized.Parameter(1)
- public TimeUnit timeUnit;
-
- @Parameterized.Parameter(2)
- public List<String> leftMapping;
-
- @Parameterized.Parameter(3)
- public List<String> rightMapping;
-
- @Parameterized.Parameter(4)
- public Boolean shouldMatch;
-
- @Parameterized.Parameter(5)
- public Integer delayFirstEvent;
-
- @Parameterized.Parameter(6)
- public Integer delaySecondEvent;
-
- @Test
- public void testAndProgram() {
- DataProcessorDescription description = new AndController().declareModel();
- description.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding());
- AndParameters params =
- new AndParameters(InvocationGraphGenerator.makeEmptyInvocation(description), timeUnit,
- timeWindow, leftMapping, rightMapping);
-
- ConfigExtractor configExtractor = ConfigExtractor.from(PatternDetectionFlinkInit.SERVICE_GROUP);
- AndProgram program = new AndProgram(params, configExtractor, null);
-
- DataStream<Event> stream =
- program.getApplicationLogic(createTestStream(makeInputData(delayFirstEvent, makeMap("field1"))),
- createTestStream(makeInputData(delaySecondEvent, makeMap("field2"))));
-
- assertStream(stream, equalTo(getOutput(shouldMatch)));
- }
-
- private Collection<Event> getOutput(Boolean shouldMatch) {
- List<Event> allEvents = new ArrayList<>();
-
- if (shouldMatch) {
- Event outMap = new Event();
- outMap.addField("id", "a");
- outMap.addField("field1", 1);
- outMap.addField("field2", 1);
- allEvents.add(outMap);
- }
-
- return allEvents;
- }
-
- private EventTimeInput<Event> makeInputData(Integer delayEvent, List<Event> inputMap) {
- List<Event> testData = inputMap;
- EventTimeInputBuilder<Event> builder = EventTimeInputBuilder.startWith(testData.get(0), after(delayEvent, seconds));
-
- return builder;
- }
-
- private List<Event> makeMap(String fieldName) {
- List<Event> allEvents = new ArrayList<>();
- Event event = new Event();
- event.addField("id", "a");
- event.addField(fieldName, 1);
-
- allEvents.add(event);
-
- return allEvents;
- }
-
-}
diff --git a/streampipes-extensions/streampipes-processors-text-mining-flink/pom.xml b/streampipes-extensions/streampipes-processors-text-mining-flink/pom.xml
index 18d031adc..45ddbff55 100644
--- a/streampipes-extensions/streampipes-processors-text-mining-flink/pom.xml
+++ b/streampipes-extensions/streampipes-processors-text-mining-flink/pom.xml
@@ -63,29 +63,6 @@
</dependency>
<!-- Test dependencies -->
- <dependency>
- <groupId>io.flinkspector</groupId>
- <artifactId>flinkspector-datastream_2.11</artifactId>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_2.11</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.11</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-test-utils</artifactId>
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/pom.xml b/streampipes-extensions/streampipes-processors-transformation-flink/pom.xml
index a94fcbec2..2469a686e 100644
--- a/streampipes-extensions/streampipes-processors-transformation-flink/pom.xml
+++ b/streampipes-extensions/streampipes-processors-transformation-flink/pom.xml
@@ -64,21 +64,6 @@
</dependency>
<!-- Test dependencies -->
- <dependency>
- <groupId>io.flinkspector</groupId>
- <artifactId>flinkspector-datastream_2.11</artifactId>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_2.11</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.11</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-test-utils</artifactId>
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/converter/TestConverterProgram.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/converter/TestConverterProgram.java
deleted file mode 100644
index e8dd0b4a0..000000000
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/converter/TestConverterProgram.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.transformation.flink.processor.converter;
-
-
-import org.apache.streampipes.extensions.management.config.ConfigExtractor;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.processors.transformation.flink.TransformationFlinkInit;
-import org.apache.streampipes.test.generator.InvocationGraphGenerator;
-
-import io.flinkspector.core.collection.ExpectedRecords;
-import io.flinkspector.datastream.DataStreamTestBase;
-import io.flinkspector.datastream.input.EventTimeInput;
-import io.flinkspector.datastream.input.EventTimeInputBuilder;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-@RunWith(Parameterized.class)
-@Ignore
-public class TestConverterProgram extends DataStreamTestBase {
-
- @Parameterized.Parameters
- public static Iterable<Object[]> data() {
- return Arrays.asList(new Object[][]{
- {"1", 1, "http://www.w3.org/2001/XMLSchema#integer"},
- {"1.0", 1.0f, "http://www.w3.org/2001/XMLSchema#float"},
-
- });
- }
-
- @Parameterized.Parameter
- public String inputValue;
-
- @Parameterized.Parameter(1)
- public Object expectedValue;
-
- @Parameterized.Parameter(2)
- public String targetDatatype;
-
- @Test
- public void testConverterProgram() {
- FieldConverterParameters params = new FieldConverterParameters(
- InvocationGraphGenerator.makeEmptyInvocation(new FieldConverterController().declareModel()), "field",
- targetDatatype);
-
- ConfigExtractor configExtractor = ConfigExtractor.from(TransformationFlinkInit.SERVICE_GROUP);
- FieldConverterProgram program = new FieldConverterProgram(params, configExtractor, null);
-
- DataStream<Event> stream = program.getApplicationLogic(createTestStream(makeInputData(inputValue)));
-
- ExpectedRecords<Event> expected =
- new ExpectedRecords<Event>().expect(makeTestData(expectedValue).get(0));
-
- assertStream(stream, expected);
- }
-
- private EventTimeInput<Event> makeInputData(String inputValue) {
- List<Event> testData = makeTestData(inputValue);
- EventTimeInputBuilder<Event> builder = EventTimeInputBuilder.startWith(testData.get(0));
-
- return builder;
- }
-
- private List<Event> makeTestData(Object inputValue) {
- List<Event> allEvents = new ArrayList<>();
- Event event = new Event();
- event.addField("field", inputValue);
-
- allEvents.add(event);
-
- return allEvents;
- }
-}
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/TestFieldHasher.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/TestFieldHasher.java
deleted file mode 100644
index e5723d16a..000000000
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/TestFieldHasher.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.transformation.flink.processor.hasher;
-
-
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.algorithm.HashAlgorithm;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.algorithm.Md5HashAlgorithm;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.algorithm.Sha1HashAlgorithm;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.algorithm.Sha2HashAlgorithm;
-import org.apache.streampipes.processors.transformation.flink.utils.DummyCollector;
-
-import io.flinkspector.datastream.DataStreamTestBase;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-@Ignore
-@RunWith(Parameterized.class)
-public class TestFieldHasher extends DataStreamTestBase {
-
- @Parameterized.Parameters
- public static Iterable<Object[]> algorithm() {
- return Arrays.asList(new Object[][]{
- {"field1", "field2", "1"},
- {"field1", "field2", "1"},
- {"field1", "field2", 3},
- });
- }
-
- @Parameterized.Parameter
- public String fieldToHash;
-
- @Parameterized.Parameter(1)
- public String fieldNotToHash;
-
- @Parameterized.Parameter(2)
- public Object valueToHash;
-
- private Event inputMap;
- private Event expectedMap;
-
- @Before
- public void generateMaps() {
- inputMap = new Event();
- inputMap.addField(fieldToHash, valueToHash);
- inputMap.addField(fieldNotToHash, valueToHash);
-
- expectedMap = new Event();
- expectedMap.addField(fieldToHash, valueToHash);
- expectedMap.addField(fieldNotToHash, valueToHash);
- }
-
- @Test
- public void testFieldHasherMd5() {
- HashAlgorithm algorithm = new Md5HashAlgorithm();
- FieldHasher fieldHasher = new FieldHasher(fieldToHash, algorithm);
- expectedMap.addField(fieldToHash, algorithm.toHashValue(valueToHash));
-
- testFieldHasher(fieldHasher);
-
- }
-
- @Test
- public void testFieldHasherSha1() {
- HashAlgorithm algorithm = new Sha1HashAlgorithm();
- FieldHasher fieldHasher = new FieldHasher(fieldToHash, algorithm);
- expectedMap.addField(fieldToHash, algorithm.toHashValue(valueToHash));
-
- testFieldHasher(fieldHasher);
-
- }
-
- @Test
- public void testFieldHasherSha2() {
- HashAlgorithm algorithm = new Sha2HashAlgorithm();
- FieldHasher fieldHasher = new FieldHasher(fieldToHash, algorithm);
- expectedMap.addField(fieldToHash, algorithm.toHashValue(valueToHash));
-
- testFieldHasher(fieldHasher);
-
- }
-
- private void testFieldHasher(FieldHasher fieldHasher) {
- DummyCollector collector = new DummyCollector();
- try {
- fieldHasher.flatMap(inputMap, collector);
-
- List<Event> output = collector.getOutput();
-
- if (output.size() != 1) {
- fail();
- } else {
- assertEquals(expectedMap.getRaw(), output.get(0).getRaw());
- }
- } catch (Exception e) {
- fail();
- }
- }
-
-}
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/TestFieldHasherController.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/TestFieldHasherController.java
deleted file mode 100644
index 6aace3dd4..000000000
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/TestFieldHasherController.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.transformation.flink.processor.hasher;
-
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-
-import org.junit.Test;
-
-public class TestFieldHasherController {
-
- @Test
- public void testDescription() {
- DataProcessorDescription description = new FieldHasherController().declareModel();
-
- }
-
- @Test
- public void testInvocation() {
-
- }
-
-
-}
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/TestFieldHasherProgram.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/TestFieldHasherProgram.java
deleted file mode 100644
index 2a19bdc3b..000000000
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/TestFieldHasherProgram.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.transformation.flink.processor.hasher;
-
-
-import org.apache.streampipes.extensions.management.config.ConfigExtractor;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.processors.transformation.flink.TransformationFlinkInit;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.algorithm.HashAlgorithm;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.algorithm.HashAlgorithmType;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.algorithm.Md5HashAlgorithm;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.algorithm.Sha1HashAlgorithm;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.algorithm.Sha2HashAlgorithm;
-import org.apache.streampipes.test.generator.InvocationGraphGenerator;
-
-import io.flinkspector.core.collection.ExpectedRecords;
-import io.flinkspector.datastream.DataStreamTestBase;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-
-import static org.apache.streampipes.processors.transformation.flink.processor.hasher.TestFieldHasherUtils.makeTestData;
-
-@RunWith(Parameterized.class)
-@Ignore
-public class TestFieldHasherProgram extends DataStreamTestBase {
-
- @Parameterized.Parameters
- public static Iterable<Object[]> algorithm() {
- return Arrays.asList(new Object[][]{
- {new Md5HashAlgorithm(), HashAlgorithmType.MD5},
- {new Sha1HashAlgorithm(), HashAlgorithmType.SHA1},
- {new Sha2HashAlgorithm(), HashAlgorithmType.SHA2}
- });
- }
-
- @Parameterized.Parameter()
- public HashAlgorithm hashAlgorithm;
-
- @Parameterized.Parameter(1)
- public HashAlgorithmType hashAlgorithmType;
-
- @Test
- public void testFieldHasherProgram() {
-
- FieldHasherParameters params = makeParams();
- ConfigExtractor configExtractor = ConfigExtractor.from(TransformationFlinkInit.SERVICE_GROUP);
- FieldHasherProgram program = new FieldHasherProgram(params, configExtractor, null);
-
- DataStream<Event> stream = program.getApplicationLogic(createTestStream(makeTestData(true, hashAlgorithm)));
-
- ExpectedRecords<Event> expected =
- new ExpectedRecords<Event>().expectAll(makeTestData(false, hashAlgorithm));
-
- assertStream(stream, expected);
- }
-
- private FieldHasherParameters makeParams() {
- return new FieldHasherParameters(
- InvocationGraphGenerator.makeEmptyInvocation(new FieldHasherController().declareModel()), "field",
- hashAlgorithmType);
- }
-
-
-}
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/TestFieldHasherUtils.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/TestFieldHasherUtils.java
deleted file mode 100644
index 637bbde75..000000000
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/TestFieldHasherUtils.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.transformation.flink.processor.hasher;
-
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.processors.transformation.flink.processor.hasher.algorithm.HashAlgorithm;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class TestFieldHasherUtils {
-
- public static List<String> testData = Arrays.asList("test1", "test2", "test3", "test4");
-
- public static List<Event> makeTestData(boolean originalValue, HashAlgorithm hashAlgorithm) {
- List<Event> data = new ArrayList<>();
- for (int i = 0; i < 3; i++) {
- Event event = new Event();
- event.addField("timestamp", i);
- event.addField("field", originalValue ? testData.get(i) : hashAlgorithm.toHashValue
- (testData
- .get(i)));
- data.add(event);
- }
- return data;
- }
-
-}
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/rename/TestRenameProgram.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/rename/TestRenameProgram.java
deleted file mode 100644
index b35321b40..000000000
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/rename/TestRenameProgram.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.transformation.flink.processor.rename;
-
-
-import org.apache.streampipes.extensions.management.config.ConfigExtractor;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.processors.transformation.flink.TransformationFlinkInit;
-import org.apache.streampipes.test.generator.InvocationGraphGenerator;
-
-import io.flinkspector.core.collection.ExpectedRecords;
-import io.flinkspector.datastream.DataStreamTestBase;
-import io.flinkspector.datastream.input.EventTimeInput;
-import io.flinkspector.datastream.input.EventTimeInputBuilder;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runners.Parameterized;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-@Ignore
-public class TestRenameProgram extends DataStreamTestBase {
-
- @Parameterized.Parameters
- public static Iterable<Object[]> data() {
- return Arrays.asList(new Object[][]{
- {"fieldA", "fieldC"},
- {"fieldB", "fieldD"},
-
- });
- }
-
- @Parameterized.Parameter
- public String oldPropertyName;
-
- @Parameterized.Parameter(1)
- public String newPropertyName;
-
- @Test
- public void testConverterProgram() {
- FieldRenamerParameters params = new FieldRenamerParameters(
- InvocationGraphGenerator.makeEmptyInvocation(new FieldRenamerController().declareModel()), oldPropertyName,
- newPropertyName);
-
- ConfigExtractor configExtractor = ConfigExtractor.from(TransformationFlinkInit.SERVICE_GROUP);
- FieldRenamerProgram program = new FieldRenamerProgram(params, configExtractor, null);
-
- DataStream<Event> stream = program.getApplicationLogic(createTestStream(makeInputData()));
-
- ExpectedRecords<Event> expected =
- new ExpectedRecords<Event>().expectAll(getOutput(oldPropertyName, newPropertyName));
-
- assertStream(stream, expected);
- }
-
- private Collection<Event> getOutput(String oldPropertyName, String newPropertyName) {
- List<Event> allEvents = new ArrayList<>();
- Event outMap = makeTestData().get(0);
- Object value = outMap.getFieldBySelector(oldPropertyName);
- outMap.removeFieldBySelector(oldPropertyName);
- outMap.addField(newPropertyName, value);
- allEvents.add(outMap);
-
- return allEvents;
- }
-
- private EventTimeInput<Event> makeInputData() {
- List<Event> testData = makeTestData();
-
- return EventTimeInputBuilder.startWith(testData.get(0));
- }
-
- private List<Event> makeTestData() {
- List<Event> allEvents = new ArrayList<>();
- Event event = new Event();
- event.addField("fieldA", "a");
- event.addField("fieldB", "b");
-
- allEvents.add(event);
-
- return allEvents;
- }
-}
diff --git a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/utils/DummyCollector.java b/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/utils/DummyCollector.java
deleted file mode 100644
index 45e411187..000000000
--- a/streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/utils/DummyCollector.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.processors.transformation.flink.utils;
-
-import org.apache.streampipes.model.runtime.Event;
-
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class DummyCollector implements Collector<Event> {
-
- List<Event> output;
-
- public DummyCollector() {
- this.output = new ArrayList<>();
- }
-
- @Override
- public void collect(Event stringObjectMap) {
- this.output.add(stringObjectMap);
- }
-
- @Override
- public void close() {
-
- }
-
- public List<Event> getOutput() {
- return output;
- }
-}
diff --git a/streampipes-maven-plugin/src/test/java/org/apache/streampipes/smp/extractor/TestDockerImageExtractor.java b/streampipes-maven-plugin/src/test/java/org/apache/streampipes/smp/extractor/TestDockerImageExtractor.java
deleted file mode 100644
index c5370be58..000000000
--- a/streampipes-maven-plugin/src/test/java/org/apache/streampipes/smp/extractor/TestDockerImageExtractor.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.smp.extractor;
-
-import org.apache.commons.io.IOUtils;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.yaml.snakeyaml.Yaml;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestDockerImageExtractor {
-
- @Test
- @Ignore
- public void testDockerImageExtrator() throws IOException {
- ClassLoader classLoader = this.getClass().getClassLoader();
- String yamlContent = IOUtils.toString(classLoader.getResourceAsStream("docker-compose.yml"));
- Yaml yaml = new Yaml();
- String imageName = new DockerImageExtractor(null).extractNameFromYaml(yaml.load(yamlContent));
- System.out.println(imageName);
-
- assertEquals("processors-text-mining-flink", imageName);
-
- }
-}