You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2022/03/31 16:07:53 UTC
[incubator-streampipes] 02/02: [hotfix] Remove old spark wrapper module
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 1e3d2aecd935de6fd62e75d4172b46ba90cb1507
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Thu Mar 31 18:07:17 2022 +0200
[hotfix] Remove old spark wrapper module
---
pom.xml | 17 --
streampipes-wrapper-spark/pom.xml | 213 ---------------------
.../wrapper/spark/AbstractSparkDeclarer.java | 52 -----
.../wrapper/spark/SparkDataProcessorDeclarer.java | 28 ---
.../wrapper/spark/SparkDataProcessorRuntime.java | 162 ----------------
.../wrapper/spark/SparkDeploymentConfig.java | 100 ----------
.../streampipes/wrapper/spark/SparkRuntime.java | 188 ------------------
.../wrapper/spark/converter/JsonToMapFormat.java | 52 -----
.../spark/serializer/SimpleKafkaSerializer.java | 69 -------
9 files changed, 881 deletions(-)
diff --git a/pom.xml b/pom.xml
index 80f292d..8eff674 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,7 +109,6 @@
<slf4j.version>1.7.30</slf4j.version>
<snakeyaml.version>1.26</snakeyaml.version>
<snappy-java.version>1.1.7.7</snappy-java.version>
- <spark.version>2.1.2</spark.version>
<spring.version>5.3.14</spring.version>
<spring-boot.version>2.6.2</spring-boot.version>
<spring-security.version>5.6.1</spring-security.version>
@@ -513,21 +512,6 @@ IoT data streams.
<version>${shiro.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.11</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <dependency>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-component-annotations</artifactId>
<version>${plexus-component-annotations.version}</version>
@@ -971,7 +955,6 @@ IoT data streams.
<module>streampipes-wrapper-kafka-streams</module>
<module>streampipes-wrapper-python</module>
<module>streampipes-wrapper-siddhi</module>
- <!-- <module>streampipes-wrapper-spark</module>-->
<module>streampipes-wrapper-standalone</module>
<module>streampipes-mail</module>
<module>streampipes-resource-management</module>
diff --git a/streampipes-wrapper-spark/pom.xml b/streampipes-wrapper-spark/pom.xml
deleted file mode 100644
index 1c3e9c9..0000000
--- a/streampipes-wrapper-spark/pom.xml
+++ /dev/null
@@ -1,213 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- ~
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-parent</artifactId>
- <version>0.69.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>streampipes-wrapper-spark</artifactId>
-
- <properties>
- <spark.version>2.4.8</spark.version>
- </properties>
-
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>com.github.luben</groupId>
- <artifactId>zstd-jni</artifactId>
- <version>1.3.8-1</version>
- </dependency>
- <dependency>
- <groupId>com.thoughtworks.paranamer</groupId>
- <artifactId>paranamer</artifactId>
- <version>2.8</version>
- </dependency>
- <dependency>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- <version>1.10</version>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <version>2.6</version>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- <version>3.9.9.Final</version>
- </dependency>
- <dependency>
- <groupId>javax.activation</groupId>
- <artifactId>activation</artifactId>
- <version>1.1.1</version>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.17</version>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- <version>1.8.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- <version>1.8.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-math3</artifactId>
- <version>3.4.1</version>
- </dependency>
- <dependency>
- <groupId>org.lz4</groupId>
- <artifactId>lz4-java</artifactId>
- <version>1.5.0</version>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>2.11.2</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- <version>1.7.25</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.25</version>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
- <dependencies>
- <!-- StreamPipes dependencies -->
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-model</artifactId>
- <version>0.69.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-container</artifactId>
- <version>0.69.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.streampipes</groupId>
- <artifactId>streampipes-wrapper</artifactId>
- <version>0.69.0-SNAPSHOT</version>
- </dependency>
-
- <!-- External dependencies -->
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </dependency>
- <dependency>
- <groupId>com.github.luben</groupId>
- <artifactId>zstd-jni</artifactId>
- </dependency>
- <dependency>
- <groupId>com.thoughtworks.paranamer</groupId>
- <artifactId>paranamer</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </dependency>
- <dependency>
- <groupId>javax.activation</groupId>
- <artifactId>activation</artifactId>
- </dependency>
- <dependency>
- <groupId>org.lz4</groupId>
- <artifactId>lz4-java</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-math3</artifactId>
- </dependency>
- <dependency>
- <groupId>org.glassfish.jersey.containers</groupId>
- <artifactId>jersey-container-servlet-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.glassfish.jersey.core</groupId>
- <artifactId>jersey-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.glassfish.jersey.core</groupId>
- <artifactId>jersey-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.glassfish.jersey.core</groupId>
- <artifactId>jersey-server</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.11</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
- <version>${spark.version}</version>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </dependency>
- </dependencies>
-</project>
diff --git a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/AbstractSparkDeclarer.java b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/AbstractSparkDeclarer.java
deleted file mode 100644
index 5c41055..0000000
--- a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/AbstractSparkDeclarer.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.wrapper.spark;
-
-import org.apache.streampipes.container.declarer.InvocableDeclarer;
-import org.apache.streampipes.model.Response;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.base.NamedStreamPipesEntity;
-
-public abstract class AbstractSparkDeclarer<D extends NamedStreamPipesEntity, I extends InvocableStreamPipesEntity, SR extends SparkRuntime> implements InvocableDeclarer<D, I> {
- protected SR runtime;
- protected I graph;
-
- @Override
- public Response invokeRuntime(I sepaInvocation, String serviceId) {
- runtime = getRuntime(sepaInvocation);
- graph = sepaInvocation;
-
- if (runtime.startExecution()) {
- return new Response(graph.getElementId(), true);
- } else {
- return new Response(graph.getElementId(), false);
- }
- }
-
- @Override
- public Response detachRuntime(String pipelineId, String serviceId) {
- if (runtime.stop()) {
- return new Response(graph.getElementId(), true);
- } else {
- return new Response(graph.getElementId(), false);
- }
- }
-
- protected abstract SR getRuntime(I graph);
-}
diff --git a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java
deleted file mode 100644
index 91fbf57..0000000
--- a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorDeclarer.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.wrapper.spark;
-
-import org.apache.streampipes.container.declarer.SemanticEventProcessingAgentDeclarer;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public abstract class SparkDataProcessorDeclarer<B extends EventProcessorBindingParams>
- extends AbstractSparkDeclarer<DataProcessorDescription, DataProcessorInvocation, SparkDataProcessorRuntime> implements SemanticEventProcessingAgentDeclarer {
-}
diff --git a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorRuntime.java b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorRuntime.java
deleted file mode 100644
index 460edb4..0000000
--- a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDataProcessorRuntime.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.wrapper.spark;
-
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.model.grounding.TransportProtocol;
-import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-import org.apache.streampipes.wrapper.spark.serializer.SimpleKafkaSerializer;
-
-import java.io.*;
-import java.lang.reflect.Constructor;
-import java.util.Base64;
-import java.util.Map;
-
-public abstract class SparkDataProcessorRuntime<B extends EventProcessorBindingParams> extends SparkRuntime<DataProcessorInvocation> {
- private static final long serialVersionUID = 1L;
- protected B params;
-
- public SparkDataProcessorRuntime(B params, SparkDeploymentConfig deploymentConfig) {
- super(params.getGraph(), deploymentConfig);
- this.params = params;
- }
-
- protected abstract JavaDStream<Map<String, Object>> getApplicationLogic(JavaDStream<Map<String, Object>>... messageStream);
-
- @Override
- public boolean execute(JavaDStream<Map<String, Object>>... convertedStream) {
- JavaDStream<Map<String, Object>> applicationLogic = getApplicationLogic(convertedStream);
- //applicationLogic.print();
-
- if (isOutputKafkaProtocol()) {
- applicationLogic.foreachRDD(SimpleKafkaSerializer.getInstance(kafkaParams, protocol().getTopicDefinition
- ().getActualTopicName()));
- }
- else {
- //TODO: JMS
- }
-
- thread = new Thread(this);
- thread.start();
-
- return true;
- }
-
- /**
- * Serialize the instance's data to transmit to Spark cluster.
- * The serialized data contains
- * - the class name (as String) of the job's program class (inheriting from this class)
- * - the class name (as String) of the job's parameter class (inheriting from EventProcessorBindingParams),
- * stored in this.params
- * - this.params
- * - this.deplomentconfig
- *
- * This data is deserialized in SparkDataProcessorRuntime.main()
- * @return the serialized data as byte array
- */
- @Override
- protected byte[] getSerializationData() {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutputStream out = null;
- try {
- out = new ObjectOutputStream(bos);
-
- out.writeObject(this.getClass().getName());
- out.writeObject(params.getClass().getName());
- out.writeObject(params);
- out.writeObject(deploymentConfig);
-
- out.flush();
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- try {
- bos.close();
- } catch (IOException ex) {
- // ignore close exception
- }
- }
-
- return bos.toByteArray();
- }
-
- private boolean isOutputKafkaProtocol() {
- return protocol() instanceof KafkaTransportProtocol;
- }
-
- private TransportProtocol protocol() {
- return params
- .getGraph()
- .getOutputStream()
- .getEventGrounding()
- .getTransportProtocol();
- }
-
- /**
- * Entry point for execution on Spark clusters.
- * @param args
- * args[0] contains the serialized and base64-encoded data of the instance to run.
- */
- public static void main(String[] args) {
- String enc = args[0];
- //System.out.println("Received data: '" + enc + "'");
-
- byte[] data = Base64.getDecoder().decode(enc);
-
- String programClassName = null;
- String paramsClassName = null;
- EventProcessorBindingParams params = null;
- SparkDeploymentConfig sparkDeploymentConfig = null;
-
- InputStream fis = null;
- fis = new ByteArrayInputStream(data);
- ObjectInputStream o = null;
- try {
- o = new ObjectInputStream(fis);
- programClassName = (String) o.readObject();
- paramsClassName = (String) o.readObject();
- params = (EventProcessorBindingParams) o.readObject();
-
- sparkDeploymentConfig = (SparkDeploymentConfig) o.readObject();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- if (programClassName != null && paramsClassName != null && params != null && sparkDeploymentConfig != null) {
- try {
- Class<?> programClass = Class.forName(programClassName);
- Class<?> paramsClass = Class.forName(paramsClassName);
-
- Class[] types = new Class[]{paramsClass, SparkDeploymentConfig.class};
- Constructor constructor = programClass.getConstructor(types);
-
- Object[] parameters = {params, sparkDeploymentConfig};
- SparkDataProcessorRuntime prog = (SparkDataProcessorRuntime) constructor.newInstance(parameters);
-
- sparkDeploymentConfig.setRunLocal(true);
-
- prog.startExecution();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
-}
\ No newline at end of file
diff --git a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDeploymentConfig.java b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDeploymentConfig.java
deleted file mode 100644
index 1983f3c..0000000
--- a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkDeploymentConfig.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.wrapper.spark;
-
-import java.io.Serializable;
-
-public class SparkDeploymentConfig implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private String jarFile;
- private String appName;
- private String sparkHost;
- private long sparkBatchDuration;
- private String kafkaHost;
- private boolean runLocal;
-
- public SparkDeploymentConfig(String jarFile, String appName, String sparkHost, boolean runLocal, long sparkBatchDuration, String kafkaHost) {
- super();
-
- this.jarFile = jarFile;
- this.appName = appName;
- this.sparkHost = sparkHost;
- this.runLocal = runLocal;
- this.sparkBatchDuration = sparkBatchDuration;
- this.kafkaHost = kafkaHost;//TODO: JMS berücksichtigen
-
- }
-
- public SparkDeploymentConfig(String jarFile, String appName, String sparkHost, long sparkBatchDuration, String kafkaHost) {
- this(jarFile, appName, sparkHost, false, sparkBatchDuration, kafkaHost);
- }
-
- public SparkDeploymentConfig(String jarFile, String appName, String sparkHost, String kafkaHost) {
- this(jarFile, appName, sparkHost, 1000, kafkaHost);
- }
-
- public String getJarFile() {
- return jarFile;
- }
-
- public void setJarFile(String jarFile) {
- this.jarFile = jarFile;
- }
-
- public String getAppName() {
- return appName;
- }
-
- public void setAppName(String appName) {
- this.appName = appName;
- }
-
- public String getSparkHost() {
- return sparkHost;
- }
-
- public void setSparkHost(String sparkHost) {
- this.sparkHost = sparkHost;
- }
-
- public boolean isRunLocal() {
- return runLocal;
- }
-
- public void setRunLocal(boolean runLocal) {
- this.runLocal = runLocal;
- }
-
- public long getSparkBatchDuration() {
- return sparkBatchDuration;
- }
-
- public void setSparkBatchDuration(long sparkBatchDuration) {
- this.sparkBatchDuration = sparkBatchDuration;
- }
-
- public String getKafkaHost() {
- return kafkaHost;
- }
-
- public void setKafkaHost(String kafkaHost) {
- this.kafkaHost = kafkaHost;
- }
-}
diff --git a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkRuntime.java b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkRuntime.java
deleted file mode 100644
index fc0b5f6..0000000
--- a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/SparkRuntime.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.wrapper.spark;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.spark.SparkConf;
-import org.apache.spark.launcher.SparkAppHandle;
-import org.apache.spark.launcher.SparkLauncher;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka010.ConsumerStrategies;
-import org.apache.spark.streaming.kafka010.KafkaUtils;
-import org.apache.spark.streaming.kafka010.LocationStrategies;
-import org.apache.streampipes.model.SpDataStream;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
-import org.apache.streampipes.wrapper.spark.converter.JsonToMapFormat;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
-
-public abstract class SparkRuntime<I extends InvocableStreamPipesEntity> implements Runnable, Serializable {
- private static final long serialVersionUID = 1L;
- protected final SparkDeploymentConfig deploymentConfig;
-
- protected Thread thread;
- protected SparkAppHandle appHandle;
- protected SparkLauncher launcher;
- protected JavaStreamingContext streamingContext;
- protected I graph;
- protected Map kafkaParams;
-
- public SparkRuntime(I graph, SparkDeploymentConfig deploymentConfig) {
- this.graph = graph;
- this.deploymentConfig = deploymentConfig;
-
- kafkaParams = new HashMap<>();
- kafkaParams.put("bootstrap.servers", this.deploymentConfig.getKafkaHost());
- kafkaParams.put("key.deserializer", StringDeserializer.class);
- kafkaParams.put("value.deserializer", StringDeserializer.class);
- kafkaParams.put("key.serializer", StringSerializer.class);
- kafkaParams.put("value.serializer", StringSerializer.class);
- kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
- kafkaParams.put("auto.offset.reset", "latest");
- kafkaParams.put("enable.auto.commit", false);
- }
-
- public boolean startExecution() {
- if (this.deploymentConfig.isRunLocal()) {
- try {
- SparkConf conf = new SparkConf().setAppName(this.deploymentConfig.getAppName())
- .setMaster(this.deploymentConfig.getSparkHost());
- streamingContext = new JavaStreamingContext(conf, new Duration(this.deploymentConfig.getSparkBatchDuration()));
-
- JavaDStream<Map<String, Object>> messageStream1 = null;
- JavaInputDStream<ConsumerRecord<String, String>> source1 = getStream1Source(streamingContext);
- if (source1 != null) {
- messageStream1 = source1.flatMap(new JsonToMapFormat(graph));
- } else {
- throw new Exception("At least one source must be defined for a Spark SEPA");
- }
- //TODO: zweiter Stream. Kann das Spark?
-
-
- //TODO: Error Handling
-
- execute(messageStream1);
-
- return true;
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
- else {
- try {
- byte[] data = getSerializationData();
- String enc = Base64.getEncoder().encodeToString(data);
- //System.out.println("Sending data: '" + enc + "'");
-
- launcher = new SparkLauncher()
- .setAppResource(this.deploymentConfig.getJarFile())
- .setMainClass(this.getClass().getName())
- .addAppArgs(enc)
- //.redirectError()
- //.redirectOutput(ProcessBuilder.Redirect.PIPE)
- .setMaster(this.deploymentConfig.getSparkHost())
- .setConf(SparkLauncher.DRIVER_MEMORY, "2g");//TODO
- appHandle = launcher.startApplication();
-
- return true;
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
- }
-
- protected abstract byte[] getSerializationData();
-
- public abstract boolean execute(JavaDStream<Map<String, Object>>... convertedStream);
-
- public void run() {
- try {
- streamingContext.start();
- if (deploymentConfig.isRunLocal()) {
- streamingContext.awaitTermination();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public boolean stop() {
- try {
- appHandle.stop();
- //streamingContext.stop();
- //streamingContext.awaitTermination();
- return true;
-
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
-
- }
-
- private JavaInputDStream<ConsumerRecord<String, String>> getStream1Source(JavaStreamingContext streamingContext) {
- return getStreamSource(0, streamingContext);
- }
-
- private JavaInputDStream<ConsumerRecord<String, String>> getStream2Source(JavaStreamingContext streamingContext) {
- return getStreamSource(1, streamingContext);
- }
-
- /**
- * This method takes the i's input stream and creates a source for the Spark streaming job
- * Currently just kafka is supported as a protocol
- * TODO Add also jms support
- *
- * @param i
- * @param streamingContext
- * @return
- */
- private JavaInputDStream<ConsumerRecord<String, String>> getStreamSource(int i, JavaStreamingContext streamingContext) {
- if (graph.getInputStreams().size() - 1 >= i) {
-
- SpDataStream stream = graph.getInputStreams().get(i);
- if (stream != null) {
- KafkaTransportProtocol protocol = (KafkaTransportProtocol) stream.getEventGrounding().getTransportProtocol();
-
- //System.out.println("Listening on Kafka topic '" + protocol.getTopicName() + "'");
- return KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),
- ConsumerStrategies.<String, String>Subscribe(Arrays.asList(protocol.getTopicDefinition().getActualTopicName()),
- kafkaParams));
- }
- else {
- return null;
- }
- }
- else {
- return null;
- }
- }
-}
diff --git a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/converter/JsonToMapFormat.java b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/converter/JsonToMapFormat.java
deleted file mode 100644
index 7141dc8..0000000
--- a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/converter/JsonToMapFormat.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.wrapper.spark.converter;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.streampipes.logging.impl.EventStatisticLogger;
-import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-public class JsonToMapFormat implements FlatMapFunction<ConsumerRecord<String, String>, Map<String, Object>> {
-
- private static final long serialVersionUID = 1L;
- private ObjectMapper mapper;
- private InvocableStreamPipesEntity graph;
-
- public JsonToMapFormat(InvocableStreamPipesEntity graph) {
- this.mapper = new ObjectMapper();
- this.graph = graph;
- }
-
- @Override
- public Iterator<Map<String, Object>> call(ConsumerRecord<String, String> s) throws Exception {
- HashMap json = mapper.readValue(s.value(), HashMap.class);
-
- System.out.println(s.value());
-
- EventStatisticLogger.log(graph.getName(), graph.getCorrespondingPipeline(), graph.getUri());
- return Arrays.asList((Map<String, Object>)json).iterator();
- }
-}
diff --git a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/serializer/SimpleKafkaSerializer.java b/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/serializer/SimpleKafkaSerializer.java
deleted file mode 100644
index de26761..0000000
--- a/streampipes-wrapper-spark/src/main/java/org/apache/streampipes/wrapper/spark/serializer/SimpleKafkaSerializer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.wrapper.spark.serializer;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.VoidFunction;
-
-import java.util.Map;
-
-/**
- * Created by Jochen Lutz on 2017-12-21.
- */
-public class SimpleKafkaSerializer implements VoidFunction<org.apache.spark.api.java.JavaRDD<java.util.Map<java.lang.String, java.lang.Object>>> {
- private static SimpleKafkaSerializer instance;
-
- private final Map kafkaParams;
- private final String topic;
-
- private SimpleKafkaSerializer(Map kafkaParams, String topicName) {
- this.topic = topicName;
- //System.out.println("Sending output to Kafka topic '" + topicName + "'");
- this.kafkaParams = kafkaParams;
- }
-
- @Override
- public void call(JavaRDD<Map<String, Object>> javaRDD) throws Exception {
- //System.out.println("Sending Kafka output");
-
- javaRDD.foreach(new VoidFunction<Map<String, Object>>() {
- private static final long serialVersionUID = 1L;
-
- private final ObjectMapper objectMapper = new ObjectMapper();
-
- @Override
- public void call(Map<String, Object> map) throws Exception {
- KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaParams);
-
- producer.send(new ProducerRecord<>(topic, objectMapper.writeValueAsString(map)));
- }
- });
- }
-
- public static synchronized SimpleKafkaSerializer getInstance(Map kafkaParams, String topicName) {
- if (SimpleKafkaSerializer.instance == null) {
- SimpleKafkaSerializer.instance = new SimpleKafkaSerializer(kafkaParams, topicName);
- }
-
- return SimpleKafkaSerializer.instance;
- }
-}