You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/09/10 01:42:18 UTC

[1/9] git commit: Update LICENSE and add license headers to CSS and JS in flink clients

Repository: incubator-flink
Updated Branches:
  refs/heads/release-0.6.1 828407d58 -> 9a7d3d49f


Update LICENSE and add license headers to CSS and JS in flink clients

Conflicts:
	LICENSE
	flink-dist/src/main/flink-bin/LICENSE


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/f82acce6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/f82acce6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/f82acce6

Branch: refs/heads/release-0.6.1
Commit: f82acce6145e95196dad6a7a6dea6abc2a324b3b
Parents: 828407d
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Aug 19 12:07:03 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Sep 10 00:43:23 2014 +0200

----------------------------------------------------------------------
 LICENSE                               | 179 ++++++++++++++++++++++++++++-
 flink-dist/src/main/flink-bin/LICENSE |  43 ++++++-
 2 files changed, 213 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f82acce6/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index 7e3aff2..3604605 100644
--- a/LICENSE
+++ b/LICENSE
@@ -205,10 +205,53 @@
 
 Apache Flink subcomponents:
 
-The Apache Flink project contains subcomponents in the source code
-release with separate copyright notices and license terms. Your use of
-the source code for the these subcomponents is subject to the terms and
-conditions of their respective licenses.
+The Apache Flink project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of their respective
+licenses.
+
+
+-----------------------------------------------------------------------
+ Apache License (Version 2.0)
+-----------------------------------------------------------------------
+
+The Apache Flink project depends on and/or bundles the following components
+under the Apache License (v 2.0):
+
+ - Apache Commons Logging (http://commons.apache.org/proper/commons-logging/)
+ - Apache Commons Codec (http://commons.apache.org/proper/commons-codec/)
+ - Apache Commons CLI (http://commons.apache.org/cli/)
+ - Apache Commons FileUpload (http://commons.apache.org/fileupload/)
+ - Apache Commons IO (http://commons.apache.org/io/)
+ - Apache Log4J (http://logging.apache.org/log4j/1.2/)
+ - Apache Avro (http://avro.apache.org)
+ - Apache Hadoop (http://hadoop.apache.org)
+ - Apache Derby (http://db.apache.org/derby/)
+ - Apache Kafka (http://kafka.apache.org)
+ - Apache Flume (http://flume.apache.org)
+ - Google Guava (https://code.google.com/p/guava-libraries/)
+ - Netty (http://netty.io)
+ - Powermock (http://www.powermock.org)
+ - Javassist (http://www.javassist.org)
+ - Jetty Web Container (http://www.eclipse.org/jetty/)
+ - Amazon Web Services SDK for Java (http://aws.amazon.com/sdkforjava/)
+ - ScalaTest (http://www.scalatest.org)
+ - StartBootstrap (http://startbootstrap.com)
+ - CHAP Links Library Timeline (http://almende.github.io/chap-links-library/)
+ - Twitter Hosebird Client (hbc) (https://github.com/twitter/hbc)
+
+
+-----------------------------------------------------------------------
+ Eclipse Public License  - v 1.0
+-----------------------------------------------------------------------
+
+The Apache Flink project depends on and/or bundles the following components
+under the Eclipse Public License (v 1.0)
+
+ - JUnit (http://junit.org/)
+ 
+You may obtain a copy of the Eclipse Public License (v 1.0) at
+https://www.eclipse.org/legal/epl-v10.html
 
 
 -----------------------------------------------------------------------
@@ -216,8 +259,10 @@ conditions of their respective licenses.
 -----------------------------------------------------------------------
 
 The Apache Flink project depends on and/or bundles the following components
-under the MIT License
+under the The MIT License
 
+ - Mockito (http://www.mockito.org) - Copyright (c) 2007 Mockito contributors
+ - SLF4J (http://www.slf4j.org) - Copyright (c) 2004-2013 QOS.ch
  - jQuery 1.4.2 (http://jquery.com) - Copyright 2014 jQuery Foundation and other contributors
  - jCanvas 13.11.21 (http://calebevans.me/projects/jcanvas/) - Copyright 2014 Caleb Evans
  - Flot 0.8.1 (http://www.flotcharts.org) - Copyright (c) 2007-2013 IOLA and Ole Laursen
@@ -251,9 +296,27 @@ THE SOFTWARE.
  BSD-style Licenses
 -----------------------------------------------------------------------
 
+The Apache Flink project depends on and/or bundles the following components
+under BSD-style licenses
+
+(BSD-style License)
+ - Hamcrest (https://code.google.com/p/hamcrest/) - Copyright (c) 2000-2006, www.hamcrest.org
+ 
+(New BSD License)
+ - Kryo (https://github.com/EsotericSoftware/kryo) - Copyright (c) 2008, Nathan Sweet
+ 
+(BSD-like License)
+ - Scala Library (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
+ - Scala Compiler (BSD-like) - (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
+ - Scala Compiler Reflect (BSD-like) - (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
+ - ASM (BSD-like) - (http://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom
+
 (3-clause BSD license)
  - D3 (http://d3js.org/) - Copyright (c) 2010-2014, Michael Bostock
 
+
+(Below is the 3-clause BSD license)
+
 All rights reserved.
 
 Redistribution and use in source and binary forms, with or without modification,
@@ -283,12 +346,116 @@ POSSIBILITY OF SUCH DAMAGE.
 
 
 -----------------------------------------------------------------------
+ Mozilla Public License  - v 1.1
+-----------------------------------------------------------------------
+
+The Apache Flink project depends on and/or bundles the following components
+under the Mozilla Public License (v 1.1)
+
+ - RabbitMQ (http://www.rabbitmq.com) 
+      The Initial Developer of the Original Code is GoPivotal,Ltd.
+      Copyright (c) 2007-2013 GoPivotal, Inc.  All Rights Reserved.
+ 
+You may obtain a copy of the Mozilla Public License (v 1.1) at
+http://www.mozilla.org/MPL/
+
+
+-----------------------------------------------------------------------
  The Open Font License
 -----------------------------------------------------------------------
 
-The Apache Flink project bundles the following fonts under the
+The Apache Flink project packages the following fonts under the
 Open Font License (OFT) - http://scripts.sil.org/OFL/
 
  - Font Awesome (http://fortawesome.github.io/Font-Awesome/) - Created by Dave Gandy
    -> fonts in "flink-runtime/resources/web-docs-infoserver/font-awesome/fonts"
 
+
+-----------------------------------------------------------------------
+For Apache Hadoop Subcomponents
+-----------------------------------------------------------------------
+
+The Apache Hadoop project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses. 
+
+For the org.apache.hadoop.util.bloom.* classes:
+
+/**
+ *
+ * Copyright (c) 2005, European Commission project OneLab under contract
+ * 034819 (http://www.one-lab.org)
+ * All rights reserved.
+ * Redistribution and use in source and binary forms, with or 
+ * without modification, are permitted provided that the following 
+ * conditions are met:
+ *  - Redistributions of source code must retain the above copyright 
+ *    notice, this list of conditions and the following disclaimer.
+ *  - Redistributions in binary form must reproduce the above copyright 
+ *    notice, this list of conditions and the following disclaimer in 
+ *    the documentation and/or other materials provided with the distribution.
+ *  - Neither the name of the University Catholique de Louvain - UCL
+ *    nor the names of its contributors may be used to endorse or 
+ *    promote products derived from this software without specific prior 
+ *    written permission.
+ *    
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
+ * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
+ * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+For portions of the native implementation of slicing-by-8 CRC calculation
+in src/main/native/src/org/apache/hadoop/util:
+
+/**
+ *   Copyright 2008,2009,2010 Massachusetts Institute of Technology.
+ *   All rights reserved. Use of this source code is governed by a
+ *   BSD-style license that can be found in the LICENSE file.
+ */
+
+For src/main/native/src/org/apache/hadoop/io/compress/lz4/{lz4.h,lz4.c,
+lz4_encoder.h,lz4hc.h,lz4hc.c,lz4hc_encoder.h},
+
+/*
+   LZ4 - Fast LZ compression algorithm
+   Header File
+   Copyright (C) 2011-2013, Yann Collet.
+   BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
+
+   Redistribution and use in source and binary forms, with or without
+   modification, are permitted provided that the following conditions are
+   met:
+
+       * Redistributions of source code must retain the above copyright
+   notice, this list of conditions and the following disclaimer.
+       * Redistributions in binary form must reproduce the above
+   copyright notice, this list of conditions and the following disclaimer
+   in the documentation and/or other materials provided with the
+   distribution.
+
+   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+   You can contact the author at :
+   - LZ4 homepage : http://fastcompression.blogspot.com/p/lz4.html
+   - LZ4 source repository : http://code.google.com/p/lz4/
+*/

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/f82acce6/flink-dist/src/main/flink-bin/LICENSE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE
index d7d203f..3604605 100644
--- a/flink-dist/src/main/flink-bin/LICENSE
+++ b/flink-dist/src/main/flink-bin/LICENSE
@@ -227,6 +227,8 @@ under the Apache License (v 2.0):
  - Apache Avro (http://avro.apache.org)
  - Apache Hadoop (http://hadoop.apache.org)
  - Apache Derby (http://db.apache.org/derby/)
+ - Apache Kafka (http://kafka.apache.org)
+ - Apache Flume (http://flume.apache.org)
  - Google Guava (https://code.google.com/p/guava-libraries/)
  - Netty (http://netty.io)
  - Powermock (http://www.powermock.org)
@@ -236,6 +238,20 @@ under the Apache License (v 2.0):
  - ScalaTest (http://www.scalatest.org)
  - StartBootstrap (http://startbootstrap.com)
  - CHAP Links Library Timeline (http://almende.github.io/chap-links-library/)
+ - Twitter Hosebird Client (hbc) (https://github.com/twitter/hbc)
+
+
+-----------------------------------------------------------------------
+ Eclipse Public License  - v 1.0
+-----------------------------------------------------------------------
+
+The Apache Flink project depends on and/or bundles the following components
+under the Eclipse Public License (v 1.0)
+
+ - JUnit (http://junit.org/)
+ 
+You may obtain a copy of the Eclipse Public License (v 1.0) at
+https://www.eclipse.org/legal/epl-v10.html
 
 
 -----------------------------------------------------------------------
@@ -245,6 +261,7 @@ under the Apache License (v 2.0):
 The Apache Flink project depends on and/or bundles the following components
 under the The MIT License
 
+ - Mockito (http://www.mockito.org) - Copyright (c) 2007 Mockito contributors
  - SLF4J (http://www.slf4j.org) - Copyright (c) 2004-2013 QOS.ch
  - jQuery 1.4.2 (http://jquery.com) - Copyright 2014 jQuery Foundation and other contributors
  - jCanvas 13.11.21 (http://calebevans.me/projects/jcanvas/) - Copyright 2014 Caleb Evans
@@ -281,17 +298,22 @@ THE SOFTWARE.
 
 The Apache Flink project depends on and/or bundles the following components
 under BSD-style licenses
+
+(BSD-style License)
+ - Hamcrest (https://code.google.com/p/hamcrest/) - Copyright (c) 2000-2006, www.hamcrest.org
  
-[3-clause BSD license]
+(New BSD License)
  - Kryo (https://github.com/EsotericSoftware/kryo) - Copyright (c) 2008, Nathan Sweet
- - D3 (http://d3js.org/) - Copyright (c) 2010-2014, Michael Bostock
  
-[BSD-like License]
+(BSD-like License)
  - Scala Library (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
  - Scala Compiler (BSD-like) - (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
  - Scala Compiler Reflect (BSD-like) - (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
  - ASM (BSD-like) - (http://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom
 
+(3-clause BSD license)
+ - D3 (http://d3js.org/) - Copyright (c) 2010-2014, Michael Bostock
+
 
 (Below is the 3-clause BSD license)
 
@@ -324,6 +346,21 @@ POSSIBILITY OF SUCH DAMAGE.
 
 
 -----------------------------------------------------------------------
+ Mozilla Public License  - v 1.1
+-----------------------------------------------------------------------
+
+The Apache Flink project depends on and/or bundles the following components
+under the Mozilla Public License (v 1.1)
+
+ - RabbitMQ (http://www.rabbitmq.com) 
+      The Initial Developer of the Original Code is GoPivotal,Ltd.
+      Copyright (c) 2007-2013 GoPivotal, Inc.  All Rights Reserved.
+ 
+You may obtain a copy of the Mozilla Public License (v 1.1) at
+http://www.mozilla.org/MPL/
+
+
+-----------------------------------------------------------------------
  The Open Font License
 -----------------------------------------------------------------------
 


[8/9] git commit: Fix DeltaIteration Bug with Immutable Types

Posted by uc...@apache.org.
Fix DeltaIteration Bug with Immutable Types

The CompactingHashTable did not obey the new serialization contract that
allows for immutable objects. We must always use the return value of the
deserialization methods and cannot assume that the data was put into the
reuse object.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/46148f99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/46148f99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/46148f99

Branch: refs/heads/release-0.6.1
Commit: 46148f990d91d0205e961957c339b6c915681d37
Parents: 0644740
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 4 17:49:22 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Sep 10 00:49:51 2014 +0200

----------------------------------------------------------------------
 .../CoGroupWithSolutionSetFirstDriver.java      |  5 +-
 .../CoGroupWithSolutionSetSecondDriver.java     |  5 +-
 .../JoinWithSolutionSetFirstDriver.java         |  7 +--
 .../JoinWithSolutionSetSecondDriver.java        |  7 +--
 .../operators/hash/AbstractHashTableProber.java |  2 +-
 .../operators/hash/CompactingHashTable.java     | 16 ++---
 .../operators/hash/InMemoryPartition.java       |  4 +-
 .../hash/HashTablePerformanceComparison.java    | 10 ++--
 .../operators/hash/MemoryHashTableTest.java     | 62 +++++++++-----------
 9 files changed, 54 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/46148f99/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
index 8590b78..912cbb6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
@@ -136,8 +136,9 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
 		
 		while (this.running && probeSideInput.nextKey()) {
 			IT2 current = probeSideInput.getCurrent();
-			
-			if (prober.getMatchFor(current, buildSideRecord)) {
+
+			buildSideRecord = prober.getMatchFor(current, buildSideRecord);
+			if (buildSideRecord != null) {
 				siIter.set(buildSideRecord);
 				coGroupStub.coGroup(siIter, probeSideInput.getValues(), collector);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/46148f99/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
index b3c0ece..a6b747a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
@@ -135,8 +135,9 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
 		
 		while (this.running && probeSideInput.nextKey()) {
 			IT1 current = probeSideInput.getCurrent();
-			
-			if (prober.getMatchFor(current, buildSideRecord)) {
+
+			buildSideRecord = prober.getMatchFor(current, buildSideRecord);
+			if (buildSideRecord != null) {
 				siIter.set(buildSideRecord);
 				coGroupStub.coGroup(probeSideInput.getValues(), siIter, collector);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/46148f99/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
index 342f307..2735fd5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
@@ -137,11 +137,8 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP
 			
 		final CompactingHashTable<IT1>.HashTableProber<IT2> prober = join.getProber(probeSideComparator, pairComparator);
 		while (this.running && ((probeSideRecord = probeSideInput.next(probeSideRecord)) != null)) {
-			if (prober.getMatchFor(probeSideRecord, buildSideRecord)) {
-				joinFunction.join(buildSideRecord, probeSideRecord, collector);
-			} else {
-				joinFunction.join(null, probeSideRecord, collector);
-			}
+			buildSideRecord = prober.getMatchFor(probeSideRecord, buildSideRecord);
+			joinFunction.join(buildSideRecord, probeSideRecord, collector);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/46148f99/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
index c38a81a..2d834b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
@@ -137,11 +137,8 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable
 			
 		final CompactingHashTable<IT2>.HashTableProber<IT1> prober = join.getProber(probeSideComparator, pairComparator);
 		while (this.running && ((probeSideRecord = probeSideInput.next(probeSideRecord)) != null)) {
-			if (prober.getMatchFor(probeSideRecord, buildSideRecord)) {
-				joinFunction.join(probeSideRecord, buildSideRecord, collector);
-			} else {
-				joinFunction.join(probeSideRecord, null, collector);
-			}
+			buildSideRecord = prober.getMatchFor(probeSideRecord, buildSideRecord);
+			joinFunction.join(probeSideRecord, buildSideRecord, collector);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/46148f99/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractHashTableProber.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractHashTableProber.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractHashTableProber.java
index 642f7fd..3ecd911 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractHashTableProber.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/AbstractHashTableProber.java
@@ -38,7 +38,7 @@ public abstract class AbstractHashTableProber<PT, BT> {
 		this.pairComparator = pairComparator;
 	}
 	
-	public abstract boolean getMatchFor(PT probeSideRecord, BT targetForMatch);
+	public abstract BT getMatchFor(PT probeSideRecord, BT targetForMatch);
 	
 	public abstract void updateMatch(BT record) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/46148f99/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index 239786d..bfef3d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -462,7 +462,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 					
 					// deserialize the key to check whether it is really equal, or whether we had only a hash collision
 					try {
-						partition.readRecordAt(pointer, tempHolder);
+						tempHolder = partition.readRecordAt(pointer, tempHolder);
 						if (this.buildSideComparator.equalToReference(tempHolder)) {
 							long newPointer = partition.appendRecord(record);
 							bucket.putLong(pointerOffset, newPointer);
@@ -1115,7 +1115,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 				while (true) {
 					while (numInSegment < countInSegment) {
 						pointer = segment.getLong(pointerOffset);
-						partition.readRecordAt(pointer, tempHolder);
+						tempHolder = partition.readRecordAt(pointer, tempHolder);
 						pointer = this.compactionMemory.appendRecord(tempHolder);
 						segment.putLong(pointerOffset, pointer);
 						pointerOffset += POINTER_LEN;
@@ -1267,7 +1267,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 					numInSegment++;
 					T target = table.buildSideSerializer.createInstance();
 					try {
-						partition.readRecordAt(pointer, target);
+						target = partition.readRecordAt(pointer, target);
 						cache.add(target);
 					} catch (IOException e) {
 							throw new RuntimeException("Error deserializing record from the Hash Table: " + e.getMessage(), e);
@@ -1311,9 +1311,9 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 			super(probeTypeComparator, pairComparator);
 		}
 		
-		public boolean getMatchFor(PT probeSideRecord, T targetForMatch) {
+		public T getMatchFor(PT probeSideRecord, T targetForMatch) {
 			if(closed.get()) {
-				return false;
+				return null;
 			}
 			final int searchHashCode = hash(this.probeTypeComparator.hash(probeSideRecord));
 			
@@ -1351,13 +1351,13 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 						
 						// deserialize the key to check whether it is really equal, or whether we had only a hash collision
 						try {
-							p.readRecordAt(pointer, targetForMatch);
+							targetForMatch = p.readRecordAt(pointer, targetForMatch);
 							
 							if (this.pairComparator.equalToReference(targetForMatch)) {
 								this.partition = p;
 								this.bucket = bucket;
 								this.pointerOffsetInBucket = pointerOffset;
-								return true;
+								return targetForMatch;
 							}
 						}
 						catch (IOException e) {
@@ -1372,7 +1372,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T>{
 				// this segment is done. check if there is another chained bucket
 				final long forwardPointer = bucket.getLong(bucketInSegmentOffset + HEADER_FORWARD_OFFSET);
 				if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) {
-					return false;
+					return null;
 				}
 				
 				final int overflowSegNum = (int) (forwardPointer >>> 32);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/46148f99/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
index b90ca1a..a46842f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/InMemoryPartition.java
@@ -231,9 +231,9 @@ public class InMemoryPartition<T> {
 		}
 	}
 	
-	public void readRecordAt(long pointer, T record) throws IOException {
+	public T readRecordAt(long pointer, T reuse) throws IOException {
 		this.readView.setReadPosition(pointer);
-		this.serializer.deserialize(record, this.readView);
+		return this.serializer.deserialize(reuse, this.readView);
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/46148f99/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
index 3996bf9..9bbf123 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.runtime.operators.hash;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.util.ArrayList;
 import java.util.List;
 
@@ -43,6 +39,8 @@ import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
 import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 public class HashTablePerformanceComparison {
 		
 	private static final int PAGE_SIZE = 16 * 1024;
@@ -96,7 +94,7 @@ public class HashTablePerformanceComparison {
 			AbstractHashTableProber<IntPair, IntPair> prober = table.getProber(comparator, pairComparator);
 			IntPair temp = new IntPair();
 			while(probeTester.next(target) != null) {
-				assertTrue(prober.getMatchFor(target, temp));
+				assertNotNull(prober.getMatchFor(target, temp));
 				assertEquals(temp.getValue(), target.getValue());
 			}
 			end = System.currentTimeMillis();
@@ -114,7 +112,7 @@ public class HashTablePerformanceComparison {
 			System.out.println("Starting second probing run...");
 			start = System.currentTimeMillis();
 			while (updateTester.next(target) != null) {
-				assertTrue(prober.getMatchFor(target, temp));
+				assertNotNull(prober.getMatchFor(target, temp));
 				assertEquals(target.getValue(), temp.getValue());
 			}
 			end = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/46148f99/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
index b644da1..ce9e469 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MemoryHashTableTest.java
@@ -18,12 +18,6 @@
 
 package org.apache.flink.runtime.operators.hash;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
@@ -54,6 +48,8 @@ import org.apache.flink.util.MutableObjectIterator;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import static org.junit.Assert.*;
+
 
 public class MemoryHashTableTest {
 	
@@ -134,7 +130,7 @@ public class MemoryHashTableTest {
 			IntPair target = new IntPair();
 			
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(prober.getMatchFor(pairs[i], target));
+				assertNotNull(prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 			
@@ -197,8 +193,8 @@ public class MemoryHashTableTest {
 			
 			IntList target = new IntList();
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(pairProber.getMatchFor(pairs[i], target));
-				assertTrue(listProber.getMatchFor(lists[i], target));
+				assertNotNull(pairProber.getMatchFor(pairs[i], target));
+				assertNotNull(listProber.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			table.close();
@@ -232,7 +228,7 @@ public class MemoryHashTableTest {
 			IntList target = new IntList();
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(prober.getMatchFor(lists[i], target));
+				assertNotNull(prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			
@@ -245,7 +241,7 @@ public class MemoryHashTableTest {
 			}
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue("" + i, prober.getMatchFor(overwriteLists[i], target));
+				assertNotNull("" + i, prober.getMatchFor(overwriteLists[i], target));
 				assertArrayEquals(overwriteLists[i].getValue(), target.getValue());
 			}
 			
@@ -275,7 +271,7 @@ public class MemoryHashTableTest {
 			IntList target = new IntList();
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(prober.getMatchFor(lists[i], target));
+				assertNotNull(prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			
@@ -291,7 +287,7 @@ public class MemoryHashTableTest {
 			}
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue("" + i, prober.getMatchFor(lists[i], target));
+				assertNotNull("" + i, prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			
@@ -324,7 +320,7 @@ public class MemoryHashTableTest {
 			IntList target = new IntList();
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(prober.getMatchFor(lists[i], target));
+				assertNotNull(prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			
@@ -339,7 +335,7 @@ public class MemoryHashTableTest {
 			}
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(prober.getMatchFor(lists[i], target));
+				assertNotNull(prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			
@@ -374,7 +370,7 @@ public class MemoryHashTableTest {
 			IntList target = new IntList();
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(prober.getMatchFor(lists[i], target));
+				assertNotNull(prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			
@@ -389,7 +385,7 @@ public class MemoryHashTableTest {
 				}
 			
 				for (int i = 0; i < NUM_LISTS; i++) {
-					assertTrue("" + i, prober.getMatchFor(overwriteLists[i], target));
+					assertNotNull("" + i, prober.getMatchFor(overwriteLists[i], target));
 					assertArrayEquals(overwriteLists[i].getValue(), target.getValue());
 				}
 			}
@@ -426,13 +422,13 @@ public class MemoryHashTableTest {
 			IntList target = new IntList();
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(""+i,prober.getMatchFor(lists[i], target));
+				assertNotNull(""+i,prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 				prober.updateMatch(overwriteLists[i]);
 			}
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue("" + i, prober.getMatchFor(overwriteLists[i], target));
+				assertNotNull("" + i, prober.getMatchFor(overwriteLists[i], target));
 				assertArrayEquals(overwriteLists[i].getValue(), target.getValue());
 			}
 			
@@ -462,7 +458,7 @@ public class MemoryHashTableTest {
 			IntPair target = new IntPair();
 			
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(prober.getMatchFor(pairs[i], target));
+				assertNotNull(prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 			
@@ -472,7 +468,7 @@ public class MemoryHashTableTest {
 			assertTrue(b.booleanValue());
 			
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertNotNull(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 			
@@ -502,7 +498,7 @@ public class MemoryHashTableTest {
 			IntPair target = new IntPair();
 			
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(prober.getMatchFor(pairs[i], target));
+				assertNotNull(prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 			
@@ -512,7 +508,7 @@ public class MemoryHashTableTest {
 			assertTrue(b.booleanValue());
 			
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertNotNull(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 			
@@ -522,7 +518,7 @@ public class MemoryHashTableTest {
 			assertTrue(b.booleanValue());
 						
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertNotNull(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 						
@@ -552,7 +548,7 @@ public class MemoryHashTableTest {
 			IntPair target = new IntPair();
 			
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(prober.getMatchFor(pairs[i], target));
+				assertNotNull(prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 			
@@ -562,7 +558,7 @@ public class MemoryHashTableTest {
 			assertTrue(b.booleanValue());
 			
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertNotNull(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 			
@@ -572,7 +568,7 @@ public class MemoryHashTableTest {
 			assertTrue(b.booleanValue());
 						
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertNotNull(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 			
@@ -582,7 +578,7 @@ public class MemoryHashTableTest {
 			assertTrue(b.booleanValue());
 									
 			for (int i = 0; i < NUM_PAIRS; i++) {
-				assertTrue(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
+				assertNotNull(pairs[i].getKey() + " " + pairs[i].getValue(), prober.getMatchFor(pairs[i], target));
 				assertEquals(pairs[i].getValue(), target.getValue());
 			}
 						
@@ -617,7 +613,7 @@ public class MemoryHashTableTest {
 			IntList target = new IntList();
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(prober.getMatchFor(lists[i], target));
+				assertNotNull(prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			
@@ -627,7 +623,7 @@ public class MemoryHashTableTest {
 			assertTrue(b.booleanValue());
 						
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue(prober.getMatchFor(lists[i], target));
+				assertNotNull(prober.getMatchFor(lists[i], target));
 				assertArrayEquals(lists[i].getValue(), target.getValue());
 			}
 			
@@ -653,7 +649,7 @@ public class MemoryHashTableTest {
 			assertTrue(b.booleanValue());									
 			
 			for (int i = 0; i < NUM_LISTS; i++) {
-				assertTrue("" + i, prober.getMatchFor(overwriteLists[i], target));
+				assertNotNull("" + i, prober.getMatchFor(overwriteLists[i], target));
 				assertArrayEquals(overwriteLists[i].getValue(), target.getValue());
 			}
 			
@@ -689,7 +685,7 @@ public class MemoryHashTableTest {
 			AbstractHashTableProber<StringPair, StringPair> prober = table.getProber(comparatorS, pairComparatorS);
 			StringPair temp = new StringPair();
 			while(probeTester.next(target) != null) {
-				assertTrue("" + target.getKey(), prober.getMatchFor(target, temp));
+				assertNotNull("" + target.getKey(), prober.getMatchFor(target, temp));
 				assertEquals(temp.getValue(), target.getValue());
 			}
 			
@@ -699,7 +695,7 @@ public class MemoryHashTableTest {
 			}
 			
 			while (updateTester.next(target) != null) {
-				assertTrue(prober.getMatchFor(target, temp));
+				assertNotNull(prober.getMatchFor(target, temp));
 				assertEquals(target.getValue(), temp.getValue());
 			}
 			


[7/9] git commit: [FLINK-1084] Fix broken links in "How to add an operator"

Posted by uc...@apache.org.
[FLINK-1084] Fix broken links in "How to add an operator"


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/72a481f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/72a481f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/72a481f4

Branch: refs/heads/release-0.6.1
Commit: 72a481f47e3cdf1f251c25e2ebc9109b2200e91a
Parents: 3bcf699
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 1 19:14:24 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Sep 10 00:49:50 2014 +0200

----------------------------------------------------------------------
 docs/internal_add_operator.md       | 12 ++++++------
 docs/internal_program_life_cycle.md |  2 +-
 2 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72a481f4/docs/internal_add_operator.md
----------------------------------------------------------------------
diff --git a/docs/internal_add_operator.md b/docs/internal_add_operator.md
index 40b0a69..f3c9758 100644
--- a/docs/internal_add_operator.md
+++ b/docs/internal_add_operator.md
@@ -56,7 +56,7 @@ public static <T>DataSet<Long> count(DataSet<T> data) {
 
 A more complex example of an operation via specialization is the {% gh_link /flink-java/src/main/java/org/apache/flink/api/java/operators/AggregateOperator.java "Aggregation Operation" %} in the Java API. It is implemented by means of a *GroupReduce* UDF.
 
-The Aggregate Operation comes with its own operator in the *Java API*, but translates itself into a {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java "GroupReduceOperatorBase" %} in the *Common API*. (see [Program Life Cycle](program_life_cycle.html) for details of how an operation from the *Java API* becomes an operation of the *Common API* and finally a runtime operation.)
+The Aggregate Operation comes with its own operator in the *Java API*, but translates itself into a {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorBase.java "GroupReduceOperatorBase" %} in the *Common API*. (see [Program Life Cycle](internal_program_life_cycle.html) for details of how an operation from the *Java API* becomes an operation of the *Common API* and finally a runtime operation.)
 The Java API aggregation operator is only a builder that takes the types of aggregations and the field positions, and used that information to
 parameterize the GroupReduce UDF that performs the aggregations.
 
@@ -109,7 +109,7 @@ function, but invoked only once per parallel partition.
 
 **Runtime**
 
-Runtime Operators are implemented using the {% gh_link /flink-runtime/src/main/java/org/apache/flink/pact/runtime/task/PactDriver.java "Driver" %} interface. The interface defines the methods that describe the operator towards the runtime. The {% gh_link /flink-runtime/src/main/java/org/apache/flink/pact/runtime/task/MapDriver.java "MapDriver" %} serves as a simple example of how those operators work.
+Runtime Operators are implemented using the {% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java "Driver" %} interface. The interface defines the methods that describe the operator towards the runtime. The {% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java "MapDriver" %} serves as a simple example of how those operators work.
 
 The runtime works with the `MutableObjectIterator`, which describes data streams with the ability to reuse objects, to reduce pressure on the garbage collector.
 
@@ -132,16 +132,16 @@ To increase efficiency, it is often beneficial to implement a *chained* version
 operators run in the same thread as their preceding operator, and work with nested function calls.
 This is very efficient, because it saves serialization/deserialization overhead.
 
-To learn how to implement a chained operator, take a look at the {% gh_link /flink-runtime/src/main/java/org/apache/flink/pact/runtime/task/MapDriver.java "MapDriver" %} (regular) and the
-{% gh_link /flink-runtime/src/main/java/org/apache/flink/pact/runtime/task/chaining/ChainedMapDriver.java "ChainedMapDriver" %} (chained variant).
+To learn how to implement a chained operator, take a look at the {% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java "MapDriver" %} (regular) and the
+{% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedMapDriver.java "ChainedMapDriver" %} (chained variant).
 
 
 **Optimizer/Compiler**
 
-This section does a minimal discussion of the important steps to add an operator. Please see the [Optimizer](optimizer.html) docs for more detail on how the optimizer works.
+This section does a minimal discussion of the important steps to add an operator. Please see the [Optimizer](internal_optimizer.html) docs for more detail on how the optimizer works.
 To allow the optimizer to include a new operator in its planning, it needs a bit of information about it; in particular, the following information:
 
-- *{% gh_link /flink-runtime/src/main/java/org/apache/flink/pact/runtime/task/DriverStrategy.java "DriverStrategy" %}*: The operation needs to be added to the Enum, to make it available to the optimizer. The parameters to the Enum entry define which class implements the runtime operator, its chained version, whether the operator accumulates records (and needs memory for that), and whether it requires a comparator (works on keys). For our example, we can add the entry
+- *{% gh_link /flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java "DriverStrategy" %}*: The operation needs to be added to the Enum, to make it available to the optimizer. The parameters to the Enum entry define which class implements the runtime operator, its chained version, whether the operator accumulates records (and needs memory for that), and whether it requires a comparator (works on keys). For our example, we can add the entry
 ``` java
 MAP_PARTITION(MapPartitionDriver.class, null /* or chained variant */, PIPELINED, false)
 ```

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/72a481f4/docs/internal_program_life_cycle.md
----------------------------------------------------------------------
diff --git a/docs/internal_program_life_cycle.md b/docs/internal_program_life_cycle.md
index c68232b..90d31cc 100644
--- a/docs/internal_program_life_cycle.md
+++ b/docs/internal_program_life_cycle.md
@@ -2,4 +2,4 @@
 title:  "Program Life Cycle"
 ---
 
-
+To be done...


[9/9] git commit: Fix typo in package LICENSE

Posted by uc...@apache.org.
Fix typo in package LICENSE


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9a7d3d49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9a7d3d49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9a7d3d49

Branch: refs/heads/release-0.6.1
Commit: 9a7d3d49fd588025198b327481e932b691138c03
Parents: 46148f9
Author: uce <u....@fu-berlin.de>
Authored: Wed Sep 10 01:11:40 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Sep 10 01:13:04 2014 +0200

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/LICENSE | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9a7d3d49/flink-dist/src/main/flink-bin/LICENSE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE
index ef148d8..e9fa313 100644
--- a/flink-dist/src/main/flink-bin/LICENSE
+++ b/flink-dist/src/main/flink-bin/LICENSE
@@ -246,7 +246,7 @@ under the Apache License (v 2.0):
 -----------------------------------------------------------------------
 
 The Apache Flink project depends on and/or bundles the following components
-under the The MIT License
+under the MIT License:
 
  - SLF4J (http://www.slf4j.org) - Copyright (c) 2004-2013 QOS.ch
  - jQuery 1.4.2 (http://jquery.com) - Copyright 2014 jQuery Foundation and other contributors


[4/9] git commit: [FLINK-1075] Removed the AsynchronousPartialSorter.

Posted by uc...@apache.org.
[FLINK-1075] Removed the AsynchronousPartialSorter.

This closes #104


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/64510b6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/64510b6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/64510b6a

Branch: refs/heads/release-0.6.1
Commit: 64510b6a5cb332b3c2b99471b6f6e8608854ef45
Parents: 8af40c3
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Aug 28 18:28:20 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Sep 10 00:43:53 2014 +0200

----------------------------------------------------------------------
 .../operators/GroupReduceCombineDriver.java     | 125 +++++---
 .../runtime/operators/ReduceCombineDriver.java  |   1 -
 .../sort/AsynchronousPartialSorter.java         | 207 -------------
 .../AsynchronousPartialSorterCollector.java     | 101 ------
 .../sort/AsynchonousPartialSorterITCase.java    | 306 -------------------
 5 files changed, 89 insertions(+), 651 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64510b6a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index f786c56..0452ef7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -23,15 +23,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AsynchronousPartialSorter;
-import org.apache.flink.runtime.operators.util.CloseableInputProvider;
-import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
 import org.apache.flink.runtime.util.KeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
+import java.io.IOException;
+import java.util.List;
+
 /**
  * Combine operator, standalone (not chained)
  * <p>
@@ -43,16 +49,26 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 	
 	private static final Log LOG = LogFactory.getLog(GroupReduceCombineDriver.class);
 
-	
+	/** Fix length records with a length below this threshold will be in-place sorted, if possible. */
+	private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
 	private PactTaskContext<FlatCombineFunction<T>, T> taskContext;
-	
-	private CloseableInputProvider<T> input;
 
-	private TypeSerializerFactory<T> serializerFactory;
+	private InMemorySorter<T> sorter;
+
+	private FlatCombineFunction<T> combiner;
+
+	private TypeSerializer<T> serializer;
 
 	private TypeComparator<T> comparator;
-	
-	private volatile boolean running;
+
+	private QuickSort sortAlgo = new QuickSort();
+
+	private MemoryManager memManager;
+
+	private Collector<T> output;
+
+	private volatile boolean running = true;
 
 	// ------------------------------------------------------------------------
 
@@ -81,55 +97,92 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 
 	@Override
 	public void prepare() throws Exception {
-		final TaskConfig config = this.taskContext.getTaskConfig();
-		final DriverStrategy ls = config.getDriverStrategy();
+		if(this.taskContext.getTaskConfig().getDriverStrategy() != DriverStrategy.SORTED_GROUP_COMBINE){
+			throw new Exception("Invalid strategy " + this.taskContext.getTaskConfig().getDriverStrategy() + " for " +
+					"group reduce combinder.");
+		}
 
-		final MemoryManager memoryManager = this.taskContext.getMemoryManager();
+		this.memManager = this.taskContext.getMemoryManager();
+		final int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver());
 
-		final MutableObjectIterator<T> in = this.taskContext.getInput(0);
-		this.serializerFactory = this.taskContext.getInputSerializer(0);
+		final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
+		this.serializer = serializerFactory.getSerializer();
 		this.comparator = this.taskContext.getInputComparator(0);
-
-		switch (ls) {
-		case SORTED_GROUP_COMBINE:
-			this.input = new AsynchronousPartialSorter<T>(memoryManager, in, this.taskContext.getOwningNepheleTask(),
-						this.serializerFactory, this.comparator.duplicate(), config.getRelativeMemoryDriver());
-			break;
-		// obtain and return a grouped iterator from the combining sort-merger
-		default:
-			throw new RuntimeException("Invalid local strategy provided for CombineTask.");
+		this.combiner = this.taskContext.getStub();
+		this.output = this.taskContext.getOutputCollector();
+
+		final List<MemorySegment> memory = this.memManager.allocatePages(this.taskContext.getOwningNepheleTask(),
+				numMemoryPages);
+
+		// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
+		if (this.comparator.supportsSerializationWithKeyNormalization() &&
+				this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
+		{
+			this.sorter = new FixedLengthRecordSorter<T>(this.serializer, this.comparator, memory);
+		} else {
+			this.sorter = new NormalizedKeySorter<T>(this.serializer, this.comparator.duplicate(), memory);
 		}
 	}
 
 	@Override
 	public void run() throws Exception {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug(this.taskContext.formatLogString("Preprocessing done, iterator obtained."));
+			LOG.debug("Combiner starting.");
 		}
 
-		final KeyGroupedIterator<T> iter = new KeyGroupedIterator<T>(this.input.getIterator(),
-				this.serializerFactory.getSerializer(), this.comparator);
+		final MutableObjectIterator<T> in = this.taskContext.getInput(0);
+		final TypeSerializer<T> serializer = this.serializer;
+
+		T value = serializer.createInstance();
+
+		while (running && (value = in.next(value)) != null) {
+
+			// try writing to the sorter first
+			if (this.sorter.write(value)) {
+				continue;
+			}
 
-		// cache references on the stack
-		final FlatCombineFunction<T> stub = this.taskContext.getStub();
-		final Collector<T> output = this.taskContext.getOutputCollector();
+			// do the actual sorting, combining, and data writing
+			sortAndCombine();
+			this.sorter.reset();
 
-		// run stub implementation
-		while (this.running && iter.nextKey()) {
-			stub.combine(iter.getValues(), output);
+			// write the value again
+			if (!this.sorter.write(value)) {
+				throw new IOException("Cannot write record to fresh sort buffer. Record too large.");
+			}
+		}
+
+		// sort, combine, and send the final batch
+		sortAndCombine();
+	}
+
+	private void sortAndCombine() throws Exception {
+		final InMemorySorter<T> sorter = this.sorter;
+
+		if (!sorter.isEmpty()) {
+			this.sortAlgo.sort(sorter);
+
+			final KeyGroupedIterator<T> keyIter = new KeyGroupedIterator<T>(sorter.getIterator(), this.serializer,
+					this.comparator);
+
+			final FlatCombineFunction<T> combiner = this.combiner;
+			final Collector<T> output = this.output;
+
+			// iterate over key groups
+			while (this.running && keyIter.nextKey()) {
+				combiner.combine(keyIter.getValues(), output);
+			}
 		}
 	}
 
 	@Override
 	public void cleanup() throws Exception {
-		if (this.input != null) {
-			this.input.close();
-			this.input = null;
-		}
+		this.memManager.release(this.sorter.dispose());
 	}
 
 	@Override
 	public void cancel() {
 		this.running = false;
+		this.memManager.release(this.sorter.dispose());
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64510b6a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index 87cea30..6b18bb1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -109,7 +109,6 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
 		
 		// instantiate the serializer / comparator
 		final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
-		this.serializer = serializerFactory.getSerializer();
 		this.comparator = this.taskContext.getInputComparator(0);
 		this.serializer = serializerFactory.getSerializer();
 		this.reducer = this.taskContext.getStub();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64510b6a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorter.java
deleted file mode 100644
index 03794ff..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorter.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.util.MutableObjectIterator;
-
-
-/**
- * The {@link AsynchronousPartialSorter} is a simple sort implementation that sorts
- * bulks inside its buffers, and returns them directly, without merging them. Therefore,
- * it establishes an order within certain windows, but not across them.
- */
-public class AsynchronousPartialSorter<E> extends UnilateralSortMerger<E> {
-	
-	private BufferQueueIterator bufferIterator;
-	
-	// ------------------------------------------------------------------------
-	// Constructor
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * 
-	 * 
-	 * @param memoryManager The memory manager from which to allocate the memory.
-	 * @param input The input that is sorted by this sorter.
-	 * @param parentTask The parent task, which owns all resources used by this sorter.
-	 * @param serializerFactory The type serializer.
-	 * @param comparator The type comparator establishing the order relation.
-	 * @param memoryFraction The fraction of memory dedicated to sorting.
-	 * 
-	 * @throws IOException Thrown, if an error occurs initializing the resources for external sorting.
-	 * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
-	 *                                   perform the sort.
-	 */
-	public AsynchronousPartialSorter(MemoryManager memoryManager,
-			MutableObjectIterator<E> input, AbstractInvokable parentTask, 
-			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			double memoryFraction)
-	throws IOException, MemoryAllocationException
-	{
-		super(memoryManager, null, input, parentTask, serializerFactory, comparator, memoryFraction, 1, 2, 0.0f, true);
-	}
-	
-
-	public void close() {
-		// make a best effort to close the buffer iterator
-		try {
-			if (this.bufferIterator != null) {
-				this.bufferIterator.close();
-				this.bufferIterator = null;
-			}
-		}
-		finally {
-			super.close();
-		}
-	}
-	
-	/* 
-	 * This method does not actually create a spilling thread, but grabs the circular queues and creates the
-	 * iterator that reads from the sort buffers in turn.
-	 */
-	@Override
-	protected ThreadBase<E> getSpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues,
-			AbstractInvokable parentTask, MemoryManager memoryManager, IOManager ioManager, 
-			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxFileHandles)
-	{
-		this.bufferIterator = new BufferQueueIterator(queues);
-		setResultIterator(this.bufferIterator);
-		
-		return null;
-	}
-
-	// ------------------------------------------------------------------------
-
-	private final class BufferQueueIterator implements MutableObjectIterator<E> {
-		
-		private final CircularQueues<E> queues;
-		
-		private CircularElement<E> currentElement;
-		
-		private MutableObjectIterator<E> currentIterator;
-		
-		private volatile boolean closed = false;
-
-
-		protected BufferQueueIterator(CircularQueues<E> queues) {
-			this.queues = queues;
-		}
-
-
-		@Override
-		public E next(final E reuse) throws IOException {
-			E result;
-			if (this.currentIterator != null && ((result = this.currentIterator.next(reuse)) != null)) {
-				return result;
-			}
-			else if (this.closed) {
-				throw new IllegalStateException("The sorter has been closed.");
-			}
-			else {
-				if (AsynchronousPartialSorter.this.iteratorException != null) {
-					throw new IOException("The sorter has ancountered an error.", AsynchronousPartialSorter.this.iteratorException);
-				}
-				
-				while (true) {
-					if (this.currentElement == endMarker()) {
-						return null;
-					}
-					else if (this.currentElement != null) {
-						// return the current element to the empty queue
-						this.currentElement.buffer.reset();
-						this.queues.empty.add(this.currentElement);
-					}
-					
-					// get a new element
-					try {
-						this.currentElement = null;
-						while (!this.closed && this.currentElement == null) {
-							this.currentElement = this.queues.spill.poll(1000, TimeUnit.MILLISECONDS);
-						}
-						if (AsynchronousPartialSorter.this.iteratorException != null) {
-							throw new IOException("The sorter has ancountered an error.", AsynchronousPartialSorter.this.iteratorException);
-						}
-						
-						if (this.currentElement == endMarker()) {
-							// signals the end, no more buffers will come
-							// release the memory first before returning
-							releaseSortBuffers();
-							return null;
-						}
-						if (this.currentElement == spillingMarker()) {
-							this.currentElement = null;
-							continue;
-						}
-					}
-					catch (InterruptedException e) {
-						throw new RuntimeException("Iterator was interrupted getting the next sortedBuffer.");
-					}
-					
-					this.currentIterator = this.currentElement.buffer.getIterator();
-					if ((result = this.currentIterator.next(reuse)) != null) {
-						return result;
-					}
-					this.currentIterator = null;
-				}
-			}
-		}
-		
-		public void close() {
-			synchronized (this) {
-				if (this.closed) {
-					return;
-				}
-				this.closed = true;
-			}
-			
-			if (this.currentElement != null) {
-				this.queues.empty.add(this.currentElement);
-				this.currentElement = null;
-			}
-			if (this.currentIterator != null) {
-				this.currentIterator = null;
-			}
-		}
-		
-		private final void releaseSortBuffers() 	{
-			while (!this.queues.empty.isEmpty()) {
-				final CircularElement<E> elem = this.queues.empty.poll();
-				if (elem != null) {
-					final InMemorySorter<E> sorter = elem.buffer;
-					final List<MemorySegment> segments = sorter.dispose();
-					AsynchronousPartialSorter.this.memoryManager.release(segments);
-				}
-			}
-		}
-
-	};
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64510b6a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorterCollector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorterCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorterCollector.java
deleted file mode 100644
index a41dbf1..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/AsynchronousPartialSorterCollector.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.util.MutableObjectIterator;
-
-/**
- * The {@link AsynchronousPartialSorterCollector} is a simple sort implementation that sorts
- * bulks inside its buffers, and returns them directly, without merging them. Therefore,
- * it establishes an order within certain windows, but not across them.
- * <p>
- * In contract to the {@link AsynchronousPartialSorter}, this class has no dedicated reading thread that
- * pulls records from an iterator, but offers a collector into which data to be sorted is pushed.
- * 
- */
-public class AsynchronousPartialSorterCollector<E> extends AsynchronousPartialSorter<E> {
-	
-	private InputDataCollector<E> collector;
-	
-	// ------------------------------------------------------------------------
-	// Constructor
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * @param memoryManager The memory manager from which to allocate the memory.
-	 * @param parentTask The parent task, which owns all resources used by this sorter.
-	 * @param serializerFactory The type serializer.
-	 * @param comparator The type comparator establishing the order relation.
-	 * @param memoryFraction The fraction of memory dedicated to sorting.
-	 * 
-	 * @throws IOException Thrown, if an error occurs initializing the resources for external sorting.
-	 * @throws MemoryAllocationException Thrown, if not enough memory can be obtained from the memory manager to
-	 *                                   perform the sort.
-	 */
-	public AsynchronousPartialSorterCollector(MemoryManager memoryManager,
-			AbstractInvokable parentTask, 
-			TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator,
-			double memoryFraction)
-	throws IOException, MemoryAllocationException
-	{
-		super(memoryManager, null, parentTask, serializerFactory, comparator,
-				memoryFraction);
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Gets the collector that writes into the sort buffers.
-	 * 
-	 * @return The collector that writes into the sort buffers.
-	 */
-	public InputDataCollector<E> getInputCollector() {
-		return this.collector;
-	}
-
-	@Override
-	protected ThreadBase<E> getReadingThread(ExceptionHandler<IOException> exceptionHandler,
-		MutableObjectIterator<E> reader, CircularQueues<E> queues, AbstractInvokable parentTask,
-		TypeSerializer<E> serializer, long startSpillingBytes)
-	{
-		this.collector = new InputDataCollector<E>(queues, startSpillingBytes);
-		return null;
-	}
-	
-
-	public void close() {
-		try {
-			if (this.collector != null) {
-				this.collector.close();
-			}
-		}
-		finally {
-			super.close();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/64510b6a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AsynchonousPartialSorterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AsynchonousPartialSorterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AsynchonousPartialSorterITCase.java
deleted file mode 100644
index 90fe59f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/AsynchonousPartialSorterITCase.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.sort;
-
-import java.io.IOException;
-
-import org.junit.Assert;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.java.typeutils.runtime.record.RecordComparator;
-import org.apache.flink.api.java.typeutils.runtime.record.RecordSerializerFactory;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
-import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.sort.AsynchronousPartialSorter;
-import org.apache.flink.runtime.operators.sort.ExceptionHandler;
-import org.apache.flink.runtime.operators.sort.Sorter;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class AsynchonousPartialSorterITCase {
-	
-	private static final Log LOG = LogFactory.getLog(AsynchonousPartialSorterITCase.class);
-
-	private static final long SEED = 649180756312423613L;
-
-	private static final int KEY_MAX = Integer.MAX_VALUE;
-
-	private static final Value VAL = new Value("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
-	
-	private static final int VALUE_LENGTH = 114;
-
-	public static final int MEMORY_SIZE = 1024 * 1024 * 32;
-	
-	private final AbstractInvokable parentTask = new DummyInvokable();
-
-	private IOManager ioManager;
-
-	private MemoryManager memoryManager;
-	
-	private TypeSerializerFactory<Record> serializer;
-	
-	private TypeComparator<Record> comparator;
-
-
-	@SuppressWarnings("unchecked")
-	@Before
-	public void beforeTest()
-	{
-		this.memoryManager = new DefaultMemoryManager(MEMORY_SIZE,1);
-		this.ioManager = new IOManager();
-		this.serializer = RecordSerializerFactory.get();
-		this.comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
-	}
-
-	@After
-	public void afterTest()
-	{
-		this.ioManager.shutdown();
-		if (!this.ioManager.isProperlyShutDown()) {
-			Assert.fail("I/O Manager was not properly shut down.");
-		}
-		
-		if (this.memoryManager != null) {
-			Assert.assertTrue("Memory leak: not all segments have been returned to the memory manager.", 
-				this.memoryManager.verifyEmpty());
-			this.memoryManager.shutdown();
-			this.memoryManager = null;
-		}
-	}
-
-	@Test
-	public void testSmallSortInOneWindow() throws Exception
-	{
-		try {
-			final int NUM_RECORDS = 1000;
-			
-			// reader
-			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_RECORDS);
-			
-			// merge iterator
-			LOG.debug("Initializing sortmerger...");
-			Sorter<Record> sorter = new AsynchronousPartialSorter<Record>(this.memoryManager, source,
-				this.parentTask, this.serializer, this.comparator, 1.0);
-	
-			runPartialSorter(sorter, NUM_RECORDS, 0);
-		}
-		catch (Exception t) {
-			t.printStackTrace();
-			Assert.fail("Test failed due to an uncaught exception: " + t.getMessage());
-		}
-	}
-	
-	@Test
-	public void testLargeSortAcrossTwoWindows() throws Exception
-	{
-		try {
-			final int NUM_RECORDS = 100000;
-			
-			// reader
-			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_RECORDS);
-			
-			// merge iterator
-			LOG.debug("Initializing sortmerger...");
-			Sorter<Record> sorter = new AsynchronousPartialSorter<Record>(this.memoryManager, source,
-				this.parentTask, this.serializer, this.comparator, 0.2);
-	
-			runPartialSorter(sorter, NUM_RECORDS, 2);
-		}
-		catch (Exception t) {
-			t.printStackTrace();
-			Assert.fail("Test failed due to an uncaught exception: " + t.getMessage());
-		}
-	}
-	
-	@Test
-	public void testLargeSortAcrossMultipleWindows() throws Exception
-	{
-		try {
-			final int NUM_RECORDS = 1000000;
-			
-			// reader
-			final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-			final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_RECORDS);
-			
-			// merge iterator
-			LOG.debug("Initializing sortmerger...");
-			Sorter<Record> sorter = new AsynchronousPartialSorter<Record>(this.memoryManager, source,
-				this.parentTask, this.serializer, this.comparator, 0.15);
-	
-			runPartialSorter(sorter, NUM_RECORDS, 27);
-		}
-		catch (Exception t) {
-			t.printStackTrace();
-			Assert.fail("Test failed due to an uncaught exception: " + t.getMessage());
-		}
-	}
-	
-	@Test
-	public void testExceptionForwarding() throws IOException
-	{
-		try {
-			Sorter<Record> sorter = null;
-			try	{
-				final int NUM_RECORDS = 100;
-
-				// reader
-				final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
-				final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_RECORDS);
-				
-				// merge iterator
-				LOG.debug("Initializing sortmerger...");
-				sorter = new ExceptionThrowingAsynchronousPartialSorter<Record>(this.memoryManager, source,
-						this.parentTask, this.serializer, this.comparator, 1.0);
-		
-				runPartialSorter(sorter, NUM_RECORDS, 0);
-				
-				Assert.fail("Expected Test Exception not thrown.");
-			} catch(Exception e) {
-				if (!containsTriggerException(e)) {
-					throw e;
-				}
-			} finally {
-				if (sorter != null) {
-					sorter.close();
-				}
-			}
-		}
-		catch (Exception t) {
-			t.printStackTrace();
-			Assert.fail("Test failed due to an uncaught exception: " + t.getMessage());
-		}
-	}
-	
-	private static void runPartialSorter(Sorter<Record> sorter, 
-								int expectedNumResultRecords, int expectedNumWindowTransitions)
-	throws Exception
-	{
-		// check order
-		final MutableObjectIterator<Record> iterator = sorter.getIterator();
-		int pairsEmitted = 1;
-		int windowTransitions = 0;
-		
-		Record rec1 = new Record();
-		Record rec2 = new Record();
-		
-		LOG.debug("Checking results...");
-		Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
-		while ((rec2 = iterator.next(rec2)) != null)
-		{
-			final TestData.Key k1 = rec1.getField(0, TestData.Key.class);
-			final TestData.Key k2 = rec2.getField(0, TestData.Key.class);
-			pairsEmitted++;
-			
-			// if the next key is smaller again, we have a new window
-			if (k1.compareTo(k2) > 0) {
-				windowTransitions++;
-			}
-			
-			Record tmp = rec1;
-			rec1 = rec2;
-			k1.setKey(k2.getKey());
-			
-			rec2 = tmp;
-		}
-		
-		sorter.close();
-		
-		Assert.assertEquals("Sorter did not return the expected number of result records.",
-			expectedNumResultRecords, pairsEmitted);
-		Assert.assertEquals("The partial sorter made an unexpected number of window transitions.",
-			expectedNumWindowTransitions, windowTransitions); 
-	}
-	
-	private static boolean containsTriggerException(Throwable exception)
-	{
-		while (exception != null) {
-			if (exception.getClass().equals(TriggeredException.class)) {
-				return true;
-			}
-			exception = exception.getCause();
-		}
-		return false;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//              					 Internal classes
-	// --------------------------------------------------------------------------------------------
-	
-	/*
-	 * Mock exception thrown on purpose.
-	 */
-	@SuppressWarnings("serial")
-	private static class TriggeredException extends IOException {}
-	
-	/*
-	 * Mocked sorter that throws an exception in the sorting thread.
-	 */
-	private static class ExceptionThrowingAsynchronousPartialSorter<E> extends AsynchronousPartialSorter<E>
-	{	
-		protected static class ExceptionThrowingSorterThread<E> extends SortingThread<E> {
-				
-			public ExceptionThrowingSorterThread(ExceptionHandler<IOException> exceptionHandler,
-						org.apache.flink.runtime.operators.sort.UnilateralSortMerger.CircularQueues<E> queues,
-						AbstractInvokable parentTask)
-			{
-				super(exceptionHandler, queues, parentTask);
-			}
-	
-			@Override
-			public void go() throws IOException {
-				throw new TriggeredException();
-			}
-		}
-
-		public ExceptionThrowingAsynchronousPartialSorter(MemoryManager memoryManager,
-				MutableObjectIterator<E> input, AbstractInvokable parentTask, 
-				TypeSerializerFactory<E> serializer, TypeComparator<E> comparator,
-				double memoryFraction)
-		throws IOException, MemoryAllocationException
-		{
-			super(memoryManager, input, parentTask, serializer, comparator, memoryFraction);
-		}
-
-
-		@Override
-		protected ThreadBase<E> getSortingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues,
-				AbstractInvokable parentTask)
-		{
-			return new ExceptionThrowingSorterThread<E>(exceptionHandler, queues, parentTask);
-		}		
-	}
-}


[2/9] git commit: Split between source distribution license/notice and binary distribution license/notice. Add a DEPENDENCIES file to list all maven dependencies with licenses and notices.

Posted by uc...@apache.org.
Split between source distribution license/notice and binary distribution license/notice.
Add a DEPENDENCIES file to list all maven dependencies with licenses and notices.

[ci skip]


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7853a841
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7853a841
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7853a841

Branch: refs/heads/release-0.6.1
Commit: 7853a841420dbb6d4b96ff6f57204597614ae060
Parents: f82acce
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Aug 19 12:47:55 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Sep 10 00:43:39 2014 +0200

----------------------------------------------------------------------
 LICENSE                               | 179 +----------------------------
 flink-dist/src/main/flink-bin/LICENSE |  25 +---
 2 files changed, 9 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7853a841/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index 3604605..7e3aff2 100644
--- a/LICENSE
+++ b/LICENSE
@@ -205,53 +205,10 @@
 
 Apache Flink subcomponents:
 
-The Apache Flink project contains subcomponents with separate copyright
-notices and license terms. Your use of the source code for the these
-subcomponents is subject to the terms and conditions of their respective
-licenses.
-
-
------------------------------------------------------------------------
- Apache License (Version 2.0)
------------------------------------------------------------------------
-
-The Apache Flink project depends on and/or bundles the following components
-under the Apache License (v 2.0):
-
- - Apache Commons Logging (http://commons.apache.org/proper/commons-logging/)
- - Apache Commons Codec (http://commons.apache.org/proper/commons-codec/)
- - Apache Commons CLI (http://commons.apache.org/cli/)
- - Apache Commons FileUpload (http://commons.apache.org/fileupload/)
- - Apache Commons IO (http://commons.apache.org/io/)
- - Apache Log4J (http://logging.apache.org/log4j/1.2/)
- - Apache Avro (http://avro.apache.org)
- - Apache Hadoop (http://hadoop.apache.org)
- - Apache Derby (http://db.apache.org/derby/)
- - Apache Kafka (http://kafka.apache.org)
- - Apache Flume (http://flume.apache.org)
- - Google Guava (https://code.google.com/p/guava-libraries/)
- - Netty (http://netty.io)
- - Powermock (http://www.powermock.org)
- - Javassist (http://www.javassist.org)
- - Jetty Web Container (http://www.eclipse.org/jetty/)
- - Amazon Web Services SDK for Java (http://aws.amazon.com/sdkforjava/)
- - ScalaTest (http://www.scalatest.org)
- - StartBootstrap (http://startbootstrap.com)
- - CHAP Links Library Timeline (http://almende.github.io/chap-links-library/)
- - Twitter Hosebird Client (hbc) (https://github.com/twitter/hbc)
-
-
------------------------------------------------------------------------
- Eclipse Public License  - v 1.0
------------------------------------------------------------------------
-
-The Apache Flink project depends on and/or bundles the following components
-under the Eclipse Public License (v 1.0)
-
- - JUnit (http://junit.org/)
- 
-You may obtain a copy of the Eclipse Public License (v 1.0) at
-https://www.eclipse.org/legal/epl-v10.html
+The Apache Flink project contains subcomponents in the source code
+release with separate copyright notices and license terms. Your use of
+the source code for the these subcomponents is subject to the terms and
+conditions of their respective licenses.
 
 
 -----------------------------------------------------------------------
@@ -259,10 +216,8 @@ https://www.eclipse.org/legal/epl-v10.html
 -----------------------------------------------------------------------
 
 The Apache Flink project depends on and/or bundles the following components
-under the The MIT License
+under the MIT License
 
- - Mockito (http://www.mockito.org) - Copyright (c) 2007 Mockito contributors
- - SLF4J (http://www.slf4j.org) - Copyright (c) 2004-2013 QOS.ch
  - jQuery 1.4.2 (http://jquery.com) - Copyright 2014 jQuery Foundation and other contributors
  - jCanvas 13.11.21 (http://calebevans.me/projects/jcanvas/) - Copyright 2014 Caleb Evans
  - Flot 0.8.1 (http://www.flotcharts.org) - Copyright (c) 2007-2013 IOLA and Ole Laursen
@@ -296,27 +251,9 @@ THE SOFTWARE.
  BSD-style Licenses
 -----------------------------------------------------------------------
 
-The Apache Flink project depends on and/or bundles the following components
-under BSD-style licenses
-
-(BSD-style License)
- - Hamcrest (https://code.google.com/p/hamcrest/) - Copyright (c) 2000-2006, www.hamcrest.org
- 
-(New BSD License)
- - Kryo (https://github.com/EsotericSoftware/kryo) - Copyright (c) 2008, Nathan Sweet
- 
-(BSD-like License)
- - Scala Library (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
- - Scala Compiler (BSD-like) - (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
- - Scala Compiler Reflect (BSD-like) - (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
- - ASM (BSD-like) - (http://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom
-
 (3-clause BSD license)
  - D3 (http://d3js.org/) - Copyright (c) 2010-2014, Michael Bostock
 
-
-(Below is the 3-clause BSD license)
-
 All rights reserved.
 
 Redistribution and use in source and binary forms, with or without modification,
@@ -346,116 +283,12 @@ POSSIBILITY OF SUCH DAMAGE.
 
 
 -----------------------------------------------------------------------
- Mozilla Public License  - v 1.1
------------------------------------------------------------------------
-
-The Apache Flink project depends on and/or bundles the following components
-under the Mozilla Public License (v 1.1)
-
- - RabbitMQ (http://www.rabbitmq.com) 
-      The Initial Developer of the Original Code is GoPivotal,Ltd.
-      Copyright (c) 2007-2013 GoPivotal, Inc.  All Rights Reserved.
- 
-You may obtain a copy of the Mozilla Public License (v 1.1) at
-http://www.mozilla.org/MPL/
-
-
------------------------------------------------------------------------
  The Open Font License
 -----------------------------------------------------------------------
 
-The Apache Flink project packages the following fonts under the
+The Apache Flink project bundles the following fonts under the
 Open Font License (OFT) - http://scripts.sil.org/OFL/
 
  - Font Awesome (http://fortawesome.github.io/Font-Awesome/) - Created by Dave Gandy
    -> fonts in "flink-runtime/resources/web-docs-infoserver/font-awesome/fonts"
 
-
------------------------------------------------------------------------
-For Apache Hadoop Subcomponents
------------------------------------------------------------------------
-
-The Apache Hadoop project contains subcomponents with separate copyright
-notices and license terms. Your use of the source code for the these
-subcomponents is subject to the terms and conditions of the following
-licenses. 
-
-For the org.apache.hadoop.util.bloom.* classes:
-
-/**
- *
- * Copyright (c) 2005, European Commission project OneLab under contract
- * 034819 (http://www.one-lab.org)
- * All rights reserved.
- * Redistribution and use in source and binary forms, with or 
- * without modification, are permitted provided that the following 
- * conditions are met:
- *  - Redistributions of source code must retain the above copyright 
- *    notice, this list of conditions and the following disclaimer.
- *  - Redistributions in binary form must reproduce the above copyright 
- *    notice, this list of conditions and the following disclaimer in 
- *    the documentation and/or other materials provided with the distribution.
- *  - Neither the name of the University Catholique de Louvain - UCL
- *    nor the names of its contributors may be used to endorse or 
- *    promote products derived from this software without specific prior 
- *    written permission.
- *    
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 
- * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 
- * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 
- * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 
- * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 
- * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 
- * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-For portions of the native implementation of slicing-by-8 CRC calculation
-in src/main/native/src/org/apache/hadoop/util:
-
-/**
- *   Copyright 2008,2009,2010 Massachusetts Institute of Technology.
- *   All rights reserved. Use of this source code is governed by a
- *   BSD-style license that can be found in the LICENSE file.
- */
-
-For src/main/native/src/org/apache/hadoop/io/compress/lz4/{lz4.h,lz4.c,
-lz4_encoder.h,lz4hc.h,lz4hc.c,lz4hc_encoder.h},
-
-/*
-   LZ4 - Fast LZ compression algorithm
-   Header File
-   Copyright (C) 2011-2013, Yann Collet.
-   BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
-
-   Redistribution and use in source and binary forms, with or without
-   modification, are permitted provided that the following conditions are
-   met:
-
-       * Redistributions of source code must retain the above copyright
-   notice, this list of conditions and the following disclaimer.
-       * Redistributions in binary form must reproduce the above
-   copyright notice, this list of conditions and the following disclaimer
-   in the documentation and/or other materials provided with the
-   distribution.
-
-   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-   You can contact the author at :
-   - LZ4 homepage : http://fastcompression.blogspot.com/p/lz4.html
-   - LZ4 source repository : http://code.google.com/p/lz4/
-*/

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7853a841/flink-dist/src/main/flink-bin/LICENSE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE
index 3604605..ef148d8 100644
--- a/flink-dist/src/main/flink-bin/LICENSE
+++ b/flink-dist/src/main/flink-bin/LICENSE
@@ -239,20 +239,7 @@ under the Apache License (v 2.0):
  - StartBootstrap (http://startbootstrap.com)
  - CHAP Links Library Timeline (http://almende.github.io/chap-links-library/)
  - Twitter Hosebird Client (hbc) (https://github.com/twitter/hbc)
-
-
------------------------------------------------------------------------
- Eclipse Public License  - v 1.0
------------------------------------------------------------------------
-
-The Apache Flink project depends on and/or bundles the following components
-under the Eclipse Public License (v 1.0)
-
- - JUnit (http://junit.org/)
  
-You may obtain a copy of the Eclipse Public License (v 1.0) at
-https://www.eclipse.org/legal/epl-v10.html
-
 
 -----------------------------------------------------------------------
  The MIT License
@@ -261,7 +248,6 @@ https://www.eclipse.org/legal/epl-v10.html
 The Apache Flink project depends on and/or bundles the following components
 under the The MIT License
 
- - Mockito (http://www.mockito.org) - Copyright (c) 2007 Mockito contributors
  - SLF4J (http://www.slf4j.org) - Copyright (c) 2004-2013 QOS.ch
  - jQuery 1.4.2 (http://jquery.com) - Copyright 2014 jQuery Foundation and other contributors
  - jCanvas 13.11.21 (http://calebevans.me/projects/jcanvas/) - Copyright 2014 Caleb Evans
@@ -298,22 +284,17 @@ THE SOFTWARE.
 
 The Apache Flink project depends on and/or bundles the following components
 under BSD-style licenses
-
-(BSD-style License)
- - Hamcrest (https://code.google.com/p/hamcrest/) - Copyright (c) 2000-2006, www.hamcrest.org
  
-(New BSD License)
+[3-clause BSD license]
  - Kryo (https://github.com/EsotericSoftware/kryo) - Copyright (c) 2008, Nathan Sweet
+ - D3 (http://d3js.org/) - Copyright (c) 2010-2014, Michael Bostock
  
-(BSD-like License)
+[BSD-like License]
  - Scala Library (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
  - Scala Compiler (BSD-like) - (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
  - Scala Compiler Reflect (BSD-like) - (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.
  - ASM (BSD-like) - (http://asm.ow2.org/) - Copyright (c) 2000-2011 INRIA, France Telecom
 
-(3-clause BSD license)
- - D3 (http://d3js.org/) - Copyright (c) 2010-2014, Michael Bostock
-
 
 (Below is the 3-clause BSD license)
 


[3/9] git commit: [FLINK-1074] Fix for NULL input tuples in ProjectJoin

Posted by uc...@apache.org.
[FLINK-1074] Fix for NULL input tuples in ProjectJoin


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/8af40c32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/8af40c32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/8af40c32

Branch: refs/heads/release-0.6.1
Commit: 8af40c3225f7844cfaa614e71ea9373e32ce3f57
Parents: 7853a84
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Aug 28 10:38:53 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Sep 10 00:43:47 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/operators/JoinOperator.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8af40c32/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 6ffbd1b..2efe7e9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -925,13 +925,13 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		public void join(T1 in1, T2 in2, Collector<R> out) {
 			for(int i=0; i<fields.length; i++) {
 				if(isFromFirst[i]) {
-					if(fields[i] >= 0) {
+					if(fields[i] >= 0 && in1 != null) {
 						outTuple.setField(((Tuple)in1).getField(fields[i]), i);
 					} else {
 						outTuple.setField(in1, i);
 					}
 				} else {
-					if(fields[i] >= 0) {
+					if(fields[i] >= 0 && in2 != null) {
 						outTuple.setField(((Tuple)in2).getField(fields[i]), i);
 					} else {
 						outTuple.setField(in2, i);


[6/9] git commit: [FLINK-1078] PrintingOutputFormat uses same partition indexing as FileOutputFormat.

Posted by uc...@apache.org.
[FLINK-1078] PrintingOutputFormat uses same partition indexing as FileOutputFormat.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3bcf6999
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3bcf6999
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3bcf6999

Branch: refs/heads/release-0.6.1
Commit: 3bcf699955bae85145fe33a4099587636d3c4ef3
Parents: 64510b6
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 1 19:03:04 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Sep 10 00:49:50 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/io/PrintingOutputFormat.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3bcf6999/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
index 435057f..42e0b46 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
@@ -74,7 +74,7 @@ public class PrintingOutputFormat<T> implements OutputFormat<T> {
 		this.stream = this.target == STD_OUT ? System.out : System.err;
 		
 		// set the prefix if we have a >1 DOP
-		this.prefix = (numTasks > 1) ? (taskNumber + "> ") : null;
+		this.prefix = (numTasks > 1) ? ((taskNumber+1) + "> ") : null;
 	}
 
 	@Override


[5/9] git commit: Disable merging of Iteration Aux Tasks

Posted by uc...@apache.org.
Disable merging of Iteration Aux Tasks

This will become obsolete once we have buffer oriented execution.
Disable it for now because it is causing a bug: FLINK-1087


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/06447403
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/06447403
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/06447403

Branch: refs/heads/release-0.6.1
Commit: 06447403740a9b06627ddd8e1797a793932acc6f
Parents: 72a481f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Sep 4 16:33:36 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Sep 10 00:49:50 2014 +0200

----------------------------------------------------------------------
 .../flink/compiler/plantranslate/NepheleJobGraphGenerator.java   | 2 +-
 .../java/org/apache/flink/runtime/operators/RegularPactTask.java | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/06447403/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index 1eabb83..fcf9f8d 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -101,7 +101,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 	
 	public static final String MERGE_ITERATION_AUX_TASKS_KEY = "compiler.merge-iteration-aux";
 	
-	private static final boolean mergeIterationAuxTasks = GlobalConfiguration.getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, true);
+	private static final boolean mergeIterationAuxTasks = GlobalConfiguration.getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, false);
 	
 //	private static final Log LOG = LogFactory.getLog(NepheleJobGraphGenerator.class);
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/06447403/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index e8d7add..f884df1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -256,8 +256,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 			initInputReaders();
 			initBroadcastInputReaders();
 		} catch (Exception e) {
-			throw new RuntimeException("Initializing the input streams failed" +
-				e.getMessage() == null ? "." : ": " + e.getMessage(), e);
+			throw new RuntimeException("Initializing the input streams failed in Task " + getEnvironment().getTaskName() +
+					(e.getMessage() == null ? "." : ": " + e.getMessage()), e);
 		}
 
 		// initialize the writers. this is necessary for nephele to create the output gates.