You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/11 13:14:22 UTC
[4/4] flink git commit: [FLINK-2277] [scala api] Add flag to set
delta iteration solution set to unmanaged
[FLINK-2277] [scala api] Add flag to set delta iteration solution set to unmanaged
This closes #1005
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f50ae26a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f50ae26a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f50ae26a
Branch: refs/heads/master
Commit: f50ae26a2fb4a0c7f5b390e2f0f5528be9f61730
Parents: b42fbf7
Author: Pieter-Jan Van Aeken <pi...@euranova.eu>
Authored: Mon Aug 10 15:16:08 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Aug 11 13:13:47 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/api/scala/DataSet.scala | 60 ++++++++++++++++++++
1 file changed, 60 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f50ae26a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 167aa26..207bc5d 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -1075,6 +1075,36 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
*
* Note: The syntax of delta iterations are very likely going to change soon.
*/
+ def iterateDelta[R: ClassTag](workset: DataSet[R], maxIterations: Int, keyFields: Array[Int],
+ solutionSetUnManaged: Boolean)(
+ stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])) = {
+ val key = new ExpressionKeys[T](keyFields, javaSet.getType, false)
+
+ val iterativeSet = new DeltaIteration[T, R](
+ javaSet.getExecutionEnvironment,
+ javaSet.getType,
+ javaSet,
+ workset.javaSet,
+ key,
+ maxIterations)
+
+ iterativeSet.setSolutionSetUnManaged(solutionSetUnManaged)
+
+ val (newSolution, newWorkset) = stepFunction(
+ wrap(iterativeSet.getSolutionSet),
+ wrap(iterativeSet.getWorkset))
+ val result = iterativeSet.closeWith(newSolution.javaSet, newWorkset.javaSet)
+ wrap(result)
+ }
+
+ /**
+ * Creates a new DataSet by performing delta (or workset) iterations using the given step
+ * function. At the beginning `this` DataSet is the solution set and `workset` is the Workset.
+ * The iteration step function gets the current solution set and workset and must output the
+ * delta for the solution set and the workset for the next iteration.
+ *
+ * Note: The syntax of delta iterations are very likely going to change soon.
+ */
def iterateDelta[R: ClassTag](workset: DataSet[R], maxIterations: Int, keyFields: Array[String])(
stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])) = {
@@ -1094,6 +1124,36 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
wrap(result)
}
+ /**
+ * Creates a new DataSet by performing delta (or workset) iterations using the given step
+ * function. At the beginning `this` DataSet is the solution set and `workset` is the Workset.
+ * The iteration step function gets the current solution set and workset and must output the
+ * delta for the solution set and the workset for the next iteration.
+ *
+ * Note: The syntax of delta iterations are very likely going to change soon.
+ */
+ def iterateDelta[R: ClassTag](workset: DataSet[R], maxIterations: Int, keyFields: Array[String],
+ solutionSetUnManaged: Boolean)(
+ stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])) = {
+
+ val key = new ExpressionKeys[T](keyFields, javaSet.getType)
+ val iterativeSet = new DeltaIteration[T, R](
+ javaSet.getExecutionEnvironment,
+ javaSet.getType,
+ javaSet,
+ workset.javaSet,
+ key,
+ maxIterations)
+
+ iterativeSet.setSolutionSetUnManaged(solutionSetUnManaged)
+
+ val (newSolution, newWorkset) = stepFunction(
+ wrap(iterativeSet.getSolutionSet),
+ wrap(iterativeSet.getWorkset))
+ val result = iterativeSet.closeWith(newSolution.javaSet, newWorkset.javaSet)
+ wrap(result)
+ }
+
// -------------------------------------------------------------------------------------------
// Custom Operators
// -------------------------------------------------------------------------------------------