You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/12/05 04:38:52 UTC

[GitHub] [pulsar] devinbost commented on issue #4721: Flink pulsar sink NotSerializableException

devinbost commented on issue #4721: Flink pulsar sink NotSerializableException
URL: https://github.com/apache/pulsar/issues/4721#issuecomment-561965357
 
 
   Does anyone know of a workaround? I'm stuck on this exact issue. 
   
   You should be able to reproduce it with this example:
   
   
   ```
   import org.apache.flink.api.common.functions.AggregateFunction;
   import org.apache.flink.api.common.functions.FoldFunction;
   import org.apache.flink.api.common.functions.MapFunction;
   import org.apache.flink.api.common.serialization.SimpleStringSchema;
   import org.apache.flink.api.common.typeinfo.TypeHint;
   import org.apache.flink.api.common.typeinfo.TypeInformation;
   import org.apache.flink.api.java.tuple.Tuple2;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.api.functions.source.SourceFunction;
   import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
   import org.apache.flink.streaming.api.windowing.time.Time;
   import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
   import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
   import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
   import org.json.simple.JSONObject;
   import org.json.simple.parser.JSONParser;
   import org.json.simple.parser.ParseException;
   
   import static java.nio.charset.StandardCharsets.UTF_8;
   
   public class StreamingJob {
   
   	public static Tuple2<String,String> mapToTuple(String incomingMessage) throws ParseException {
   		JSONObject incomingObj = (JSONObject) new JSONParser().parse(incomingMessage);
   		JSONObject correlationIdJson = (JSONObject) incomingObj.get("correlationId");
   		String correlationId = "";
   		if(correlationIdJson != null){
   			correlationId = correlationIdJson.toString();
   		} // Put in try/catch to throw exception if correlationIdJson == null
   		Tuple2 msgEnvelope = new Tuple2(correlationId, incomingObj.toString());
   		return msgEnvelope;
   	}
   	private static class JsonConcatenator
   			implements AggregateFunction<Tuple2<String, String>, Tuple2<String, String>, String> {
   		@Override
   		public Tuple2<String, String> createAccumulator() {
   			return new Tuple2<String, String>("","");
   		}
   
   		@Override
   		public Tuple2<String, String> add(Tuple2<String, String> value, Tuple2<String, String> accumulator) {
   			return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
   		}
   
   		@Override
   		public String getResult(Tuple2<String, String> accumulator) {
   			return "[" + accumulator.f1 + "]";
   		}
   
   		@Override
   		public Tuple2<String, String> merge(Tuple2<String, String> a, Tuple2<String, String> b) {
   			return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
   		}
   	}
   
   	public static void main(String[] args) throws Exception {
   		// set up the streaming execution environment
   		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   
   		String SERVICE_URL = "pulsar://localhost:6650";
   		String INPUT_TOPIC = "persistent://public/default/test-flink-jaeger-spanner-input";
   		String SUBSCRIPTION_NAME = "test-jaeger-spanner";
   		String OUTPUT_TOPIC = "persistent://public/default/test-flink-jaeger-spanner-output";
   
   		PulsarSourceBuilder<String> builder = PulsarSourceBuilder
   				.builder(new SimpleStringSchema())
   				.serviceUrl(SERVICE_URL)
   				.topic(INPUT_TOPIC)
   				.subscriptionName(SUBSCRIPTION_NAME);
   		SourceFunction<String> src = builder.build();
   		DataStream<String> dataStream = env.addSource(src);
   
   		DataStream<String> combinedEnvelopes = dataStream
   				.map(new MapFunction<String, Tuple2<String, String>>() {
   					@Override
   					public Tuple2 map(String incomingMessage) throws Exception {
   						return mapToTuple(incomingMessage);
   					}
   				})
   				.keyBy(0)
   				.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
   				.aggregate(new JsonConcatenator())
   				.returns(String.class);
   
   		combinedEnvelopes.addSink(new FlinkPulsarProducer<>(
   				SERVICE_URL,
   				OUTPUT_TOPIC,
   				new AuthenticationDisabled(), // probably need to fix //  AuthenticationTls()
   				combinedData -> combinedData.toString().getBytes(UTF_8),
   				combinedData -> null)
   		);
   
   		// execute program
   		env.execute("Flink Streaming Java API Skeleton");
   	}
   }
   ```
   
   with this POM file:
   
   ```
   <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   	<modelVersion>4.0.0</modelVersion>
   
   	<groupId>com.example</groupId>
   	<artifactId>flink-poc</artifactId>
   	<version>1.0-SNAPSHOT</version>
   	<packaging>jar</packaging>
   
   	<name>Flink Quickstart Job</name>
   	<url>http://www.myorganization.org</url>
   
   	<properties>
   		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   		<flink.version>1.9.0</flink.version>
   		<java.version>1.8</java.version>
   		<scala.binary.version>2.11</scala.binary.version>
   		<maven.compiler.source>${java.version}</maven.compiler.source>
   		<maven.compiler.target>${java.version}</maven.compiler.target>
   		<pulsar.version>2.4.0</pulsar.version>
   	</properties>
   
   	<repositories>
   		<repository>
   			<id>apache.snapshots</id>
   			<name>Apache Development Snapshot Repository</name>
   			<url>https://repository.apache.org/content/repositories/snapshots/</url>
   			<releases>
   				<enabled>false</enabled>
   			</releases>
   			<snapshots>
   				<enabled>true</enabled>
   			</snapshots>
   		</repository>
   	</repositories>
   
   	<dependencies>
   		<!-- https://mvnrepository.com/artifact/io.jaegertracing/jaeger-client -->
   		<dependency>
   			<groupId>io.jaegertracing</groupId>
   			<artifactId>jaeger-client</artifactId>
   			<version>1.0.0</version>
   		</dependency>
   		<dependency>
   			<groupId>org.apache.pulsar</groupId>
   			<artifactId>pulsar-functions-api</artifactId>
   			<version>${pulsar.version}</version>
   		</dependency>
   
   		<dependency>
   			<groupId>org.apache.pulsar</groupId>
   			<artifactId>pulsar-io-core</artifactId>
   			<version>${pulsar.version}</version>
   		</dependency>
   
   		<dependency>
   			<groupId>org.apache.pulsar</groupId>
   			<artifactId>pulsar-client</artifactId>
   			<version>2.4.0</version> <!-- What's the latest stable version???-->
   		</dependency>
   
   		<dependency>
   			<groupId>org.apache.pulsar</groupId>
   			<artifactId>pulsar-client-admin</artifactId>
   			<version>2.4.0</version> 
   		</dependency>
   		<!-- Apache Flink dependencies -->
   		<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
   		<dependency>
   			<groupId>org.apache.flink</groupId>
   			<artifactId>flink-java</artifactId>
   			<version>${flink.version}</version>
   			<scope>provided</scope>
   		</dependency>
   		<dependency>
   			<groupId>org.apache.flink</groupId>
   			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
   			<version>${flink.version}</version>
   			<scope>provided</scope>
   		</dependency>
   		<dependency>
   			<groupId>com.googlecode.json-simple</groupId>
   			<artifactId>json-simple</artifactId>
   			<version>1.1</version>
   		</dependency>
   		<dependency>
   			<groupId>org.projectlombok</groupId>
   			<artifactId>lombok</artifactId>
   			<version>1.18.6</version>
   			<scope>provided</scope>
   		</dependency>
   		<dependency>
   			<groupId>junit</groupId>
   			<artifactId>junit</artifactId>
   			<version>4.11</version>
   			<scope>test</scope>
   		</dependency>
   
   		<!-- https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-flink -->
   		<dependency>
   			<groupId>org.apache.pulsar</groupId>
   			<artifactId>pulsar-flink</artifactId>
   			<version>2.4.0</version>
   		</dependency>
   		<dependency>
   			<groupId>org.slf4j</groupId>
   			<artifactId>slf4j-log4j12</artifactId>
   			<version>1.7.7</version>
   			<scope>runtime</scope>
   		</dependency>
   		<dependency>
   			<groupId>log4j</groupId>
   			<artifactId>log4j</artifactId>
   			<version>1.2.17</version>
   			<scope>runtime</scope>
   		</dependency>
   	</dependencies>
   
   	<build>
   		<plugins>
   
   			<!-- Java Compiler -->
   			<plugin>
   				<groupId>org.apache.maven.plugins</groupId>
   				<artifactId>maven-compiler-plugin</artifactId>
   				<version>3.1</version>
   				<configuration>
   					<source>${java.version}</source>
   					<target>${java.version}</target>
   				</configuration>
   			</plugin>
   
   			<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
   			<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
   			<plugin>
   				<groupId>org.apache.maven.plugins</groupId>
   				<artifactId>maven-shade-plugin</artifactId>
   				<version>3.0.0</version>
   				<executions>
   					<!-- Run shade goal on package phase -->
   					<execution>
   						<phase>package</phase>
   						<goals>
   							<goal>shade</goal>
   						</goals>
   						<configuration>
   							<artifactSet>
   								<excludes>
   									<exclude>org.apache.flink:force-shading</exclude>
   									<exclude>com.google.code.findbugs:jsr305</exclude>
   									<exclude>org.slf4j:*</exclude>
   									<exclude>log4j:*</exclude>
   								</excludes>
   							</artifactSet>
   							<filters>
   								<filter>
   									<!-- Do not copy the signatures in the META-INF folder.
   									Otherwise, this might cause SecurityExceptions when using the JAR. -->
   									<artifact>*:*</artifact>
   									<excludes>
   										<exclude>META-INF/*.SF</exclude>
   										<exclude>META-INF/*.DSA</exclude>
   										<exclude>META-INF/*.RSA</exclude>
   									</excludes>
   								</filter>
   							</filters>
   							<transformers>
   								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
   									<mainClass>com.overstock.dataeng.jaeger.spanner.StreamingJob</mainClass>
   								</transformer>
   							</transformers>
   						</configuration>
   					</execution>
   				</executions>
   			</plugin>
   		</plugins>
   
   		<pluginManagement>
   			<plugins>
   				<plugin>
   					<groupId>org.apache.maven.plugins</groupId>
   					<artifactId>maven-assembly-plugin</artifactId>
   					<version>3.1.1</version>
   					<configuration>
   						<descriptorRefs>
   							<descriptorRef>jar-with-dependencies</descriptorRef>
   						</descriptorRefs>
   					</configuration>
   					<executions>
   						<execution>
   							<id>assemble-all</id>
   							<phase>package</phase>
   							<goals>
   								<goal>single</goal>
   							</goals>
   						</execution>
   					</executions>
   				</plugin>
   				<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
   				<plugin>
   					<groupId>org.eclipse.m2e</groupId>
   					<artifactId>lifecycle-mapping</artifactId>
   					<version>1.0.0</version>
   					<configuration>
   						<lifecycleMappingMetadata>
   							<pluginExecutions>
   								<pluginExecution>
   									<pluginExecutionFilter>
   										<groupId>org.apache.maven.plugins</groupId>
   										<artifactId>maven-shade-plugin</artifactId>
   										<versionRange>[3.0.0,)</versionRange>
   										<goals>
   											<goal>shade</goal>
   										</goals>
   									</pluginExecutionFilter>
   									<action>
   										<ignore/>
   									</action>
   								</pluginExecution>
   								<pluginExecution>
   									<pluginExecutionFilter>
   										<groupId>org.apache.maven.plugins</groupId>
   										<artifactId>maven-compiler-plugin</artifactId>
   										<versionRange>[3.1,)</versionRange>
   										<goals>
   											<goal>testCompile</goal>
   											<goal>compile</goal>
   										</goals>
   									</pluginExecutionFilter>
   									<action>
   										<ignore/>
   									</action>
   								</pluginExecution>
   							</pluginExecutions>
   						</lifecycleMappingMetadata>
   					</configuration>
   				</plugin>
   			</plugins>
   		</pluginManagement>
   	</build>
   
   	<!-- This profile helps to make things run out of the box in IntelliJ -->
   	<!-- Its adds Flink's core classes to the runtime class path. -->
   	<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
   	<profiles>
   		<profile>
   			<id>add-dependencies-for-IDEA</id>
   
   			<activation>
   				<property>
   					<name>idea.version</name>
   				</property>
   			</activation>
   
   			<dependencies>
   				<dependency>
   					<groupId>org.apache.flink</groupId>
   					<artifactId>flink-java</artifactId>
   					<version>${flink.version}</version>
   					<scope>compile</scope>
   				</dependency>
   				<dependency>
   					<groupId>org.apache.flink</groupId>
   					<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
   					<version>${flink.version}</version>
   					<scope>compile</scope>
   				</dependency>
   			</dependencies>
   		</profile>
   	</profiles>
   
   </project>
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services