You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:08 UTC
[26/60] Renamed java examples package
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java
deleted file mode 100644
index 10cf748..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.example.java.graph.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/**
- * Provides the default data sets used for the PageRank example program.
- * The default data sets are used, if no parameters are given to the program.
- *
- */
-public class PageRankData {
-
- public static final Object[][] EDGES = {
- {1L, 2L},
- {1L, 15L},
- {2L, 3L},
- {2L, 4L},
- {2L, 5L},
- {2L, 6L},
- {2L, 7L},
- {3L, 13L},
- {4L, 2L},
- {5L, 11L},
- {5L, 12L},
- {6L, 1L},
- {6L, 7L},
- {6L, 8L},
- {7L, 1L},
- {7L, 8L},
- {8L, 1L},
- {8L, 9L},
- {8L, 10L},
- {9L, 14L},
- {9L, 1L},
- {10L, 1L},
- {10L, 13L},
- {11L, 12L},
- {11L, 1L},
- {12L, 1L},
- {13L, 14L},
- {14L, 12L},
- {15L, 1L},
- };
-
- private static long numPages = 15;
-
- public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
-
- List<Tuple2<Long, Long>> edges = new ArrayList<Tuple2<Long, Long>>();
- for(Object[] e : EDGES) {
- edges.add(new Tuple2<Long, Long>((Long)e[0], (Long)e[1]));
- }
- return env.fromCollection(edges);
- }
-
- public static DataSet<Long> getDefaultPagesDataSet(ExecutionEnvironment env) {
- return env.generateSequence(1, 15);
- }
-
- public static long getNumberOfPages() {
- return numPages;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/misc/PiEstimation.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/misc/PiEstimation.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/misc/PiEstimation.java
deleted file mode 100644
index ef336da..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/misc/PiEstimation.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.example.java.misc;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * Estimates the value of Pi using the Monte Carlo method.
- * The area of a circle is Pi * R^2, R being the radius of the circle
- * The area of a square is 4 * R^2, where the length of the square's edge is 2*R.
- *
- * Thus Pi = 4 * (area of circle / area of square).
- *
- * The idea is to find a way to estimate the circle to square area ratio.
- * The Monte Carlo method suggests collecting random points (within the square)
- * and then counting the number of points that fall within the circle
- *
- * <pre>
- * {@code
- * x = Math.random()
- * y = Math.random()
- *
- * x * x + y * y < 1
- * }
- * </pre>
- */
-@SuppressWarnings("serial")
-public class PiEstimation implements java.io.Serializable {
-
-
- public static void main(String[] args) throws Exception {
-
- final long numSamples = args.length > 0 ? Long.parseLong(args[0]) : 1000000;
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // count how many of the samples would randomly fall into
- // the unit circle
- DataSet<Long> count =
- env.generateSequence(1, numSamples)
- .map(new Sampler())
- .reduce(new SumReducer());
-
- // the ratio of the unit circle surface to 4 times the unit square is pi
- DataSet<Double> pi = count
- .map(new MapFunction<Long, Double>() {
- public Double map(Long value) {
- return value * 4.0 / numSamples;
- }
- });
-
- System.out.println("We estimate Pi to be:");
- pi.print();
-
- env.execute();
- }
-
- //*************************************************************************
- // USER FUNCTIONS
- //*************************************************************************
-
-
- /**
- * Sampler randomly emits points that fall within a square of edge x * y.
- * It calculates the distance to the center of a virtually centered circle of radius x = y = 1
- * If the distance is less than 1, then and only then does it returns a 1.
- */
- public static class Sampler implements MapFunction<Long, Long> {
-
- @Override
- public Long map(Long value) throws Exception{
- double x = Math.random();
- double y = Math.random();
- return (x * x + y * y) < 1 ? 1L : 0L;
- }
- }
-
-
- /**
- * Simply sums up all long values.
- */
- public static final class SumReducer implements ReduceFunction<Long>{
-
- @Override
- public Long reduce(Long value1, Long value2) throws Exception {
- return value1 + value2;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
deleted file mode 100644
index 6ef6270..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/LinearRegression.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.example.java.ml;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.example.java.ml.util.LinearRegressionData;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-
-/**
- * This example implements a basic Linear Regression to solve the y = theta0 + theta1*x problem using batch gradient descent algorithm.
- *
- * <p>
- * Linear Regression with BGD(batch gradient descent) algorithm is an iterative clustering algorithm and works as follows:<br>
- * Giving a data set and target set, the BGD try to find out the best parameters for the data set to fit the target set.
- * In each iteration, the algorithm computes the gradient of the cost function and use it to update all the parameters.
- * The algorithm terminates after a fixed number of iterations (as in this implementation)
- * With enough iteration, the algorithm can minimize the cost function and find the best parameters
- * This is the Wikipedia entry for the <a href = "http://en.wikipedia.org/wiki/Linear_regression">Linear regression</a> and <a href = "http://en.wikipedia.org/wiki/Gradient_descent">Gradient descent algorithm</a>.
- *
- * <p>
- * This implementation works on one-dimensional data. And find the two-dimensional theta.<br>
- * It find the best Theta parameter to fit the target.
- *
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <ul>
- * <li>Data points are represented as two double values separated by a blank character. The first one represent the X(the training data) and the second represent the Y(target).
- * Data points are separated by newline characters.<br>
- * For example <code>"-0.02 -0.04\n5.3 10.6\n"</code> gives two data points (x=-0.02, y=-0.04) and (x=5.3, y=10.6).
- * </ul>
- *
- * <p>
- * This example shows how to use:
- * <ul>
- * <li> Bulk iterations
- * <li> Broadcast variables in bulk iterations
- * <li> Custom Java objects (PoJos)
- * </ul>
- */
-@SuppressWarnings("serial")
-public class LinearRegression {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(String[] args) throws Exception{
-
- if(!parseParameters(args)) {
- return;
- }
-
- // set up execution environment
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // get input x data from elements
- DataSet<Data> data = getDataSet(env);
-
- // get the parameters from elements
- DataSet<Params> parameters = getParamsDataSet(env);
-
- // set number of bulk iterations for SGD linear Regression
- IterativeDataSet<Params> loop = parameters.iterate(numIterations);
-
- DataSet<Params> new_parameters = data
- // compute a single step using every sample
- .map(new SubUpdate()).withBroadcastSet(loop, "parameters")
- // sum up all the steps
- .reduce(new UpdateAccumulator())
- // average the steps and update all parameters
- .map(new Update());
-
- // feed new parameters back into next iteration
- DataSet<Params> result = loop.closeWith(new_parameters);
-
- // emit result
- if(fileOutput) {
- result.writeAsText(outputPath);
- } else {
- result.print();
- }
-
- // execute program
- env.execute("Linear Regression example");
-
- }
-
- // *************************************************************************
- // DATA TYPES
- // *************************************************************************
-
- /**
- * A simple data sample, x means the input, and y means the target.
- */
- public static class Data implements Serializable{
- public double x,y;
-
- public Data() {};
-
- public Data(double x ,double y){
- this.x = x;
- this.y = y;
- }
-
- @Override
- public String toString() {
- return "(" + x + "|" + y + ")";
- }
-
- }
-
- /**
- * A set of parameters -- theta0, theta1.
- */
- public static class Params implements Serializable{
-
- private double theta0,theta1;
-
- public Params(){};
-
- public Params(double x0, double x1){
- this.theta0 = x0;
- this.theta1 = x1;
- }
-
- @Override
- public String toString() {
- return theta0 + " " + theta1;
- }
-
- public double getTheta0() {
- return theta0;
- }
-
- public double getTheta1() {
- return theta1;
- }
-
- public void setTheta0(double theta0) {
- this.theta0 = theta0;
- }
-
- public void setTheta1(double theta1) {
- this.theta1 = theta1;
- }
-
- public Params div(Integer a){
- this.theta0 = theta0 / a ;
- this.theta1 = theta1 / a ;
- return this;
- }
-
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- /** Converts a Tuple2<Double,Double> into a Data. */
- public static final class TupleDataConverter implements MapFunction<Tuple2<Double, Double>, Data> {
-
- @Override
- public Data map(Tuple2<Double, Double> t) throws Exception {
- return new Data(t.f0, t.f1);
- }
- }
-
- /** Converts a Tuple2<Double,Double> into a Params. */
- public static final class TupleParamsConverter implements MapFunction<Tuple2<Double, Double>,Params> {
-
- @Override
- public Params map(Tuple2<Double, Double> t)throws Exception {
- return new Params(t.f0,t.f1);
- }
- }
-
- /**
- * Compute a single BGD type update for every parameters.
- */
- public static class SubUpdate extends RichMapFunction<Data,Tuple2<Params,Integer>> {
-
- private Collection<Params> parameters;
-
- private Params parameter;
-
- private int count = 1;
-
- /** Reads the parameters from a broadcast variable into a collection. */
- @Override
- public void open(Configuration parameters) throws Exception {
- this.parameters = getRuntimeContext().getBroadcastVariable("parameters");
- }
-
- @Override
- public Tuple2<Params, Integer> map(Data in) throws Exception {
-
- for(Params p : parameters){
- this.parameter = p;
- }
-
- double theta_0 = parameter.theta0 - 0.01*((parameter.theta0 + (parameter.theta1*in.x)) - in.y);
- double theta_1 = parameter.theta1 - 0.01*(((parameter.theta0 + (parameter.theta1*in.x)) - in.y) * in.x);
-
- return new Tuple2<Params,Integer>(new Params(theta_0,theta_1),count);
- }
- }
-
- /**
- * Accumulator all the update.
- * */
- public static class UpdateAccumulator implements ReduceFunction<Tuple2<Params, Integer>> {
-
- @Override
- public Tuple2<Params, Integer> reduce(Tuple2<Params, Integer> val1, Tuple2<Params, Integer> val2) {
-
- double new_theta0 = val1.f0.theta0 + val2.f0.theta0;
- double new_theta1 = val1.f0.theta1 + val2.f0.theta1;
- Params result = new Params(new_theta0,new_theta1);
- return new Tuple2<Params, Integer>( result, val1.f1 + val2.f1);
-
- }
- }
-
- /**
- * Compute the final update by average them.
- */
- public static class Update implements MapFunction<Tuple2<Params, Integer>,Params> {
-
- @Override
- public Params map(Tuple2<Params, Integer> arg0) throws Exception {
-
- return arg0.f0.div(arg0.f1);
-
- }
-
- }
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String dataPath = null;
- private static String outputPath = null;
- private static int numIterations = 10;
-
- private static boolean parseParameters(String[] programArguments) {
-
- if(programArguments.length > 0) {
- // parse input arguments
- fileOutput = true;
- if(programArguments.length == 3) {
- dataPath = programArguments[0];
- outputPath = programArguments[1];
- numIterations = Integer.parseInt(programArguments[2]);
- } else {
- System.err.println("Usage: LinearRegression <data path> <result path> <num iterations>");
- return false;
- }
- } else {
- System.out.println("Executing Linear Regression example with default parameters and built-in default data.");
- System.out.println(" Provide parameters to read input data from files.");
- System.out.println(" See the documentation for the correct format of input files.");
- System.out.println(" We provide a data generator to create synthetic input files for this program.");
- System.out.println(" Usage: LinearRegression <data path> <result path> <num iterations>");
- }
- return true;
- }
-
- private static DataSet<Data> getDataSet(ExecutionEnvironment env) {
- if(fileOutput) {
- // read data from CSV file
- return env.readCsvFile(dataPath)
- .fieldDelimiter(' ')
- .includeFields(true, true)
- .types(Double.class, Double.class)
- .map(new TupleDataConverter());
- } else {
- return LinearRegressionData.getDefaultDataDataSet(env);
- }
- }
-
- private static DataSet<Params> getParamsDataSet(ExecutionEnvironment env) {
-
- return LinearRegressionData.getDefaultParamsDataSet(env);
-
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/util/LinearRegressionData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/util/LinearRegressionData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/util/LinearRegressionData.java
deleted file mode 100644
index 31e71f5..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/util/LinearRegressionData.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.example.java.ml.util;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.example.java.ml.LinearRegression.Data;
-import org.apache.flink.example.java.ml.LinearRegression.Params;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Provides the default data sets used for the Linear Regression example
- * program. The default data sets are used, if no parameters are given to the
- * program.
- */
-public class LinearRegressionData {
-
- // We have the data as object arrays so that we can also generate Scala Data
- // Sources from it.
- public static final Object[][] PARAMS = new Object[][] { new Object[] {
- 0.0, 0.0 } };
-
- public static final Object[][] DATA = new Object[][] {
- new Object[] { 0.5, 1.0 }, new Object[] { 1.0, 2.0 },
- new Object[] { 2.0, 4.0 }, new Object[] { 3.0, 6.0 },
- new Object[] { 4.0, 8.0 }, new Object[] { 5.0, 10.0 },
- new Object[] { 6.0, 12.0 }, new Object[] { 7.0, 14.0 },
- new Object[] { 8.0, 16.0 }, new Object[] { 9.0, 18.0 },
- new Object[] { 10.0, 20.0 }, new Object[] { -0.08, -0.16 },
- new Object[] { 0.13, 0.26 }, new Object[] { -1.17, -2.35 },
- new Object[] { 1.72, 3.45 }, new Object[] { 1.70, 3.41 },
- new Object[] { 1.20, 2.41 }, new Object[] { -0.59, -1.18 },
- new Object[] { 0.28, 0.57 }, new Object[] { 1.65, 3.30 },
- new Object[] { -0.55, -1.08 } };
-
- public static DataSet<Params> getDefaultParamsDataSet(
- ExecutionEnvironment env) {
- List<Params> paramsList = new LinkedList<Params>();
- for (Object[] params : PARAMS) {
- paramsList.add(new Params((Double) params[0], (Double) params[1]));
- }
- return env.fromCollection(paramsList);
- }
-
- public static DataSet<Data> getDefaultDataDataSet(ExecutionEnvironment env) {
-
- List<Data> dataList = new LinkedList<Data>();
- for (Object[] data : DATA) {
- dataList.add(new Data((Double) data[0], (Double) data[1]));
- }
- return env.fromCollection(dataList);
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/util/LinearRegressionDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/util/LinearRegressionDataGenerator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/util/LinearRegressionDataGenerator.java
deleted file mode 100644
index 28001ba..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/ml/util/LinearRegressionDataGenerator.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.example.java.ml.util;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.text.DecimalFormat;
-import java.util.Locale;
-import java.util.Random;
-
-/**
- * Generates data for the {@link org.apache.flink.example.java.ml.LinearRegression} example program.
- */
-public class LinearRegressionDataGenerator {
-
- static {
- Locale.setDefault(Locale.US);
- }
-
- private static final String POINTS_FILE = "data";
- private static final long DEFAULT_SEED = 4650285087650871364L;
- private static final int DIMENSIONALITY = 1;
- private static final DecimalFormat FORMAT = new DecimalFormat("#0.00");
- private static final char DELIMITER = ' ';
-
- /**
- * Main method to generate data for the {@link org.apache.flink.example.java.ml.LinearRegression} example program.
- * <p>
- * The generator creates to files:
- * <ul>
- * <li><code>{tmp.dir}/data</code> for the data points
- * </ul>
- *
- * @param args
- * <ol>
- * <li>Int: Number of data points
- * <li><b>Optional</b> Long: Random seed
- * </ol>
- */
- public static void main(String[] args) throws IOException {
-
- // check parameter count
- if (args.length < 1) {
- System.out.println("LinearRegressionDataGenerator <numberOfDataPoints> [<seed>]");
- System.exit(1);
- }
-
- // parse parameters
- final int numDataPoints = Integer.parseInt(args[0]);
- final long firstSeed = args.length > 1 ? Long.parseLong(args[4]) : DEFAULT_SEED;
- final Random random = new Random(firstSeed);
- final String tmpDir = System.getProperty("java.io.tmpdir");
-
- // write the points out
- BufferedWriter pointsOut = null;
- try {
- pointsOut = new BufferedWriter(new FileWriter(new File(tmpDir+"/"+POINTS_FILE)));
- StringBuilder buffer = new StringBuilder();
-
- // DIMENSIONALITY + 1 means that the number of x(dimensionality) and target y
- double[] point = new double[DIMENSIONALITY+1];
-
- for (int i = 1; i <= numDataPoints; i++) {
- point[0] = random.nextGaussian();
- point[1] = 2 * point[0] + 0.01*random.nextGaussian();
- writePoint(point, buffer, pointsOut);
- }
-
- }
- finally {
- if (pointsOut != null) {
- pointsOut.close();
- }
- }
-
- System.out.println("Wrote "+numDataPoints+" data points to "+tmpDir+"/"+POINTS_FILE);
- }
-
-
- private static void writePoint(double[] data, StringBuilder buffer, BufferedWriter out) throws IOException {
- buffer.setLength(0);
-
- // write coordinates
- for (int j = 0; j < data.length; j++) {
- buffer.append(FORMAT.format(data[j]));
- if(j < data.length - 1) {
- buffer.append(DELIMITER);
- }
- }
-
- out.write(buffer.toString());
- out.newLine();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
deleted file mode 100644
index a379bf8..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/EmptyFieldsCountAccumulator.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.example.java.relational;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-
-/**
- * This program filters lines from a CSV file with empty fields. In doing so, it counts the number of empty fields per
- * column within a CSV file using a custom accumulator for vectors. In this context, empty fields are those, that at
- * most contain whitespace characters like space and tab.
- * <p>
- * The input file is a plain text CSV file with the semicolon as field separator and double quotes as field delimiters
- * and three columns. See {@link #getDataSet(ExecutionEnvironment)} for configuration.
- * <p>
- * Usage: <code>FilterAndCountIncompleteLines [<input file path> [<result path>]]</code> <br>
- * <p>
- * This example shows how to use:
- * <ul>
- * <li>custom accumulators
- * <li>tuple data types
- * <li>inline-defined functions
- * </ul>
- */
-@SuppressWarnings("serial")
-public class EmptyFieldsCountAccumulator {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- private static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
-
- public static void main(final String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // get the data set
- final DataSet<Tuple> file = getDataSet(env);
-
- // filter lines with empty fields
- final DataSet<Tuple> filteredLines = file.filter(new EmptyFieldFilter());
-
- // Here, we could do further processing with the filtered lines...
-
- // output the filtered lines
- if (outputPath == null) {
- filteredLines.print();
- } else {
- filteredLines.writeAsCsv(outputPath);
- }
-
- // execute program
- final JobExecutionResult result = env.execute("Accumulator example");
-
- // get the accumulator result via its registration key
- final List<Integer> emptyFields = result.getAccumulatorResult(EMPTY_FIELD_ACCUMULATOR);
- System.out.format("Number of detected empty fields per column: %s\n", emptyFields);
-
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static String filePath;
- private static String outputPath;
-
- private static boolean parseParameters(final String[] programArguments) {
-
- if (programArguments.length >= 3) {
- System.err.println("Usage: FilterAndCountIncompleteLines [<input file path> [<result path>]]");
- return false;
- }
-
- if (programArguments.length >= 1) {
- filePath = programArguments[0];
- if (programArguments.length == 2) {
- outputPath = programArguments[1];
- }
- }
-
- return true;
- }
-
- @SuppressWarnings("unchecked")
- private static DataSet<Tuple> getDataSet(final ExecutionEnvironment env) {
-
- DataSet<? extends Tuple> source;
- if (filePath == null) {
- source = env.fromCollection(getExampleInputTuples());
-
- } else {
- source = env
- .readCsvFile(filePath)
- .fieldDelimiter(';')
- .types(String.class, String.class, String.class);
-
- }
-
- return (DataSet<Tuple>) source;
- }
-
- private static Collection<Tuple3<String, String, String>> getExampleInputTuples() {
- Collection<Tuple3<String, String, String>> inputTuples = new ArrayList<Tuple3<String, String, String>>();
- inputTuples.add(new Tuple3<String, String, String>("John", "Doe", "Foo Str."));
- inputTuples.add(new Tuple3<String, String, String>("Joe", "Johnson", ""));
- inputTuples.add(new Tuple3<String, String, String>(null, "Kate Morn", "Bar Blvd."));
- inputTuples.add(new Tuple3<String, String, String>("Tim", "Rinny", ""));
- inputTuples.add(new Tuple3<String, String, String>("Alicia", "Jackson", " "));
- return inputTuples;
- }
-
- /**
- * This function filters all incoming tuples that have one or more empty fields.
- * In doing so, it also counts the number of empty fields per attribute with an accumulator (registered under
- * {@link EmptyFieldsCountAccumulator#EMPTY_FIELD_ACCUMULATOR}).
- */
- public static final class EmptyFieldFilter extends RichFilterFunction<Tuple> {
-
- // create a new accumulator in each filter function instance
- // accumulators can be merged later on
- private final VectorAccumulator emptyFieldCounter = new VectorAccumulator();
-
- @Override
- public void open(final Configuration parameters) throws Exception {
- super.open(parameters);
-
- // register the accumulator instance
- getRuntimeContext().addAccumulator(EMPTY_FIELD_ACCUMULATOR,
- this.emptyFieldCounter);
- }
-
- @Override
- public boolean filter(final Tuple t) {
- boolean containsEmptyFields = false;
-
- // iterate over the tuple fields looking for empty ones
- for (int pos = 0; pos < t.getArity(); pos++) {
-
- final String field = t.getField(pos);
- if (field == null || field.trim().isEmpty()) {
- containsEmptyFields = true;
-
- // if an empty field is encountered, update the
- // accumulator
- this.emptyFieldCounter.add(pos);
- }
- }
-
- return !containsEmptyFields;
- }
- }
-
- /**
- * This accumulator lets you increase vector components distributedly. The {@link #add(Integer)} method lets you
- * increase the <i>n</i>-th vector component by 1, whereat <i>n</i> is the methods parameter. The size of the vector
- * is automatically managed.
- */
- public static class VectorAccumulator implements Accumulator<Integer, List<Integer>> {
-
- /** Stores the accumulated vector components. */
- private final List<Integer> resultVector = new ArrayList<Integer>();
-
- /**
- * Increases the result vector component at the specified position by 1.
- */
- @Override
- public void add(final Integer position) {
- updateResultVector(position, 1);
- }
-
- /**
- * Increases the result vector component at the specified position by the specified delta.
- */
- private void updateResultVector(final int position, final int delta) {
- // inflate the vector to contain the given position
- while (this.resultVector.size() <= position) {
- this.resultVector.add(0);
- }
-
- // increment the component value
- final int component = this.resultVector.get(position);
- this.resultVector.set(position, component + delta);
- }
-
- @Override
- public List<Integer> getLocalValue() {
- return this.resultVector;
- }
-
- @Override
- public void resetLocal() {
- // clear the result vector if the accumulator instance shall be reused
- this.resultVector.clear();
- }
-
- @Override
- public void merge(final Accumulator<Integer, List<Integer>> other) {
- // merge two vector accumulators by adding their up their vector components
- final List<Integer> otherVector = other.getLocalValue();
- for (int index = 0; index < otherVector.size(); index++) {
- updateResultVector(index, otherVector.get(index));
- }
- }
-
- @Override
- public void write(final DataOutputView out) throws IOException {
- // binary serialization of the result vector:
- // [number of components, component 0, component 1, ...]
- out.writeInt(this.resultVector.size());
- for (final Integer component : this.resultVector) {
- out.writeInt(component);
- }
- }
-
- @Override
- public void read(final DataInputView in) throws IOException {
- // binary deserialization of the result vector
- final int size = in.readInt();
- for (int numReadComponents = 0; numReadComponents < size; numReadComponents++) {
- final int component = in.readInt();
- this.resultVector.add(component);
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java
deleted file mode 100644
index 08a261c..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.example.java.relational;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * This program implements the following relational query on the TPC-H data set.
- *
- * <p>
- * <code><pre>
- * SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
- * FROM orders, lineitem
- * WHERE l_orderkey = o_orderkey
- * AND o_orderstatus = "X"
- * AND YEAR(o_orderdate) > Y
- * AND o_orderpriority LIKE "Z%"
- * GROUP BY l_orderkey, o_shippriority;
- * </pre></code>
- *
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator
- * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
- *
- * <p>
- * Usage: <code>RelationalQuery <orders-csv path> <lineitem-csv path> <result path></code><br>
- *
- * <p>
- * This example shows how to use:
- * <ul>
- * <li> tuple data types
- * <li> inline-defined functions
- * <li> projection and join projection
- * <li> build-in aggregation functions
- * </ul>
- */
-@SuppressWarnings("serial")
-public class RelationalQuery {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- private static String STATUS_FILTER = "F";
- private static int YEAR_FILTER = 1993;
- private static String OPRIO_FILTER = "5";
-
- public static void main(String[] args) throws Exception {
-
- if(!parseParameters(args)) {
- return;
- }
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)
- DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env);
-
- // get lineitem data set: (orderkey, extendedprice)
- DataSet<Tuple2<Integer, Double>> lineitems = getLineitemDataSet(env);
-
- // orders filtered by year: (orderkey, custkey)
- DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
- // filter orders
- orders.filter(
- new FilterFunction<Tuple5<Integer, String, String, String, Integer>>() {
- @Override
- public boolean filter(Tuple5<Integer, String, String, String, Integer> t) {
- // status filter
- if(!t.f1.equals(STATUS_FILTER)) {
- return false;
- // year filter
- } else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) {
- return false;
- // order priority filter
- } else if(!t.f3.startsWith(OPRIO_FILTER)) {
- return false;
- }
- return true;
- }
- })
- // project fields out that are no longer required
- .project(0,4).types(Integer.class, Integer.class);
-
- // join orders with lineitems: (orderkey, shippriority, extendedprice)
- DataSet<Tuple3<Integer, Integer, Double>> lineitemsOfOrders =
- ordersFilteredByYear.joinWithHuge(lineitems)
- .where(0).equalTo(0)
- .projectFirst(0,1).projectSecond(1)
- .types(Integer.class, Integer.class, Double.class);
-
- // extendedprice sums: (orderkey, shippriority, sum(extendedprice))
- DataSet<Tuple3<Integer, Integer, Double>> priceSums =
- // group by order and sum extendedprice
- lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);
-
- // emit result
- priceSums.writeAsCsv(outputPath);
-
- // execute program
- env.execute("Relational Query Example");
-
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static String ordersPath;
- private static String lineitemPath;
- private static String outputPath;
-
- private static boolean parseParameters(String[] programArguments) {
-
- if(programArguments.length > 0) {
- if(programArguments.length == 3) {
- ordersPath = programArguments[0];
- lineitemPath = programArguments[1];
- outputPath = programArguments[2];
- } else {
- System.err.println("Usage: RelationalQuery <orders-csv path> <lineitem-csv path> <result path>");
- return false;
- }
- } else {
- System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
- " Due to legal restrictions, we can not ship generated data.\n" +
- " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
- " Usage: RelationalQuery <orders-csv path> <lineitem-csv path> <result path>");
- return false;
- }
- return true;
- }
-
- private static DataSet<Tuple5<Integer, String, String, String, Integer>> getOrdersDataSet(ExecutionEnvironment env) {
- return env.readCsvFile(ordersPath)
- .fieldDelimiter('|')
- .includeFields("101011010")
- .types(Integer.class, String.class, String.class, String.class, Integer.class);
- }
-
- private static DataSet<Tuple2<Integer, Double>> getLineitemDataSet(ExecutionEnvironment env) {
- return env.readCsvFile(lineitemPath)
- .fieldDelimiter('|')
- .includeFields("1000010000000000")
- .types(Integer.class, Double.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java
deleted file mode 100644
index 1ff6583..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery10.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.example.java.relational;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * This program implements a modified version of the TPC-H query 10.
- * The original query can be found at
- * <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 45).
- *
- * <p>
- * This program implements the following SQL equivalent:
- *
- * <p>
- * <code><pre>
- * SELECT
- * c_custkey,
- * c_name,
- * c_address,
- * n_name,
- * c_acctbal
- * SUM(l_extendedprice * (1 - l_discount)) AS revenue,
- * FROM
- * customer,
- * orders,
- * lineitem,
- * nation
- * WHERE
- * c_custkey = o_custkey
- * AND l_orderkey = o_orderkey
- * AND YEAR(o_orderdate) > '1990'
- * AND l_returnflag = 'R'
- * AND c_nationkey = n_nationkey
- * GROUP BY
- * c_custkey,
- * c_name,
- * c_acctbal,
- * n_name,
- * c_address
- * </pre></code>
- *
- * <p>
- * Compared to the original TPC-H query this version does not print
- * c_phone and c_comment, only filters by years greater than 1990 instead of
- * a period of 3 months, and does not sort the result by revenue.
- *
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator
- * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
- *
- * <p>
- * Usage: <code>TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path></code><br>
- *
- * <p>
- * This example shows how to use:
- * <ul>
- * <li> tuple data types
- * <li> inline-defined functions
- * <li> projection and join projection
- * <li> build-in aggregation functions
- * </ul>
- */
-@SuppressWarnings("serial")
-public class TPCHQuery10 {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(String[] args) throws Exception {
-
- if(!parseParameters(args)) {
- return;
- }
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // get customer data set: (custkey, name, address, nationkey, acctbal)
- DataSet<Tuple5<Integer, String, String, Integer, Double>> customers = getCustomerDataSet(env);
-
- // get orders data set: (orderkey, custkey, orderdate)
- DataSet<Tuple3<Integer, Integer, String>> orders = getOrdersDataSet(env);
-
- // get lineitem data set: (orderkey, extendedprice, discount, returnflag)
- DataSet<Tuple4<Integer, Double, Double, String>> lineitems = getLineitemDataSet(env);
-
- // get nation data set: (nationkey, name)
- DataSet<Tuple2<Integer, String>> nations = getNationsDataSet(env);
-
- // orders filtered by year: (orderkey, custkey)
- DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
- // filter by year
- orders.filter(
- new FilterFunction<Tuple3<Integer,Integer, String>>() {
- @Override
- public boolean filter(Tuple3<Integer, Integer, String> t) {
- int year = Integer.parseInt(t.f2.substring(0, 4));
- return year > 1990;
- }
- })
- // project fields out that are no longer required
- .project(0,1).types(Integer.class, Integer.class);
-
- // lineitems filtered by flag: (orderkey, extendedprice, discount)
- DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag =
- // filter by flag
- lineitems.filter(new FilterFunction<Tuple4<Integer, Double, Double, String>>() {
- @Override
- public boolean filter(Tuple4<Integer, Double, Double, String> t)
- throws Exception {
- return t.f3.equals("R");
- }
- })
- // project fields out that are no longer required
- .project(0,1,2).types(Integer.class, Double.class, Double.class);
-
- // join orders with lineitems: (custkey, extendedprice, discount)
- DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey =
- ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
- .where(0).equalTo(0)
- .projectFirst(1).projectSecond(1,2)
- .types(Integer.class, Double.class, Double.class);
-
- // aggregate for revenue: (custkey, revenue)
- DataSet<Tuple2<Integer, Double>> revenueOfCustomerKey = lineitemsOfCustomerKey
- // calculate the revenue for each item
- .map(new MapFunction<Tuple3<Integer, Double, Double>, Tuple2<Integer, Double>>() {
- @Override
- public Tuple2<Integer, Double> map(Tuple3<Integer, Double, Double> t) {
- // revenue per item = l_extendedprice * (1 - l_discount)
- return new Tuple2<Integer, Double>(t.f0, t.f1 * (1 - t.f2));
- }
- })
- // aggregate the revenues per item to revenue per customer
- .groupBy(0).aggregate(Aggregations.SUM, 1);
-
- // join customer with nation (custkey, name, address, nationname, acctbal)
- DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
- .joinWithTiny(nations)
- .where(3).equalTo(0)
- .projectFirst(0,1,2).projectSecond(1).projectFirst(4)
- .types(Integer.class, String.class, String.class, String.class, Double.class);
-
- // join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
- DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue =
- customerWithNation.join(revenueOfCustomerKey)
- .where(0).equalTo(0)
- .projectFirst(0,1,2,3,4).projectSecond(1)
- .types(Integer.class, String.class, String.class, String.class, Double.class, Double.class);
-
- // emit result
- customerWithRevenue.writeAsCsv(outputPath);
-
- // execute program
- env.execute("TPCH Query 10 Example");
-
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static String customerPath;
- private static String ordersPath;
- private static String lineitemPath;
- private static String nationPath;
- private static String outputPath;
-
- private static boolean parseParameters(String[] programArguments) {
-
- if(programArguments.length > 0) {
- if(programArguments.length == 5) {
- customerPath = programArguments[0];
- ordersPath = programArguments[1];
- lineitemPath = programArguments[2];
- nationPath = programArguments[3];
- outputPath = programArguments[4];
- } else {
- System.err.println("Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
- return false;
- }
- } else {
- System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
- " Due to legal restrictions, we can not ship generated data.\n" +
- " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
- " Usage: TPCHQuery10 <customer-csv path> <orders-csv path> <lineitem-csv path> <nation-csv path> <result path>");
- return false;
- }
- return true;
- }
-
- private static DataSet<Tuple5<Integer, String, String, Integer, Double>> getCustomerDataSet(ExecutionEnvironment env) {
- return env.readCsvFile(customerPath)
- .fieldDelimiter('|')
- .includeFields("11110100")
- .types(Integer.class, String.class, String.class, Integer.class, Double.class);
- }
-
- private static DataSet<Tuple3<Integer, Integer, String>> getOrdersDataSet(ExecutionEnvironment env) {
- return env.readCsvFile(ordersPath)
- .fieldDelimiter('|')
- .includeFields("110010000")
- .types(Integer.class, Integer.class, String.class);
- }
-
- private static DataSet<Tuple4<Integer, Double, Double, String>> getLineitemDataSet(ExecutionEnvironment env) {
- return env.readCsvFile(lineitemPath)
- .fieldDelimiter('|')
- .includeFields("1000011010000000")
- .types(Integer.class, Double.class, Double.class, String.class);
- }
-
- private static DataSet<Tuple2<Integer, String>> getNationsDataSet(ExecutionEnvironment env) {
- return env.readCsvFile(nationPath)
- .fieldDelimiter('|')
- .includeFields("1100")
- .types(Integer.class, String.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java
deleted file mode 100644
index 4544fd4..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/TPCHQuery3.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.example.java.relational;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
-/**
- * This program implements a modified version of the TPC-H query 3. The
- * example demonstrates how to assign names to fields by extending the Tuple class.
- * The original query can be found at
- * <a href="http://www.tpc.org/tpch/spec/tpch2.16.0.pdf">http://www.tpc.org/tpch/spec/tpch2.16.0.pdf</a> (page 29).
- *
- * <p>
- * This program implements the following SQL equivalent:
- *
- * <p>
- * <code><pre>
- * SELECT
- * l_orderkey,
- * SUM(l_extendedprice*(1-l_discount)) AS revenue,
- * o_orderdate,
- * o_shippriority
- * FROM customer,
- * orders,
- * lineitem
- * WHERE
- * c_mktsegment = '[SEGMENT]'
- * AND c_custkey = o_custkey
- * AND l_orderkey = o_orderkey
- * AND o_orderdate < date '[DATE]'
- * AND l_shipdate > date '[DATE]'
- * GROUP BY
- * l_orderkey,
- * o_orderdate,
- * o_shippriority;
- * </pre></code>
- *
- * <p>
- * Compared to the original TPC-H query this version does not sort the result by revenue
- * and orderdate.
- *
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator
- * as generated by the TPC-H data generator which is available at <a href="http://www.tpc.org/tpch/">http://www.tpc.org/tpch/</a>.
- *
- * <p>
- * Usage: <code>TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path></code><br>
- *
- * <p>
- * This example shows how to use:
- * <ul>
- * <li> custom data type derived from tuple data types
- * <li> inline-defined functions
- * <li> build-in aggregation functions
- * </ul>
- */
-@SuppressWarnings("serial")
-public class TPCHQuery3 {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(String[] args) throws Exception {
-
- if(!parseParameters(args)) {
- return;
- }
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // get input data
- DataSet<Lineitem> li = getLineitemDataSet(env);
- DataSet<Order> or = getOrdersDataSet(env);
- DataSet<Customer> cust = getCustomerDataSet(env);
-
- // Filter market segment "AUTOMOBILE"
- cust = cust.filter(
- new FilterFunction<Customer>() {
- @Override
- public boolean filter(Customer value) {
- return value.getMktsegment().equals("AUTOMOBILE");
- }
- });
-
- // Filter all Orders with o_orderdate < 12.03.1995
- or = or.filter(
- new FilterFunction<Order>() {
- private DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
- private Date date;
-
- {
- Calendar cal = Calendar.getInstance();
- cal.set(1995, 3, 12);
- date = cal.getTime();
- }
-
- @Override
- public boolean filter(Order value) throws ParseException {
- Date orderDate = format.parse(value.getOrderdate());
- return orderDate.before(date);
- }
- });
-
- // Filter all Lineitems with l_shipdate > 12.03.1995
- li = li.filter(
- new FilterFunction<Lineitem>() {
- private DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
- private Date date;
-
- {
- Calendar cal = Calendar.getInstance();
- cal.set(1995, 3, 12);
- date = cal.getTime();
- }
-
- @Override
- public boolean filter(Lineitem value) throws ParseException {
- Date shipDate = format.parse(value.getShipdate());
- return shipDate.after(date);
- }
- });
-
- // Join customers with orders and package them into a ShippingPriorityItem
- DataSet<ShippingPriorityItem> customerWithOrders =
- cust.join(or)
- .where(0)
- .equalTo(0)
- .with(
- new JoinFunction<Customer, Order, ShippingPriorityItem>() {
- @Override
- public ShippingPriorityItem join(Customer first, Order second) {
- return new ShippingPriorityItem(0, 0.0, second.getOrderdate(),
- second.getShippriority(), second.getOrderkey());
- }
- });
-
- // Join the last join result with Lineitems
- DataSet<ShippingPriorityItem> joined =
- customerWithOrders.join(li)
- .where(4)
- .equalTo(0)
- .with(
- new JoinFunction<ShippingPriorityItem, Lineitem, ShippingPriorityItem>() {
- @Override
- public ShippingPriorityItem join(ShippingPriorityItem first, Lineitem second) {
- first.setL_Orderkey(second.getOrderkey());
- first.setRevenue(second.getExtendedprice() * (1 - second.getDiscount()));
- return first;
- }
- });
-
- // Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum
- joined = joined
- .groupBy(0, 2, 3)
- .aggregate(Aggregations.SUM, 1);
-
- // emit result
- joined.writeAsCsv(outputPath, "\n", "|");
-
- // execute program
- env.execute("TPCH Query 3 Example");
-
- }
-
- // *************************************************************************
- // DATA TYPES
- // *************************************************************************
-
- public static class Lineitem extends Tuple4<Integer, Double, Double, String> {
-
- public Integer getOrderkey() { return this.f0; }
- public Double getDiscount() { return this.f2; }
- public Double getExtendedprice() { return this.f1; }
- public String getShipdate() { return this.f3; }
- }
-
- public static class Customer extends Tuple2<Integer, String> {
-
- public Integer getCustKey() { return this.f0; }
- public String getMktsegment() { return this.f1; }
- }
-
- public static class Order extends Tuple3<Integer, String, Integer> {
-
- public Integer getOrderkey() { return this.f0; }
- public String getOrderdate() { return this.f1; }
- public Integer getShippriority() { return this.f2; }
- }
-
- public static class ShippingPriorityItem extends Tuple5<Integer, Double, String, Integer, Integer> {
-
- public ShippingPriorityItem() { }
-
- public ShippingPriorityItem(Integer l_orderkey, Double revenue,
- String o_orderdate, Integer o_shippriority, Integer o_orderkey) {
- this.f0 = l_orderkey;
- this.f1 = revenue;
- this.f2 = o_orderdate;
- this.f3 = o_shippriority;
- this.f4 = o_orderkey;
- }
-
- public Integer getL_Orderkey() { return this.f0; }
- public void setL_Orderkey(Integer l_orderkey) { this.f0 = l_orderkey; }
- public Double getRevenue() { return this.f1; }
- public void setRevenue(Double revenue) { this.f1 = revenue; }
-
- public String getOrderdate() { return this.f2; }
- public Integer getShippriority() { return this.f3; }
- public Integer getO_Orderkey() { return this.f4; }
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static String lineitemPath;
- private static String customerPath;
- private static String ordersPath;
- private static String outputPath;
-
- private static boolean parseParameters(String[] programArguments) {
-
- if(programArguments.length > 0) {
- if(programArguments.length == 4) {
- lineitemPath = programArguments[0];
- customerPath = programArguments[1];
- ordersPath = programArguments[2];
- outputPath = programArguments[3];
- } else {
- System.err.println("Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>");
- return false;
- }
- } else {
- System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
- " Due to legal restrictions, we can not ship generated data.\n" +
- " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
- " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>");
- return false;
- }
- return true;
- }
-
- private static DataSet<Lineitem> getLineitemDataSet(ExecutionEnvironment env) {
- return env.readCsvFile(lineitemPath)
- .fieldDelimiter('|')
- .includeFields("1000011000100000")
- .tupleType(Lineitem.class);
- }
-
- private static DataSet<Customer> getCustomerDataSet(ExecutionEnvironment env) {
- return env.readCsvFile(customerPath)
- .fieldDelimiter('|')
- .includeFields("10000010")
- .tupleType(Customer.class);
- }
-
- private static DataSet<Order> getOrdersDataSet(ExecutionEnvironment env) {
- return env.readCsvFile(ordersPath)
- .fieldDelimiter('|')
- .includeFields("100010010")
- .tupleType(Order.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4cce46eb/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
deleted file mode 100644
index 9ca6ea9..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/WebLogAnalysis.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.example.java.relational;
-
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.util.Collector;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.example.java.relational.util.WebLogData;
-import org.apache.flink.example.java.relational.util.WebLogDataGenerator;
-
-/**
- * This program processes web logs and relational data.
- * It implements the following relational query:
- *
- * <code><pre>
- * SELECT
- * r.pageURL,
- * r.pageRank,
- * r.avgDuration
- * FROM documents d JOIN rankings r
- * ON d.url = r.url
- * WHERE CONTAINS(d.text, [keywords])
- * AND r.rank > [rank]
- * AND NOT EXISTS
- * (
- * SELECT * FROM Visits v
- * WHERE v.destUrl = d.url
- * AND v.visitDate < [date]
- * );
- * </pre></code>
- *
- * <p>
- * Input files are plain text CSV files using the pipe character ('|') as field separator.
- * The tables referenced in the query can be generated using the {@link WebLogDataGenerator} and
- * have the following schemas
- * <code><pre>
- * CREATE TABLE Documents (
- * url VARCHAR(100) PRIMARY KEY,
- * contents TEXT );
- *
- * CREATE TABLE Rankings (
- * pageRank INT,
- * pageURL VARCHAR(100) PRIMARY KEY,
- * avgDuration INT );
- *
- * CREATE TABLE Visits (
- * sourceIP VARCHAR(16),
- * destURL VARCHAR(100),
- * visitDate DATE,
- * adRevenue FLOAT,
- * userAgent VARCHAR(64),
- * countryCode VARCHAR(3),
- * languageCode VARCHAR(6),
- * searchWord VARCHAR(32),
- * duration INT );
- * </pre></code>
- *
- * <p>
- * Usage: <code>WebLogAnalysis <documents path> <ranks path> <visits path> <result path></code><br>
- * If no parameters are provided, the program is run with default data from {@link WebLogData}.
- *
- * <p>
- * This example shows how to use:
- * <ul>
- * <li> tuple data types
- * <li> projection and join projection
- * <li> the CoGroup transformation for an anti-join
- * </ul>
- *
- */
-@SuppressWarnings("serial")
-public class WebLogAnalysis {
-
- // *************************************************************************
- // PROGRAM
- // *************************************************************************
-
- public static void main(String[] args) throws Exception {
-
- if(!parseParameters(args)) {
- return;
- }
-
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // get input data
- DataSet<Tuple2<String, String>> documents = getDocumentsDataSet(env);
- DataSet<Tuple3<Integer, String, Integer>> ranks = getRanksDataSet(env);
- DataSet<Tuple2<String, String>> visits = getVisitsDataSet(env);
-
- // Retain documents with keywords
- DataSet<Tuple1<String>> filterDocs = documents
- .filter(new FilterDocByKeyWords())
- .project(0).types(String.class);
-
- // Filter ranks by minimum rank
- DataSet<Tuple3<Integer, String, Integer>> filterRanks = ranks
- .filter(new FilterByRank());
-
- // Filter visits by visit date
- DataSet<Tuple1<String>> filterVisits = visits
- .filter(new FilterVisitsByDate())
- .project(0).types(String.class);
-
- // Join the filtered documents and ranks, i.e., get all URLs with min rank and keywords
- DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks =
- filterDocs.join(filterRanks)
- .where(0).equalTo(1)
- .projectSecond(0,1,2)
- .types(Integer.class, String.class, Integer.class);
-
- // Anti-join urls with visits, i.e., retain all URLs which have NOT been visited in a certain time
- DataSet<Tuple3<Integer, String, Integer>> result =
- joinDocsRanks.coGroup(filterVisits)
- .where(1).equalTo(0)
- .with(new AntiJoinVisits());
-
- // emit result
- if(fileOutput) {
- result.writeAsCsv(outputPath, "\n", "|");
- } else {
- result.print();
- }
-
- // execute program
- env.execute("WebLogAnalysis Example");
-
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- /**
- * MapFunction that filters for documents that contain a certain set of
- * keywords.
- */
- public static class FilterDocByKeyWords implements FilterFunction<Tuple2<String, String>> {
-
- private static final String[] KEYWORDS = { " editors ", " oscillations " };
-
- /**
- * Filters for documents that contain all of the given keywords and projects the records on the URL field.
- *
- * Output Format:
- * 0: URL
- * 1: DOCUMENT_TEXT
- */
- @Override
- public boolean filter(Tuple2<String, String> value) throws Exception {
- // FILTER
- // Only collect the document if all keywords are contained
- String docText = value.f1;
- for (String kw : KEYWORDS) {
- if (!docText.contains(kw)) {
- return false;
- }
- }
- return true;
- }
- }
-
- /**
- * MapFunction that filters for records where the rank exceeds a certain threshold.
- */
- public static class FilterByRank implements FilterFunction<Tuple3<Integer, String, Integer>> {
-
- private static final int RANKFILTER = 40;
-
- /**
- * Filters for records of the rank relation where the rank is greater
- * than the given threshold.
- *
- * Output Format:
- * 0: RANK
- * 1: URL
- * 2: AVG_DURATION
- */
- @Override
- public boolean filter(Tuple3<Integer, String, Integer> value) throws Exception {
- return (value.f0 > RANKFILTER);
- }
- }
-
- /**
- * MapFunction that filters for records of the visits relation where the year
- * (from the date string) is equal to a certain value.
- */
- public static class FilterVisitsByDate implements FilterFunction<Tuple2<String, String>> {
-
- private static final int YEARFILTER = 2007;
-
- /**
- * Filters for records of the visits relation where the year of visit is equal to a
- * specified value. The URL of all visit records passing the filter is emitted.
- *
- * Output Format:
- * 0: URL
- * 1: DATE
- */
- @Override
- public boolean filter(Tuple2<String, String> value) throws Exception {
- // Parse date string with the format YYYY-MM-DD and extract the year
- String dateString = value.f1;
- int year = Integer.parseInt(dateString.substring(0,4));
- return (year == YEARFILTER);
- }
- }
-
-
- /**
- * CoGroupFunction that realizes an anti-join.
- * If the first input does not provide any pairs, all pairs of the second input are emitted.
- * Otherwise, no pair is emitted.
- */
- public static class AntiJoinVisits implements CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, Tuple3<Integer, String, Integer>> {
-
- /**
- * If the visit iterator is empty, all pairs of the rank iterator are emitted.
- * Otherwise, no pair is emitted.
- *
- * Output Format:
- * 0: RANK
- * 1: URL
- * 2: AVG_DURATION
- */
- @Override
- public void coGroup(Iterable<Tuple3<Integer, String, Integer>> ranks, Iterable<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) {
- // Check if there is a entry in the visits relation
- if (!visits.iterator().hasNext()) {
- for (Tuple3<Integer, String, Integer> next : ranks) {
- // Emit all rank pairs
- out.collect(next);
- }
- }
- }
- }
-
- // *************************************************************************
- // UTIL METHODS
- // *************************************************************************
-
- private static boolean fileOutput = false;
- private static String documentsPath;
- private static String ranksPath;
- private static String visitsPath;
- private static String outputPath;
-
- private static boolean parseParameters(String[] args) {
-
- if(args.length > 0) {
- fileOutput = true;
- if(args.length == 4) {
- documentsPath = args[0];
- ranksPath = args[1];
- visitsPath = args[2];
- outputPath = args[3];
- } else {
- System.err.println("Usage: WebLogAnalysis <documents path> <ranks path> <visits path> <result path>");
- return false;
- }
- } else {
- System.out.println("Executing WebLog Analysis example with built-in default data.");
- System.out.println(" Provide parameters to read input data from files.");
- System.out.println(" See the documentation for the correct format of input files.");
- System.out.println(" We provide a data generator to create synthetic input files for this program.");
- System.out.println(" Usage: WebLogAnalysis <documents path> <ranks path> <visits path> <result path>");
- }
- return true;
- }
-
- private static DataSet<Tuple2<String, String>> getDocumentsDataSet(ExecutionEnvironment env) {
- // Create DataSet for documents relation (URL, Doc-Text)
- if(fileOutput) {
- return env.readCsvFile(documentsPath)
- .fieldDelimiter('|')
- .types(String.class, String.class);
- } else {
- return WebLogData.getDocumentDataSet(env);
- }
- }
-
- private static DataSet<Tuple3<Integer, String, Integer>> getRanksDataSet(ExecutionEnvironment env) {
- // Create DataSet for ranks relation (Rank, URL, Avg-Visit-Duration)
- if(fileOutput) {
- return env.readCsvFile(ranksPath)
- .fieldDelimiter('|')
- .types(Integer.class, String.class, Integer.class);
- } else {
- return WebLogData.getRankDataSet(env);
- }
- }
-
- private static DataSet<Tuple2<String, String>> getVisitsDataSet(ExecutionEnvironment env) {
- // Create DataSet for visits relation (URL, Date)
- if(fileOutput) {
- return env.readCsvFile(visitsPath)
- .fieldDelimiter('|')
- .includeFields("011000000")
- .types(String.class, String.class);
- } else {
- return WebLogData.getVisitDataSet(env);
- }
- }
-
-}