You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:08 UTC

[26/60] Renamed java examples package

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java
deleted file mode 100644
index 10cf748..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.example.java.graph.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/**
- * Provides the default data sets used for the PageRank example program.
- * The default data sets are used, if no parameters are given to the program.
- *
- */
-public class PageRankData {
-
-	public static final Object[][] EDGES = {
-		{1L, 2L},
-		{1L, 15L},
-		{2L, 3L},
-		{2L, 4L},
-		{2L, 5L},
-		{2L, 6L},
-		{2L, 7L},
-		{3L, 13L},
-		{4L, 2L},
-		{5L, 11L},
-		{5L, 12L},
-		{6L, 1L},
-		{6L, 7L},
-		{6L, 8L},
-		{7L, 1L},
-		{7L, 8L},
-		{8L, 1L},
-		{8L, 9L},
-		{8L, 10L},
-		{9L, 14L},
-		{9L, 1L},
-		{10L, 1L},
-		{10L, 13L},
-		{11L, 12L},
-		{11L, 1L},
-		{12L, 1L},
-		{13L, 14L},
-		{14L, 12L},
-		{15L, 1L},
-	};
-	
-	private static long numPages = 15;
-	
-	public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
-		
-		List<Tuple2<Long, Long>> edges = new ArrayList<Tuple2<Long, Long>>();
-		for(Object[] e : EDGES) {
-			edges.add(new Tuple2<Long, Long>((Long)e[0], (Long)e[1]));
-		}
-		return env.fromCollection(edges);
-	}
-	
-	public static DataSet<Long> getDefaultPagesDataSet(ExecutionEnvironment env) {
-		return env.generateSequence(1, 15);
-	}
-	
-	public static long getNumberOfPages() {
-		return numPages;
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/misc/PiEstimation.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/misc/PiEstimation.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/misc/PiEstimation.java
deleted file mode 100644
index ef336da..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/misc/PiEstimation.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.example.java.misc;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/** 
- * Estimates the value of Pi using the Monte Carlo method.
- * The area of a circle is Pi * R^2, R being the radius of the circle 
- * The area of a square is 4 * R^2, where the length of the square's edge is 2*R.
- * 
- * Thus Pi = 4 * (area of circle / area of square).
- * 
- * The idea is to find a way to estimate the circle to square area ratio.
- * The Monte Carlo method suggests collecting random points (within the square)
- * and then counting the number of points that fall within the circle
- * 
- * <pre>
- * {@code
- * x = Math.random()
- * y = Math.random()
- * 
- * x * x + y * y < 1
- * }
- * </pre>
- */
-@SuppressWarnings("serial")
-public class PiEstimation implements java.io.Serializable {
-	
-	
-	public static void main(String[] args) throws Exception {
-
-		final long numSamples = args.length > 0 ? Long.parseLong(args[0]) : 1000000;
-		
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		
-		// count how many of the samples would randomly fall into
-		// the unit circle
-		DataSet<Long> count = 
-				env.generateSequence(1, numSamples)
-				.map(new Sampler())
-				.reduce(new SumReducer());
-
-		// the ratio of the unit circle surface to 4 times the unit square is pi
-		DataSet<Double> pi = count
-				.map(new MapFunction<Long, Double>() {
-					public Double map(Long value) {
-						return value * 4.0 / numSamples;
-					}
-				});
-
-		System.out.println("We estimate Pi to be:");
-		pi.print();
-
-		env.execute();
-	}
-
-	//*************************************************************************
-	//     USER FUNCTIONS
-	//*************************************************************************
-	
-	
-	/** 
-	 * Sampler randomly emits points that fall within a square of edge x * y.
-	 * It calculates the distance to the center of a virtually centered circle of radius x = y = 1
-	 * If the distance is less than 1, then and only then does it returns a 1.
-	 */
-	public static class Sampler implements MapFunction<Long, Long> {
-
-		@Override
-		public Long map(Long value) throws Exception{
-			double x = Math.random();
-			double y = Math.random();
-			return (x * x + y * y) < 1 ? 1L : 0L;
-		}
-	}
-
-	
-	/** 
-	 * Simply sums up all long values.
-	 */
-	public static final class SumReducer implements ReduceFunction<Long>{
-
-		@Override
-		public Long reduce(Long value1, Long value2) throws Exception {
-			return value1 + value2;
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
deleted file mode 100644
index 6ef6270..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.example.java.ml;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.example.java.ml.util.LinearRegressionData;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-
-/**
- * This example implements a basic Linear Regression  to solve the y = theta0 + theta1*x problem using batch gradient descent algorithm.
- *
- * <p>
- * Linear Regression with BGD(batch gradient descent) algorithm is an iterative clustering algorithm and works as follows:<br>
- * Giving a data set and target set, the BGD try to find out the best parameters for the data set to fit the target set.
- * In each iteration, the algorithm computes the gradient of the cost function and use it to update all the parameters.
- * The algorithm terminates after a fixed number of iterations (as in this implementation)
- * With enough iteration, the algorithm can minimize the cost function and find the best parameters
- * This is the Wikipedia entry for the <a href = "http://en.wikipedia.org/wiki/Linear_regression">Linear regression</a> and <a href = "http://en.wikipedia.org/wiki/Gradient_descent">Gradient descent algorithm</a>.
- * 
- * <p>
- * This implementation works on one-dimensional data. And find the two-dimensional theta.<br>
- * It find the best Theta parameter to fit the target.
- * 
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <ul>
- * <li>Data points are represented as two double values separated by a blank character. The first one represent the X(the training data) and the second represent the Y(target).
- * Data points are separated by newline characters.<br>
- * For example <code>"-0.02 -0.04\n5.3 10.6\n"</code> gives two data points (x=-0.02, y=-0.04) and (x=5.3, y=10.6).
- * </ul>
- * 
- * <p>
- * This example shows how to use:
- * <ul>
- * <li> Bulk iterations
- * <li> Broadcast variables in bulk iterations
- * <li> Custom Java objects (PoJos)
- * </ul>
- */
-@SuppressWarnings("serial")
-public class LinearRegression {
-
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-
-	public static void main(String[] args) throws Exception{
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		// set up execution environment
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// get input x data from elements
-		DataSet<Data> data = getDataSet(env);
-
-		// get the parameters from elements
-		DataSet<Params> parameters = getParamsDataSet(env);
-
-		// set number of bulk iterations for SGD linear Regression
-		IterativeDataSet<Params> loop = parameters.iterate(numIterations);
-
-		DataSet<Params> new_parameters = data
-				// compute a single step using every sample
-				.map(new SubUpdate()).withBroadcastSet(loop, "parameters")
-				// sum up all the steps
-				.reduce(new UpdateAccumulator())
-				// average the steps and update all parameters
-				.map(new Update());
-
-		// feed new parameters back into next iteration
-		DataSet<Params> result = loop.closeWith(new_parameters);
-
-		// emit result
-		if(fileOutput) {
-			result.writeAsText(outputPath);
-		} else {
-			result.print();
-		}
-
-		// execute program
-		env.execute("Linear Regression example");
-
-	}
-
-	// *************************************************************************
-	//     DATA TYPES
-	// *************************************************************************
-
-	/**
-	 * A simple data sample, x means the input, and y means the target.
-	 */
-	public static class Data implements Serializable{
-		public double x,y;
-
-		public Data() {};
-
-		public Data(double x ,double y){
-			this.x = x;
-			this.y = y;
-		}
-
-		@Override
-		public String toString() {
-			return "(" + x + "|" + y + ")";
-		}
-
-	}
-
-	/**
-	 * A set of parameters -- theta0, theta1.
-	 */
-	public static class Params implements Serializable{
-
-		private double theta0,theta1;
-
-		public Params(){};
-
-		public Params(double x0, double x1){
-			this.theta0 = x0;
-			this.theta1 = x1;
-		}
-
-		@Override
-		public String toString() {
-			return theta0 + " " + theta1;
-		}
-
-		public double getTheta0() {
-			return theta0;
-		}
-
-		public double getTheta1() {
-			return theta1;
-		}
-
-		public void setTheta0(double theta0) {
-			this.theta0 = theta0;
-		}
-
-		public void setTheta1(double theta1) {
-			this.theta1 = theta1;
-		}
-
-		public Params div(Integer a){
-			this.theta0 = theta0 / a ;
-			this.theta1 = theta1 / a ;
-			return this;
-		}
-
-	}
-
-	// *************************************************************************
-	//     USER FUNCTIONS
-	// *************************************************************************
-
-	/** Converts a Tuple2<Double,Double> into a Data. */
-	public static final class TupleDataConverter implements MapFunction<Tuple2<Double, Double>, Data> {
-
-		@Override
-		public Data map(Tuple2<Double, Double> t) throws Exception {
-			return new Data(t.f0, t.f1);
-		}
-	}
-
-	/** Converts a Tuple2<Double,Double> into a Params. */
-	public static final class TupleParamsConverter implements MapFunction<Tuple2<Double, Double>,Params> {
-
-		@Override
-		public Params map(Tuple2<Double, Double> t)throws Exception {
-			return new Params(t.f0,t.f1);
-		}
-	}
-
-	/**
-	 * Compute a single BGD type update for every parameters.
-	 */
-	public static class SubUpdate extends RichMapFunction<Data,Tuple2<Params,Integer>> {
-
-		private Collection<Params> parameters; 
-
-		private Params parameter;
-
-		private int count = 1;
-
-		/** Reads the parameters from a broadcast variable into a collection. */
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			this.parameters = getRuntimeContext().getBroadcastVariable("parameters");
-		}
-
-		@Override
-		public Tuple2<Params, Integer> map(Data in) throws Exception {
-
-			for(Params p : parameters){
-				this.parameter = p; 
-			}
-
-			double theta_0 = parameter.theta0 - 0.01*((parameter.theta0 + (parameter.theta1*in.x)) - in.y);
-			double theta_1 = parameter.theta1 - 0.01*(((parameter.theta0 + (parameter.theta1*in.x)) - in.y) * in.x);
-
-			return new Tuple2<Params,Integer>(new Params(theta_0,theta_1),count);
-		}
-	}
-
-	/**  
-	 * Accumulator all the update.
-	 * */
-	public static class UpdateAccumulator implements ReduceFunction<Tuple2<Params, Integer>> {
-
-		@Override
-		public Tuple2<Params, Integer> reduce(Tuple2<Params, Integer> val1, Tuple2<Params, Integer> val2) {
-
-			double new_theta0 = val1.f0.theta0 + val2.f0.theta0;
-			double new_theta1 = val1.f0.theta1 + val2.f0.theta1;
-			Params result = new Params(new_theta0,new_theta1);
-			return new Tuple2<Params, Integer>( result, val1.f1 + val2.f1);
-
-		}
-	}
-
-	/**
-	 * Compute the final update by average them.
-	 */
-	public static class Update implements MapFunction<Tuple2<Params, Integer>,Params> {
-
-		@Override
-		public Params map(Tuple2<Params, Integer> arg0) throws Exception {
-
-			return arg0.f0.div(arg0.f1);
-
-		}
-
-	}
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String dataPath = null;
-	private static String outputPath = null;
-	private static int numIterations = 10;
-
-	private static boolean parseParameters(String[] programArguments) {
-
-		if(programArguments.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-			if(programArguments.length == 3) {
-				dataPath = programArguments[0];
-				outputPath = programArguments[1];
-				numIterations = Integer.parseInt(programArguments[2]);
-			} else {
-				System.err.println("Usage: LinearRegression <data path> <result path> <num iterations>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing Linear Regression example with default parameters and built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  We provide a data generator to create synthetic input files for this program.");
-			System.out.println("  Usage: LinearRegression <data path> <result path> <num iterations>");
-		}
-		return true;
-	}
-
-	private static DataSet<Data> getDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			// read data from CSV file
-			return env.readCsvFile(dataPath)
-					.fieldDelimiter(' ')
-					.includeFields(true, true)
-					.types(Double.class, Double.class)
-					.map(new TupleDataConverter());
-		} else {
-			return LinearRegressionData.getDefaultDataDataSet(env);
-		}
-	}
-
-	private static DataSet<Params> getParamsDataSet(ExecutionEnvironment env) {
-
-		return LinearRegressionData.getDefaultParamsDataSet(env);
-
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/util/LinearRegressionData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/util/LinearRegressionData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/util/LinearRegressionData.java
deleted file mode 100644
index 31e71f5..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/util/LinearRegressionData.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.example.java.ml.util;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.example.java.ml.LinearRegression.Data;
-import org.apache.flink.example.java.ml.LinearRegression.Params;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Provides the default data sets used for the Linear Regression example
- * program. The default data sets are used, if no parameters are given to the
- * program.
- */
-public class LinearRegressionData {
-
-	// We have the data as object arrays so that we can also generate Scala Data
-	// Sources from it.
-	public static final Object[][] PARAMS = new Object[][] { new Object[] {
-			0.0, 0.0 } };
-
-	public static final Object[][] DATA = new Object[][] {
-			new Object[] { 0.5, 1.0 }, new Object[] { 1.0, 2.0 },
-			new Object[] { 2.0, 4.0 }, new Object[] { 3.0, 6.0 },
-			new Object[] { 4.0, 8.0 }, new Object[] { 5.0, 10.0 },
-			new Object[] { 6.0, 12.0 }, new Object[] { 7.0, 14.0 },
-			new Object[] { 8.0, 16.0 }, new Object[] { 9.0, 18.0 },
-			new Object[] { 10.0, 20.0 }, new Object[] { -0.08, -0.16 },
-			new Object[] { 0.13, 0.26 }, new Object[] { -1.17, -2.35 },
-			new Object[] { 1.72, 3.45 }, new Object[] { 1.70, 3.41 },
-			new Object[] { 1.20, 2.41 }, new Object[] { -0.59, -1.18 },
-			new Object[] { 0.28, 0.57 }, new Object[] { 1.65, 3.30 },
-			new Object[] { -0.55, -1.08 } };
-
-	public static DataSet<Params> getDefaultParamsDataSet(
-			ExecutionEnvironment env) {
-		List<Params> paramsList = new LinkedList<Params>();
-		for (Object[] params : PARAMS) {
-			paramsList.add(new Params((Double) params[0], (Double) params[1]));
-		}
-		return env.fromCollection(paramsList);
-	}
-
-	public static DataSet<Data> getDefaultDataDataSet(ExecutionEnvironment env) {
-
-		List<Data> dataList = new LinkedList<Data>();
-		for (Object[] data : DATA) {
-			dataList.add(new Data((Double) data[0], (Double) data[1]));
-		}
-		return env.fromCollection(dataList);
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/util/LinearRegressionDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/util/LinearRegressionDataGenerator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/util/LinearRegressionDataGenerator.java
deleted file mode 100644
index 28001ba..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/util/LinearRegressionDataGenerator.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.example.java.ml.util;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.text.DecimalFormat;
-import java.util.Locale;
-import java.util.Random;
-
-/**
- * Generates data for the {@link org.apache.flink.example.java.ml.LinearRegression} example program.
- */
-public class LinearRegressionDataGenerator {
-
-	static {
-		Locale.setDefault(Locale.US);
-	}
-
-	private static final String POINTS_FILE = "data";
-	private static final long DEFAULT_SEED = 4650285087650871364L;
-	private static final int DIMENSIONALITY = 1;
-	private static final DecimalFormat FORMAT = new DecimalFormat("#0.00");
-	private static final char DELIMITER = ' ';
-
-	/**
-	 * Main method to generate data for the {@link org.apache.flink.example.java.ml.LinearRegression} example program.
-	 * <p>
-	 * The generator creates to files:
-	 * <ul>
-	 * <li><code>{tmp.dir}/data</code> for the data points
-	 * </ul> 
-	 * 
-	 * @param args 
-	 * <ol>
-	 * <li>Int: Number of data points
-	 * <li><b>Optional</b> Long: Random seed
-	 * </ol>
-	 */
-	public static void main(String[] args) throws IOException {
-
-		// check parameter count
-		if (args.length < 1) {
-			System.out.println("LinearRegressionDataGenerator <numberOfDataPoints> [<seed>]");
-			System.exit(1);
-		}
-
-		// parse parameters
-		final int numDataPoints = Integer.parseInt(args[0]);
-		final long firstSeed = args.length > 1 ? Long.parseLong(args[4]) : DEFAULT_SEED;
-		final Random random = new Random(firstSeed);
-		final String tmpDir = System.getProperty("java.io.tmpdir");
-
-		// write the points out
-		BufferedWriter pointsOut = null;
-		try {
-			pointsOut = new BufferedWriter(new FileWriter(new File(tmpDir+"/"+POINTS_FILE)));
-			StringBuilder buffer = new StringBuilder();
-
-			// DIMENSIONALITY + 1 means that the number of x(dimensionality) and target y
-			double[] point = new double[DIMENSIONALITY+1];
-
-			for (int i = 1; i <= numDataPoints; i++) {
-				point[0] = random.nextGaussian();
-				point[1] = 2 * point[0] + 0.01*random.nextGaussian();
-				writePoint(point, buffer, pointsOut);
-			}
-
-		}
-		finally {
-			if (pointsOut != null) {
-				pointsOut.close();
-			}
-		}
-
-		System.out.println("Wrote "+numDataPoints+" data points to "+tmpDir+"/"+POINTS_FILE);
-	}
-
-
-	private static void writePoint(double[] data, StringBuilder buffer, BufferedWriter out) throws IOException {
-		buffer.setLength(0);
-
-		// write coordinates
-		for (int j = 0; j < data.length; j++) {
-			buffer.append(FORMAT.format(data[j]));
-			if(j < data.length - 1) {
-				buffer.append(DELIMITER);
-			}
-		}
-
-		out.write(buffer.toString());
-		out.newLine();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
deleted file mode 100644
index a379bf8..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.example.java.relational;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-
-/**
- * This program filters lines from a CSV file with empty fields. In doing so, it counts the number of empty fields per
- * column within a CSV file using a custom accumulator for vectors. In this context, empty fields are those, that at
- * most contain whitespace characters like space and tab.
- * <p>
- * The input file is a plain text CSV file with the semicolon as field separator and double quotes as field delimiters
- * and three columns. See {@link #getDataSet(ExecutionEnvironment)} for configuration.
- * <p>
- * Usage: <code>FilterAndCountIncompleteLines [&lt;input file path&gt; [&lt;result path&gt;]]</code> <br>
- * <p>
- * This example shows how to use:
- * <ul>
- * <li>custom accumulators
- * <li>tuple data types
- * <li>inline-defined functions
- * </ul>
- */
-@SuppressWarnings("serial")
-public class EmptyFieldsCountAccumulator {
-
-	// *************************************************************************
-	// PROGRAM
-	// *************************************************************************
-
-	private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
-
-	public static void main(final String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// get the data set
-		final DataSet<Tuple> file = getDataSet(env);
-
-		// filter lines with empty fields
-		final DataSet<Tuple> filteredLines = file.filter(new EmptyFieldFilter());
-
-		// Here, we could do further processing with the filtered lines...
-		
-		// output the filtered lines
-		if (outputPath == null) {
-			filteredLines.print();
-		} else {
-			filteredLines.writeAsCsv(outputPath);
-		}
-
-		// execute program
-		final JobExecutionResult result = env.execute("Accumulator example");
-
-		// get the accumulator result via its registration key
-		final List<Integer> emptyFields = result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
-		System.out.format("Number of detected empty fields per column: %s\n", emptyFields);
-
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static String filePath;
-	private static String outputPath;
-
-	private static boolean parseParameters(final String[] programArguments) {
-
-		if (programArguments.length >= 3) {
-			System.err.println("Usage: FilterAndCountIncompleteLines [<input file path> [<result path>]]");
-			return false;
-		}
-
-		if (programArguments.length >= 1) {
-			filePath = programArguments[0];
-			if (programArguments.length == 2) {
-				outputPath = programArguments[1];
-			}
-		}
-
-		return true;
-	}
-
-	@SuppressWarnings("unchecked")
-	private static DataSet<Tuple> getDataSet(final ExecutionEnvironment env) {
-
-		DataSet<? extends Tuple> source;
-		if (filePath == null) {
-			source = env.fromCollection(getExampleInputTuples());
-
-		} else {
-			source = env
-					.readCsvFile(filePath)
-					.fieldDelimiter(';')
-					.types(String.class, String.class, String.class);
-
-		}
-
-		return (DataSet<Tuple>) source;
-	}
-
-	private static Collection<Tuple3<String, String, String>> getExampleInputTuples() {
-		Collection<Tuple3<String, String, String>> inputTuples = new ArrayList<Tuple3<String, String, String>>();
-		inputTuples.add(new Tuple3<String, String, String>("John", "Doe", "Foo Str."));
-		inputTuples.add(new Tuple3<String, String, String>("Joe", "Johnson", ""));
-		inputTuples.add(new Tuple3<String, String, String>(null, "Kate Morn", "Bar Blvd."));
-		inputTuples.add(new Tuple3<String, String, String>("Tim", "Rinny", ""));
-		inputTuples.add(new Tuple3<String, String, String>("Alicia", "Jackson", "  "));
-		return inputTuples;
-	}
-
-	/**
-	 * This function filters all incoming tuples that have one or more empty fields.
-	 * In doing so, it also counts the number of empty fields per attribute with an accumulator (registered under 
-	 * {@link EmptyFieldsCountAccumulator#EMPTY_FIELD_ACCUMULATOR}).
-	 */
-	public static final class EmptyFieldFilter extends RichFilterFunction<Tuple> {
-
-		// create a new accumulator in each filter function instance
-		// accumulators can be merged later on
-		private final VectorAccumulator emptyFieldCounter = new VectorAccumulator();
-
-		@Override
-		public void open(final Configuration parameters) throws Exception {
-			super.open(parameters);
-
-			// register the accumulator instance
-			getRuntimeContext().addAccumulator(EMPTY_FIELD_ACCUMULATOR,
-					this.emptyFieldCounter);
-		}
-
-		@Override
-		public boolean filter(final Tuple t) {
-			boolean containsEmptyFields = false;
-
-			// iterate over the tuple fields looking for empty ones
-			for (int pos = 0; pos < t.getArity(); pos++) {
-
-				final String field = t.getField(pos);
-				if (field == null || field.trim().isEmpty()) {
-					containsEmptyFields = true;
-
-					// if an empty field is encountered, update the
-					// accumulator
-					this.emptyFieldCounter.add(pos);
-				}
-			}
-
-			return !containsEmptyFields;
-		}
-	}
-
-	/**
-	 * This accumulator lets you increase vector components distributedly. The {@link #add(Integer)} method lets you
-	 * increase the <i>n</i>-th vector component by 1, whereat <i>n</i> is the methods parameter. The size of the vector
-	 * is automatically managed.
-	 */
-	public static class VectorAccumulator implements Accumulator<Integer, List<Integer>> {
-
-		/** Stores the accumulated vector components. */
-		private final List<Integer> resultVector = new ArrayList<Integer>();
-
-		/**
-		 * Increases the result vector component at the specified position by 1.
-		 */
-		@Override
-		public void add(final Integer position) {
-			updateResultVector(position, 1);
-		}
-
-		/**
-		 * Increases the result vector component at the specified position by the specified delta.
-		 */
-		private void updateResultVector(final int position, final int delta) {
-			// inflate the vector to contain the given position
-			while (this.resultVector.size() <= position) {
-				this.resultVector.add(0);
-			}
-
-			// increment the component value
-			final int component = this.resultVector.get(position);
-			this.resultVector.set(position, component + delta);
-		}
-
-		@Override
-		public List<Integer> getLocalValue() {
-			return this.resultVector;
-		}
-
-		@Override
-		public void resetLocal() {
-			// clear the result vector if the accumulator instance shall be reused
-			this.resultVector.clear();
-		}
-
-		@Override
-		public void merge(final Accumulator<Integer, List<Integer>> other) {
-			// merge two vector accumulators by adding their up their vector components
-			final List<Integer> otherVector = other.getLocalValue();
-			for (int index = 0; index < otherVector.size(); index++) {
-				updateResultVector(index, otherVector.get(index));
-			}
-		}
-
-		@Override
-		public void write(final DataOutputView out) throws IOException {
-			// binary serialization of the result vector:
-			// [number of components, component 0, component 1, ...]
-			out.writeInt(this.resultVector.size());
-			for (final Integer component : this.resultVector) {
-				out.writeInt(component);
-			}
-		}
-
-		@Override
-		public void read(final DataInputView in) throws IOException {
-			// binary deserialization of the result vector
-			final int size = in.readInt();
-			for (int numReadComponents = 0; numReadComponents < size; numReadComponents++) {
-				final int component = in.readInt();
-				this.resultVector.add(component);
-			}
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java
deleted file mode 100644
index 08a261c..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.example.java.relational;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * This program implements the following relational query on the TPC-H data set.
- * 
- * <p>
- * <code><pre>
- * SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
- *   FROM orders, lineitem
- *   WHERE l_orderkey = o_orderkey
- *     AND o_orderstatus = "X"
- *     AND YEAR(o_orderdate) > Y
- *     AND o_orderpriority LIKE "Z%"
- *   GROUP BY l_orderkey, o_shippriority;
- * </pre></code>
- *        
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator 
- * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
- * 
- * <p>
- * Usage: <code>RelationalQuery &lt;orders-csv path&gt; &lt;lineitem-csv path&gt; &lt;result path&gt;</code><br>
- *  
- * <p>
- * This example shows how to use:
- * <ul>
- * <li> tuple data types
- * <li> inline-defined functions
- * <li> projection and join projection
- * <li> build-in aggregation functions
- * </ul>
- */
-@SuppressWarnings("serial")
-public class RelationalQuery {
-	
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-	
-	private static String STATUS_FILTER = "F";
-	private static int YEAR_FILTER = 1993;
-	private static String OPRIO_FILTER = "5";
-	
-	public static void main(String[] args) throws Exception {
-		
-		if(!parseParameters(args)) {
-			return;
-		}
-		
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)
-		DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env);
-
-		// get lineitem data set: (orderkey, extendedprice)
-		DataSet<Tuple2<Integer, Double>> lineitems = getLineitemDataSet(env);
-
-		// orders filtered by year: (orderkey, custkey)
-		DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
-				// filter orders
-				orders.filter(
-								new FilterFunction<Tuple5<Integer, String, String, String, Integer>>() {
-									@Override
-									public boolean filter(Tuple5<Integer, String, String, String, Integer> t) {
-										// status filter
-										if(!t.f1.equals(STATUS_FILTER)) {
-											return false;
-										// year filter
-										} else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) {
-											return false;
-										// order priority filter
-										} else if(!t.f3.startsWith(OPRIO_FILTER)) {
-											return false;
-										}
-										return true;
-									}
-								})
-				// project fields out that are no longer required
-				.project(0,4).types(Integer.class, Integer.class);
-
-		// join orders with lineitems: (orderkey, shippriority, extendedprice)
-		DataSet<Tuple3<Integer, Integer, Double>> lineitemsOfOrders = 
-				ordersFilteredByYear.joinWithHuge(lineitems)
-									.where(0).equalTo(0)
-									.projectFirst(0,1).projectSecond(1)
-									.types(Integer.class, Integer.class, Double.class);
-
-		// extendedprice sums: (orderkey, shippriority, sum(extendedprice))
-		DataSet<Tuple3<Integer, Integer, Double>> priceSums =
-				// group by order and sum extendedprice
-				lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);
-
-		// emit result
-		priceSums.writeAsCsv(outputPath);
-		
-		// execute program
-		env.execute("Relational Query Example");
-		
-	}
-	
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-	
-	private static String ordersPath;
-	private static String lineitemPath;
-	private static String outputPath;
-	
-	private static boolean parseParameters(String[] programArguments) {
-		
-		if(programArguments.length > 0) {
-			if(programArguments.length == 3) {
-				ordersPath = programArguments[0];
-				lineitemPath = programArguments[1];
-				outputPath = programArguments[2];
-			} else {
-				System.err.println("Usage: RelationalQuery <orders-csv path> <lineitem-csv path> <result path>");
-				return false;
-			}
-		} else {
-			System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
-								"  Due to legal restrictions, we can not ship generated data.\n" +
-								"  You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + 
-								"  Usage: RelationalQuery <orders-csv path> <lineitem-csv path> <result path>");
-			return false;
-		}
-		return true;
-	}
-	
-	private static DataSet<Tuple5<Integer, String, String, String, Integer>> getOrdersDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(ordersPath)
-					.fieldDelimiter('|')
-					.includeFields("101011010")
-					.types(Integer.class, String.class, String.class, String.class, Integer.class);
-	}
-
-	private static DataSet<Tuple2<Integer, Double>> getLineitemDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(lineitemPath)
-					.fieldDelimiter('|')
-					.includeFields("1000010000000000")
-					.types(Integer.class, Double.class);
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java
deleted file mode 100644
index 1ff6583..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.example.java.relational;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * This program implements a modified version of the TPC-H query 10.
- * The original query can be found at
- * <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 45).
- * 
- * <p>
- * This program implements the following SQL equivalent:
- * 
- * <p>
- * <code><pre>
- * SELECT 
- *        c_custkey,
- *        c_name, 
- *        c_address,
- *        n_name, 
- *        c_acctbal
- *        SUM(l_extendedprice * (1 - l_discount)) AS revenue,  
- * FROM   
- *        customer, 
- *        orders, 
- *        lineitem, 
- *        nation 
- * WHERE 
- *        c_custkey = o_custkey 
- *        AND l_orderkey = o_orderkey 
- *        AND YEAR(o_orderdate) > '1990' 
- *        AND l_returnflag = 'R' 
- *        AND c_nationkey = n_nationkey 
- * GROUP BY 
- *        c_custkey, 
- *        c_name, 
- *        c_acctbal, 
- *        n_name, 
- *        c_address
- * </pre></code>
- *        
- * <p>
- * Compared to the original TPC-H query this version does not print 
- * c_phone and c_comment, only filters by years greater than 1990 instead of
- * a period of 3 months, and does not sort the result by revenue.
- * 
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator 
- * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
- * 
- * <p>
- * Usage: <code>TPCHQuery10 &lt;customer-csv path&gt; &lt;orders-csv path&gt; &lt;lineitem-csv path&gt; &lt;nation-csv path&gt; &lt;result path&gt;</code><br>
- *  
- * <p>
- * This example shows how to use:
- * <ul>
- * <li> tuple data types
- * <li> inline-defined functions
- * <li> projection and join projection
- * <li> build-in aggregation functions
- * </ul>
- */
-@SuppressWarnings("serial")
-public class TPCHQuery10 {
-	
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-	
-	public static void main(String[] args) throws Exception {
-		
-		if(!parseParameters(args)) {
-			return;
-		}
-		
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// get customer data set: (custkey, name, address, nationkey, acctbal) 
-		DataSet<Tuple5<Integer, String, String, Integer, Double>> customers = getCustomerDataSet(env);
-
-		// get orders data set: (orderkey, custkey, orderdate)
-		DataSet<Tuple3<Integer, Integer, String>> orders = getOrdersDataSet(env);
-
-		// get lineitem data set: (orderkey, extendedprice, discount, returnflag)
-		DataSet<Tuple4<Integer, Double, Double, String>> lineitems = getLineitemDataSet(env);
-
-		// get nation data set: (nationkey, name)
-		DataSet<Tuple2<Integer, String>> nations = getNationsDataSet(env);
-
-		// orders filtered by year: (orderkey, custkey)
-		DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
-				// filter by year
-				orders.filter(
-								new FilterFunction<Tuple3<Integer,Integer, String>>() {
-									@Override
-									public boolean filter(Tuple3<Integer, Integer, String> t) {
-										int year = Integer.parseInt(t.f2.substring(0, 4));
-										return year > 1990;
-									}
-								})
-				// project fields out that are no longer required
-				.project(0,1).types(Integer.class, Integer.class);
-
-		// lineitems filtered by flag: (orderkey, extendedprice, discount)
-		DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag = 
-				// filter by flag
-				lineitems.filter(new FilterFunction<Tuple4<Integer, Double, Double, String>>() {
-										@Override
-										public boolean filter(Tuple4<Integer, Double, Double, String> t)
-												throws Exception {
-											return t.f3.equals("R");
-										}
-								})
-				// project fields out that are no longer required
-				.project(0,1,2).types(Integer.class, Double.class, Double.class);
-
-		// join orders with lineitems: (custkey, extendedprice, discount)
-		DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey = 
-				ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
-									.where(0).equalTo(0)
-									.projectFirst(1).projectSecond(1,2)
-									.types(Integer.class, Double.class, Double.class);
-
-		// aggregate for revenue: (custkey, revenue)
-		DataSet<Tuple2<Integer, Double>> revenueOfCustomerKey = lineitemsOfCustomerKey
-				// calculate the revenue for each item
-				.map(new MapFunction<Tuple3<Integer, Double, Double>, Tuple2<Integer, Double>>() {
-							@Override
-							public Tuple2<Integer, Double> map(Tuple3<Integer, Double, Double> t) {
-								// revenue per item = l_extendedprice * (1 - l_discount)
-								return new Tuple2<Integer, Double>(t.f0, t.f1 * (1 - t.f2));
-							}
-					})
-				// aggregate the revenues per item to revenue per customer
-				.groupBy(0).aggregate(Aggregations.SUM, 1);
-
-		// join customer with nation (custkey, name, address, nationname, acctbal)
-		DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
-						.joinWithTiny(nations)
-						.where(3).equalTo(0)
-						.projectFirst(0,1,2).projectSecond(1).projectFirst(4)
-						.types(Integer.class, String.class, String.class, String.class, Double.class);
-
-		// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
-		DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue = 
-				customerWithNation.join(revenueOfCustomerKey)
-				.where(0).equalTo(0)
-				.projectFirst(0,1,2,3,4).projectSecond(1)
-				.types(Integer.class, String.class, String.class, String.class, Double.class, Double.class);
-
-		// emit result
-		customerWithRevenue.writeAsCsv(outputPath);
-		
-		// execute program
-		env.execute("TPCH Query 10 Example");
-		
-	}
-	
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-	
-	private static String customerPath;
-	private static String ordersPath;
-	private static String lineitemPath;
-	private static String nationPath;
-	private static String outputPath;
-	
-	private static boolean parseParameters(String[] programArguments) {
-		
-		if(programArguments.length > 0) {
-			if(programArguments.length == 5) {
-				customerPath = programArguments[0];
-				ordersPath = programArguments[1];
-				lineitemPath = programArguments[2];
-				nationPath = programArguments[3];
-				outputPath = programArguments[4];
-			} else {
-				System.err.println("Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
-				return false;
-			}
-		} else {
-			System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
-								"  Due to legal restrictions, we can not ship generated data.\n" +
-								"  You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + 
-								"  Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
-			return false;
-		}
-		return true;
-	}
-	
-	private static DataSet<Tuple5<Integer, String, String, Integer, Double>> getCustomerDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(customerPath)
-					.fieldDelimiter('|')
-					.includeFields("11110100")
-					.types(Integer.class, String.class, String.class, Integer.class, Double.class);
-	}
-	
-	private static DataSet<Tuple3<Integer, Integer, String>> getOrdersDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(ordersPath)
-					.fieldDelimiter('|')
-					.includeFields("110010000")
-					.types(Integer.class, Integer.class, String.class);
-	}
-
-	private static DataSet<Tuple4<Integer, Double, Double, String>> getLineitemDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(lineitemPath)
-					.fieldDelimiter('|')
-					.includeFields("1000011010000000")
-					.types(Integer.class, Double.class, Double.class, String.class);
-	}
-	
-	private static DataSet<Tuple2<Integer, String>> getNationsDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(nationPath)
-					.fieldDelimiter('|')
-					.includeFields("1100")
-					.types(Integer.class, String.class);
-	}
-			
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java
deleted file mode 100644
index 4544fd4..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.example.java.relational;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * This program implements a modified version of the TPC-H query 3. The
- * example demonstrates how to assign names to fields by extending the Tuple class.
- * The original query can be found at
- * <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 29).
- *
- * <p>
- * This program implements the following SQL equivalent:
- *
- * <p>
- * <code><pre>
- * SELECT 
- *      l_orderkey, 
- *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
- *      o_orderdate, 
- *      o_shippriority 
- * FROM customer, 
- *      orders, 
- *      lineitem 
- * WHERE
- *      c_mktsegment = '[SEGMENT]' 
- *      AND c_custkey = o_custkey
- *      AND l_orderkey = o_orderkey
- *      AND o_orderdate < date '[DATE]'
- *      AND l_shipdate > date '[DATE]'
- * GROUP BY
- *      l_orderkey, 
- *      o_orderdate, 
- *      o_shippriority;
- * </pre></code>
- *
- * <p>
- * Compared to the original TPC-H query this version does not sort the result by revenue
- * and orderdate.
- *
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator 
- * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
- *
- *  <p>
- * Usage: <code>TPCHQuery3 &lt;lineitem-csv path&gt; &lt;customer-csv path&gt; &lt;orders-csv path&gt; &lt;result path&gt;</code><br>
- *  
- * <p>
- * This example shows how to use:
- * <ul>
- * <li> custom data type derived from tuple data types
- * <li> inline-defined functions
- * <li> build-in aggregation functions
- * </ul>
- */
-@SuppressWarnings("serial")
-public class TPCHQuery3 {
-
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-	
-	public static void main(String[] args) throws Exception {
-		
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		DataSet<Lineitem> li = getLineitemDataSet(env);
-		DataSet<Order> or = getOrdersDataSet(env);
-		DataSet<Customer> cust = getCustomerDataSet(env);
-		
-		// Filter market segment "AUTOMOBILE"
-		cust = cust.filter(
-							new FilterFunction<Customer>() {
-								@Override
-								public boolean filter(Customer value) {
-									return value.getMktsegment().equals("AUTOMOBILE");
-								}
-							});
-
-		// Filter all Orders with o_orderdate < 12.03.1995
-		or = or.filter(
-						new FilterFunction<Order>() {
-							private DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
-							private Date date;
-							
-							{	
-								Calendar cal = Calendar.getInstance();
-								cal.set(1995, 3, 12);
-								date = cal.getTime(); 
-							}
-							
-							@Override
-							public boolean filter(Order value) throws ParseException {
-								Date orderDate = format.parse(value.getOrderdate());
-								return orderDate.before(date);
-							}
-						});
-		
-		// Filter all Lineitems with l_shipdate > 12.03.1995
-		li = li.filter(
-						new FilterFunction<Lineitem>() {
-							private DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
-							private Date date;
-							
-							{
-								Calendar cal = Calendar.getInstance();
-								cal.set(1995, 3, 12);
-								date = cal.getTime();
-							}
-							
-							@Override
-							public boolean filter(Lineitem value) throws ParseException {
-								Date shipDate = format.parse(value.getShipdate());
-								return shipDate.after(date);
-							}
-						});
-
-		// Join customers with orders and package them into a ShippingPriorityItem
-		DataSet<ShippingPriorityItem> customerWithOrders = 
-				cust.join(or)
-					.where(0)
-					.equalTo(0)
-					.with(
-							new JoinFunction<Customer, Order, ShippingPriorityItem>() {
-								@Override
-								public ShippingPriorityItem join(Customer first, Order second) {
-									return new ShippingPriorityItem(0, 0.0, second.getOrderdate(),
-											second.getShippriority(), second.getOrderkey());
-								}
-							});
-		
-		// Join the last join result with Lineitems
-		DataSet<ShippingPriorityItem> joined = 
-				customerWithOrders.join(li)
-									.where(4)
-									.equalTo(0)
-									.with(
-											new JoinFunction<ShippingPriorityItem, Lineitem, ShippingPriorityItem>() {
-												@Override
-												public ShippingPriorityItem join(ShippingPriorityItem first, Lineitem second) {
-													first.setL_Orderkey(second.getOrderkey());
-													first.setRevenue(second.getExtendedprice() * (1 - second.getDiscount()));
-													return first;
-												}
-											});
-		
-		// Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum
-		joined = joined
-				.groupBy(0, 2, 3)
-				.aggregate(Aggregations.SUM, 1);
-		
-		// emit result
-		joined.writeAsCsv(outputPath, "\n", "|");
-		
-		// execute program
-		env.execute("TPCH Query 3 Example");
-		
-	}
-	
-	// *************************************************************************
-	//     DATA TYPES
-	// *************************************************************************
-	
-	public static class Lineitem extends Tuple4<Integer, Double, Double, String> {
-
-		public Integer getOrderkey() { return this.f0; }
-		public Double getDiscount() { return this.f2; }
-		public Double getExtendedprice() { return this.f1; }
-		public String getShipdate() { return this.f3; }
-	}
-
-	public static class Customer extends Tuple2<Integer, String> {
-		
-		public Integer getCustKey() { return this.f0; }
-		public String getMktsegment() { return this.f1; }
-	}
-
-	public static class Order extends Tuple3<Integer, String, Integer> {
-		
-		public Integer getOrderkey() { return this.f0; }
-		public String getOrderdate() { return this.f1; }
-		public Integer getShippriority() { return this.f2; }
-	}
-
-	public static class ShippingPriorityItem extends Tuple5<Integer, Double, String, Integer, Integer> {
-
-		public ShippingPriorityItem() { }
-
-		public ShippingPriorityItem(Integer l_orderkey, Double revenue,
-				String o_orderdate, Integer o_shippriority, Integer o_orderkey) {
-			this.f0 = l_orderkey;
-			this.f1 = revenue;
-			this.f2 = o_orderdate;
-			this.f3 = o_shippriority;
-			this.f4 = o_orderkey;
-		}
-		
-		public Integer getL_Orderkey() { return this.f0; }
-		public void setL_Orderkey(Integer l_orderkey) { this.f0 = l_orderkey; }
-		public Double getRevenue() { return this.f1; }
-		public void setRevenue(Double revenue) { this.f1 = revenue; }
-		
-		public String getOrderdate() { return this.f2; }
-		public Integer getShippriority() { return this.f3; }
-		public Integer getO_Orderkey() { return this.f4; }
-	}
-	
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-	
-	private static String lineitemPath;
-	private static String customerPath;
-	private static String ordersPath;
-	private static String outputPath;
-	
-	private static boolean parseParameters(String[] programArguments) {
-		
-		if(programArguments.length > 0) {
-			if(programArguments.length == 4) {
-				lineitemPath = programArguments[0];
-				customerPath = programArguments[1];
-				ordersPath = programArguments[2];
-				outputPath = programArguments[3];
-			} else {
-				System.err.println("Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>");
-				return false;
-			}
-		} else {
-			System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
-								"  Due to legal restrictions, we can not ship generated data.\n" +
-								"  You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + 
-								"  Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>");
-			return false;
-		}
-		return true;
-	}
-	
-	private static DataSet<Lineitem> getLineitemDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(lineitemPath)
-					.fieldDelimiter('|')
-					.includeFields("1000011000100000")
-					.tupleType(Lineitem.class);
-	}
-	
-	private static DataSet<Customer> getCustomerDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(customerPath)
-					.fieldDelimiter('|')
-					.includeFields("10000010")
-					.tupleType(Customer.class);
-	}
-	
-	private static DataSet<Order> getOrdersDataSet(ExecutionEnvironment env) {
-		return env.readCsvFile(ordersPath)
-					.fieldDelimiter('|')
-					.includeFields("100010010")
-					.tupleType(Order.class);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
deleted file mode 100644
index 9ca6ea9..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.example.java.relational;
-
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.example.java.relational.util.WebLogData;
-import org.apache.flink.example.java.relational.util.WebLogDataGenerator;
-
-/**
- * This program processes web logs and relational data. 
- * It implements the following relational query:
- *
- * <code><pre>
- * SELECT 
- *       r.pageURL, 
- *       r.pageRank, 
- *       r.avgDuration
- * FROM documents d JOIN rankings r
- *                  ON d.url = r.url
- * WHERE CONTAINS(d.text, [keywords]) 
- *       AND r.rank > [rank] 
- *       AND NOT EXISTS 
- *           (
- *              SELECT * FROM Visits v
- *              WHERE v.destUrl = d.url 
- *                    AND v.visitDate < [date]
- *           );
- * </pre></code>
- *
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator.
- * The tables referenced in the query can be generated using the {@link WebLogDataGenerator} and 
- * have the following schemas
- * <code><pre>
- * CREATE TABLE Documents (
- *                url VARCHAR(100) PRIMARY KEY,
- *                contents TEXT );
- *
- * CREATE TABLE Rankings (
- *                pageRank INT,
- *                pageURL VARCHAR(100) PRIMARY KEY,
- *                avgDuration INT );
- *
- * CREATE TABLE Visits (
- *                sourceIP VARCHAR(16),
- *                destURL VARCHAR(100),
- *                visitDate DATE,
- *                adRevenue FLOAT,
- *                userAgent VARCHAR(64),
- *                countryCode VARCHAR(3),
- *                languageCode VARCHAR(6),
- *                searchWord VARCHAR(32),
- *                duration INT );
- * </pre></code>
- * 
- * <p>
- * Usage: <code>WebLogAnalysis &lt;documents path&gt; &lt;ranks path&gt; &lt;visits path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from {@link WebLogData}.
- * 
- * <p>
- * This example shows how to use:
- * <ul>
- * <li> tuple data types
- * <li> projection and join projection
- * <li> the CoGroup transformation for an anti-join
- * </ul>
- * 
- */
-@SuppressWarnings("serial")
-public class WebLogAnalysis {
-	
-	// *************************************************************************
-	//     PROGRAM
-	// *************************************************************************
-	
-	public static void main(String[] args) throws Exception {
-		
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		DataSet<Tuple2<String, String>> documents = getDocumentsDataSet(env);
-		DataSet<Tuple3<Integer, String, Integer>> ranks = getRanksDataSet(env);
-		DataSet<Tuple2<String, String>> visits = getVisitsDataSet(env);
-		
-		// Retain documents with keywords
-		DataSet<Tuple1<String>> filterDocs = documents
-				.filter(new FilterDocByKeyWords())
-				.project(0).types(String.class);
-
-		// Filter ranks by minimum rank
-		DataSet<Tuple3<Integer, String, Integer>> filterRanks = ranks
-				.filter(new FilterByRank());
-
-		// Filter visits by visit date
-		DataSet<Tuple1<String>> filterVisits = visits
-				.filter(new FilterVisitsByDate())
-				.project(0).types(String.class);
-
-		// Join the filtered documents and ranks, i.e., get all URLs with min rank and keywords
-		DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks = 
-				filterDocs.join(filterRanks)
-							.where(0).equalTo(1)
-							.projectSecond(0,1,2)
-							.types(Integer.class, String.class, Integer.class);
-
-		// Anti-join urls with visits, i.e., retain all URLs which have NOT been visited in a certain time
-		DataSet<Tuple3<Integer, String, Integer>> result = 
-				joinDocsRanks.coGroup(filterVisits)
-								.where(1).equalTo(0)
-								.with(new AntiJoinVisits());
-
-		// emit result
-		if(fileOutput) {
-			result.writeAsCsv(outputPath, "\n", "|");
-		} else {
-			result.print();
-		}
-
-		// execute program
-		env.execute("WebLogAnalysis Example");
-		
-	}
-	
-	// *************************************************************************
-	//     USER FUNCTIONS
-	// *************************************************************************
-	
-	/**
-	 * MapFunction that filters for documents that contain a certain set of
-	 * keywords.
-	 */
-	public static class FilterDocByKeyWords implements FilterFunction<Tuple2<String, String>> {
-
-		private static final String[] KEYWORDS = { " editors ", " oscillations " };
-
-		/**
-		 * Filters for documents that contain all of the given keywords and projects the records on the URL field.
-		 *
-		 * Output Format:
-		 * 0: URL
-		 * 1: DOCUMENT_TEXT
-		 */
-		@Override
-		public boolean filter(Tuple2<String, String> value) throws Exception {
-			// FILTER
-			// Only collect the document if all keywords are contained
-			String docText = value.f1;
-			for (String kw : KEYWORDS) {
-				if (!docText.contains(kw)) {
-					return false;
-				}
-			}
-			return true;
-		}
-	}
-
-	/**
-	 * MapFunction that filters for records where the rank exceeds a certain threshold.
-	 */
-	public static class FilterByRank implements FilterFunction<Tuple3<Integer, String, Integer>> {
-
-		private static final int RANKFILTER = 40;
-
-		/**
-		 * Filters for records of the rank relation where the rank is greater
-		 * than the given threshold.
-		 *
-		 * Output Format:
-		 * 0: RANK
-		 * 1: URL
-		 * 2: AVG_DURATION
-		 */
-		@Override
-		public boolean filter(Tuple3<Integer, String, Integer> value) throws Exception {
-			return (value.f0 > RANKFILTER);
-		}
-	}
-
-	/**
-	 * MapFunction that filters for records of the visits relation where the year
-	 * (from the date string) is equal to a certain value.
-	 */
-	public static class FilterVisitsByDate implements FilterFunction<Tuple2<String, String>> {
-
-		private static final int YEARFILTER = 2007;
-
-		/**
-		 * Filters for records of the visits relation where the year of visit is equal to a
-		 * specified value. The URL of all visit records passing the filter is emitted.
-		 *
-		 * Output Format:
-		 * 0: URL
-		 * 1: DATE
-		 */
-		@Override
-		public boolean filter(Tuple2<String, String> value) throws Exception {
-			// Parse date string with the format YYYY-MM-DD and extract the year
-			String dateString = value.f1;
-			int year = Integer.parseInt(dateString.substring(0,4));
-			return (year == YEARFILTER);
-		}
-	}
-
-
-	/**
-	 * CoGroupFunction that realizes an anti-join.
-	 * If the first input does not provide any pairs, all pairs of the second input are emitted.
-	 * Otherwise, no pair is emitted.
-	 */
-	public static class AntiJoinVisits implements CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, Tuple3<Integer, String, Integer>> {
-
-		/**
-		 * If the visit iterator is empty, all pairs of the rank iterator are emitted.
-		 * Otherwise, no pair is emitted.
-		 *
-		 * Output Format:
-		 * 0: RANK
-		 * 1: URL
-		 * 2: AVG_DURATION
-		 */
-		@Override
-		public void coGroup(Iterable<Tuple3<Integer, String, Integer>> ranks, Iterable<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) {
-			// Check if there is a entry in the visits relation
-			if (!visits.iterator().hasNext()) {
-				for (Tuple3<Integer, String, Integer> next : ranks) {
-					// Emit all rank pairs
-					out.collect(next);
-				}
-			}
-		}
-	}
-
-	// *************************************************************************
-	//     UTIL METHODS
-	// *************************************************************************
-	
-	private static boolean fileOutput = false;
-	private static String documentsPath;
-	private static String ranksPath;
-	private static String visitsPath;
-	private static String outputPath;
-	
-	private static boolean parseParameters(String[] args) {
-		
-		if(args.length > 0) {
-			fileOutput = true;
-			if(args.length == 4) {
-				documentsPath = args[0];
-				ranksPath = args[1];
-				visitsPath = args[2];
-				outputPath = args[3];
-			} else {
-				System.err.println("Usage: WebLogAnalysis <documents path> <ranks path> <visits path> <result path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WebLog Analysis example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  We provide a data generator to create synthetic input files for this program.");
-			System.out.println("  Usage: WebLogAnalysis <documents path> <ranks path> <visits path> <result path>");
-		}
-		return true;
-	}
-	
-	private static DataSet<Tuple2<String, String>> getDocumentsDataSet(ExecutionEnvironment env) {
-		// Create DataSet for documents relation (URL, Doc-Text)
-		if(fileOutput) {
-			return env.readCsvFile(documentsPath)
-						.fieldDelimiter('|')
-						.types(String.class, String.class);
-		} else {
-			return WebLogData.getDocumentDataSet(env);
-		}
-	}
-	
-	private static DataSet<Tuple3<Integer, String, Integer>> getRanksDataSet(ExecutionEnvironment env) {
-		// Create DataSet for ranks relation (Rank, URL, Avg-Visit-Duration)
-		if(fileOutput) {
-			return env.readCsvFile(ranksPath)
-						.fieldDelimiter('|')
-						.types(Integer.class, String.class, Integer.class);
-		} else {
-			return WebLogData.getRankDataSet(env);
-		}
-	}
-
-	private static DataSet<Tuple2<String, String>> getVisitsDataSet(ExecutionEnvironment env) {
-		// Create DataSet for visits relation (URL, Date)
-		if(fileOutput) {
-			return env.readCsvFile(visitsPath)
-						.fieldDelimiter('|')
-						.includeFields("011000000")
-						.types(String.class, String.class);
-		} else {
-			return WebLogData.getVisitDataSet(env);
-		}
-	}
-		
-}