You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/07/17 01:46:21 UTC

flink git commit: [FLINK-1963] Improve distinct() transformation

Repository: flink
Updated Branches:
  refs/heads/master c06213706 -> 08ca9ffa9


[FLINK-1963] Improve distinct() transformation

This closes #905


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08ca9ffa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08ca9ffa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08ca9ffa

Branch: refs/heads/master
Commit: 08ca9ffa9a95610c073145a09e731311e728c4fd
Parents: c062137
Author: pietro pinoli <pi...@pietros-MBP.lan>
Authored: Mon Jul 13 13:32:20 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Jul 17 01:39:25 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java | 15 ++---
 .../api/java/operators/DistinctOperator.java    | 26 +++-----
 .../api/java/operator/DistinctOperatorTest.java | 65 +++++++++++++++++++-
 .../org/apache/flink/api/scala/DataSet.scala    | 49 ++++++++++-----
 .../test/javaApiOperators/DistinctITCase.java   | 44 +++++++++++++
 .../api/scala/operators/DistinctITCase.scala    | 39 ++++++++++++
 .../scala/operators/DistinctOperatorTest.scala  | 26 ++++++++
 7 files changed, 223 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index c628b04..81ba279 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -606,13 +606,13 @@ public abstract class DataSet<T> {
 	}
 	
 	/**
-	 * Returns a distinct set of a {@link Tuple} {@link DataSet} using expression keys.
-	 * <p>
-	 * The field position keys specify the fields of Tuples or Pojos on which the decision is made if two elements are distinct or
-	 * not.
+	 * Returns a distinct set of a {@link DataSet} using expression keys.
 	 * <p>
+	 * The field expression keys specify the fields of a {@link org.apache.flink.api.common.typeutils.CompositeType}
+	 * (e.g., Tuple or Pojo type) on which the decision is made if two elements are distinct or not.
+	 * In case of a {@link org.apache.flink.api.common.typeinfo.AtomicType}, only the wildcard expression ("*") is valid.
 	 *
-	 * @param fields One or more field positions on which the distinction of the DataSet is decided. 
+	 * @param fields One or more field expressions on which the distinction of the DataSet is decided.
 	 * @return A DistinctOperator that represents the distinct DataSet.
 	 */
 	public DistinctOperator<T> distinct(String... fields) {
@@ -620,9 +620,10 @@ public abstract class DataSet<T> {
 	}
 	
 	/**
-	 * Returns a distinct set of a {@link Tuple} {@link DataSet} using all fields of the tuple.
+	 * Returns a distinct set of a {@link DataSet}.
 	 * <p>
-	 * Note: This operator can only be applied to Tuple DataSets.
+	 * If the input is a {@link org.apache.flink.api.common.typeutils.CompositeType} (Tuple or Pojo type),
+	 * distinct is performed on all fields and each field must be a key type
 	 * 
 	 * @return A DistinctOperator that represents the distinct DataSet.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index 686823c..a6eb43e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
@@ -47,28 +48,21 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 	private final Keys<T> keys;
 	
 	private final String distinctLocationName;
-	
+
 	public DistinctOperator(DataSet<T> input, Keys<T> keys, String distinctLocationName) {
 		super(input, input.getType());
 
 		this.distinctLocationName = distinctLocationName;
-		
-		// if keys is null distinction is done on all tuple fields
-		if (keys == null) {
-			if (input.getType() instanceof CompositeType) {
-				keys = new Keys.ExpressionKeys<T>(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType());
-			}
-			else {
-				throw new InvalidProgramException("Distinction on all fields is only possible on composite (pojo / tuple) data types.");
-			}
+
+		if (!(input.getType() instanceof CompositeType) &&
+				!(input.getType() instanceof AtomicType && input.getType().isKeyType())){
+			throw new InvalidProgramException("Distinct only possible on composite or atomic key types.");
 		}
-		
-		
-		// FieldPositionKeys can only be applied on Tuples and POJOs
-		if (keys instanceof Keys.ExpressionKeys && !(input.getType() instanceof CompositeType)) {
-			throw new InvalidProgramException("Distinction on field positions is only possible on composite type DataSets.");
+		// if keys is null distinction is done on all fields
+		if (keys == null) {
+			keys = new Keys.ExpressionKeys<T>(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType());
 		}
-		
+
 		this.keys = keys;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
index f4c87c8..f4bd945 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
@@ -114,18 +114,31 @@ public class DistinctOperatorTest {
 	
 	@Test(expected = IllegalArgumentException.class)
 	public void testDistinctByKeyFields6() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
 		// should not work, negative field position
 		tupleDs.distinct(-1);
 	}
+
+	@Test
+	public void testDistinctByKeyFields7(){
+		final ExecutionEnvironment env  = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
+
+		// should work
+		try {
+			longDs.distinct("*");
+		} catch (Exception e){
+			Assert.fail();
+		}
+	}
 	
 	@Test
 	@SuppressWarnings("serial")
 	public void testDistinctByKeySelector1() {
-		
+
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		this.customTypeData.add(new CustomType());
 		
@@ -145,7 +158,53 @@ public class DistinctOperatorTest {
 		}
 		
 	}
-	
+
+	@Test
+	public void  testDistinctByKeyIndices1() {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		try {
+			DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
+			// should work
+			longDs.distinct();
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testDistinctOnNotKeyDataType() throws Exception {
+    	/*
+     	* should not work. NotComparable data type cannot be used as key
+     	*/
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		NotComparable a = new NotComparable();
+		List<NotComparable> l = new ArrayList<NotComparable>();
+		l.add(a);
+
+		DataSet<NotComparable> ds = env.fromCollection(l);
+		DataSet<NotComparable> reduceDs = ds.distinct();
+
+	}
+
+	@Test(expected = InvalidProgramException.class)
+	public void testDistinctOnNotKeyDataTypeOnSelectAllChar() throws Exception {
+    	/*
+     	* should not work. NotComparable data type cannot be used as key
+     	*/
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		NotComparable a = new NotComparable();
+		List<NotComparable> l = new ArrayList<NotComparable>();
+		l.add(a);
+
+		DataSet<NotComparable> ds = env.fromCollection(l);
+		DataSet<NotComparable> reduceDs = ds.distinct("*");
+	}
+
+	class NotComparable {
+		public List<Integer> myInts;
+	}
 
 	public static class CustomType implements Serializable {
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index fd1492a..3a0f6d9 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -710,10 +710,12 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   // --------------------------------------------------------------------------------------------
   //  distinct
   // --------------------------------------------------------------------------------------------
-
   /**
    * Creates a new DataSet containing the distinct elements of this DataSet. The decision whether
    * two elements are distinct or not is made using the return value of the given function.
+   *
+   * @param fun The function which extracts the key values from the DataSet on which the
+   *            distinction of the DataSet is decided.
    */
   def distinct[K: TypeInformation](fun: T => K): DataSet[T] = {
     val keyExtractor = new KeySelector[T, K] {
@@ -728,10 +730,24 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
-   * Creates a new DataSet containing the distinct elements of this DataSet. The decision whether
-   * two elements are distinct or not is made based on only the specified tuple fields.
+   * Returns a distinct set of a {@link DataSet}.
+   * <p>
+   * If the input is a composite type (Tuple or Pojo type), distinct is performed on all fields
+   * and each field must be a key type.
+   */
+  def distinct: DataSet[T] = {
+    wrap(new DistinctOperator[T](javaSet, null, getCallLocationName()))
+  }
+
+  /**
+   * Returns a distinct set of a {@link Tuple} {@link DataSet} using field position keys.
+   * <p>
+   * The field position keys specify the fields of Tuples on which the decision is made if
+   * two Tuples are distinct or not.
+   * <p>
+   * Note: Field position keys can only be specified for Tuple DataSets.
    *
-   * This only works on tuple DataSets.
+   * @param fields One or more field positions on which the distinction of the DataSet is decided.
    */
   def distinct(fields: Int*): DataSet[T] = {
     wrap(new DistinctOperator[T](
@@ -741,8 +757,20 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
-   * Creates a new DataSet containing the distinct elements of this DataSet. The decision whether
-   * two elements are distinct or not is made based on only the specified fields.
+   * Returns a distinct set of a {@link Tuple} {@link DataSet} using expression keys.
+   * <p>
+   * The field position keys specify the fields of Tuples or Pojos on which the decision is made
+   * if two elements are distinct or not.
+   *
+   * The field expression keys specify the fields of a
+   * {@link org.apache.flink.api.common.typeutils.CompositeType}
+   * (e.g., Tuple or Pojo type) on which the decision is made if two elements are distinct or not.
+   * In case of a {@link org.apache.flink.api.common.typeinfo.AtomicType}, only the
+   * wildcard expression ("_") is valid.
+   *
+   * @param firstField First field position on which the distinction of the DataSet is decided
+   * @param otherFields Zero or more field positions on which the distinction of the DataSet
+   *                    is decided
    */
   def distinct(firstField: String, otherFields: String*): DataSet[T] = {
     wrap(new DistinctOperator[T](
@@ -751,15 +779,6 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
       getCallLocationName()))
   }
 
-  /**
-   * Creates a new DataSet containing the distinct elements of this DataSet. The decision whether
-   * two elements are distinct or not is made based on all tuple fields.
-   *
-   * This only works if this DataSet contains Tuples.
-   */
-  def distinct: DataSet[T] = {
-    wrap(new DistinctOperator[T](javaSet, null, getCallLocationName()))
-  }
 
   // --------------------------------------------------------------------------------------------
   //  Keyed DataSet

http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
index 02dbb76..d32986d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.test.javaApiOperators;
 
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.avro.generic.GenericData;
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -274,4 +277,45 @@ public class DistinctITCase extends MultipleProgramsTestBase {
 			return (int) value.nestedPojo.longNumber;
 		}
 	}
+
+	@Test
+	public void testCorrectnessOfDistinctOnAtomic() throws Exception {
+		/*
+		 * check correctness of distinct on Integers
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Integer> ds = CollectionDataSets.getIntegerDataSet(env);
+		DataSet<Integer> reduceDs = ds.distinct();
+
+		List<Integer> result = reduceDs.collect();
+
+		String expected = "1\n2\n3\n4\n5";
+
+		compareResultAsText(result, expected);
+	}
+
+	@Test
+	public void testCorrectnessOfDistinctOnAtomicWithSelectAllChar() throws Exception {
+		/*
+		 * check correctness of distinct on Strings, using Keys.ExpressionKeys.SELECT_ALL_CHAR
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<String> ds = CollectionDataSets.getStringDataSet(env);
+		DataSet<String> reduceDs = ds.union(ds).distinct("*");
+
+		List<String> result = reduceDs.collect();
+
+		String expected = "I am fine.\n" +
+				"Luke Skywalker\n" +
+				"LOL\n" +
+				"Hello world, how are you?\n" +
+				"Hi\n" +
+				"Hello world\n" +
+				"Hello\n" +
+				"Random comment\n";
+
+		compareResultAsText(result, expected);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
index cf82ce9..8b1e2fc 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
@@ -174,6 +174,45 @@ class DistinctITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(m
     env.execute()
     expected = "10000\n20000\n30000\n"
   }
+
+  @Test
+  def testCorrectnessOfDistinctOnAtomic(): Unit = {
+    /*
+     * check correctness of distinct on Integers
+     */
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getIntDataSet(env)
+
+    val reduceDs = ds.distinct
+
+    reduceDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "1\n2\n3\n4\n5"
+  }
+
+  @Test
+  def testCorrectnessOfDistinctOnAtomicWithSelectAllChar(): Unit = {
+    /*
+     * check correctness of distinct on Strings, using Keys.ExpressionKeys.SELECT_ALL_CHAR
+     */
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.getStringDataSet(env)
+    val reduceDs = ds.union(ds).distinct("_")
+
+    reduceDs.writeAsText(resultPath, writeMode = WriteMode.OVERWRITE)
+    env.execute()
+    expected = "I am fine.\n" +
+      "Luke Skywalker\n" +
+      "LOL\n" +
+      "Hello world, how are you?\n" +
+      "Hi\n" +
+      "Hello world\n" +
+      "Hello\n" +
+      "Random comment\n"
+  }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/08ca9ffa/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala
index ca93d86..7fc53e5 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctOperatorTest.scala
@@ -90,6 +90,19 @@ class DistinctOperatorTest {
   }
 
   @Test
+  def testDistinctByKeyIndices7(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val longDs = env.fromCollection(emptyLongData)
+
+    // should work
+    try {
+      longDs.distinct
+    } catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Test
   def testDistinctByKeyFields1(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tupleDs = env.fromCollection(emptyTupleData)
@@ -140,6 +153,19 @@ class DistinctOperatorTest {
   }
 
   @Test
+  def testDistinctByKeyFields6(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val longDs = env.fromCollection(emptyLongData)
+
+    // should work
+    try {
+      longDs.distinct("_")
+    } catch {
+      case e: Exception => Assert.fail()
+    }
+  }
+
+  @Test
   def testDistinctByKeySelector1(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     try {