You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/21 14:13:30 UTC

[1/6] flink git commit: [FLINK-2743] Add XORShfitRandom and use it in RandomSamplers.

Repository: flink
Updated Branches:
  refs/heads/master 22510f0e2 -> 4c1cffd9d


[FLINK-2743] Add XORShfitRandom and use it in RandomSamplers.

This closes #1170


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5fb1c479
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5fb1c479
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5fb1c479

Branch: refs/heads/master
Commit: 5fb1c479f5e3f00917ebc2c623126b8966e46315
Parents: 22510f0
Author: chengxiang li <ch...@intel.com>
Authored: Wed Sep 23 18:08:06 2015 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 21 11:43:58 2015 +0200

----------------------------------------------------------------------
 .../flink/util/RandomGeneratorBenchmark.java    | 82 ++++++++++++++++++++
 flink-core/pom.xml                              |  2 +-
 .../org/apache/flink/util/XORShiftRandom.java   | 61 +++++++++++++++
 .../api/java/sampling/BernoulliSampler.java     |  5 +-
 .../flink/api/java/sampling/PoissonSampler.java |  5 +-
 .../ReservoirSamplerWithReplacement.java        |  5 +-
 .../ReservoirSamplerWithoutReplacement.java     |  5 +-
 7 files changed, 156 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5fb1c479/flink-benchmark/src/main/java/org/apache/flink/util/RandomGeneratorBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-benchmark/src/main/java/org/apache/flink/util/RandomGeneratorBenchmark.java b/flink-benchmark/src/main/java/org/apache/flink/util/RandomGeneratorBenchmark.java
new file mode 100644
index 0000000..65b2434
--- /dev/null
+++ b/flink-benchmark/src/main/java/org/apache/flink/util/RandomGeneratorBenchmark.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+public class RandomGeneratorBenchmark {
+
+	@BenchmarkMode(Mode.Throughput)
+	@Fork(1)
+	@State(Scope.Thread)
+	@OutputTimeUnit(TimeUnit.SECONDS)
+	public static abstract class AbstractRandomBench {
+		private final static long ITERATOR_NUMBER = 10000000;
+		protected Random random;
+
+		@Setup
+		public abstract void init();
+
+		@Benchmark
+		@Warmup(iterations = 5)
+		@Measurement(iterations = 5)
+		public void bench() {
+			for (int i = 0; i < ITERATOR_NUMBER; i++) {
+				random.nextInt();
+			}
+		}
+	}
+
+	public static class RandomBench extends AbstractRandomBench {
+		@Override
+		public void init() {
+			this.random = new Random(11);
+		}
+	}
+
+	public static class XORShiftRandomBench extends AbstractRandomBench {
+
+		@Override
+		public void init() {
+			this.random = new XORShiftRandom(11);
+		}
+	}
+
+	public static void main(String[] args) throws RunnerException {
+		Options opt = new OptionsBuilder().include(".*" + RandomGeneratorBenchmark.class.getSimpleName() +
+			".*").build();
+		new Runner(opt).run();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5fb1c479/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 7ff0fbc..94a4082 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -70,7 +70,7 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
-    </dependencies>
+	</dependencies>
 
 	<build>
 		<plugins>

http://git-wip-us.apache.org/repos/asf/flink/blob/5fb1c479/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java b/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java
new file mode 100644
index 0000000..fa68442
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/XORShiftRandom.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import com.google.common.hash.Hashing;
+
+import java.util.Random;
+
+/**
+ * Implement a random number generator based on the XORShift algorithm discovered by George Marsaglia.
+ * This RNG is observed 4.5 times faster than {@link java.util.Random} in benchmark, with the cost
+ * that abandon thread-safety. So it's recommended to create a new {@link XORShiftRandom} for each
+ * thread.
+ *
+ * @see <a href="http://www.jstatsoft.org/v08/i14/paper">XORShift Algorithm Paper</a>
+ */
+public class XORShiftRandom extends Random {
+
+	private long seed;
+
+	public XORShiftRandom() {
+		this(System.nanoTime());
+	}
+
+	public XORShiftRandom(long input) {
+		super(input);
+		this.seed = Hashing.murmur3_128().hashLong(input).asLong();
+	}
+
+	/**
+	 * All other methods like nextInt()/nextDouble()... depends on this, so we just need to overwrite
+	 * this.
+	 *
+	 * @param bits Random bits
+	 * @return The next pseudorandom value from this random number
+	 * generator's sequence
+	 */
+	@Override
+	public int next(int bits) {
+		long nextSeed = seed ^ (seed << 21);
+		nextSeed ^= (nextSeed >>> 35);
+		nextSeed ^= (nextSeed << 4);
+		seed = nextSeed;
+		return (int) (nextSeed & ((1L << bits) - 1));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5fb1c479/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
index 99ea5de..b9aef66 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java
@@ -18,6 +18,7 @@
 package org.apache.flink.api.java.sampling;
 
 import com.google.common.base.Preconditions;
+import org.apache.flink.util.XORShiftRandom;
 
 import java.util.Iterator;
 import java.util.Random;
@@ -44,7 +45,7 @@ public class BernoulliSampler<T> extends RandomSampler<T> {
 	 * @param fraction Sample fraction, aka the Bernoulli sampler possibility.
 	 */
 	public BernoulliSampler(double fraction) {
-		this(fraction, new Random());
+		this(fraction, new XORShiftRandom());
 	}
 	
 	/**
@@ -54,7 +55,7 @@ public class BernoulliSampler<T> extends RandomSampler<T> {
 	 * @param seed     Random number generator seed.
 	 */
 	public BernoulliSampler(double fraction, long seed) {
-		this(fraction, new Random(seed));
+		this(fraction, new XORShiftRandom(seed));
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/5fb1c479/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
index 8701167..e132882 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java
@@ -19,6 +19,7 @@ package org.apache.flink.api.java.sampling;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.math3.distribution.PoissonDistribution;
+import org.apache.flink.util.XORShiftRandom;
 
 import java.util.Iterator;
 import java.util.Random;
@@ -53,7 +54,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
 			this.poissonDistribution = new PoissonDistribution(fraction);
 			this.poissonDistribution.reseedRandomGenerator(seed);
 		}
-		this.random = new Random(seed);
+		this.random = new XORShiftRandom(seed);
 	}
 	
 	/**
@@ -67,7 +68,7 @@ public class PoissonSampler<T> extends RandomSampler<T> {
 		if (this.fraction > 0) {
 			this.poissonDistribution = new PoissonDistribution(fraction);
 		}
-		this.random = new Random();
+		this.random = new XORShiftRandom();
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/5fb1c479/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java
index 9c37154..634f60d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.java
@@ -18,6 +18,7 @@
 package org.apache.flink.api.java.sampling;
 
 import com.google.common.base.Preconditions;
+import org.apache.flink.util.XORShiftRandom;
 
 import java.util.Iterator;
 import java.util.PriorityQueue;
@@ -45,7 +46,7 @@ public class ReservoirSamplerWithReplacement<T> extends DistributedRandomSampler
 	 * @param numSamples Number of selected elements, must be non-negative.
 	 */
 	public ReservoirSamplerWithReplacement(int numSamples) {
-		this(numSamples, new Random());
+		this(numSamples, new XORShiftRandom());
 	}
 	
 	/**
@@ -55,7 +56,7 @@ public class ReservoirSamplerWithReplacement<T> extends DistributedRandomSampler
 	 * @param seed       Random number generator seed
 	 */
 	public ReservoirSamplerWithReplacement(int numSamples, long seed) {
-		this(numSamples, new Random(seed));
+		this(numSamples, new XORShiftRandom(seed));
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/5fb1c479/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java
index b953bff..139859b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.java
@@ -18,6 +18,7 @@
 package org.apache.flink.api.java.sampling;
 
 import com.google.common.base.Preconditions;
+import org.apache.flink.util.XORShiftRandom;
 
 import java.util.Iterator;
 import java.util.PriorityQueue;
@@ -60,7 +61,7 @@ public class ReservoirSamplerWithoutReplacement<T> extends DistributedRandomSamp
 	 * @param numSamples Maximum number of samples to retain in reservoir, must be non-negative.
 	 */
 	public ReservoirSamplerWithoutReplacement(int numSamples) {
-		this(numSamples, new Random());
+		this(numSamples, new XORShiftRandom());
 	}
 	
 	/**
@@ -71,7 +72,7 @@ public class ReservoirSamplerWithoutReplacement<T> extends DistributedRandomSamp
 	 */
 	public ReservoirSamplerWithoutReplacement(int numSamples, long seed) {
 		
-		this(numSamples, new Random(seed));
+		this(numSamples, new XORShiftRandom(seed));
 	}
 	
 	@Override


[6/6] flink git commit: [FLINK-2820] Configuration not passed to JobGraphGenerator

Posted by fh...@apache.org.
[FLINK-2820] Configuration not passed to JobGraphGenerator

This was previously reported as FLINK-2625 (commit 8a84937215ea575fa94a00d11c2517902d252756).
The Client class was concurrently refactored with FLINK-2097 (commit 71bf2f570861daae53b24bfcf1d06aedb85311b9).

This closes #1278


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4c1cffd9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4c1cffd9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4c1cffd9

Branch: refs/heads/master
Commit: 4c1cffd9d02d9ddaaf433a1882098c8423d97c28
Parents: 6666ea5
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Oct 20 12:13:10 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 21 13:41:54 2015 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/client/program/Client.java  | 6 +++---
 .../java/org/apache/flink/client/web/JobSubmissionServlet.java | 2 +-
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4c1cffd9/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 322c73d..1cc1a54 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -521,16 +521,16 @@ public class Client {
 		return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
 	}
 
-	public static JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
+	public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
 		return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths());
 	}
 
-	private static JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths) {
+	private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths) {
 		JobGraph job;
 		if (optPlan instanceof StreamingPlan) {
 			job = ((StreamingPlan) optPlan).getJobGraph();
 		} else {
-			JobGraphGenerator gen = new JobGraphGenerator();
+			JobGraphGenerator gen = new JobGraphGenerator(this.config);
 			job = gen.compileJobGraph((OptimizedPlan) optPlan);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4c1cffd9/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
index f83b97c..472c8d5 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
@@ -313,7 +313,7 @@ public class JobSubmissionServlet extends HttpServlet {
 			// submit the job
 			try {
 				Client client = new Client(GlobalConfiguration.getConfiguration());
-				client.runDetached(Client.getJobGraph(job.f0, job.f1), job.f0.getUserCodeClassLoader());
+				client.runDetached(client.getJobGraph(job.f0, job.f1), job.f0.getUserCodeClassLoader());
 			}
 			catch (Exception ex) {
 				LOG.error("Error submitting job to the job-manager.", ex);


[5/6] flink git commit: [FLINK-2876] Minutiae

Posted by fh...@apache.org.
[FLINK-2876] Minutiae

A collection of small documentation and grammar updates.

This closes #1277


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6666ea58
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6666ea58
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6666ea58

Branch: refs/heads/master
Commit: 6666ea58de3ef94032014d891c2de5671361a494
Parents: 7a959bc
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Oct 20 11:57:33 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 21 13:37:03 2015 +0200

----------------------------------------------------------------------
 docs/apis/dataset_transformations.md            | 47 +++++++++-----------
 docs/apis/programming_guide.md                  | 12 ++---
 .../api/common/accumulators/Accumulator.java    |  2 +-
 .../common/accumulators/AverageAccumulator.java |  4 +-
 .../api/common/typeutils/TypeComparator.java    |  4 +-
 .../relational/EmptyFieldsCountAccumulator.java |  5 +--
 .../flink/api/java/ExecutionEnvironment.java    |  2 +-
 .../translation/Tuple3UnwrappingIterator.java   |  2 +-
 .../plantranslate/JobGraphGenerator.java        |  2 +-
 .../apache/flink/runtime/client/JobClient.java  |  2 +-
 .../iomanager/AsynchronousFileIOChannel.java    |  2 +-
 .../task/IterationIntermediateTask.java         |  2 +-
 .../flink/runtime/operators/BatchTask.java      |  2 +-
 .../operators/hash/MutableHashTable.java        |  2 +-
 .../sort/PartialOrderPriorityQueue.java         |  2 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  2 +-
 pom.xml                                         |  2 +-
 17 files changed, 45 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6666ea58/docs/apis/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/apis/dataset_transformations.md b/docs/apis/dataset_transformations.md
index feed121..cfa138c 100644
--- a/docs/apis/dataset_transformations.md
+++ b/docs/apis/dataset_transformations.md
@@ -240,18 +240,6 @@ This problem can be overcome by hinting the return type of `project` operator li
 DataSet<Tuple1<String>> ds2 = ds.<Tuple1<String>>project(0).distinct(0);
 ~~~
 
-### Transformations on Grouped DataSet
-
-The reduce operations can operate on grouped data sets. Specifying the key to
-be used for grouping can be done in many ways:
-
-- key expressions
-- a key-selector function
-- one or more field position keys (Tuple DataSet only)
-- Case Class fields (Case Classes only)
-
-Please look at the reduce examples to see how the grouping keys are specified.
-
 </div>
 <div data-lang="python" markdown="1">
 
@@ -259,15 +247,20 @@ Please look at the reduce examples to see how the grouping keys are specified.
 out = in.project(2,0);
 ~~~
 
+</div>
+</div>
+
 ### Transformations on Grouped DataSet
 
 The reduce operations can operate on grouped data sets. Specifying the key to
-be used for grouping can be done using one or more field position keys (Tuple DataSet only).
+be used for grouping can be done in many ways:
 
-Please look at the reduce examples to see how the grouping keys are specified.
+- key expressions
+- a key-selector function
+- one or more field position keys (Tuple DataSet only)
+- Case Class fields (Case Classes only)
 
-</div>
-</div>
+Please look at the reduce examples to see how the grouping keys are specified.
 
 ### Reduce on Grouped DataSet
 
@@ -679,9 +672,10 @@ an alternative WordCount implementation. In the implementation,
 
 ~~~java
 DataSet<String> input = [..] // The words received as input
-DataSet<String> groupedInput = input.groupBy(0); // group identical words
 
-DataSet<Tuple2<String, Integer>> combinedWords = groupedInput.combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>() {
+DataSet<Tuple2<String, Integer>> combinedWords = input
+  .groupBy(0); // group identical words
+  .combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>() {
 
     public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine
         int count = 0;
@@ -692,9 +686,9 @@ DataSet<Tuple2<String, Integer>> combinedWords = groupedInput.combineGroup(new G
     }
 });
 
-DataSet<Tuple2<String, Integer>> groupedCombinedWords = combinedWords.groupBy(0); // group by words again
-
-DataSet<Tuple2<String, Integer>> output = combinedWords.reduceGroup(new GroupReduceFunction() { // group reduce with full data exchange
+DataSet<Tuple2<String, Integer>> output = combinedWords
+  .groupBy(0);                             // group by words again
+  .reduceGroup(new GroupReduceFunction() { // group reduce with full data exchange
 
     public void reduce(Iterable<Tuple2<String, Integer>>, Collector<Tuple2<String, Integer>>) {
         int count = 0;
@@ -711,9 +705,10 @@ DataSet<Tuple2<String, Integer>> output = combinedWords.reduceGroup(new GroupRed
 
 ~~~scala
 val input: DataSet[String] = [..] // The words received as input
-val groupedInput: DataSet[String] = input.groupBy(0)
 
-val combinedWords: DataSet[(String, Int)] = groupedInput.combineGroup {
+val combinedWords: DataSet[(String, Int)] = input
+  .groupBy(0)
+  .combineGroup {
     (words, out: Collector[(String, Int)]) =>
         var count = 0
         for (word <- words) {
@@ -722,9 +717,9 @@ val combinedWords: DataSet[(String, Int)] = groupedInput.combineGroup {
         out.collect(word, count)
 }
 
-val groupedCombinedWords: DataSet[(String, Int)] = combinedWords.groupBy(0)
-
-val output: DataSet[(String, Int)] = groupedInput.reduceGroup {
+val output: DataSet[(String, Int)] = combinedWords
+  .groupBy(0)
+  .reduceGroup {
     (words, out: Collector[(String, Int)]) =>
         var count = 0
         for ((word, Int) <- words) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6666ea58/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index a89f736..102b137 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -120,17 +120,17 @@ manually create the project, you can use the archetype and create a project by c
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight bash %}
-mvn archetype:generate /
-    -DarchetypeGroupId=org.apache.flink/
-    -DarchetypeArtifactId=flink-quickstart-java /
+mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-quickstart-java \
     -DarchetypeVersion={{site.version }}
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight bash %}
-mvn archetype:generate /
-    -DarchetypeGroupId=org.apache.flink/
-    -DarchetypeArtifactId=flink-quickstart-scala /
+mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-quickstart-scala \
     -DarchetypeVersion={{site.version }}
 {% endhighlight %}
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/6666ea58/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
index e49cc04..2ee5fa0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
@@ -25,7 +25,7 @@ import java.io.Serializable;
  * and operators. Each parallel instance creates and updates its own accumulator object,
  * and the different parallel instances of the accumulator are later merged.
  * merged by the system at the end of the job. The result can be obtained from the
- * result of a job execution, or from teh web runtime monitor.
+ * result of a job execution, or from the web runtime monitor.
  *
  * The accumulators are inspired by the Hadoop/MapReduce counters.
  * 

http://git-wip-us.apache.org/repos/asf/flink/blob/6666ea58/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
index 5ed3c26..0db1942 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AverageAccumulator.java
@@ -19,8 +19,8 @@
 package org.apache.flink.api.common.accumulators;
 
 /**
- * An accumulator that get the average values.
- * Input can be {@code long}, {@code integer}, {@code double} as the result is {@code double}.
+ * An accumulator that computes the average value.
+ * Input can be {@code long}, {@code integer}, or {@code double} and the result is {@code double}.
  */
 public class AverageAccumulator implements SimpleAccumulator<Double> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6666ea58/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
index d017694..3a545e4 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeComparator.java
@@ -38,7 +38,7 @@ import org.apache.flink.core.memory.MemorySegment;
  * Implementing classes are stateful, because several methods require to set one record as the reference for
  * comparisons and later comparing a candidate against it. Therefore, the classes implementing this interface are
  * not thread safe. The runtime will ensure that no instance is used twice in different threads, but will create
- * a copy for that purpose. It is hence imperative that the copied created by the {@link #duplicate()} method
+ * a copy for that purpose. It is hence imperative that the copies created by the {@link #duplicate()} method
  * share no state with the instance from which they were copied: They have to be deep copies.  
  *
  * @see java.lang.Object#hashCode()
@@ -60,7 +60,7 @@ public abstract class TypeComparator<T> implements Serializable {
 	 * results in a rather uniform value distribution.
 	 * However, any collisions produced by this method cannot be undone. While it is NOT
 	 * important to create hash codes that cover the full spectrum of bits in the integer, it IS important 
-	 * to avoid collisions when combining two value as good as possible.
+	 * to avoid collisions when combining two value as much as possible.
 	 * 
 	 * @param record The record to be hashed.
 	 * @return A hash value for the record.

http://git-wip-us.apache.org/repos/asf/flink/blob/6666ea58/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
index 7fd5799..e7ac474 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java
@@ -183,9 +183,8 @@ public class EmptyFieldsCountAccumulator {
 	}
 
 	/**
-	 * This accumulator lets you increase vector components distributedly. The {@link #add(Integer)} method lets you
-	 * increase the <i>n</i>-th vector component by 1, whereat <i>n</i> is the methods parameter. The size of the vector
-	 * is automatically managed.
+	 * This accumulator maintains a vector of counts. Calling {@link #add(Integer)} increments the
+	 * <i>n</i>-th vector component. The size of the vector is automatically managed.
 	 */
 	public static class VectorAccumulator implements Accumulator<Integer, ArrayList<Integer>> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6666ea58/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index aa7c0c4..283d6d4 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -791,7 +791,7 @@ public abstract class ExecutionEnvironment {
 	
 	/**
 	 * Creates a new data set that contains a sequence of numbers. The data set will be created in parallel,
-	 * so there is no guarantee about the oder of the elements.
+	 * so there is no guarantee about the order of the elements.
 	 * 
 	 * @param from The number to start at (inclusive).
 	 * @param to The number to stop at (inclusive).

http://git-wip-us.apache.org/repos/asf/flink/blob/6666ea58/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
index 7e054dd..2ee55bf 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/Tuple3UnwrappingIterator.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.TraversableOnceException;
 
 /**
- * An iterator that reads 3-tuples (groupKey, sortKey, value) and returns only the values (thrid field).
+ * An iterator that reads 3-tuples (groupKey, sortKey, value) and returns only the values (third field).
  * The iterator also tracks the groupKeys, as the triples flow though it.
  */
 public class Tuple3UnwrappingIterator<T, K1, K2> implements Iterator<T>, Iterable<T>, java.io.Serializable {

http://git-wip-us.apache.org/repos/asf/flink/blob/6666ea58/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index afd0682..c9140a5 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -150,7 +150,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 	
 	public JobGraphGenerator(Configuration config) {
 		this.defaultMaxFan = config.getInteger(ConfigConstants.DEFAULT_SPILLING_MAX_FAN_KEY, 
-				ConfigConstants.DEFAULT_SPILLING_MAX_FAN);
+			ConfigConstants.DEFAULT_SPILLING_MAX_FAN);
 		this.defaultSortSpillingThreshold = config.getFloat(ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD_KEY,
 			ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6666ea58/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 0105632..b908eb1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -190,7 +190,7 @@ public class JobClient {
 
 	/**
 	 * Submits a job in detached mode. The method sends the JobGraph to the
-	 * JobManager and waits for the answer whether teh job could be started or not.
+	 * JobManager and waits for the answer whether the job could be started or not.
 	 *
 	 * @param jobManagerGateway Gateway to the JobManager which will execute the jobs
 	 * @param jobGraph The job

http://git-wip-us.apache.org/repos/asf/flink/blob/6666ea58/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
index aefeddb..a41be64 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
@@ -467,7 +467,7 @@ final class FileSegmentReadRequest implements ReadRequest {
 
 			fileSegment = new FileSegment(fileChannel, position, length, isBuffer);
 
-			// Skip the binary dataa
+			// Skip the binary data
 			fileChannel.position(position + length);
 
 			hasReachedEndOfFile.set(fileChannel.size() - fileChannel.position() == 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/6666ea58/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
index 60f0dcf..0d266c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 
 /**
- * An intermediate iteration task, which runs a Driver}inside.
+ * An intermediate iteration task, which runs a {@link org.apache.flink.runtime.operators.PactDriver} inside.
  * <p>
  * It will propagate {@link EndOfSuperstepEvent}s and {@link TerminationEvent}s to it's connected tasks. Furthermore
  * intermediate tasks can also update the iteration state, either the workset or the solution set.

http://git-wip-us.apache.org/repos/asf/flink/blob/6666ea58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index 3f94109..c570458 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -1189,7 +1189,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 	/**
 	 * Creates the {@link Collector} for the given task, as described by the given configuration. The
 	 * output collector contains the writers that forward the data to the different tasks that the given task
-	 * is connected to. Each writer applies a the partitioning as described in the configuration.
+	 * is connected to. Each writer applies the partitioning as described in the configuration.
 	 *
 	 * @param task The task that the output collector is created for.
 	 * @param config The configuration describing the output shipping strategies.

http://git-wip-us.apache.org/repos/asf/flink/blob/6666ea58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index efaceea..0bf4433 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -49,7 +49,7 @@ import org.apache.flink.util.MutableObjectIterator;
  * spilling contents to disk, when the memory is not sufficient. It does not need to know a priori 
  * how large the input will be.
  * 
- * <p>The design of this class follows on many parts the design presented in
+ * <p>The design of this class follows in many parts the design presented in
  * "Hash joins and hash teams in Microsoft SQL Server", by Goetz Graefe et al. In its current state, the
  * implementation lacks features like dynamic role reversal, partition tuning, or histogram guided partitioning.</p>
  * 

http://git-wip-us.apache.org/repos/asf/flink/blob/6666ea58/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/PartialOrderPriorityQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/PartialOrderPriorityQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/PartialOrderPriorityQueue.java
index 8fd9188..82d3868 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/PartialOrderPriorityQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/PartialOrderPriorityQueue.java
@@ -33,7 +33,7 @@ import java.util.Queue;
 
 /**
  * This class implements a priority-queue, which maintains a partial
- * ordering of its elements such that the+ least element can always be found
+ * ordering of its elements such that the least element can always be found
  * in constant time. Put()'s and pop()'s require log(size) time.
  * 
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/6666ea58/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index d9d9596..41533ea 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1943,7 +1943,7 @@ object TaskManager {
    * directories (not files), and are writable.
    *
    * @param tmpDirs The array of directory paths to check.
-   * @throws Exception Thrown if any of the directories doe not exist or is not writable
+   * @throws Exception Thrown if any of the directories does not exist or is not writable
    *                   or is a file, rather than a directory.
    */
   @throws(classOf[IOException])

http://git-wip-us.apache.org/repos/asf/flink/blob/6666ea58/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 00952c7..cf1a917 100644
--- a/pom.xml
+++ b/pom.xml
@@ -810,7 +810,7 @@ under the License.
 				<configuration>
 					<source>1.7</source>
 					<target>1.7</target>
-					<!-- The output of Xlint is not show by default, but we activate it for the QA bot
+					<!-- The output of Xlint is not shown by default, but we activate it for the QA bot
 					to be able to get more warnings -->
 					<compilerArgument>-Xlint:all</compilerArgument>
 				</configuration>


[2/6] flink git commit: [docs] Fix documentation for building Flink with Scala 2.11 or 2.10

Posted by fh...@apache.org.
[docs] Fix documentation for building Flink with Scala 2.11 or 2.10

This closes #1260


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3cabe7c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3cabe7c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3cabe7c

Branch: refs/heads/master
Commit: e3cabe7c715cc0ddc1ed8b65ca03b0701928ebcf
Parents: 5fb1c47
Author: Alexander Alexandrov <al...@gmail.com>
Authored: Fri Oct 16 00:21:23 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 21 11:45:01 2015 +0200

----------------------------------------------------------------------
 docs/apis/programming_guide.md |  2 +-
 docs/index.md                  |  2 +-
 docs/setup/building.md         | 16 +++++++++++-----
 3 files changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3cabe7c/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index aa88ce9..a89f736 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -197,7 +197,7 @@ to run your program on Flink with Scala 2.11, you need to add a `_2.11` suffix t
 values of the Flink modules in your dependencies section.
 
 If you are looking for building Flink with Scala 2.11, please check
-[build guide](../setup/building.html#build-flink-for-scala-211).
+[build guide](../setup/building.html#build-flink-for-a-specific-scala-version).
 
 #### Hadoop Dependency Versions
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3cabe7c/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 31aaa2b..748b441 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -64,7 +64,7 @@ You can download the latest pre-built snapshot version from the [downloads]({{ s
 <!--The Scala API uses Scala {{ site.scala_version }}. Please make sure to use a compatible version.
 
 The Scala API uses Scala 2.10, but you can use the API with Scala 2.11. To use Flink with
-Scala 2.11, please check [build guide](/setup/building.html#build-flink-for-scala-211)
+Scala 2.11, please check [build guide](/setup/building.html#build-flink-for-a-specific-scala-version)
 and [programming guide](/apis/programming_guide.html#scala-dependency-versions).-->
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3cabe7c/docs/setup/building.md
----------------------------------------------------------------------
diff --git a/docs/setup/building.md b/docs/setup/building.md
index 2f51da6..01600f9 100644
--- a/docs/setup/building.md
+++ b/docs/setup/building.md
@@ -103,19 +103,25 @@ So if you are building Flink for Hadoop `2.0.0-alpha`, use the following command
 Flink has APIs, libraries, and runtime modules written in [Scala](http://scala-lang.org). Users of the Scala API and libraries may have to match the Scala version of Flink with the Scala version
 of their projects (because Scala is not strictly backwards compatible).
 
-By default, Flink is built with Scala *2.10*. To build Flink with Scala *2.11*, append the `-Dscala-2.11` option to your build command:
+By default, Flink is built with the Scala *2.10*. To build Flink with Scala *2.11*, you need to change the default Scala *binary version* with a build script:
 
 ~~~bash
-mvn clean install -DskipTests -Dscala-2.11
+# Switch Scala binary version between 2.10 and 2.11
+tools/change-scala-version.sh 2.11
+# Build and install locally
+mvn clean install -DskipTests
 ~~~
 
-To build against custom Scala versions, you need to supply the *language version* and the *binary version* as properties to the build:
+To build against custom Scala versions, you need to switch to the appropriate binary version and supply the *language version* as additional build property. For example, to buid against Scala 2.11.4, you have to execute:
 
 ~~~bash
-mvn clean install -DskipTests -Dscala-2.11 -Dscala.version=2.11.4 -Dscala.binary.version=2.11
+# Switch Scala binary version to 2.11
+tools/change-scala-version.sh 2.11
+# Build with custom Scala version 2.11.4
+mvn clean install -DskipTests -Dscala.version=2.11.4
 ~~~
 
-Flink is developed against Scala *2.10*, and tested additionally against Scala *2.11*. These two versions are known to be compatible. Earlier versions (like Scala *2.9*) are *not* compatible.
+Flink is developed against Scala *2.10* and tested additionally against Scala *2.11*. These two versions are known to be compatible. Earlier versions (like Scala *2.9*) are *not* compatible.
 
 Newer versions may be compatible, depending on breaking changes in the language features used by Flink, and the availability of Flink's dependencies in those Scala versions. The dependencies written in Scala include for example *Kafka*, *Akka*, *Scalatest*, and *scopt*.
 


[3/6] flink git commit: [FLINK-2834] Global round-robin for temporary directories

Posted by fh...@apache.org.
[FLINK-2834] Global round-robin for temporary directories

Multiple TaskManager filesystems can be used by configuring multiple temporary directories.
This patch changes the process of spilling files from a per-operator round-robin to a global
round-robin such that each directory is written to in turn across all operators, reducing
unbalanced I/O due to bunching.

This closes #1272


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dd3264e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dd3264e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dd3264e6

Branch: refs/heads/master
Commit: dd3264e6360130c935267a13ff23db9ec3128d18
Parents: e3cabe7
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Oct 20 10:47:58 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 21 11:46:06 2015 +0200

----------------------------------------------------------------------
 .../runtime/io/disk/iomanager/FileIOChannel.java    | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dd3264e6/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
index f9ee90c..fd8e8e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/FileIOChannel.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.flink.util.StringUtils;
 
@@ -153,21 +154,26 @@ public interface FileIOChannel {
 	 */
 	public static final class Enumerator {
 
+		private static AtomicInteger globalCounter = new AtomicInteger();
+
 		private final File[] paths;
-		
+
 		private final String namePrefix;
 
-		private int counter;
+		private int localCounter;
 
 		protected Enumerator(File[] basePaths, Random random) {
 			this.paths = basePaths;
 			this.namePrefix = ID.randomString(random);
-			this.counter = 0;
+			this.localCounter = 0;
 		}
 
 		public ID next() {
-			int threadNum = counter % paths.length;
-			String filename = String.format("%s.%06d.channel", namePrefix, (counter++));
+			// The local counter is used to increment file names while the global counter is used
+			// for indexing the directory and associated read and write threads. This performs a
+			// round-robin among all spilling operators and avoids I/O bunching.
+			int threadNum = globalCounter.getAndIncrement() % paths.length;
+			String filename = String.format("%s.%06d.channel", namePrefix, (localCounter++));
 			return new ID(new File(paths[threadNum], filename), threadNum);
 		}
 	}


[4/6] flink git commit: [FLINK-2880] [streaming] Allow DeserializationSchema to forward exceptions.

Posted by fh...@apache.org.
[FLINK-2880] [streaming] Allow DeserializationSchema to forward exceptions.

This closes #1275


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a959bcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a959bcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a959bcb

Branch: refs/heads/master
Commit: 7a959bcb405e0e4e6655dc87824c4ab27da317c2
Parents: dd3264e
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Oct 20 19:17:59 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Oct 21 11:52:39 2015 +0200

----------------------------------------------------------------------
 .../streaming/connectors/kafka/KafkaConsumerTestBase.java     | 7 +++++--
 .../streaming/util/serialization/DeserializationSchema.java   | 3 ++-
 2 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a959bcb/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index e9a5728..ffb6818 100644
--- a/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -76,6 +76,7 @@ import org.junit.Assert;
 import org.junit.Rule;
 import scala.collection.Seq;
 
+import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1012,7 +1013,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 
 	private static void printTopic(String topicName, ConsumerConfig config,
 								DeserializationSchema<?> deserializationSchema,
-								int stopAfter) {
+								int stopAfter) throws IOException {
 
 		List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName, config, stopAfter);
 		LOG.info("Printing contents of topic {} in consumer grouo {}", topicName, config.groupId());
@@ -1023,7 +1024,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		}
 	}
 
-	private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer) {
+	private static void printTopic(String topicName, int elements,DeserializationSchema<?> deserializer) 
+			throws IOException
+	{
 		// write the sequence to log for debugging purposes
 		Properties stdProps = standardCC.props().props();
 		Properties newProps = new Properties(stdProps);

http://git-wip-us.apache.org/repos/asf/flink/blob/7a959bcb/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
index f0e4477..cd25e83 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.util.serialization;
 
+import java.io.IOException;
 import java.io.Serializable;
 
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -36,7 +37,7 @@ public interface DeserializationSchema<T> extends Serializable, ResultTypeQuerya
 	 * @param message The message, as a byte array.
 	 * @return The deserialized message as an object.
 	 */
-	T deserialize(byte[] message);
+	T deserialize(byte[] message) throws IOException;
 
 	/**
 	 * Method to decide whether the element signals the end of the stream. If