You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/10 21:35:00 UTC
[03/34] git commit: Rework stratosphere-clients tests to not expect a
binary blob in the test resources (build the jar file using maven) add a test
for ensuring the presense of a custom classloader in the CliFrontend Add
missing ClassLoader for the "info
Rework stratosphere-clients tests to not expect a binary blob in the test resources (build the jar file using maven)
add a test for ensuring the presense of a custom classloader in the CliFrontend
Add missing ClassLoader for the "info" command in the CliFrontend
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6c4b85c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6c4b85c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6c4b85c9
Branch: refs/heads/release-0.5.1
Commit: 6c4b85c94472e6ab7ab1ac46ac462c3237bfc7c6
Parents: bce608c
Author: Robert Metzger <me...@web.de>
Authored: Wed May 28 23:27:02 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Jun 10 21:24:32 2014 +0200
----------------------------------------------------------------------
stratosphere-addons/avro/pom.xml | 76 +++++
.../avro/src/test/assembly/test-assembly.xml | 30 ++
.../api/avro/AvroExternalJarProgram.java | 218 --------------
.../api/avro/AvroExternalJarProgramITCase.java | 4 +-
.../avro/testjar/AvroExternalJarProgram.java | 228 ++++++++++++++
.../avro/src/test/resources/AvroTestProgram.jar | Bin 5713 -> 0 bytes
stratosphere-clients/pom.xml | 86 +++++-
.../src/main/assembly/test-assembly.xml | 30 ++
.../eu/stratosphere/client/program/Client.java | 5 +
.../client/program/PackagedProgram.java | 9 +-
.../client/CliFrontendPackageProgramTest.java | 88 +++++-
.../client/CliFrontendTestUtils.java | 16 +-
.../client/program/PackagedProgramTest.java | 9 +-
.../testjar/JobWithExternalDependency.java | 30 ++
.../stratosphere/client/testjar/WordCount.java | 162 ++++++++++
.../src/test/resources/test.jar | Bin 4929 -> 0 bytes
stratosphere-tests/pom.xml | 67 +++++
.../src/test/assembly/test-assembly.xml | 30 ++
.../PackagedProgramEndToEndITCase.java | 12 +-
.../test/util/testjar/KMeansForTest.java | 294 +++++++++++++++++++
.../src/test/resources/KMeansForTest.jar | Bin 144530 -> 0 bytes
21 files changed, 1154 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-addons/avro/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/pom.xml b/stratosphere-addons/avro/pom.xml
index f772362..e9da4e4 100644
--- a/stratosphere-addons/avro/pom.xml
+++ b/stratosphere-addons/avro/pom.xml
@@ -51,6 +51,82 @@
</dependencies>
+ <build>
+ <plugins>
+ <!-- Exclude ExternalJar contents from regular build -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/src/test/java/eu/stratosphere/api/avro/testjar/*.java</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>create-test-dependency</id>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>eu.stratosphere.api.avro.testjar.AvroExternalJarProgram</mainClass>
+ </manifest>
+ </archive>
+ <finalName>maven</finalName>
+ <attach>false</attach>
+ <descriptors>
+ <descriptor>src/test/assembly/test-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>
+ org.apache.maven.plugins
+ </groupId>
+ <artifactId>
+ maven-assembly-plugin
+ </artifactId>
+ <versionRange>
+ [2.4,)
+ </versionRange>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
<profiles>
<profile>
<!-- A bug with java6 is causing the javadoc generation
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-addons/avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/test/assembly/test-assembly.xml b/stratosphere-addons/avro/src/test/assembly/test-assembly.xml
new file mode 100644
index 0000000..ebb1b07
--- /dev/null
+++ b/stratosphere-addons/avro/src/test/assembly/test-assembly.xml
@@ -0,0 +1,30 @@
+<!--
+ Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+
+ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations under the License.
+-->
+
+<assembly>
+ <id>test-jar</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.testOutputDirectory}</directory>
+ <outputDirectory>/</outputDirectory>
+ <!--modify/add include to match your package(s) -->
+ <includes>
+ <include>eu/stratosphere/api/avro/testjar/**</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgram.java b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgram.java
deleted file mode 100644
index 704cf87..0000000
--- a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgram.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
-// ================================================================================================
-// This file defines the classes for the AvroExternalJarProgramITCase.
-// The program is exported into src/test/resources/AvroTestProgram.jar.
-//
-// THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE NOT COMPILED
-// AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS LOADING WILL
-// NOT BE COVERED BY THIS TEST.
-// ================================================================================================
-//package eu.stratosphere.api.avro;
-//
-//import java.util.ArrayList;
-//import java.util.List;
-//
-//import eu.stratosphere.api.java.DataSet;
-//import eu.stratosphere.api.java.ExecutionEnvironment;
-//import eu.stratosphere.api.java.functions.MapFunction;
-//import eu.stratosphere.api.java.functions.ReduceFunction;
-//import eu.stratosphere.api.java.io.AvroInputFormat;
-//import eu.stratosphere.api.java.io.DiscardingOuputFormat;
-//import eu.stratosphere.api.java.tuple.Tuple2;
-//import eu.stratosphere.core.fs.Path;
-//
-//public class AvroExternalJarProgram {
-//
-// public static final class Color {
-//
-// private String name;
-// private double saturation;
-//
-// public Color() {
-// name = "";
-// saturation = 1.0;
-// }
-//
-// public Color(String name, double saturation) {
-// this.name = name;
-// this.saturation = saturation;
-// }
-//
-// public String getName() {
-// return name;
-// }
-//
-// public void setName(String name) {
-// this.name = name;
-// }
-//
-// public double getSaturation() {
-// return saturation;
-// }
-//
-// public void setSaturation(double saturation) {
-// this.saturation = saturation;
-// }
-//
-// @Override
-// public String toString() {
-// return name + '(' + saturation + ')';
-// }
-// }
-//
-// public static final class MyUser {
-//
-// private String name;
-// private List<Color> colors;
-//
-// public MyUser() {
-// name = "unknown";
-// colors = new ArrayList<Color>();
-// }
-//
-// public MyUser(String name, List<Color> colors) {
-// this.name = name;
-// this.colors = colors;
-// }
-//
-// public String getName() {
-// return name;
-// }
-//
-// public List<Color> getColors() {
-// return colors;
-// }
-//
-// public void setName(String name) {
-// this.name = name;
-// }
-//
-// public void setColors(List<Color> colors) {
-// this.colors = colors;
-// }
-//
-// @Override
-// public String toString() {
-// return name + " : " + colors;
-// }
-// }
-//
-//
-// public static final class SUser extends AvroBaseValue<MyUser> {
-//
-// static final long serialVersionUID = 1L;
-//
-// public SUser() {}
-//
-// public SUser(MyUser u) {
-// super(u);
-// }
-// }
-//
-// // --------------------------------------------------------------------------------------------
-//
-// // --------------------------------------------------------------------------------------------
-//
-// public static final class NameExtractor extends MapFunction<MyUser, Tuple2<String, MyUser>> {
-// private static final long serialVersionUID = 1L;
-//
-// @Override
-// public Tuple2<String, MyUser> map(MyUser u) {
-// String namePrefix = u.getName().substring(0, 1);
-// return new Tuple2<String, MyUser>(namePrefix, u);
-// }
-// }
-//
-// public static final class NameGrouper extends ReduceFunction<Tuple2<String, MyUser>> {
-// private static final long serialVersionUID = 1L;
-//
-// @Override
-// public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> val1, Tuple2<String, MyUser> val2) {
-// return val1;
-// }
-// }
-//
-// // --------------------------------------------------------------------------------------------
-// // Test Data
-// // --------------------------------------------------------------------------------------------
-//
-// public static final class Generator {
-//
-// private final Random rnd = new Random(2389756789345689276L);
-//
-// public MyUser nextUser() {
-// return randomUser();
-// }
-//
-// private MyUser randomUser() {
-//
-// int numColors = rnd.nextInt(5);
-// ArrayList<Color> colors = new ArrayList<Color>(numColors);
-// for (int i = 0; i < numColors; i++) {
-// colors.add(new Color(randomString(), rnd.nextDouble()));
-// }
-//
-// return new MyUser(randomString(), colors);
-// }
-//
-// private String randomString() {
-// char[] c = new char[this.rnd.nextInt(20) + 5];
-//
-// for (int i = 0; i < c.length; i++) {
-// c[i] = (char) (this.rnd.nextInt(150) + 40);
-// }
-//
-// return new String(c);
-// }
-// }
-//
-// public static void writeTestData(File testFile, int numRecords) throws IOException {
-//
-// DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class);
-// DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter);
-//
-// dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile);
-//
-//
-// Generator generator = new Generator();
-//
-// for (int i = 0; i < numRecords; i++) {
-// MyUser user = generator.nextUser();
-// dataFileWriter.append(user);
-// }
-//
-// dataFileWriter.close();
-// }
-//
-// public static void main(String[] args) throws Exception {
-// String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath();
-// writeTestData(new File(testDataFile), 50);
-// }
-//
-// public static void main(String[] args) throws Exception {
-// String inputPath = args[0];
-//
-// ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-//
-// DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class));
-//
-// DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper());
-//
-// result.output(new DiscardingOuputFormat<Tuple2<String,MyUser>>());
-// env.execute();
-// }
-//}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java
index 3140be4..a766fcb 100644
--- a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java
+++ b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java
@@ -31,7 +31,7 @@ public class AvroExternalJarProgramITCase {
private static final int TEST_JM_PORT = 43191;
- private static final String JAR_FILE = "/AvroTestProgram.jar";
+ private static final String JAR_FILE = "target/maven-test-jar.jar";
private static final String TEST_DATA_FILE = "/testdata.avro";
@@ -49,7 +49,7 @@ public class AvroExternalJarProgramITCase {
testMiniCluster.setJobManagerRpcPort(TEST_JM_PORT);
testMiniCluster.start();
- String jarFile = getClass().getResource(JAR_FILE).getFile();
+ String jarFile = JAR_FILE;
String testData = getClass().getResource(TEST_DATA_FILE).toString();
PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/testjar/AvroExternalJarProgram.java b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/testjar/AvroExternalJarProgram.java
new file mode 100644
index 0000000..1db21b3
--- /dev/null
+++ b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/testjar/AvroExternalJarProgram.java
@@ -0,0 +1,228 @@
+package eu.stratosphere.api.avro.testjar;
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+// ================================================================================================
+// This file defines the classes for the AvroExternalJarProgramITCase.
+// The program is exported into src/test/resources/AvroTestProgram.jar.
+//
+// THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE NOT COMPILED
+// AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS LOADING WILL
+// NOT BE COVERED BY THIS TEST.
+// ================================================================================================
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+
+import eu.stratosphere.api.avro.AvroBaseValue;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.functions.ReduceFunction;
+import eu.stratosphere.api.java.io.AvroInputFormat;
+import eu.stratosphere.api.java.io.DiscardingOuputFormat;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.core.fs.Path;
+
+public class AvroExternalJarProgram {
+
+ public static final class Color {
+
+ private String name;
+ private double saturation;
+
+ public Color() {
+ name = "";
+ saturation = 1.0;
+ }
+
+ public Color(String name, double saturation) {
+ this.name = name;
+ this.saturation = saturation;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public double getSaturation() {
+ return saturation;
+ }
+
+ public void setSaturation(double saturation) {
+ this.saturation = saturation;
+ }
+
+ @Override
+ public String toString() {
+ return name + '(' + saturation + ')';
+ }
+ }
+
+ public static final class MyUser {
+
+ private String name;
+ private List<Color> colors;
+
+ public MyUser() {
+ name = "unknown";
+ colors = new ArrayList<Color>();
+ }
+
+ public MyUser(String name, List<Color> colors) {
+ this.name = name;
+ this.colors = colors;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public List<Color> getColors() {
+ return colors;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setColors(List<Color> colors) {
+ this.colors = colors;
+ }
+
+ @Override
+ public String toString() {
+ return name + " : " + colors;
+ }
+ }
+
+
+ public static final class SUser extends AvroBaseValue<MyUser> {
+
+ static final long serialVersionUID = 1L;
+
+ public SUser() {}
+
+ public SUser(MyUser u) {
+ super(u);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final class NameExtractor extends MapFunction<MyUser, Tuple2<String, MyUser>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, MyUser> map(MyUser u) {
+ String namePrefix = u.getName().substring(0, 1);
+ return new Tuple2<String, MyUser>(namePrefix, u);
+ }
+ }
+
+ public static final class NameGrouper extends ReduceFunction<Tuple2<String, MyUser>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> val1, Tuple2<String, MyUser> val2) {
+ return val1;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Test Data
+ // --------------------------------------------------------------------------------------------
+
+ public static final class Generator {
+
+ private final Random rnd = new Random(2389756789345689276L);
+
+ public MyUser nextUser() {
+ return randomUser();
+ }
+
+ private MyUser randomUser() {
+
+ int numColors = rnd.nextInt(5);
+ ArrayList<Color> colors = new ArrayList<Color>(numColors);
+ for (int i = 0; i < numColors; i++) {
+ colors.add(new Color(randomString(), rnd.nextDouble()));
+ }
+
+ return new MyUser(randomString(), colors);
+ }
+
+ private String randomString() {
+ char[] c = new char[this.rnd.nextInt(20) + 5];
+
+ for (int i = 0; i < c.length; i++) {
+ c[i] = (char) (this.rnd.nextInt(150) + 40);
+ }
+
+ return new String(c);
+ }
+ }
+
+ public static void writeTestData(File testFile, int numRecords) throws IOException {
+
+ DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class);
+ DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter);
+
+ dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile);
+
+
+ Generator generator = new Generator();
+
+ for (int i = 0; i < numRecords; i++) {
+ MyUser user = generator.nextUser();
+ dataFileWriter.append(user);
+ }
+
+ dataFileWriter.close();
+ }
+
+// public static void main(String[] args) throws Exception {
+// String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath();
+// writeTestData(new File(testDataFile), 50);
+// }
+
+ public static void main(String[] args) throws Exception {
+ String inputPath = args[0];
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class));
+
+ DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper());
+
+ result.output(new DiscardingOuputFormat<Tuple2<String,MyUser>>());
+ env.execute();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-addons/avro/src/test/resources/AvroTestProgram.jar
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/test/resources/AvroTestProgram.jar b/stratosphere-addons/avro/src/test/resources/AvroTestProgram.jar
deleted file mode 100644
index eb56b62..0000000
Binary files a/stratosphere-addons/avro/src/test/resources/AvroTestProgram.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-clients/pom.xml b/stratosphere-clients/pom.xml
index 4daf320..d812d3c 100644
--- a/stratosphere-clients/pom.xml
+++ b/stratosphere-clients/pom.xml
@@ -35,6 +35,12 @@
<artifactId>stratosphere-compiler</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>eu.stratosphere</groupId>
+ <artifactId>stratosphere-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
@@ -62,8 +68,7 @@
<scope>compile</scope>
</dependency>
- <!-- commons-io is required by commons-fileupload
- See http://commons.apache.org/proper/commons-fileupload/dependencies.html
+ <!-- commons-io is required by commons-fileupload See http://commons.apache.org/proper/commons-fileupload/dependencies.html
and https://github.com/dimalabs/ozone/pull/157 -->
<dependency>
<groupId>commons-io</groupId>
@@ -73,4 +78,81 @@
</dependency>
</dependencies>
+ <!-- More information on this:
+ http://stackoverflow.com/questions/1401857/using-maven-to-build-separate-jar-files-for-unit-testing-a-custom-class-loader
+ -->
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/src/test/java/eu/stratosphere/client/testjar/*.java</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>create-test-dependency</id>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>eu.stratosphere.client.testjar.WordCount</mainClass>
+ </manifest>
+ </archive>
+ <finalName>maven</finalName>
+ <attach>false</attach>
+ <descriptors>
+ <descriptor>src/main/assembly/test-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>
+ org.apache.maven.plugins
+ </groupId>
+ <artifactId>
+ maven-assembly-plugin
+ </artifactId>
+ <versionRange>
+ [2.4,)
+ </versionRange>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/main/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/assembly/test-assembly.xml b/stratosphere-clients/src/main/assembly/test-assembly.xml
new file mode 100644
index 0000000..c45d378
--- /dev/null
+++ b/stratosphere-clients/src/main/assembly/test-assembly.xml
@@ -0,0 +1,30 @@
+<!--
+ Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+
+ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations under the License.
+-->
+
+<assembly>
+ <id>test-jar</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.testOutputDirectory}</directory>
+ <outputDirectory>/</outputDirectory>
+ <!--modify/add include to match your package(s) -->
+ <includes>
+ <include>eu/stratosphere/client/testjar**</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
index 632c854..dfe9fdd 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
@@ -21,6 +21,8 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import com.google.common.base.Preconditions;
+
import eu.stratosphere.api.common.JobExecutionResult;
import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.java.ExecutionEnvironment;
@@ -67,6 +69,7 @@ public class Client {
* @param jobManagerAddress Address and port of the job-manager.
*/
public Client(InetSocketAddress jobManagerAddress, Configuration config) {
+ Preconditions.checkNotNull(config, "Configuration is null");
this.configuration = config;
configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getAddress().getHostAddress());
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
@@ -84,6 +87,7 @@ public class Client {
* @param config The config used to obtain the job-manager's address.
*/
public Client(Configuration config) {
+ Preconditions.checkNotNull(config, "Configuration is null");
this.configuration = config;
// instantiate the address to the job manager
@@ -126,6 +130,7 @@ public class Client {
}
public OptimizedPlan getOptimizedPlan(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
+ Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
return getOptimizedPlan(prog.getPlanWithJars(), parallelism);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
index 2ee1765..a0fc1b7 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
@@ -73,7 +73,7 @@ public class PackagedProgram {
private final List<File> extractedTempLibraries;
- private final ClassLoader userCodeClassLoader;
+ private ClassLoader userCodeClassLoader;
private Plan plan;
@@ -204,6 +204,7 @@ public class PackagedProgram {
* missing parameters for generation.
*/
public String getPreviewPlan() throws ProgramInvocationException {
+ Thread.currentThread().setContextClassLoader(this.getUserCodeClassLoader());
List<DataSinkNode> previewPlan;
if (isUsingProgramEntryPoint()) {
@@ -312,6 +313,12 @@ public class PackagedProgram {
public ClassLoader getUserCodeClassLoader() {
return this.userCodeClassLoader;
}
+
+ public void setUserCodeClassLoader(ClassLoader cl) {
+ this.userCodeClassLoader = cl;
+ }
+
+
public List<File> getAllLibraries() {
List<File> libs = new ArrayList<File>(this.extractedTempLibraries.size() + 1);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendPackageProgramTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendPackageProgramTest.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendPackageProgramTest.java
index 3ff5663..00cda03 100644
--- a/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendPackageProgramTest.java
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendPackageProgramTest.java
@@ -15,9 +15,13 @@
package eu.stratosphere.client;
+import static eu.stratosphere.client.CliFrontendTestUtils.TEST_JAR_CLASSLOADERTEST_CLASS;
+import static eu.stratosphere.client.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
+import static eu.stratosphere.client.CliFrontendTestUtils.getNonJarFilePath;
+import static eu.stratosphere.client.CliFrontendTestUtils.getTestJarPath;
+import static eu.stratosphere.client.CliFrontendTestUtils.pipeSystemOutToNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static eu.stratosphere.client.CliFrontendTestUtils.*;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
@@ -26,7 +30,12 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import eu.stratosphere.client.program.Client;
import eu.stratosphere.client.program.PackagedProgram;
+import eu.stratosphere.client.program.ProgramInvocationException;
+import eu.stratosphere.compiler.CompilerException;
+import eu.stratosphere.configuration.ConfigConstants;
+import eu.stratosphere.configuration.Configuration;
public class CliFrontendPackageProgramTest {
@@ -186,4 +195,81 @@ public class CliFrontendPackageProgramTest {
fail("Program caused an exception: " + e.getMessage());
}
}
+
+ /**
+ * Ensure that we will never have the following error.
+ *
+ * The test works as follows:
+ * - Use the CliFrontend to invoke a jar file that loads a class which is only available
+ * in the jarfile itself (via a custom classloader)
+ * - Change the Usercode classloader of the PackagedProgram to a special classloader for this test
+ * - the classloader will accept the special class (and return a String.class)
+ *
+ * eu.stratosphere.client.program.ProgramInvocationException: The main method caused an error.
+ at eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:398)
+ at eu.stratosphere.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:301)
+ at eu.stratosphere.client.program.Client.getOptimizedPlan(Client.java:140)
+ at eu.stratosphere.client.program.Client.getOptimizedPlanAsJson(Client.java:125)
+ at eu.stratosphere.client.CliFrontend.info(CliFrontend.java:439)
+ at eu.stratosphere.client.CliFrontend.parseParameters(CliFrontend.java:931)
+ at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951)
+ Caused by: java.io.IOException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.RCFileInputFormat
+ at org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:102)
+ at org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:54)
+ at tlabs.CDR_In_Report.createHCatInputFormat(CDR_In_Report.java:322)
+ at tlabs.CDR_Out_Report.main(CDR_Out_Report.java:380)
+ at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
+ at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
+ at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
+ at java.lang.reflect.Method.invoke(Method.java:622)
+ at eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:383)
+
+ */
+ @Test
+ public void testPlanWithExternalClass() throws CompilerException, ProgramInvocationException {
+ final Boolean callme[] = { false }; // create a final object reference, to be able to change its val later
+ try {
+ String[] parameters = {getTestJarPath(), "-c", TEST_JAR_CLASSLOADERTEST_CLASS , "some", "program"};
+ CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), parameters, false);
+
+ CliFrontend frontend = new CliFrontend();
+ Object result = frontend.buildProgram(line);
+ assertTrue(result instanceof PackagedProgram);
+
+ PackagedProgram prog = (PackagedProgram) result;
+
+ Assert.assertArrayEquals(new String[] {"some", "program"}, prog.getArguments());
+ Assert.assertEquals(TEST_JAR_CLASSLOADERTEST_CLASS, prog.getMainClassName());
+ prog.setUserCodeClassLoader(new ClassLoader(prog.getUserCodeClassLoader()) {
+ @Override
+ public Class<?> loadClass(String name) throws ClassNotFoundException {
+ assertTrue(name.equals("org.apache.hadoop.hive.ql.io.RCFileInputFormat"));
+ callme[0] = true;
+ return String.class; // Intentionally return the wrong class.
+ }
+ });
+
+ Configuration c = new Configuration();
+ c.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "devil");
+ Client cli = new Client(c);
+
+ cli.getOptimizedPlanAsJson(prog, 666);
+ } catch(ProgramInvocationException pie) {
+ assertTrue("Classloader was not called", callme[0]);
+ // class not found exception is expected as some point
+ if( ! ( pie.getCause() instanceof ClassNotFoundException ) ) {
+ System.err.println(pie.getMessage());
+ pie.printStackTrace();
+ fail("Program caused an exception: " + pie.getMessage());
+ }
+ }
+ catch (Exception e) {
+ assertTrue("Classloader was not called", callme[0]);
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Program caused an exception: " + e.getMessage());
+ }
+
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendTestUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendTestUtils.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendTestUtils.java
index fa2c300..342b711 100644
--- a/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendTestUtils.java
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendTestUtils.java
@@ -18,15 +18,20 @@ package eu.stratosphere.client;
import static org.junit.Assert.fail;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.PrintStream;
import java.lang.reflect.Field;
+import java.net.MalformedURLException;
import java.util.Map;
import eu.stratosphere.configuration.GlobalConfiguration;
public class CliFrontendTestUtils {
- public static final String TEST_JAR_MAIN_CLASS = "eu.stratosphere.example.java.wordcount.WordCount";
+ public static final String TEST_JAR_MAIN_CLASS = "eu.stratosphere.client.testjar.WordCount";
+
+ public static final String TEST_JAR_CLASSLOADERTEST_CLASS = "eu.stratosphere.client.testjar.JobWithExternalDependency";
+
public static final String TEST_JOB_MANAGER_ADDRESS = "192.168.1.33";
@@ -37,8 +42,13 @@ public class CliFrontendTestUtils {
public static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
- public static String getTestJarPath() {
- return CliFrontendRunTest.class.getResource("/test.jar").getFile();
+ public static String getTestJarPath() throws FileNotFoundException, MalformedURLException {
+ File f = new File("target/maven-test-jar.jar");
+ if(!f.exists()) {
+ throw new FileNotFoundException("Test jar not present. Invoke tests using maven "
+ + "or build the jar using 'mvn process-test-classes' in stratosphere-clients");
+ }
+ return f.getAbsolutePath();
}
public static String getNonJarFilePath() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/test/java/eu/stratosphere/client/program/PackagedProgramTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/program/PackagedProgramTest.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/program/PackagedProgramTest.java
index 9bd9b43..856d0a1 100644
--- a/stratosphere-clients/src/test/java/eu/stratosphere/client/program/PackagedProgramTest.java
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/program/PackagedProgramTest.java
@@ -15,23 +15,20 @@
package eu.stratosphere.client.program;
import java.io.File;
-import java.net.URL;
import org.junit.Assert;
import org.junit.Test;
+import eu.stratosphere.client.CliFrontendTestUtils;
+
public class PackagedProgramTest {
- private static final String TEST_PROG_FILE_PATH = "/test.jar";
-
@Test
public void testGetPreviewPlan() {
try {
- URL jarFileURL = getClass().getResource(TEST_PROG_FILE_PATH);
-
- PackagedProgram prog = new PackagedProgram(new File(jarFileURL.getFile()));
+ PackagedProgram prog = new PackagedProgram(new File(CliFrontendTestUtils.getTestJarPath()));
Assert.assertNotNull(prog.getPreviewPlan());
}
catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/JobWithExternalDependency.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/JobWithExternalDependency.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/JobWithExternalDependency.java
new file mode 100644
index 0000000..0387e65
--- /dev/null
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/JobWithExternalDependency.java
@@ -0,0 +1,30 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.client.testjar;
+
+/**
+ * Simulate a class that requires an external dependency
+ *
+ */
+public class JobWithExternalDependency {
+
+ public static final String EXTERNAL_CLASS = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
+
+ public static void main(String[] args) throws ClassNotFoundException {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ Class.forName(EXTERNAL_CLASS, false, cl);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java
new file mode 100644
index 0000000..458c848
--- /dev/null
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java
@@ -0,0 +1,162 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.client.testjar;
+
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.aggregation.Aggregations;
+import eu.stratosphere.api.java.functions.FlatMapFunction;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.util.Collector;
+
+/**
+ * Wordcount for placing at least something into the jar file.
+ *
+ */
+public class WordCount {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ DataSet<String> text = getTextDataSet(env);
+
+ DataSet<Tuple2<String, Integer>> counts =
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ text.flatMap(new Tokenizer())
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0)
+ .aggregate(Aggregations.SUM, 1);
+
+ // emit result
+ if(fileOutput) {
+ counts.writeAsCsv(outputPath, "\n", " ");
+ } else {
+ counts.print();
+ }
+
+ // execute program
+ env.execute("WordCount Example");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Implements the string tokenizer that splits sentences into words as a user-defined
+ * FlatMapFunction. The function takes a line (String) and splits it into
+ * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+ */
+ public static final class Tokenizer extends FlatMapFunction<String, Tuple2<String, Integer>> {
+
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+ // normalize and split the line
+ String[] tokens = value.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(String[] args) {
+
+ if(args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if(args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: WordCount <text path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing WordCount example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from a file.");
+ System.out.println(" Usage: WordCount <text path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ // read the text file from given input path
+ return env.readTextFile(textPath);
+ } else {
+ // get default test text data
+ return env.fromElements(
+ "To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,",
+ "And by opposing end them?--To die,--to sleep,--",
+ "No more; and by a sleep to say we end",
+ "The heartache, and the thousand natural shocks",
+ "That flesh is heir to,--'tis a consummation",
+ "Devoutly to be wish'd. To die,--to sleep;--",
+ "To sleep! perchance to dream:--ay, there's the rub;",
+ "For in that sleep of death what dreams may come,",
+ "When we have shuffled off this mortal coil,",
+ "Must give us pause: there's the respect",
+ "That makes calamity of so long life;",
+ "For who would bear the whips and scorns of time,",
+ "The oppressor's wrong, the proud man's contumely,",
+ "The pangs of despis'd love, the law's delay,",
+ "The insolence of office, and the spurns",
+ "That patient merit of the unworthy takes,",
+ "When he himself might his quietus make",
+ "With a bare bodkin? who would these fardels bear,",
+ "To grunt and sweat under a weary life,",
+ "But that the dread of something after death,--",
+ "The undiscover'd country, from whose bourn",
+ "No traveller returns,--puzzles the will,",
+ "And makes us rather bear those ills we have",
+ "Than fly to others that we know not of?",
+ "Thus conscience does make cowards of us all;",
+ "And thus the native hue of resolution",
+ "Is sicklied o'er with the pale cast of thought;",
+ "And enterprises of great pith and moment,",
+ "With this regard, their currents turn awry,",
+ "And lose the name of action.--Soft you now!",
+ "The fair Ophelia!--Nymph, in thy orisons",
+ "Be all my sins remember'd."
+ );
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/test/resources/test.jar
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/resources/test.jar b/stratosphere-clients/src/test/resources/test.jar
deleted file mode 100644
index d0ce39b..0000000
Binary files a/stratosphere-clients/src/test/resources/test.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-tests/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-tests/pom.xml b/stratosphere-tests/pom.xml
index 6029ec3..7dec412 100644
--- a/stratosphere-tests/pom.xml
+++ b/stratosphere-tests/pom.xml
@@ -86,6 +86,11 @@
</goals>
</execution>
</executions>
+ <configuration>
+ <excludes>
+ <exclude>**/src/test/java/eu/stratosphere/test/util/testjar/*.java</exclude>
+ </excludes>
+ </configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -114,6 +119,68 @@
<perCoreThreadCount>false</perCoreThreadCount>
</configuration>
</plugin>
+
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>create-test-dependency</id>
+ <phase>process-test-classes</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>eu.stratosphere.test.util.testjar.KMeansForTest</mainClass>
+ </manifest>
+ </archive>
+ <finalName>maven</finalName>
+ <attach>false</attach>
+ <descriptors>
+ <descriptor>src/test/assembly/test-assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
+
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>
+ org.apache.maven.plugins
+ </groupId>
+ <artifactId>
+ maven-assembly-plugin
+ </artifactId>
+ <versionRange>
+ [2.4,)
+ </versionRange>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-tests/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/assembly/test-assembly.xml b/stratosphere-tests/src/test/assembly/test-assembly.xml
new file mode 100644
index 0000000..2b7f111
--- /dev/null
+++ b/stratosphere-tests/src/test/assembly/test-assembly.xml
@@ -0,0 +1,30 @@
+<!--
+ Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+
+ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations under the License.
+-->
+
+<assembly>
+ <id>test-jar</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${project.build.testOutputDirectory}</directory>
+ <outputDirectory>/</outputDirectory>
+ <!--modify/add include to match your package(s) -->
+ <includes>
+ <include>eu/stratosphere/test/util/testjar/**</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
index ba86bde..3626ba7 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
@@ -14,7 +14,6 @@ package eu.stratosphere.test.localDistributed;
import java.io.File;
import java.io.FileWriter;
-import java.net.URL;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import org.junit.Assert;
@@ -54,21 +53,20 @@ public class PackagedProgramEndToEndITCase {
fwClusters.write(KMeansData.INITIAL_CENTERS);
fwClusters.close();
- URL jarFileURL = getClass().getResource("/KMeansForTest.jar");
- String jarPath = jarFileURL.getFile();
+ String jarPath = "target/maven-test-jar.jar";
// run KMeans
cluster.setNumTaskManager(2);
cluster.start();
RemoteExecutor ex = new RemoteExecutor("localhost", 6498);
-
+
ex.executeJar(jarPath,
- "eu.stratosphere.examples.scala.testing.KMeansForTest",
- new String[] {"4",
+ "eu.stratosphere.test.util.testjar.KMeansForTest",
+ new String[] {
points.toURI().toString(),
clusters.toURI().toString(),
outFile.toURI().toString(),
- "1"});
+ "25"});
points.delete();
clusters.delete();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
new file mode 100644
index 0000000..c13ddc8
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
@@ -0,0 +1,294 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.util.testjar;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.common.Program;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.IterativeDataSet;
+import eu.stratosphere.api.java.RemoteEnvironment;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.functions.ReduceFunction;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.api.java.tuple.Tuple3;
+import eu.stratosphere.configuration.Configuration;
+
+@SuppressWarnings("serial")
+public class KMeansForTest implements Program{
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+
+ @Override
+ public Plan getPlan(String... args) {
+ if(!parseParameters(args)) {
+ throw new RuntimeException("Unable to parse the arguments");
+ }
+
+ // set up execution environment
+ ExecutionEnvironment env = new RemoteEnvironment("localhost", 1, null);
+
+ // get input data
+ DataSet<Point> points = getPointDataSet(env);
+ DataSet<Centroid> centroids = getCentroidDataSet(env);
+
+ // set number of bulk iterations for KMeans algorithm
+ IterativeDataSet<Centroid> loop = centroids.iterate(numIterations);
+
+ DataSet<Centroid> newCentroids = points
+ // compute closest centroid for each point
+ .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
+ // count and sum point coordinates for each centroid
+ .map(new CountAppender())
+ .groupBy(0).reduce(new CentroidAccumulator())
+ // compute new centroids from point counts and coordinate sums
+ .map(new CentroidAverager());
+
+ // feed new centroids back into next iteration
+ DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
+
+ DataSet<Tuple2<Integer, Point>> clusteredPoints = points
+ // assign points to final clusters
+ .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
+
+ // emit result
+ if(fileOutput) {
+ clusteredPoints.writeAsCsv(outputPath, "\n", " ");
+ } else {
+ clusteredPoints.print();
+ }
+ return env.createProgramPlan();
+ }
+
+ // *************************************************************************
+ // DATA TYPES
+ // *************************************************************************
+
+ /**
+ * A simple two-dimensional point.
+ */
+ public static class Point implements Serializable {
+
+ public double x, y;
+
+ public Point() {}
+
+ public Point(double x, double y) {
+ this.x = x;
+ this.y = y;
+ }
+
+ public Point add(Point other) {
+ x += other.x;
+ y += other.y;
+ return this;
+ }
+
+ public Point div(long val) {
+ x /= val;
+ y /= val;
+ return this;
+ }
+
+ public double euclideanDistance(Point other) {
+ return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
+ }
+
+ public void clear() {
+ x = y = 0.0;
+ }
+
+ @Override
+ public String toString() {
+ return x + " " + y;
+ }
+ }
+
+ /**
+ * A simple two-dimensional centroid, basically a point with an ID.
+ */
+ public static class Centroid extends Point {
+
+ public int id;
+
+ public Centroid() {}
+
+ public Centroid(int id, double x, double y) {
+ super(x,y);
+ this.id = id;
+ }
+
+ public Centroid(int id, Point p) {
+ super(p.x, p.y);
+ this.id = id;
+ }
+
+ @Override
+ public String toString() {
+ return id + " " + super.toString();
+ }
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /** Converts a Tuple2<Double,Double> into a Point. */
+ public static final class TuplePointConverter extends MapFunction<Tuple2<Double, Double>, Point> {
+
+ @Override
+ public Point map(Tuple2<Double, Double> t) throws Exception {
+ return new Point(t.f0, t.f1);
+ }
+ }
+
+ /** Converts a Tuple3<Integer, Double,Double> into a Centroid. */
+ public static final class TupleCentroidConverter extends MapFunction<Tuple3<Integer, Double, Double>, Centroid> {
+
+ @Override
+ public Centroid map(Tuple3<Integer, Double, Double> t) throws Exception {
+ return new Centroid(t.f0, t.f1, t.f2);
+ }
+ }
+
+ /** Determines the closest cluster center for a data point. */
+ public static final class SelectNearestCenter extends MapFunction<Point, Tuple2<Integer, Point>> {
+ private Collection<Centroid> centroids;
+
+ /** Reads the centroid values from a broadcast variable into a collection. */
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
+ }
+
+ @Override
+ public Tuple2<Integer, Point> map(Point p) throws Exception {
+
+ double minDistance = Double.MAX_VALUE;
+ int closestCentroidId = -1;
+
+ // check all cluster centers
+ for (Centroid centroid : centroids) {
+ // compute distance
+ double distance = p.euclideanDistance(centroid);
+
+ // update nearest cluster if necessary
+ if (distance < minDistance) {
+ minDistance = distance;
+ closestCentroidId = centroid.id;
+ }
+ }
+
+ // emit a new record with the center id and the data point.
+ return new Tuple2<Integer, Point>(closestCentroidId, p);
+ }
+ }
+
+ /** Appends a count variable to the tuple. */
+ public static final class CountAppender extends MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> {
+
+ @Override
+ public Tuple3<Integer, Point, Long> map(Tuple2<Integer, Point> t) {
+ return new Tuple3<Integer, Point, Long>(t.f0, t.f1, 1L);
+ }
+ }
+
+ /** Sums and counts point coordinates. */
+ public static final class CentroidAccumulator extends ReduceFunction<Tuple3<Integer, Point, Long>> {
+
+ @Override
+ public Tuple3<Integer, Point, Long> reduce(Tuple3<Integer, Point, Long> val1, Tuple3<Integer, Point, Long> val2) {
+ return new Tuple3<Integer, Point, Long>(val1.f0, val1.f1.add(val2.f1), val1.f2 + val2.f2);
+ }
+ }
+
+ /** Computes new centroid from coordinate sum and count of points. */
+ public static final class CentroidAverager extends MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
+
+ @Override
+ public Centroid map(Tuple3<Integer, Point, Long> value) {
+ return new Centroid(value.f0, value.f1.div(value.f2));
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String pointsPath = null;
+ private static String centersPath = null;
+ private static String outputPath = null;
+ private static int numIterations = 10;
+
+ private static boolean parseParameters(String[] programArguments) {
+
+ if(programArguments.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if(programArguments.length == 4) {
+ pointsPath = programArguments[0];
+ centersPath = programArguments[1];
+ outputPath = programArguments[2];
+ numIterations = Integer.parseInt(programArguments[3]);
+ } else {
+ System.err.println("Usage: KMeans <points path> <centers path> <result path> <num iterations>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing K-Means example with default parameters and built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" We provide a data generator to create synthetic input files for this program.");
+ System.out.println(" Usage: KMeans <points path> <centers path> <result path> <num iterations>");
+ }
+ return true;
+ }
+
+ private static DataSet<Point> getPointDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ // read points from CSV file
+ return env.readCsvFile(pointsPath)
+ .fieldDelimiter('|')
+ .includeFields(true, true)
+ .types(Double.class, Double.class)
+ .map(new TuplePointConverter());
+ } else {
+ throw new UnsupportedOperationException("Use file output");
+ }
+ }
+
+ private static DataSet<Centroid> getCentroidDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ return env.readCsvFile(centersPath)
+ .fieldDelimiter('|')
+ .includeFields(true, true, true)
+ .types(Integer.class, Double.class, Double.class)
+ .map(new TupleCentroidConverter());
+ } else {
+ throw new UnsupportedOperationException("Use file output");
+ }
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-tests/src/test/resources/KMeansForTest.jar
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/resources/KMeansForTest.jar b/stratosphere-tests/src/test/resources/KMeansForTest.jar
deleted file mode 100644
index 34683ad..0000000
Binary files a/stratosphere-tests/src/test/resources/KMeansForTest.jar and /dev/null differ