You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2022/03/31 16:07:53 UTC

[incubator-streampipes] 02/02: [hotfix] Remove old spark wrapper module

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 1e3d2aecd935de6fd62e75d4172b46ba90cb1507
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Thu Mar 31 18:07:17 2022 +0200

    [hotfix] Remove old spark wrapper module
---
 pom.xml                                            |  17 --
 streampipes-wrapper-spark/pom.xml                  | 213 ---------------------
 .../wrapper/spark/AbstractSparkDeclarer.java       |  52 -----
 .../wrapper/spark/SparkDataProcessorDeclarer.java  |  28 ---
 .../wrapper/spark/SparkDataProcessorRuntime.java   | 162 ----------------
 .../wrapper/spark/SparkDeploymentConfig.java       | 100 ----------
 .../streampipes/wrapper/spark/SparkRuntime.java    | 188 ------------------
 .../wrapper/spark/converter/JsonToMapFormat.java   |  52 -----
 .../spark/serializer/SimpleKafkaSerializer.java    |  69 -------
 9 files changed, 881 deletions(-)

diff --git a/pom.xml b/pom.xml
index 80f292d..8eff674 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,7 +109,6 @@
 	<slf4j.version>1.7.30</slf4j.version>
 	<snakeyaml.version>1.26</snakeyaml.version>
 	<snappy-java.version>1.1.7.7</snappy-java.version>
-	<spark.version>2.1.2</spark.version>
 	<spring.version>5.3.14</spring.version>
 	<spring-boot.version>2.6.2</spring-boot.version>
 	<spring-security.version>5.6.1</spring-security.version>
@@ -513,21 +512,6 @@ IoT data streams.
 					<version>${shiro.version}</version>
 				</dependency>
 				<dependency>
-					<groupId>org.apache.spark</groupId>
-					<artifactId>spark-core_2.11</artifactId>
-					<version>${spark.version}</version>
-				</dependency>
-				<dependency>
-					<groupId>org.apache.spark</groupId>
-					<artifactId>spark-streaming_2.11</artifactId>
-					<version>${spark.version}</version>
-				</dependency>
-				<dependency>
-					<groupId>org.apache.spark</groupId>
-					<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
-					<version>${spark.version}</version>
-				</dependency>
-				<dependency>
 					<groupId>org.codehaus.plexus</groupId>
 					<artifactId>plexus-component-annotations</artifactId>
 					<version>${plexus-component-annotations.version}</version>
@@ -971,7 +955,6 @@ IoT data streams.
 			<module>streampipes-wrapper-kafka-streams</module>
 			<module>streampipes-wrapper-python</module>
 			<module>streampipes-wrapper-siddhi</module>
-			<!--        <module>streampipes-wrapper-spark</module>-->
 			<module>streampipes-wrapper-standalone</module>
 			<module>streampipes-mail</module>
 			<module>streampipes-resource-management</module>
diff --git a/streampipes-wrapper-spark/pom.xml b/streampipes-wrapper-spark/pom.xml
deleted file mode 100644
index 1c3e9c9..0000000
--- a/streampipes-wrapper-spark/pom.xml
+++ /dev/null
@@ -1,213 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  ~
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.streampipes</groupId>
-        <artifactId>streampipes-parent</artifactId>
-        <version>0.69.0-SNAPSHOT</version>
-    </parent>
-
-    <artifactId>streampipes-wrapper-spark</artifactId>
-
-    <properties>
-        <spark.version>2.4.8</spark.version>
-    </properties>
-
-    <dependencyManagement>
-        <dependencies>
-            <dependency>
-                <groupId>com.github.luben</groupId>
-                <artifactId>zstd-jni</artifactId>
-                <version>1.3.8-1</version>
-            </dependency>
-            <dependency>
-                <groupId>com.thoughtworks.paranamer</groupId>
-                <artifactId>paranamer</artifactId>
-                <version>2.8</version>
-            </dependency>
-            <dependency>
-                <groupId>commons-codec</groupId>
-                <artifactId>commons-codec</artifactId>
-                <version>1.10</version>
-            </dependency>
-            <dependency>
-                <groupId>commons-lang</groupId>
-                <artifactId>commons-lang</artifactId>
-                <version>2.6</version>
-            </dependency>
-            <dependency>
-                <groupId>io.netty</groupId>
-                <artifactId>netty</artifactId>
-                <version>3.9.9.Final</version>
-            </dependency>
-            <dependency>
-                <groupId>javax.activation</groupId>
-                <artifactId>activation</artifactId>
-                <version>1.1.1</version>
-            </dependency>
-            <dependency>
-                <groupId>log4j</groupId>
-                <artifactId>log4j</artifactId>
-                <version>1.2.17</version>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.avro</groupId>
-                <artifactId>avro</artifactId>
-                <version>1.8.2</version>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.commons</groupId>
-                <artifactId>commons-compress</artifactId>
-                <version>1.8.1</version>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.commons</groupId>
-                <artifactId>commons-math3</artifactId>
-                <version>3.4.1</version>
-            </dependency>
-            <dependency>
-                <groupId>org.lz4</groupId>
-                <artifactId>lz4-java</artifactId>
-                <version>1.5.0</version>
-            </dependency>
-            <dependency>
-                <groupId>org.scala-lang</groupId>
-                <artifactId>scala-library</artifactId>
-                <version>2.11.2</version>
-            </dependency>
-            <dependency>
-                <groupId>org.slf4j</groupId>
-                <artifactId>jcl-over-slf4j</artifactId>
-                <version>1.7.25</version>
-            </dependency>
-            <dependency>
-                <groupId>org.slf4j</groupId>
-                <artifactId>slf4j-log4j12</artifactId>
-                <version>1.7.25</version>
-            </dependency>
-        </dependencies>
-    </dependencyManagement>
-
-    <dependencies>
-        <!-- StreamPipes dependencies -->
-        <dependency>
-            <groupId>org.apache.streampipes</groupId>
-            <artifactId>streampipes-model</artifactId>
-            <version>0.69.0-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streampipes</groupId>
-            <artifactId>streampipes-container</artifactId>
-            <version>0.69.0-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.streampipes</groupId>
-            <artifactId>streampipes-wrapper</artifactId>
-            <version>0.69.0-SNAPSHOT</version>
-        </dependency>
-
-        <!-- External dependencies -->
-        <dependency>
-            <groupId>commons-lang</groupId>
-            <artifactId>commons-lang</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.github.luben</groupId>
-            <artifactId>zstd-jni</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.thoughtworks.paranamer</groupId>
-            <artifactId>paranamer</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-codec</groupId>
-            <artifactId>commons-codec</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>javax.activation</groupId>
-            <artifactId>activation</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.lz4</groupId>
-            <artifactId>lz4-java</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-compress</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-math3</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.glassfish.jersey.containers</groupId>
-            <artifactId>jersey-container-servlet-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.glassfish.jersey.core</groupId>
-            <artifactId>jersey-common</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.glassfish.jersey.core</groupId>
-            <artifactId>jersey-client</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.glassfish.jersey.core</groupId>
-            <artifactId>jersey-server</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.11</artifactId>
-            <version>${spark.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-streaming_2.11</artifactId>
-            <version>${spark.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
-            <version>${spark.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-library</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>jcl-over-slf4j</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-        </dependency>
-    </dependencies>
-</project>
diff --git a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/AbstractSparkDeclarer.java b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/AbstractSparkDeclarer.java
deleted file mode 100644
index 5c41055..0000000
--- a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/AbstractSparkDeclarer.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.wrapper.spark;
-
-import org.apache.streampipes.container.declarer.InvocableDeclarer;
-import org.apache.streampipes.model.Response;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-
-public abstract class AbstractSparkDeclarer<D extends NamedStreamPipesEntity, I extends InvocableStreamPipesEntity, SR extends SparkRuntime> implements InvocableDeclarer<D, I> {
-  protected SR runtime;
-  protected I graph;
-
-  @Override
-  public Response invokeRuntime(I sepaInvocation, String serviceId) {
-    runtime = getRuntime(sepaInvocation);
-    graph = sepaInvocation;
-
-    if (runtime.startExecution()) {
-      return new Response(graph.getElementId(), true);
-    } else {
-      return new Response(graph.getElementId(), false);
-    }
-  }
-
-  @Override
-  public Response detachRuntime(String pipelineId, String serviceId) {
-    if (runtime.stop()) {
-      return new Response(graph.getElementId(), true);
-    } else {
-      return new Response(graph.getElementId(), false);
-    }
-  }
-
-  protected abstract SR getRuntime(I graph);
-}
diff --git a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java
deleted file mode 100644
index 91fbf57..0000000
--- a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.wrapper.spark;
-
-import org.apache.streampipes.container.declarer.SemanticEventProcessingAgentDeclarer;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public abstract class SparkDataProcessorDeclarer<B extends EventProcessorBindingParams>
-    extends AbstractSparkDeclarer<DataProcessorDescription, DataProcessorInvocation, SparkDataProcessorRuntime> implements SemanticEventProcessingAgentDeclarer {
-}
diff --git a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorRuntime.java b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorRuntime.java
deleted file mode 100644
index 460edb4..0000000
--- a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorRuntime.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.wrapper.spark;
-
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.grounding.TransportProtocol;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-import org.apache.streampipes.wrapper.spark.serializer.SimpleKafkaSerializer;
-
-import java.io.*;
-import java.lang.reflect.Constructor;
-import java.util.Base64;
-import java.util.Map;
-
-public abstract class SparkDataProcessorRuntime<B extends EventProcessorBindingParams> extends SparkRuntime<DataProcessorInvocation> {
-    private static final long serialVersionUID = 1L;
-    protected B params;
-
-    public SparkDataProcessorRuntime(B params, SparkDeploymentConfig deploymentConfig) {
-        super(params.getGraph(), deploymentConfig);
-        this.params = params;
-    }
-
-    protected abstract JavaDStream<Map<String, Object>> getApplicationLogic(JavaDStream<Map<String, Object>>... messageStream);
-
-    @Override
-    public boolean execute(JavaDStream<Map<String, Object>>... convertedStream) {
-        JavaDStream<Map<String, Object>> applicationLogic = getApplicationLogic(convertedStream);
-        //applicationLogic.print();
-
-        if (isOutputKafkaProtocol()) {
-            applicationLogic.foreachRDD(SimpleKafkaSerializer.getInstance(kafkaParams, protocol().getTopicDefinition
-                    ().getActualTopicName()));
-        }
-        else {
-            //TODO: JMS
-        }
-
-        thread = new Thread(this);
-        thread.start();
-
-        return true;
-    }
-
-    /**
-     * Serialize the instance's data to transmit to Spark cluster.
-     * The serialized data contains
-     *   - the class name (as String) of the job's program class (inheriting from this class)
-     *   - the class name (as String) of the job's parameter class (inheriting from EventProcessorBindingParams),
-     *     stored in this.params
-     *   - this.params
-     *   - this.deplomentconfig
-     *
-     * This data is deserialized in SparkDataProcessorRuntime.main()
-     * @return the serialized data as byte array
-     */
-    @Override
-    protected byte[] getSerializationData() {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        ObjectOutputStream out = null;
-        try {
-            out = new ObjectOutputStream(bos);
-
-            out.writeObject(this.getClass().getName());
-            out.writeObject(params.getClass().getName());
-            out.writeObject(params);
-            out.writeObject(deploymentConfig);
-
-            out.flush();
-        } catch (IOException e) {
-            e.printStackTrace();
-        } finally {
-            try {
-                bos.close();
-            } catch (IOException ex) {
-                // ignore close exception
-            }
-        }
-
-    return bos.toByteArray();
-    }
-
-    private boolean isOutputKafkaProtocol() {
-        return protocol() instanceof KafkaTransportProtocol;
-    }
-
-    private TransportProtocol protocol() {
-        return params
-                .getGraph()
-                .getOutputStream()
-                .getEventGrounding()
-                .getTransportProtocol();
-    }
-
-    /**
-     * Entry point for execution on Spark clusters.
-     * @param args
-     * args[0] contains the serialized and base64-encoded data of the instance to run.
-     */
-    public static void main(String[] args) {
-        String enc = args[0];
-        //System.out.println("Received data: '" + enc + "'");
-
-        byte[] data = Base64.getDecoder().decode(enc);
-
-        String programClassName = null;
-        String paramsClassName = null;
-        EventProcessorBindingParams params = null;
-        SparkDeploymentConfig sparkDeploymentConfig = null;
-
-        InputStream fis = null;
-        fis = new ByteArrayInputStream(data);
-        ObjectInputStream o = null;
-        try {
-            o = new ObjectInputStream(fis);
-            programClassName = (String) o.readObject();
-            paramsClassName = (String) o.readObject();
-            params = (EventProcessorBindingParams) o.readObject();
-
-            sparkDeploymentConfig = (SparkDeploymentConfig) o.readObject();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-        if (programClassName != null && paramsClassName != null && params != null && sparkDeploymentConfig != null) {
-            try {
-                Class<?> programClass = Class.forName(programClassName);
-                Class<?> paramsClass = Class.forName(paramsClassName);
-
-                Class[] types = new Class[]{paramsClass, SparkDeploymentConfig.class};
-                Constructor constructor = programClass.getConstructor(types);
-
-                Object[] parameters = {params, sparkDeploymentConfig};
-                SparkDataProcessorRuntime prog = (SparkDataProcessorRuntime) constructor.newInstance(parameters);
-
-                sparkDeploymentConfig.setRunLocal(true);
-
-                prog.startExecution();
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-}
\ No newline at end of file
diff --git a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDeploymentConfig.java b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDeploymentConfig.java
deleted file mode 100644
index 1983f3c..0000000
--- a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDeploymentConfig.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.wrapper.spark;
-
-import java.io.Serializable;
-
-public class SparkDeploymentConfig implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    private String jarFile;
-    private String appName;
-    private String sparkHost;
-    private long sparkBatchDuration;
-    private String kafkaHost;
-    private boolean runLocal;
-
-    public SparkDeploymentConfig(String jarFile, String appName, String sparkHost, boolean runLocal, long sparkBatchDuration, String kafkaHost) {
-        super();
-
-        this.jarFile = jarFile;
-        this.appName = appName;
-        this.sparkHost = sparkHost;
-        this.runLocal = runLocal;
-        this.sparkBatchDuration = sparkBatchDuration;
-        this.kafkaHost = kafkaHost;//TODO: JMS berücksichtigen
-
-    }
-
-    public SparkDeploymentConfig(String jarFile, String appName, String sparkHost, long sparkBatchDuration, String kafkaHost) {
-        this(jarFile, appName, sparkHost, false, sparkBatchDuration, kafkaHost);
-    }
-
-    public SparkDeploymentConfig(String jarFile, String appName, String sparkHost, String kafkaHost) {
-        this(jarFile, appName, sparkHost, 1000, kafkaHost);
-    }
-
-    public String getJarFile() {
-        return jarFile;
-    }
-
-    public void setJarFile(String jarFile) {
-        this.jarFile = jarFile;
-    }
-
-    public String getAppName() {
-        return appName;
-    }
-
-    public void setAppName(String appName) {
-        this.appName = appName;
-    }
-
-    public String getSparkHost() {
-        return sparkHost;
-    }
-
-    public void setSparkHost(String sparkHost) {
-        this.sparkHost = sparkHost;
-    }
-
-    public boolean isRunLocal() {
-        return runLocal;
-    }
-
-    public void setRunLocal(boolean runLocal) {
-        this.runLocal = runLocal;
-    }
-
-    public long getSparkBatchDuration() {
-        return sparkBatchDuration;
-    }
-
-    public void setSparkBatchDuration(long sparkBatchDuration) {
-        this.sparkBatchDuration = sparkBatchDuration;
-    }
-
-    public String getKafkaHost() {
-        return kafkaHost;
-    }
-
-    public void setKafkaHost(String kafkaHost) {
-        this.kafkaHost = kafkaHost;
-    }
-}
diff --git a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkRuntime.java b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkRuntime.java
deleted file mode 100644
index fc0b5f6..0000000
--- a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkRuntime.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.wrapper.spark;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.spark.SparkConf;
-import org.apache.spark.launcher.SparkAppHandle;
-import org.apache.spark.launcher.SparkLauncher;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka010.ConsumerStrategies;
-import org.apache.spark.streaming.kafka010.KafkaUtils;
-import org.apache.spark.streaming.kafka010.LocationStrategies;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.wrapper.spark.converter.JsonToMapFormat;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
-
-public abstract class SparkRuntime<I extends InvocableStreamPipesEntity> implements Runnable, Serializable {
-    private static final long serialVersionUID = 1L;
-    protected final SparkDeploymentConfig deploymentConfig;
-
-    protected Thread thread;
-    protected SparkAppHandle appHandle;
-    protected SparkLauncher launcher;
-    protected JavaStreamingContext streamingContext;
-    protected I graph;
-    protected Map kafkaParams;
-
-    public SparkRuntime(I graph, SparkDeploymentConfig deploymentConfig) {
-        this.graph = graph;
-        this.deploymentConfig = deploymentConfig;
-
-        kafkaParams = new HashMap<>();
-        kafkaParams.put("bootstrap.servers", this.deploymentConfig.getKafkaHost());
-        kafkaParams.put("key.deserializer", StringDeserializer.class);
-        kafkaParams.put("value.deserializer", StringDeserializer.class);
-        kafkaParams.put("key.serializer", StringSerializer.class);
-        kafkaParams.put("value.serializer", StringSerializer.class);
-        kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
-        kafkaParams.put("auto.offset.reset", "latest");
-        kafkaParams.put("enable.auto.commit", false);
-    }
-
-    public boolean startExecution() {
-        if (this.deploymentConfig.isRunLocal()) {
-            try {
-                SparkConf conf = new SparkConf().setAppName(this.deploymentConfig.getAppName())
-                        .setMaster(this.deploymentConfig.getSparkHost());
-                streamingContext = new JavaStreamingContext(conf, new Duration(this.deploymentConfig.getSparkBatchDuration()));
-
-                JavaDStream<Map<String, Object>> messageStream1 = null;
-                JavaInputDStream<ConsumerRecord<String, String>> source1 = getStream1Source(streamingContext);
-                if (source1 != null) {
-                    messageStream1 = source1.flatMap(new JsonToMapFormat(graph));
-                } else {
-                    throw new Exception("At least one source must be defined for a Spark SEPA");
-                }
-                //TODO: zweiter Stream. Kann das Spark?
-
-
-                //TODO: Error Handling
-
-                execute(messageStream1);
-
-                return true;
-            } catch (Exception e) {
-                e.printStackTrace();
-                return false;
-            }
-        }
-        else {
-            try {
-                byte[] data = getSerializationData();
-                String enc = Base64.getEncoder().encodeToString(data);
-                //System.out.println("Sending data: '" + enc + "'");
-
-                launcher = new SparkLauncher()
-                        .setAppResource(this.deploymentConfig.getJarFile())
-                        .setMainClass(this.getClass().getName())
-                        .addAppArgs(enc)
-                        //.redirectError()
-                        //.redirectOutput(ProcessBuilder.Redirect.PIPE)
-                        .setMaster(this.deploymentConfig.getSparkHost())
-                        .setConf(SparkLauncher.DRIVER_MEMORY, "2g");//TODO
-                appHandle = launcher.startApplication();
-
-                return true;
-            } catch (Exception e) {
-                e.printStackTrace();
-                return false;
-            }
-        }
-    }
-
-    protected abstract byte[] getSerializationData();
-
-    public abstract boolean execute(JavaDStream<Map<String, Object>>... convertedStream);
-
-    public void run() {
-        try {
-            streamingContext.start();
-            if (deploymentConfig.isRunLocal()) {
-                streamingContext.awaitTermination();
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    public boolean stop() {
-        try {
-            appHandle.stop();
-            //streamingContext.stop();
-            //streamingContext.awaitTermination();
-            return true;
-
-        } catch (Exception e) {
-            e.printStackTrace();
-            return false;
-        }
-
-    }
-
-    private JavaInputDStream<ConsumerRecord<String, String>> getStream1Source(JavaStreamingContext streamingContext) {
-        return getStreamSource(0, streamingContext);
-    }
-
-    private JavaInputDStream<ConsumerRecord<String, String>> getStream2Source(JavaStreamingContext streamingContext) {
-        return getStreamSource(1, streamingContext);
-    }
-
-    /**
-     * This method takes the i's input stream and creates a source for the Spark streaming job
-     * Currently just kafka is supported as a protocol
-     * TODO Add also jms support
-     *
-     * @param i
-     * @param streamingContext
-     * @return
-     */
-    private JavaInputDStream<ConsumerRecord<String, String>> getStreamSource(int i, JavaStreamingContext streamingContext) {
-        if (graph.getInputStreams().size() - 1 >= i) {
-
-            SpDataStream stream = graph.getInputStreams().get(i);
-            if (stream != null) {
-                KafkaTransportProtocol protocol = (KafkaTransportProtocol) stream.getEventGrounding().getTransportProtocol();
-
-                //System.out.println("Listening on Kafka topic '" + protocol.getTopicName() + "'");
-                return KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),
-                        ConsumerStrategies.<String, String>Subscribe(Arrays.asList(protocol.getTopicDefinition().getActualTopicName()),
-                                kafkaParams));
-            }
-            else {
-                return null;
-            }
-        }
-        else {
-            return null;
-        }
-    }
-}
diff --git a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/converter/JsonToMapFormat.java b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/converter/JsonToMapFormat.java
deleted file mode 100644
index 7141dc8..0000000
--- a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/converter/JsonToMapFormat.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.wrapper.spark.converter;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.streampipes.logging.impl.EventStatisticLogger;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-public class JsonToMapFormat implements FlatMapFunction<ConsumerRecord<String, String>, Map<String, Object>> {
-
-    private static final long serialVersionUID = 1L;
-    private ObjectMapper mapper;
-    private InvocableStreamPipesEntity graph;
-
-    public JsonToMapFormat(InvocableStreamPipesEntity graph) {
-        this.mapper = new ObjectMapper();
-        this.graph = graph;
-    }
-
-    @Override
-    public Iterator<Map<String, Object>> call(ConsumerRecord<String, String> s) throws Exception {
-        HashMap json = mapper.readValue(s.value(), HashMap.class);
-
-        System.out.println(s.value());
-
-        EventStatisticLogger.log(graph.getName(), graph.getCorrespondingPipeline(), graph.getUri());
-        return Arrays.asList((Map<String, Object>)json).iterator();
-    }
-}
diff --git a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/serializer/SimpleKafkaSerializer.java b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/serializer/SimpleKafkaSerializer.java
deleted file mode 100644
index de26761..0000000
--- a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/serializer/SimpleKafkaSerializer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.wrapper.spark.serializer;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.VoidFunction;
-
-import java.util.Map;
-
-/**
- * Created by Jochen Lutz on 2017-12-21.
- */
-public class SimpleKafkaSerializer implements VoidFunction<org.apache.spark.api.java.JavaRDD<java.util.Map<java.lang.String, java.lang.Object>>> {
-  private static SimpleKafkaSerializer instance;
-
-  private final Map kafkaParams;
-  private final String topic;
-
-  private SimpleKafkaSerializer(Map kafkaParams, String topicName) {
-    this.topic = topicName;
-    //System.out.println("Sending output to Kafka topic '" + topicName + "'");
-    this.kafkaParams = kafkaParams;
-  }
-
-  @Override
-  public void call(JavaRDD<Map<String, Object>> javaRDD) throws Exception {
-    //System.out.println("Sending Kafka output");
-
-    javaRDD.foreach(new VoidFunction<Map<String, Object>>() {
-      private static final long serialVersionUID = 1L;
-
-      private final ObjectMapper objectMapper = new ObjectMapper();
-
-      @Override
-      public void call(Map<String, Object> map) throws Exception {
-        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaParams);
-
-        producer.send(new ProducerRecord<>(topic, objectMapper.writeValueAsString(map)));
-      }
-    });
-  }
-
-  public static synchronized SimpleKafkaSerializer getInstance(Map kafkaParams, String topicName) {
-    if (SimpleKafkaSerializer.instance == null) {
-      SimpleKafkaSerializer.instance = new SimpleKafkaSerializer(kafkaParams, topicName);
-    }
-
-    return SimpleKafkaSerializer.instance;
-  }
-}