You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/11 13:14:22 UTC

[4/4] flink git commit: [FLINK-2277] [scala api] Add flag to set delta iteration solution set to unmanaged

[FLINK-2277] [scala api] Add flag to set delta iteration solution set to unmanaged

This closes #1005


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f50ae26a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f50ae26a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f50ae26a

Branch: refs/heads/master
Commit: f50ae26a2fb4a0c7f5b390e2f0f5528be9f61730
Parents: b42fbf7
Author: Pieter-Jan Van Aeken <pi...@euranova.eu>
Authored: Mon Aug 10 15:16:08 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Aug 11 13:13:47 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/api/scala/DataSet.scala    | 60 ++++++++++++++++++++
 1 file changed, 60 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f50ae26a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 167aa26..207bc5d 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -1075,6 +1075,36 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    *
    * Note: The syntax of delta iterations are very likely going to change soon.
    */
+  def iterateDelta[R: ClassTag](workset: DataSet[R], maxIterations: Int, keyFields: Array[Int],
+                                 solutionSetUnManaged: Boolean)(
+    stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])) = {
+    val key = new ExpressionKeys[T](keyFields, javaSet.getType, false)
+
+    val iterativeSet = new DeltaIteration[T, R](
+      javaSet.getExecutionEnvironment,
+      javaSet.getType,
+      javaSet,
+      workset.javaSet,
+      key,
+      maxIterations)
+
+    iterativeSet.setSolutionSetUnManaged(solutionSetUnManaged)
+
+    val (newSolution, newWorkset) = stepFunction(
+      wrap(iterativeSet.getSolutionSet),
+      wrap(iterativeSet.getWorkset))
+    val result = iterativeSet.closeWith(newSolution.javaSet, newWorkset.javaSet)
+    wrap(result)
+  }
+
+  /**
+   * Creates a new DataSet by performing delta (or workset) iterations using the given step
+   * function. At the beginning `this` DataSet is the solution set and `workset` is the Workset.
+   * The iteration step function gets the current solution set and workset and must output the
+   * delta for the solution set and the workset for the next iteration.
+   *
+   * Note: The syntax of delta iterations are very likely going to change soon.
+   */
   def iterateDelta[R: ClassTag](workset: DataSet[R], maxIterations: Int, keyFields: Array[String])(
     stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])) = {
 
@@ -1094,6 +1124,36 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
     wrap(result)
   }
 
+  /**
+   * Creates a new DataSet by performing delta (or workset) iterations using the given step
+   * function. At the beginning `this` DataSet is the solution set and `workset` is the Workset.
+   * The iteration step function gets the current solution set and workset and must output the
+   * delta for the solution set and the workset for the next iteration.
+   *
+   * Note: The syntax of delta iterations are very likely going to change soon.
+   */
+  def iterateDelta[R: ClassTag](workset: DataSet[R], maxIterations: Int, keyFields: Array[String],
+                                 solutionSetUnManaged: Boolean)(
+    stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])) = {
+
+    val key = new ExpressionKeys[T](keyFields, javaSet.getType)
+    val iterativeSet = new DeltaIteration[T, R](
+      javaSet.getExecutionEnvironment,
+      javaSet.getType,
+      javaSet,
+      workset.javaSet,
+      key,
+      maxIterations)
+
+    iterativeSet.setSolutionSetUnManaged(solutionSetUnManaged)
+
+    val (newSolution, newWorkset) = stepFunction(
+      wrap(iterativeSet.getSolutionSet),
+      wrap(iterativeSet.getWorkset))
+    val result = iterativeSet.closeWith(newSolution.javaSet, newWorkset.javaSet)
+    wrap(result)
+  }
+
   // -------------------------------------------------------------------------------------------
   //  Custom Operators
   // -------------------------------------------------------------------------------------------