You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/11/20 10:02:18 UTC

flink git commit: [FLINK-3021] Fix class loading issue for streaming sources

Repository: flink
Updated Branches:
  refs/heads/master 2f013a204 -> 177df41bf


[FLINK-3021] Fix class loading issue for streaming sources

Streaming sources were directly assigned their InputFormat in the StreamingJobGraphGenerator. As a consequence, the input formats were directly serialized/deserialized by Akka when the JobGraph was sent to the JobManager. In cases where the user provided a custom input format or an input format with custom types, this could lead to a ClassDefNotFoundException, because the system class loader instead of the user code class loader is used by Akka for the deserialization.

The problem was fixed by wrapping the InputFormat into a UserCodeObjectWrapper which is shipped ot the JobManager via the JobVertex's configuration. By instantiating stream sources as InputFormatVertices, the corresponding InputFormat is retrieved from the Configuration in the initializeOnMaster method call.

This closes #1368.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/177df41b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/177df41b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/177df41b

Branch: refs/heads/master
Commit: 177df41bf94ba18b0174a5973ee12efc195ec17e
Parents: 2f013a2
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 17 14:23:31 2015 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Nov 20 10:00:09 2015 +0100

----------------------------------------------------------------------
 .../api/graph/StreamingJobGraphGenerator.java   |  18 +-
 flink-tests/pom.xml                             |  19 +++
 .../test-streaming-custominput-assembly.xml     |  36 ++++
 .../test/classloading/ClassLoaderITCase.java    |  15 +-
 .../jar/StreamingCustomInputSplitProgram.java   | 169 +++++++++++++++++++
 5 files changed, 249 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/177df41b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index a829a8d..613d381 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -29,10 +29,12 @@ import java.util.Map.Entry;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -41,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -221,10 +224,17 @@ public class StreamingJobGraphGenerator {
 	}
 
 	private StreamConfig createProcessingVertex(Integer vertexID) {
-
-		JobVertex jobVertex = new JobVertex(chainedNames.get(vertexID));
+		JobVertex jobVertex;
 		StreamNode vertex = streamGraph.getStreamNode(vertexID);
 
+		if (vertex.getInputFormat() != null) {
+			jobVertex = new InputFormatVertex(chainedNames.get(vertexID));
+			TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration());
+			taskConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(vertex.getInputFormat()));
+		} else {
+			jobVertex = new JobVertex(chainedNames.get(vertexID));
+		}
+
 		jobVertex.setInvokableClass(vertex.getJobVertexClass());
 
 		int parallelism = vertex.getParallelism();
@@ -237,10 +247,6 @@ public class StreamingJobGraphGenerator {
 			LOG.debug("Parallelism set: {} for {}", parallelism, vertexID);
 		}
 
-		if (vertex.getInputFormat() != null) {
-			jobVertex.setInputSplitSource(vertex.getInputFormat());
-		}
-
 		jobVertices.put(vertexID, jobVertex);
 		builtVertices.add(vertexID);
 		jobGraph.addVertex(jobVertex);

http://git-wip-us.apache.org/repos/asf/flink/blob/177df41b/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index d7128d7..9061a1b 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -392,6 +392,25 @@ under the License.
 						</configuration>
 					</execution>
 					<execution>
+						<id>create-streaming-custominputsplit-jar</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+						<configuration>
+							<archive>
+								<manifest>
+									<mainClass>org.apache.flink.test.classloading.jar.StreamingCustomInputSplitProgram</mainClass>
+								</manifest>
+							</archive>
+							<finalName>streaming-customsplit</finalName>
+							<attach>false</attach>
+							<descriptors>
+								<descriptor>src/test/assembly/test-streaming-custominput-assembly.xml</descriptor>
+							</descriptors>
+						</configuration>
+					</execution>
+					<execution>
 						<id>create-streamingclassloader-jar</id>
 						<phase>process-test-classes</phase>
 						<goals>

http://git-wip-us.apache.org/repos/asf/flink/blob/177df41b/flink-tests/src/test/assembly/test-streaming-custominput-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-streaming-custominput-assembly.xml b/flink-tests/src/test/assembly/test-streaming-custominput-assembly.xml
new file mode 100644
index 0000000..474599b
--- /dev/null
+++ b/flink-tests/src/test/assembly/test-streaming-custominput-assembly.xml
@@ -0,0 +1,36 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<assembly>
+	<id>test-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>${project.build.testOutputDirectory}</directory>
+			<outputDirectory>/</outputDirectory>
+			<!--modify/add include to match your package(s) -->
+			<includes>
+				<include>org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.class</include>
+				<include>org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram$*.class</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/177df41b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 3b5295f..10311a9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -37,6 +37,8 @@ public class ClassLoaderITCase {
 
 	private static final String INPUT_SPLITS_PROG_JAR_FILE = "target/customsplit-test-jar.jar";
 
+	private static final String STREAMING_INPUT_SPLITS_PROG_JAR_FILE = "target/streaming-customsplit-test-jar.jar";
+
 	private static final String STREAMING_PROG_JAR_FILE = "target/streamingclassloader-test-jar.jar";
 
 	private static final String STREAMING_CHECKPOINTED_PROG_JAR_FILE = "target/streaming-checkpointed-classloader-test-jar.jar";
@@ -76,6 +78,15 @@ public class ClassLoaderITCase {
 									});
 				inputSplitTestProg.invokeInteractiveModeForExecution();
 
+				PackagedProgram streamingInputSplitTestProg = new PackagedProgram(
+						new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE),
+						new String[] { STREAMING_INPUT_SPLITS_PROG_JAR_FILE,
+								"localhost",
+								String.valueOf(port),
+								"4" // parallelism
+						});
+				streamingInputSplitTestProg.invokeInteractiveModeForExecution();
+
 				String classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL().toString();
 				PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE),
 						new String[] { "",
@@ -89,7 +100,7 @@ public class ClassLoaderITCase {
 				// regular streaming job
 				PackagedProgram streamingProg = new PackagedProgram(
 						new File(STREAMING_PROG_JAR_FILE),
-						new String[] { 
+						new String[] {
 								STREAMING_PROG_JAR_FILE,
 								"localhost",
 								String.valueOf(port)
@@ -102,7 +113,7 @@ public class ClassLoaderITCase {
 					PackagedProgram streamingCheckpointedProg = new PackagedProgram(
 							new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE),
 							new String[] {
-									STREAMING_CHECKPOINTED_PROG_JAR_FILE, 
+									STREAMING_CHECKPOINTED_PROG_JAR_FILE,
 									"localhost",
 									String.valueOf(port)});
 					streamingCheckpointedProg.invokeInteractiveModeForExecution();

http://git-wip-us.apache.org/repos/asf/flink/blob/177df41b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
new file mode 100644
index 0000000..0f0ee0c
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingCustomInputSplitProgram.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading.jar;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@SuppressWarnings("serial")
+public class StreamingCustomInputSplitProgram {
+	
+	public static void main(String[] args) throws Exception {
+		final String jarFile = args[0];
+		final String host = args[1];
+		final int port = Integer.parseInt(args[2]);
+		final int parallelism = Integer.parseInt(args[3]);
+
+		Configuration config = new Configuration();
+
+		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host, port, config, jarFile);
+		env.getConfig().disableSysoutLogging();
+		env.setParallelism(parallelism);
+
+		DataStream<Integer> data = env.createInput(new CustomInputFormat());
+
+		data.map(new MapFunction<Integer, Tuple2<Integer, Double>>() {
+			@Override
+			public Tuple2<Integer, Double> map(Integer value) throws Exception {
+				return new Tuple2<Integer, Double>(value, value * 0.5);
+			}
+		}).addSink(new NoOpSink());
+
+		env.execute();
+	}
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class CustomInputFormat implements InputFormat<Integer, CustomInputSplit>, ResultTypeQueryable<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		private Integer value;
+
+		CustomInputSplit split = new CustomInputSplit(0);
+
+		@Override
+		public void configure(Configuration parameters) {}
+
+		@Override
+		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+			return null;
+		}
+
+		@Override
+		public CustomInputSplit[] createInputSplits(int minNumSplits) {
+			CustomInputSplit[] splits = new CustomInputSplit[minNumSplits];
+			for (int i = 0; i < minNumSplits; i++) {
+				splits[i] = new CustomInputSplit(i);
+			}
+			return splits;
+		}
+
+		@Override
+		public InputSplitAssigner getInputSplitAssigner(CustomInputSplit[] inputSplits) {
+			return new CustomSplitAssigner(inputSplits);
+		}
+
+		@Override
+		public void open(CustomInputSplit split) {
+			this.value = split.getSplitNumber();
+		}
+
+		@Override
+		public boolean reachedEnd() {
+			return this.value == null;
+		}
+
+		@Override
+		public Integer nextRecord(Integer reuse) {
+			Integer val = this.value;
+			this.value = null;
+			return val;
+		}
+
+		@Override
+		public void close() {}
+
+		@Override
+		public TypeInformation<Integer> getProducedType() {
+			return BasicTypeInfo.INT_TYPE_INFO;
+		}
+	}
+
+	public static final class CustomInputSplit implements InputSplit {
+
+		private static final long serialVersionUID = 1L;
+
+		private final int splitNumber;
+
+		public CustomInputSplit(int splitNumber) {
+			this.splitNumber = splitNumber;
+		}
+
+		@Override
+		public int getSplitNumber() {
+			return this.splitNumber;
+		}
+	}
+
+	public static final class CustomSplitAssigner implements InputSplitAssigner, Serializable {
+
+		private final List<CustomInputSplit> remainingSplits;
+
+		public CustomSplitAssigner(CustomInputSplit[] splits) {
+			this.remainingSplits = new ArrayList<CustomInputSplit>(Arrays.asList(splits));
+		}
+
+		@Override
+		public InputSplit getNextInputSplit(String host, int taskId) {
+			synchronized (this) {
+				int size = remainingSplits.size();
+				if (size > 0) {
+					return remainingSplits.remove(size - 1);
+				} else {
+					return null;
+				}
+			}
+		}
+	}
+
+	public static class NoOpSink implements SinkFunction<Tuple2<Integer, Double>> {
+		@Override
+		public void invoke(Tuple2<Integer, Double> value) throws Exception {
+		}
+	}
+}