You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/07/22 12:41:26 UTC

[86/92] [abbrv] git commit: [FLINK-1026] Fix PojoComparator

[FLINK-1026] Fix PojoComparator

Was not using the user code class loader and therefore could not clone
the serializer that is internally used.

Also modify KMeansForTest to make use of the PojoComparator to prevent
such bugs in the future.

Also fix PackagedProgramEndToEndITCase, AvroExternalJarProgramITCase and
the external jar file class loader tests in flink-clients. The pom was
not configured to remove the code that should only be in the external
jar from the test-classes directory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/152dcde0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/152dcde0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/152dcde0

Branch: refs/heads/travis_test
Commit: 152dcde0bb0d8a56bf878dde6f4aae2d2db6c8ba
Parents: c04c9bb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Jul 17 13:38:56 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Jul 17 15:31:01 2014 +0200

----------------------------------------------------------------------
 flink-addons/flink-avro/pom.xml                 |  38 ++++--
 flink-clients/pom.xml                           |  38 ++++--
 .../src/main/assembly/test-assembly.xml         |  36 ------
 .../src/test/assembly/test-assembly.xml         |  36 ++++++
 .../java/typeutils/runtime/PojoComparator.java  |   8 +-
 flink-tests/pom.xml                             |  34 +++++-
 .../PackagedProgramEndToEndITCase.java          |   1 +
 .../flink/test/util/testjar/KMeansForTest.java  | 119 +++++++++++--------
 8 files changed, 194 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/152dcde0/flink-addons/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/pom.xml b/flink-addons/flink-avro/pom.xml
index bf2cec8..33dd01d 100644
--- a/flink-addons/flink-avro/pom.xml
+++ b/flink-addons/flink-avro/pom.xml
@@ -71,16 +71,6 @@ under the License.
 
 	<build>
 		<plugins>
-		<!-- Exclude ExternalJar contents from regular build -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<configuration>
-					<excludes>
-						<exclude>**/src/test/java/org/apache/flink/api/avro/testjar/*.java</exclude>
-					</excludes>
-				</configuration>
-			</plugin>
 			<plugin>
 				<artifactId>maven-assembly-plugin</artifactId>
 				<executions>
@@ -105,6 +95,34 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+			<!--Remove the AvroExternalJarProgram code from the test-classes directory since it musn't be in the
+			classpath when running the tests to actually test whether the user code class loader
+			is properly used.-->
+			<plugin>
+				<artifactId>maven-clean-plugin</artifactId>
+				<version>2.5</version>
+				<executions>
+					<execution>
+						<id>remove-avroexternalprogram</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>clean</goal>
+						</goals>
+						<configuration>
+							<excludeDefaultDirectories>true</excludeDefaultDirectories>
+							<filesets>
+								<fileset>
+									<directory>${project.build.testOutputDirectory}</directory>
+									<includes>
+										<include>**/testjar/*.class</include>
+									</includes>
+								</fileset>
+							</filesets>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
 		</plugins>
 		
 		<pluginManagement>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/152dcde0/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 4c48476..8944385 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -103,15 +103,6 @@ under the License.
 	<build>
 		<plugins>
 			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<configuration>
-					<excludes>
-						<exclude>**/src/test/java/org/apache/flink/client/testjar/*.java</exclude>
-					</excludes>
-				</configuration>
-			</plugin>
-			<plugin>
 				<artifactId>maven-assembly-plugin</artifactId>
 				<version>2.4</version><!--$NO-MVN-MAN-VER$-->
 				<executions>
@@ -130,12 +121,39 @@ under the License.
 							<finalName>maven</finalName>
 							<attach>false</attach>
 							<descriptors>
-								<descriptor>src/main/assembly/test-assembly.xml</descriptor>
+								<descriptor>src/test/assembly/test-assembly.xml</descriptor>
 							</descriptors>
 						</configuration>
 					</execution>
 				</executions>
 			</plugin>
+			<!--Remove the external jar test code from the test-classes directory since it musn't be in the
+			classpath when running the tests to actually test whether the user code class loader
+			is properly used.-->
+			<plugin>
+				<artifactId>maven-clean-plugin</artifactId>
+				<version>2.5</version>
+				<executions>
+					<execution>
+						<id>remove-externaltestclasses</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>clean</goal>
+						</goals>
+						<configuration>
+							<excludeDefaultDirectories>true</excludeDefaultDirectories>
+							<filesets>
+								<fileset>
+									<directory>${project.build.testOutputDirectory}</directory>
+									<includes>
+										<include>**/testjar/*.class</include>
+									</includes>
+								</fileset>
+							</filesets>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
 		
 		<pluginManagement>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/152dcde0/flink-clients/src/main/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/assembly/test-assembly.xml b/flink-clients/src/main/assembly/test-assembly.xml
deleted file mode 100644
index bd66710..0000000
--- a/flink-clients/src/main/assembly/test-assembly.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<assembly>
-	<id>test-jar</id>
-	<formats>
-		<format>jar</format>
-	</formats>
-	<includeBaseDirectory>false</includeBaseDirectory>
-	<fileSets>
-		<fileSet>
-			<directory>${project.build.testOutputDirectory}</directory>
-			<outputDirectory>/</outputDirectory>
-			<!--modify/add include to match your package(s) -->
-			<includes>
-				<include>org/apache/flink/client/testjar**</include>
-			</includes>
-		</fileSet>
-	</fileSets>
-</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/152dcde0/flink-clients/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/assembly/test-assembly.xml b/flink-clients/src/test/assembly/test-assembly.xml
new file mode 100644
index 0000000..aa7b7d1
--- /dev/null
+++ b/flink-clients/src/test/assembly/test-assembly.xml
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<assembly>
+	<id>test-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>${project.build.testOutputDirectory}</directory>
+			<outputDirectory>/</outputDirectory>
+			<!--modify/add include to match your package(s) -->
+			<includes>
+				<include>org/apache/flink/client/testjar/**</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/152dcde0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
index b25507e..41f73c4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
@@ -118,15 +118,17 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
 		this.invertNormKey = toClone.invertNormKey;
 
 		this.type = toClone.type;
+
 		try {
 			this.serializer = (TypeSerializer<T>) InstantiationUtil.deserializeObject(
-					InstantiationUtil.serializeObject(toClone.serializer), toClone.serializer.getClass().getClassLoader());
+					InstantiationUtil.serializeObject(toClone.serializer), Thread.currentThread().getContextClassLoader());
 		} catch (IOException e) {
-			e.printStackTrace();
+			throw new RuntimeException("Cannot copy serializer", e);
 		} catch (ClassNotFoundException e) {
-			e.printStackTrace();
+			throw new RuntimeException("Cannot copy serializer", e);
 		}
 
+
 	}
 
 	private void writeObject(ObjectOutputStream out)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/152dcde0/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 8a8c798..80139c6 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -104,11 +104,6 @@ under the License.
 						</goals>
 					</execution>
 				</executions>
-				<configuration>
-					<excludes>
-						<exclude>**/src/test/java/org/apache/flink/test/util/testjar/*.java</exclude>
-					</excludes>
-				</configuration>
 			</plugin>
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
@@ -161,6 +156,35 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+
+			<!--Remove the KMeansForTest code from the test-classes directory since it musn't be in the
+			classpath when running the tests to actually test whether the user code class loader
+			is properly used.-->
+			<plugin>
+				<artifactId>maven-clean-plugin</artifactId>
+				<version>2.5</version>
+				<executions>
+					<execution>
+						<id>remove-kmeansfortest</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>clean</goal>
+						</goals>
+						<configuration>
+							<excludeDefaultDirectories>true</excludeDefaultDirectories>
+							<filesets>
+								<fileset>
+									<directory>${project.build.testOutputDirectory}</directory>
+									<includes>
+										<include>**/testjar/*.class</include>
+									</includes>
+								</fileset>
+							</filesets>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
 		</plugins>
 		
 		<pluginManagement>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/152dcde0/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
index d244811..0e93f2a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
@@ -64,6 +64,7 @@ public class PackagedProgramEndToEndITCase {
 			fwClusters.close();
 
 			String jarPath = "target/maven-test-jar.jar";
+//			String jarPath = "/home/aljoscha/maven-test-jar.jar";
 
 			// run KMeans
 			cluster.setNumTaskTracker(2);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/152dcde0/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
index 7dfff46..97367f7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/testjar/KMeansForTest.java
@@ -36,139 +36,139 @@ import org.apache.flink.api.java.IterativeDataSet;
 
 @SuppressWarnings("serial")
 public class KMeansForTest implements Program {
-	
+
 	// *************************************************************************
 	//     PROGRAM
 	// *************************************************************************
-	
-	
+
+
 
 	@Override
 	public Plan getPlan(String... args) {
 		if (args.length < 4) {
 			throw new IllegalArgumentException("Missing parameters");
 		}
-		
+
 		final String pointsPath = args[0];
 		final String centersPath = args[1];
 		final String outputPath = args[2];
 		final int numIterations = Integer.parseInt(args[3]);
-		
-		
+
+
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setDegreeOfParallelism(4);
-		
+
 		// get input data
 		DataSet<Point> points = env.readCsvFile(pointsPath)
 				.fieldDelimiter('|')
 				.includeFields(true, true)
 				.types(Double.class, Double.class)
 				.map(new TuplePointConverter());
-		
+
 		DataSet<Centroid> centroids = env.readCsvFile(centersPath)
 				.fieldDelimiter('|')
 				.includeFields(true, true, true)
 				.types(Integer.class, Double.class, Double.class)
 				.map(new TupleCentroidConverter());
-		
+
 		// set number of bulk iterations for KMeans algorithm
 		IterativeDataSet<Centroid> loop = centroids.iterate(numIterations);
-		
+
 		DataSet<Centroid> newCentroids = points
 			// compute closest centroid for each point
 			.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
 			// count and sum point coordinates for each centroid
 			.map(new CountAppender())
-			.groupBy(0).reduce(new CentroidAccumulator())
+			.groupBy("f0").reduce(new CentroidAccumulator())
 			// compute new centroids from point counts and coordinate sums
 			.map(new CentroidAverager());
-		
+
 		// feed new centroids back into next iteration
 		DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
-		
+
 		DataSet<Tuple2<Integer, Point>> clusteredPoints = points
 				// assign points to final clusters
 				.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
-		
+
 		// emit result
 		clusteredPoints.writeAsCsv(outputPath, "\n", " ");
 
 		return env.createProgramPlan();
 	}
-	
+
 	// *************************************************************************
 	//     DATA TYPES
 	// *************************************************************************
-	
+
 	/**
 	 * A simple two-dimensional point.
 	 */
 	public static class Point implements Serializable {
-		
+
 		public double x, y;
-		
+
 		public Point() {}
 
 		public Point(double x, double y) {
 			this.x = x;
 			this.y = y;
 		}
-		
+
 		public Point add(Point other) {
 			x += other.x;
 			y += other.y;
 			return this;
 		}
-		
+
 		public Point div(long val) {
 			x /= val;
 			y /= val;
 			return this;
 		}
-		
+
 		public double euclideanDistance(Point other) {
 			return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
 		}
-		
+
 		public void clear() {
 			x = y = 0.0;
 		}
-		
+
 		@Override
 		public String toString() {
 			return x + " " + y;
 		}
 	}
-	
+
 	/**
-	 * A simple two-dimensional centroid, basically a point with an ID. 
+	 * A simple two-dimensional centroid, basically a point with an ID.
 	 */
 	public static class Centroid extends Point {
-		
+
 		public int id;
-		
+
 		public Centroid() {}
-		
+
 		public Centroid(int id, double x, double y) {
 			super(x,y);
 			this.id = id;
 		}
-		
+
 		public Centroid(int id, Point p) {
 			super(p.x, p.y);
 			this.id = id;
 		}
-		
+
 		@Override
 		public String toString() {
 			return id + " " + super.toString();
 		}
 	}
-	
+
 	// *************************************************************************
 	//     USER FUNCTIONS
 	// *************************************************************************
-	
+
 	/** Converts a Tuple2<Double,Double> into a Point. */
 	public static final class TuplePointConverter extends MapFunction<Tuple2<Double, Double>, Point> {
 
@@ -177,7 +177,7 @@ public class KMeansForTest implements Program {
 			return new Point(t.f0, t.f1);
 		}
 	}
-	
+
 	/** Converts a Tuple3<Integer, Double,Double> into a Centroid. */
 	public static final class TupleCentroidConverter extends MapFunction<Tuple3<Integer, Double, Double>, Centroid> {
 
@@ -186,7 +186,7 @@ public class KMeansForTest implements Program {
 			return new Centroid(t.f0, t.f1, t.f2);
 		}
 	}
-	
+
 	/** Determines the closest cluster center for a data point. */
 	public static final class SelectNearestCenter extends MapFunction<Point, Tuple2<Integer, Point>> {
 		private Collection<Centroid> centroids;
@@ -196,19 +196,19 @@ public class KMeansForTest implements Program {
 		public void open(Configuration parameters) throws Exception {
 			this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
 		}
-		
+
 		@Override
 		public Tuple2<Integer, Point> map(Point p) throws Exception {
-			
+
 			double minDistance = Double.MAX_VALUE;
 			int closestCentroidId = -1;
-			
+
 			// check all cluster centers
 			for (Centroid centroid : centroids) {
 				// compute distance
 				double distance = p.euclideanDistance(centroid);
-				
-				// update nearest cluster if necessary 
+
+				// update nearest cluster if necessary
 				if (distance < minDistance) {
 					minDistance = distance;
 					closestCentroidId = centroid.id;
@@ -219,30 +219,45 @@ public class KMeansForTest implements Program {
 			return new Tuple2<Integer, Point>(closestCentroidId, p);
 		}
 	}
-	
-	/** Appends a count variable to the tuple. */ 
-	public static final class CountAppender extends MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> {
+
+	// Use this so that we can check whether POJOs and the POJO comparator also work
+	public static final class DummyTuple3IntPointLong {
+		public Integer f0;
+		public Point f1;
+		public Long f2;
+
+		public DummyTuple3IntPointLong() {}
+
+		DummyTuple3IntPointLong(Integer f0, Point f1, Long f2) {
+			this.f0 = f0;
+			this.f1 = f1;
+			this.f2 = f2;
+		}
+	}
+
+	/** Appends a count variable to the tuple. */
+	public static final class CountAppender extends MapFunction<Tuple2<Integer, Point>, DummyTuple3IntPointLong> {
 
 		@Override
-		public Tuple3<Integer, Point, Long> map(Tuple2<Integer, Point> t) {
-			return new Tuple3<Integer, Point, Long>(t.f0, t.f1, 1L);
-		} 
+		public DummyTuple3IntPointLong map(Tuple2<Integer, Point> t) {
+			return new DummyTuple3IntPointLong(t.f0, t.f1, 1L);
+		}
 	}
-	
+
 	/** Sums and counts point coordinates. */
-	public static final class CentroidAccumulator extends ReduceFunction<Tuple3<Integer, Point, Long>> {
+	public static final class CentroidAccumulator extends ReduceFunction<DummyTuple3IntPointLong> {
 
 		@Override
-		public Tuple3<Integer, Point, Long> reduce(Tuple3<Integer, Point, Long> val1, Tuple3<Integer, Point, Long> val2) {
-			return new Tuple3<Integer, Point, Long>(val1.f0, val1.f1.add(val2.f1), val1.f2 + val2.f2);
+		public DummyTuple3IntPointLong reduce(DummyTuple3IntPointLong val1, DummyTuple3IntPointLong val2) {
+			return new DummyTuple3IntPointLong(val1.f0, val1.f1.add(val2.f1), val1.f2 + val2.f2);
 		}
 	}
-	
+
 	/** Computes new centroid from coordinate sum and count of points. */
-	public static final class CentroidAverager extends MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
+	public static final class CentroidAverager extends MapFunction<DummyTuple3IntPointLong, Centroid> {
 
 		@Override
-		public Centroid map(Tuple3<Integer, Point, Long> value) {
+		public Centroid map(DummyTuple3IntPointLong value) {
 			return new Centroid(value.f0, value.f1.div(value.f2));
 		}
 	}