You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/08/08 18:25:49 UTC

git commit: Remove expression key methods

Repository: incubator-flink
Updated Branches:
  refs/heads/master 1637cb125 -> b63beba56


Remove expression key methods


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b63beba5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b63beba5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b63beba5

Branch: refs/heads/master
Commit: b63beba56fc995dba87d6fc4766af2900c0438d5
Parents: 1637cb1
Author: uce <u....@fu-berlin.de>
Authored: Fri Aug 8 16:40:57 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Fri Aug 8 17:16:27 2014 +0200

----------------------------------------------------------------------
 .../example/java/wordcount/WordCountPOJO.java   | 51 ++++++++++----------
 .../java/org/apache/flink/api/java/DataSet.java |  6 +--
 .../api/java/operators/CoGroupOperator.java     | 24 ++++-----
 .../flink/api/java/operators/JoinOperator.java  | 24 ++++-----
 .../api/java/operator/CoGroupOperatorTest.java  |  8 +--
 .../flink/api/java/operator/GroupingTest.java   |  6 +--
 .../api/java/operator/JoinOperatorTest.java     |  8 +--
 7 files changed, 63 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b63beba5/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java
index d993e60..72d2699 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCountPOJO.java
@@ -19,7 +19,6 @@
 package org.apache.flink.example.java.wordcount;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -62,31 +61,31 @@ public class WordCountPOJO {
 		//
 		// [1] https://mail-archives.apache.org/mod_mbox/incubator-flink-dev/201407.mbox/%3C53D96049.1060509%40cse.uta.edu%3E
 		// ====================================================================
-
-		parseParameters(args);
-
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		DataSet<String> text = getTextDataSet(env);
-
-		DataSet<WC> counts = text 
-					.flatMap(new Tokenizer())
-					.groupBy("word")
-					.reduce(new ReduceFunction<WC>() {
-							public WC reduce(WC value1, WC value2) {
-								return new WC(value1.word, value1.count + value2.count);
-							}
-						});
-
-		// emit result
-		if(fileOutput) {
-			counts.writeAsText(outputPath);
-		} else {
-			counts.print();
-		}
-
-		env.execute("WordCount with custom data types example");
+//
+//		parseParameters(args);
+//
+//		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+//
+//		// get input data
+//		DataSet<String> text = getTextDataSet(env);
+//
+//		DataSet<WC> counts = text
+//					.flatMap(new Tokenizer())
+//					.groupBy("word")
+//					.reduce(new ReduceFunction<WC>() {
+//							public WC reduce(WC value1, WC value2) {
+//								return new WC(value1.word, value1.count + value2.count);
+//							}
+//						});
+//
+//		// emit result
+//		if(fileOutput) {
+//			counts.writeAsText(outputPath);
+//		} else {
+//			counts.print();
+//		}
+//
+//		env.execute("WordCount with custom data types example");
 	}
 
 	// *************************************************************************

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b63beba5/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 3d1238a..5da572d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -434,9 +434,9 @@ public abstract class DataSet<T> {
 	 * @see org.apache.flink.api.java.operators.GroupReduceOperator
 	 * @see DataSet
 	 */
-	public UnsortedGrouping<T> groupBy(String... fields) {
-		return new UnsortedGrouping<T>(this, new Keys.ExpressionKeys<T>(fields, getType()));
-	}
+//	public UnsortedGrouping<T> groupBy(String... fields) {
+//		return new UnsortedGrouping<T>(this, new Keys.ExpressionKeys<T>(fields, getType()));
+//	}
 	
 	// --------------------------------------------------------------------------------------------
 	//  Joining

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b63beba5/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index a890b32..c1ec8c6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -353,12 +353,12 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 		 * @see Tuple
 		 * @see DataSet
 		 */
-		public CoGroupOperatorSetsPredicate where(String field0, String... fields) {
-			String[] actualFields = new String[fields.length + 1];
-			actualFields[0] = field0;
-			System.arraycopy(fields, 0, actualFields, 1, fields.length);
-			return new CoGroupOperatorSetsPredicate(new Keys.ExpressionKeys<I1>(actualFields, input1.getType()));
-		}
+//		public CoGroupOperatorSetsPredicate where(String field0, String... fields) {
+//			String[] actualFields = new String[fields.length + 1];
+//			actualFields[0] = field0;
+//			System.arraycopy(fields, 0, actualFields, 1, fields.length);
+//			return new CoGroupOperatorSetsPredicate(new Keys.ExpressionKeys<I1>(actualFields, input1.getType()));
+//		}
 
 		/**
 		 * Continues a CoGroup transformation and defines a {@link KeySelector} function for the first co-grouped {@link DataSet}.</br>
@@ -426,12 +426,12 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 			 * @return An incomplete CoGroup transformation.
 			 *           Call {@link org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets.CoGroupOperatorSetsPredicate.CoGroupOperatorWithoutFunction#with(org.apache.flink.api.java.functions.RichCoGroupFunction)} to finalize the CoGroup transformation.
 			 */
-			public CoGroupOperatorWithoutFunction equalTo(String field0, String... fields) {
-				String[] actualFields = new String[fields.length + 1];
-				actualFields[0] = field0;
-				System.arraycopy(fields, 0, actualFields, 1, fields.length);
-				return createCoGroupOperator(new Keys.ExpressionKeys<I2>(actualFields, input2.getType()));
-			}
+//			public CoGroupOperatorWithoutFunction equalTo(String field0, String... fields) {
+//				String[] actualFields = new String[fields.length + 1];
+//				actualFields[0] = field0;
+//				System.arraycopy(fields, 0, actualFields, 1, fields.length);
+//				return createCoGroupOperator(new Keys.ExpressionKeys<I2>(actualFields, input2.getType()));
+//			}
 
 			/**
 			 * Continues a CoGroup transformation and defines a {@link KeySelector} function for the second co-grouped {@link DataSet}.</br>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b63beba5/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index ce0aea7..f446193 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -743,12 +743,12 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * @see Tuple
 		 * @see DataSet
 		 */
-		public JoinOperatorSetsPredicate where(String field0, String... fields) {
-			String[] actualFields = new String[fields.length + 1];
-			actualFields[0] = field0;
-			System.arraycopy(fields, 0, actualFields, 1, fields.length);
-			return new JoinOperatorSetsPredicate(new Keys.ExpressionKeys<I1>(actualFields, input1.getType()));
-		}
+//		public JoinOperatorSetsPredicate where(String field0, String... fields) {
+//			String[] actualFields = new String[fields.length + 1];
+//			actualFields[0] = field0;
+//			System.arraycopy(fields, 0, actualFields, 1, fields.length);
+//			return new JoinOperatorSetsPredicate(new Keys.ExpressionKeys<I1>(actualFields, input1.getType()));
+//		}
 		
 		/**
 		 * Continues a Join transformation and defines a {@link KeySelector} function for the first join {@link DataSet}.</br>
@@ -825,12 +825,12 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			 * @param fields The fields of the second join DataSet that should be used as keys.
 			 * @return A DefaultJoin that represents the joined DataSet.
 			 */
-			public DefaultJoin<I1, I2> equalTo(String field0, String... fields) {
-				String[] actualFields = new String[fields.length + 1];
-				actualFields[0] = field0;
-				System.arraycopy(fields, 0, actualFields, 1, fields.length);
-				return createJoinOperator(new Keys.ExpressionKeys<I2>(actualFields, input2.getType()));
-			}
+//			public DefaultJoin<I1, I2> equalTo(String field0, String... fields) {
+//				String[] actualFields = new String[fields.length + 1];
+//				actualFields[0] = field0;
+//				System.arraycopy(fields, 0, actualFields, 1, fields.length);
+//				return createJoinOperator(new Keys.ExpressionKeys<I2>(actualFields, input2.getType()));
+//			}
 
 			/**
 			 * Continues a Join transformation and defines a {@link KeySelector} function for the second join {@link DataSet}.</br>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b63beba5/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
index 360050b..20fea28 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/CoGroupOperatorTest.java
@@ -139,7 +139,7 @@ public class CoGroupOperatorTest {
 
 		// should work
 		try {
-			ds1.coGroup(ds2).where("myInt").equalTo("myInt");
+//			ds1.coGroup(ds2).where("myInt").equalTo("myInt");
 		} catch(Exception e) {
 			Assert.fail();
 		}
@@ -154,7 +154,7 @@ public class CoGroupOperatorTest {
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
 
 		// should not work, incompatible cogroup key types
-		ds1.coGroup(ds2).where("myInt").equalTo("myString");
+//		ds1.coGroup(ds2).where("myInt").equalTo("myString");
 	}
 
 	@Ignore
@@ -166,7 +166,7 @@ public class CoGroupOperatorTest {
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
 
 		// should not work, incompatible number of cogroup keys
-		ds1.coGroup(ds2).where("myInt", "myString").equalTo("myString");
+//		ds1.coGroup(ds2).where("myInt", "myString").equalTo("myString");
 	}
 
 	@Ignore
@@ -178,7 +178,7 @@ public class CoGroupOperatorTest {
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
 
 		// should not work, cogroup key non-existent
-		ds1.coGroup(ds2).where("myNonExistent").equalTo("myInt");
+//		ds1.coGroup(ds2).where("myNonExistent").equalTo("myInt");
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b63beba5/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index 98b6998..f5cd7b9 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -125,7 +125,7 @@ public class GroupingTest {
 
 		// should work
 		try {
-			ds.groupBy("myInt");
+//			ds.groupBy("myInt");
 		} catch(Exception e) {
 			Assert.fail();
 		}
@@ -139,7 +139,7 @@ public class GroupingTest {
 
 		DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
 		// should not work: groups on basic type
-		longDs.groupBy("myInt");
+//		longDs.groupBy("myInt");
 	}
 
 	@Ignore
@@ -164,7 +164,7 @@ public class GroupingTest {
 		DataSet<CustomType> ds = env.fromCollection(customTypeData);
 
 		// should not work, key out of tuple bounds
-		ds.groupBy("myNonExistent");
+//		ds.groupBy("myNonExistent");
 	}
 
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b63beba5/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
index 11eca83..fe4414b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
@@ -139,7 +139,7 @@ public class JoinOperatorTest {
 
 		// should work
 		try {
-			ds1.join(ds2).where("myInt").equalTo("myInt");
+//			ds1.join(ds2).where("myInt").equalTo("myInt");
 		} catch(Exception e) {
 			Assert.fail();
 		}
@@ -154,7 +154,7 @@ public class JoinOperatorTest {
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
 
 		// should not work, incompatible join key types
-		ds1.join(ds2).where("myInt").equalTo("myString");
+//		ds1.join(ds2).where("myInt").equalTo("myString");
 	}
 
 	@Ignore
@@ -166,7 +166,7 @@ public class JoinOperatorTest {
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
 
 		// should not work, incompatible number of join keys
-		ds1.join(ds2).where("myInt", "myString").equalTo("myString");
+//		ds1.join(ds2).where("myInt", "myString").equalTo("myString");
 	}
 
 	@Ignore
@@ -178,7 +178,7 @@ public class JoinOperatorTest {
 		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
 
 		// should not work, join key non-existent
-		ds1.join(ds2).where("myNonExistent").equalTo("myInt");
+//		ds1.join(ds2).where("myNonExistent").equalTo("myInt");
 	}