You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/06/12 15:33:06 UTC

[1/4] flink git commit: [FLINK-2206] Fix incorrect counts of finished, canceled, and failed jobs in webinterface

Repository: flink
Updated Branches:
  refs/heads/release-0.9 ecfde6dd9 -> f5f0709c9


[FLINK-2206] Fix incorrect counts of finished, canceled, and failed jobs in webinterface

This closes #826


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e513be72
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e513be72
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e513be72

Branch: refs/heads/release-0.9
Commit: e513be72a486b4f2e13c617eb6d4d08c03503ae7
Parents: ecfde6d
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Jun 12 01:45:03 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Jun 12 14:26:47 2015 +0200

----------------------------------------------------------------------
 .../jobmanager/web/JobManagerInfoServlet.java   | 31 +++++++++++++++++
 .../js/jobmanagerFrontend.js                    | 36 +++++++++-----------
 .../runtime/jobmanager/MemoryArchivist.scala    | 17 +++++++++
 .../runtime/messages/ArchiveMessages.scala      | 11 +++++-
 4 files changed, 75 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e513be72/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
index 6d58306..3fc3c82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobManagerInfoServlet.java
@@ -66,6 +66,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.StringUtils;
 import org.eclipse.jetty.io.EofException;
 
+import scala.Tuple3;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -117,6 +118,20 @@ public class JobManagerInfoServlet extends HttpServlet {
 					writeJsonForArchive(resp.getWriter(), archivedJobs);
 				}
 			}
+			else if("jobcounts".equals(req.getParameter("get"))) {
+				response = Patterns.ask(archive, ArchiveMessages.getRequestJobCounts(),
+						new Timeout(timeout));
+
+				result = Await.result(response, timeout);
+
+				if(!(result instanceof Tuple3)) {
+					throw new RuntimeException("RequestJobCounts requires a response of type " +
+							"Tuple3. Instead the response is of type " + result.getClass() +
+							".");
+				} else {
+					writeJsonForJobCounts(resp.getWriter(), (Tuple3)result);
+				}
+			}
 			else if("job".equals(req.getParameter("get"))) {
 				String jobId = req.getParameter("job");
 
@@ -341,6 +356,22 @@ public class JobManagerInfoServlet extends HttpServlet {
 	}
 
 	/**
+	 * Writes Json with the job counts
+	 *
+	 * @param wrt
+	 * @param counts
+	 */
+	private void writeJsonForJobCounts(PrintWriter wrt, Tuple3<Integer, Integer, Integer> jobCounts) {
+
+		wrt.write("{");
+		wrt.write("\"finished\": " + jobCounts._1() + ",");
+		wrt.write("\"canceled\": " + jobCounts._2() + ",");
+		wrt.write("\"failed\": "   + jobCounts._3());
+		wrt.write("}");
+
+	}
+
+	/**
 	 * Writes infos about archived job in Json format, including groupvertices and groupverticetimes
 	 *
 	 * @param wrt

http://git-wip-us.apache.org/repos/asf/flink/blob/e513be72/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js b/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js
index 92f6979..63d287c 100644
--- a/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js
+++ b/flink-runtime/src/main/resources/web-docs-infoserver/js/jobmanagerFrontend.js
@@ -81,6 +81,22 @@ function poll(jobId) {
 })();
 
 /*
+ * Polls the job execution counts on page load and every 2 seconds
+ */
+(function pollJobCounts() {
+	$.ajax({ url : "jobsInfo?get=jobcounts", cache: false, type : "GET",
+	    success : function(json) {
+
+		$("#jobs-finished").html(json.finished);
+		$("#jobs-canceled").html(json.canceled);
+		$("#jobs-failed").html(json.failed);
+
+	    }, dataType : "json",
+	});
+	setTimeout(pollJobCounts, 2000);
+})();
+
+/*
  * Polls the number of taskmanagers on page load
  */
 (function pollTaskmanagers() {
@@ -418,20 +434,12 @@ function updateTable(json) {
 	}
 }
 
-var archive_finished = 0;
-var archive_failed = 0;
-var archive_canceled = 0;
-
 /*
  * Creates job history table
  */
 function fillTableArchive(table, json) {
 	$(table).html("");
-	
-	$("#jobs-finished").html(archive_finished);
-	$("#jobs-failed").html(archive_failed);
-	$("#jobs-canceled").html(archive_canceled);
-	
+
 	$.each(json, function(i, job) {
 		_fillTableArchive(table, job, false)
 	});
@@ -459,14 +467,4 @@ function _fillTableArchive(table, job, prepend) {
 						+ job.jobname + " ("
 						+ formattedTimeFromTimestamp(parseInt(job.time))
 						+ ")</a></li>");
-	if (job.status == "FINISHED")
-		archive_finished++;
-	if (job.status == "FAILED")
-		archive_failed++;
-	if (job.status == "CANCELED")
-		archive_canceled++;
-	
-	$("#jobs-finished").html(archive_finished);
-	$("#jobs-failed").html(archive_failed);
-	$("#jobs-canceled").html(archive_canceled);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e513be72/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 62ea435..54d2f2f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager
 
 import akka.actor.Actor
 import org.apache.flink.api.common.JobID
+import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.executiongraph.ExecutionGraph
 import org.apache.flink.runtime.messages.ArchiveMessages._
@@ -45,6 +46,8 @@ import scala.collection.mutable
  *  then a [[CurrentJobStatus]] message with the last state is returned to the sender, otherwise
  *  a [[JobNotFound]] message is returned
  *
+ *  - [[RequestJobCounts]] returns the number of finished, canceled, and failed jobs as a Tuple3
+ *
  * @param max_entries Maximum number of stored Flink jobs
  */
 class MemoryArchivist(private val max_entries: Int)
@@ -57,12 +60,23 @@ class MemoryArchivist(private val max_entries: Int)
    */
   val graphs = mutable.LinkedHashMap[JobID, ExecutionGraph]()
 
+  /* Counters for finished, canceled, and failed jobs */
+  var finishedCnt: Int = 0
+  var canceledCnt: Int = 0
+  var failedCnt: Int = 0
+
   override def receiveWithLogMessages: Receive = {
     
     /* Receive Execution Graph to archive */
     case ArchiveExecutionGraph(jobID, graph) => 
       // wrap graph inside a soft reference
       graphs.update(jobID, graph)
+      // update job counters
+      graph.getState match {
+        case JobStatus.FINISHED => finishedCnt += 1
+        case JobStatus.CANCELED => canceledCnt += 1
+        case JobStatus.FAILED => failedCnt += 1
+      }
       trimHistory()
 
     case RequestArchivedJob(jobID: JobID) =>
@@ -83,6 +97,9 @@ class MemoryArchivist(private val max_entries: Int)
         case Some(graph) => sender ! CurrentJobStatus(jobID, graph.getState)
         case None => sender ! JobNotFound(jobID)
       }
+
+    case RequestJobCounts =>
+      sender ! (finishedCnt, canceledCnt, failedCnt)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e513be72/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
index e9e7dec..c4e3f3e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
@@ -34,6 +34,11 @@ object ArchiveMessages {
   case object RequestArchivedJobs
 
   /**
+   * Requests the number of finished, canceled, and failed jobs
+   */
+  case object RequestJobCounts
+
+  /**
    * Reqeuest a specific ExecutionGraph by JobID. The response is [[RequestArchivedJob]]
    * @param jobID
    */
@@ -56,7 +61,7 @@ object ArchiveMessages {
       jobs.asJavaCollection
     }
   }
-  
+
   // --------------------------------------------------------------------------
   // Utility methods to allow simpler case object access from Java
   // --------------------------------------------------------------------------
@@ -64,4 +69,8 @@ object ArchiveMessages {
   def getRequestArchivedJobs : AnyRef = {
     RequestArchivedJobs
   }
+
+  def getRequestJobCounts : AnyRef = {
+    RequestJobCounts
+  }
 }


[4/4] flink git commit: [FLINK-2194] [type extractor] Excludes Writable type from WritableTypeInformation to be treated as an interface

Posted by fh...@apache.org.
[FLINK-2194] [type extractor] Excludes Writable type from WritableTypeInformation to be treated as an interface

This closes #814.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5f0709c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5f0709c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5f0709c

Branch: refs/heads/release-0.9
Commit: f5f0709c9ba090cd1321dec761a27db556082630
Parents: 40e2df5
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 9 16:11:05 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Jun 12 14:28:52 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/typeutils/TypeExtractor.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f5f0709c/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 71d0cee..41644f9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -1152,7 +1152,7 @@ public class TypeExtractor {
 		}
 		
 		// check for writable types
-		if(Writable.class.isAssignableFrom(clazz)) {
+		if(Writable.class.isAssignableFrom(clazz) && !Writable.class.equals(clazz)) {
 			return (TypeInformation<OUT>) WritableTypeInfo.getWritableTypeInfo((Class<? extends Writable>) clazz);
 		}
 		


[2/4] flink git commit: [FLINK-2207] Fix TableAPI conversion documenation and further renamings for consistency.

Posted by fh...@apache.org.
[FLINK-2207] Fix TableAPI conversion documenation and further renamings for consistency.

This closes #829


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af0fee51
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af0fee51
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af0fee51

Branch: refs/heads/release-0.9
Commit: af0fee512bde4a5dc5c08a3cc17da788a06cd113
Parents: e513be7
Author: Fabian Hueske <fh...@apache.org>
Authored: Fri Jun 12 11:36:03 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Jun 12 14:27:03 2015 +0200

----------------------------------------------------------------------
 docs/libs/table.md                                    |  8 ++++----
 .../flink/api/scala/table/TableConversions.scala      |  4 ++--
 .../main/scala/org/apache/flink/api/table/Table.scala |  2 +-
 .../apache/flink/examples/scala/PageRankTable.scala   |  2 +-
 .../flink/examples/scala/StreamingTableFilter.scala   |  2 +-
 .../flink/api/scala/table/test/FilterITCase.scala     |  6 +++---
 .../flink/api/scala/table/test/JoinITCase.scala       | 14 +++++++-------
 7 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/docs/libs/table.md
----------------------------------------------------------------------
diff --git a/docs/libs/table.md b/docs/libs/table.md
index bcd2cb1..829c9cf 100644
--- a/docs/libs/table.md
+++ b/docs/libs/table.md
@@ -52,7 +52,7 @@ import org.apache.flink.api.scala.table._
 case class WC(word: String, count: Int)
 val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
 val expr = input.toTable
-val result = expr.groupBy('word).select('word, 'count.sum as 'count).toSet[WC]
+val result = expr.groupBy('word).select('word, 'count.sum as 'count).toDataSet[WC]
 {% endhighlight %}
 
 The expression DSL uses Scala symbols to refer to field names and we use code generation to
@@ -69,7 +69,7 @@ case class MyResult(a: String, d: Int)
 
 val input1 = env.fromElements(...).toTable('a, 'b)
 val input2 = env.fromElements(...).toTable('c, 'd)
-val joined = input1.join(input2).where("b = a && d > 42").select("a, d").toSet[MyResult]
+val joined = input1.join(input2).where("b = a && d > 42").select("a, d").toDataSet[MyResult]
 {% endhighlight %}
 
 Notice, how a DataSet can be converted to a Table by using `as` and specifying new
@@ -108,14 +108,14 @@ DataSet<WC> input = env.fromElements(
         new WC("Ciao", 1),
         new WC("Hello", 1));
 
-Table table = tableEnv.toTable(input);
+Table table = tableEnv.fromDataSet(input);
 
 Table filtered = table
         .groupBy("word")
         .select("word.count as count, word")
         .filter("count = 2");
 
-DataSet<WC> result = tableEnv.toSet(filtered, WC.class);
+DataSet<WC> result = tableEnv.toDataSet(filtered, WC.class);
 {% endhighlight %}
 
 When using Java, the embedded DSL for specifying expressions cannot be used. Only String expressions

http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
index b9c0a5e..4f2172e 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableConversions.scala
@@ -33,14 +33,14 @@ class TableConversions(table: Table) {
   /**
    * Converts the [[Table]] to a [[DataSet]].
    */
-  def toSet[T: TypeInformation]: DataSet[T] = {
+  def toDataSet[T: TypeInformation]: DataSet[T] = {
      new ScalaBatchTranslator().translate[T](table.operation)
   }
 
   /**
    * Converts the [[Table]] to a [[DataStream]].
    */
-  def toStream[T: TypeInformation]: DataStream[T] = {
+  def toDataStream[T: TypeInformation]: DataStream[T] = {
     new ScalaStreamingTranslator().translate[T](table.operation)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
index 83d5239..fdb125b 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
@@ -39,7 +39,7 @@ import org.apache.flink.api.table.plan._
  *   val table = set.toTable('a, 'b)
  *   ...
  *   val table2 = ...
- *   val set = table2.toSet[MyType]
+ *   val set = table2.toDataSet[MyType]
  * }}}
  */
 case class Table(private[flink] val operation: PlanNode) {

http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
index 7a26e0e..dda6265 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
@@ -101,7 +101,7 @@ object PageRankTable {
         val newRanks = currentRanks.toTable
           // distribute ranks to target pages
           .join(adjacencyLists).where('pageId === 'sourceId)
-          .select('rank, 'targetIds).toSet[RankOutput]
+          .select('rank, 'targetIds).toDataSet[RankOutput]
           .flatMap {
             (in, out: Collector[(Long, Double)]) =>
               val targets = in.targetIds

http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
index 4aa5653..63dddc9 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
@@ -42,7 +42,7 @@ object StreamingTableFilter {
     val cars = genCarStream().toTable
       .filter('carId === 0)
       .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time)
-      .toStream[CarEvent]
+      .toDataStream[CarEvent]
 
     cars.print()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index bc51a7e..75cd728 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -61,7 +61,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
 
     val filterDs = ds.filter( Literal(false) )
 
-    filterDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = "\n"
   }
@@ -76,7 +76,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
 
     val filterDs = ds.filter( Literal(true) )
 
-    filterDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
       "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
@@ -109,7 +109,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod
 
     val filterDs = ds.filter( 'a % 2 === 0 )
 
-    filterDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    filterDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
       "Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +

http://git-wip-us.apache.org/repos/asf/flink/blob/af0fee51/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
index b3baa56..8c3d1ca 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/JoinITCase.scala
@@ -57,7 +57,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinDs = ds1.join(ds2).where('b === 'e).select('c, 'g)
 
-    joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
   }
@@ -70,7 +70,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinDs = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
 
-    joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = "Hi,Hallo\n"
   }
@@ -83,7 +83,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinDs = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
 
-    joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
       "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
@@ -97,7 +97,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinDs = ds1.join(ds2).where('foo === 'e).select('c, 'g)
 
-    joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = ""
   }
@@ -110,7 +110,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinDs = ds1.join(ds2).where('a === 'g).select('c, 'g)
 
-    joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = ""
   }
@@ -123,7 +123,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinDs = ds1.join(ds2).where('a === 'd).select('c, 'g)
 
-    joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = ""
   }
@@ -136,7 +136,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
 
     val joinDs = ds1.join(ds2).where('a === 'd).select('g.count)
 
-    joinDs.toSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
+    joinDs.toDataSet[Row].writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
     env.execute()
     expected = "6"
   }


[3/4] flink git commit: [FLINK-2072] [ml] Adds quickstart guide

Posted by fh...@apache.org.
[FLINK-2072] [ml] Adds quickstart guide

This closes #792.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40e2df5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40e2df5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40e2df5a

Branch: refs/heads/release-0.9
Commit: 40e2df5acf9385cc3c6e3a947b4bf6cd2bd375b3
Parents: af0fee5
Author: Theodore Vasiloudis <tv...@sics.se>
Authored: Fri Jun 5 11:09:11 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Jun 12 14:27:29 2015 +0200

----------------------------------------------------------------------
 docs/libs/ml/contribution_guide.md |  10 +-
 docs/libs/ml/index.md              |  27 ++--
 docs/libs/ml/quickstart.md         | 216 +++++++++++++++++++++++++++++++-
 3 files changed, 235 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40e2df5a/docs/libs/ml/contribution_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/contribution_guide.md b/docs/libs/ml/contribution_guide.md
index 89f05c0..f0754cb 100644
--- a/docs/libs/ml/contribution_guide.md
+++ b/docs/libs/ml/contribution_guide.md
@@ -36,7 +36,7 @@ Everything from this guide also applies to FlinkML.
 
 ## Pick a Topic
 
-If you are looking for some new ideas, then you should check out the list of [unresolved issues on JIRA](https://issues.apache.org/jira/issues/?jql=component%20%3D%20%22Machine%20Learning%20Library%22%20AND%20project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20priority%20DESC).
+If you are looking for some new ideas you should first look into our [roadmap](vision_roadmap.html#Roadmap), then you should check out the list of [unresolved issues on JIRA](https://issues.apache.org/jira/issues/?jql=component%20%3D%20%22Machine%20Learning%20Library%22%20AND%20project%20%3D%20FLINK%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20priority%20DESC).
 Once you decide to contribute to one of these issues, you should take ownership of it and track your progress with this issue.
 That way, the other contributors know the state of the different issues and redundant work is avoided.
 
@@ -61,7 +61,7 @@ Thus, an integration test could look the following:
 {% highlight scala %}
 class ExampleITSuite extends FlatSpec with FlinkTestBase {
   behavior of "An example algorithm"
-  
+
   it should "do something" in {
     ...
   }
@@ -81,12 +81,12 @@ Every new algorithm is described by a single markdown file.
 This file should contain at least the following points:
 
 1. What does the algorithm do
-2. How does the algorithm work (or reference to description) 
+2. How does the algorithm work (or reference to description)
 3. Parameter description with default values
 4. Code snippet showing how the algorithm is used
 
 In order to use latex syntax in the markdown file, you have to include `mathjax: include` in the YAML front matter.
- 
+
 {% highlight java %}
 ---
 mathjax: include
@@ -103,4 +103,4 @@ See `docs/_include/latex_commands.html` for the complete list of predefined late
 ## Contributing
 
 Once you have implemented the algorithm with adequate test coverage and added documentation, you are ready to open a pull request.
-Details of how to open a pull request can be found [here](http://flink.apache.org/how-to-contribute.html#contributing-code--documentation). 
+Details of how to open a pull request can be found [here](http://flink.apache.org/how-to-contribute.html#contributing-code--documentation).

http://git-wip-us.apache.org/repos/asf/flink/blob/40e2df5a/docs/libs/ml/index.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/index.md b/docs/libs/ml/index.md
index de9137d..9ff7a4b 100644
--- a/docs/libs/ml/index.md
+++ b/docs/libs/ml/index.md
@@ -21,9 +21,9 @@ under the License.
 -->
 
 FlinkML is the Machine Learning (ML) library for Flink. It is a new effort in the Flink community,
-with a growing list of algorithms and contributors. With FlinkML we aim to provide 
-scalable ML algorithms, an intuitive API, and tools that help minimize glue code in end-to-end ML 
-systems. You can see more details about our goals and where the library is headed in our [vision 
+with a growing list of algorithms and contributors. With FlinkML we aim to provide
+scalable ML algorithms, an intuitive API, and tools that help minimize glue code in end-to-end ML
+systems. You can see more details about our goals and where the library is headed in our [vision
 and roadmap here](vision_roadmap.html).
 
 * This will be replaced by the TOC
@@ -55,10 +55,13 @@ FlinkML currently supports the following algorithms:
 
 ## Getting Started
 
-First, you have to [set up a Flink program](http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#linking-with-flink).
-Next, you have to add the FlinkML dependency to the `pom.xml` of your project.  
+You can check out our [quickstart guide](quickstart.html) for a comprehensive getting started
+example.
 
-{% highlight bash %}
+If you want to jump right in, you have to [set up a Flink program](http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#linking-with-flink).
+Next, you have to add the FlinkML dependency to the `pom.xml` of your project.
+
+{% highlight xml %}
 <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-ml</artifactId>
@@ -85,12 +88,11 @@ mlr.fit(trainingData, parameters)
 val predictions: DataSet[LabeledVector] = mlr.predict(testingData)
 {% endhighlight %}
 
-For a more comprehensive guide, please check out our [quickstart guide](quickstart.html)
-
 ## Pipelines
 
 A key concept of FlinkML is its [scikit-learn](http://scikit-learn.org) inspired pipelining mechanism.
 It allows you to quickly build complex data analysis pipelines how they appear in every data scientist's daily work.
+An in-depth description of FlinkML's pipelines and their internal workings can be found [here](pipelines.html).
 
 The following example code shows how easy it is to set up an analysis pipeline with FlinkML.
 
@@ -110,13 +112,14 @@ pipeline.fit(trainingData)
 
 // Calculate predictions
 val predictions: DataSet[LabeledVector] = pipeline.predict(testingData)
-{% endhighlight %} 
+{% endhighlight %}
 
 One can chain a `Transformer` to another `Transformer` or a set of chained `Transformers` by calling the method `chainTransformer`.
-If one wants to chain a `Predictor` to a `Transformer` or a set of chained `Transformers`, one has to call the method `chainPredictor`. 
-An in-depth description of FlinkML's pipelines and their internal workings can be found [here](pipelines.html).
+If one wants to chain a `Predictor` to a `Transformer` or a set of chained `Transformers`, one has to call the method `chainPredictor`.
+
 
 ## How to contribute
 
 The Flink community welcomes all contributors who want to get involved in the development of Flink and its libraries.
-In order to get quickly started with contributing to FlinkML, please read first the official [contribution guide]({{site.baseurl}}/libs/ml/contribution_guide.html).
\ No newline at end of file
+In order to get quickly started with contributing to FlinkML, please read our official
+[contribution guide]({{site.baseurl}}/libs/ml/contribution_guide.html).

http://git-wip-us.apache.org/repos/asf/flink/blob/40e2df5a/docs/libs/ml/quickstart.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/quickstart.md b/docs/libs/ml/quickstart.md
index b8501f8..f5d7451 100644
--- a/docs/libs/ml/quickstart.md
+++ b/docs/libs/ml/quickstart.md
@@ -1,4 +1,5 @@
 ---
+mathjax: include
 htmlTitle: FlinkML - Quickstart Guide
 title: <a href="../ml">FlinkML</a> - Quickstart Guide
 ---
@@ -24,4 +25,217 @@ under the License.
 * This will be replaced by the TOC
 {:toc}
 
-Coming soon.
+## Introduction
+
+FlinkML is designed to make learning from your data a straight-forward process, abstracting away
+the complexities that usually come with big data learning tasks. In this
+quick-start guide we will show just how easy it is to solve a simple supervised learning problem
+using FlinkML. But first some basics, feel free to skip the next few lines if you're already
+familiar with Machine Learning (ML).
+
+As defined by Murphy [[1]](#murphy) ML deals with detecting patterns in data, and using those
+learned patterns to make predictions about the future. We can categorize most ML algorithms into
+two major categories: Supervised and Unsupervised Learning.
+
+* **Supervised Learning** deals with learning a function (mapping) from a set of inputs
+(features) to a set of outputs. The learning is done using a *training set* of (input,
+output) pairs that we use to approximate the mapping function. Supervised learning problems are
+further divided into classification and regression problems. In classification problems we try to
+predict the *class* that an example belongs to, for example whether a user is going to click on
+an ad or not. Regression problems one the other hand, are about predicting (real) numerical
+values, often called the dependent variable, for example what the temperature will be tomorrow.
+
+* **Unsupervised Learning** deals with discovering patterns and regularities in the data. An example
+of this would be *clustering*, where we try to discover groupings of the data from the
+descriptive features. Unsupervised learning can also be used for feature selection, for example
+through [principal components analysis](https://en.wikipedia.org/wiki/Principal_component_analysis).
+
+## Linking with FlinkML
+
+In order to use FlinkML in your project, first you have to
+[set up a Flink program](http://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#linking-with-flink).
+Next, you have to add the FlinkML dependency to the `pom.xml` of your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-ml</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+## Loading data
+
+To load data to be used with FlinkML we can use the ETL capabilities of Flink, or specialized
+functions for formatted data, such as the LibSVM format. For supervised learning problems it is
+common to use the `LabeledVector` class to represent the `(label, features)` examples. A `LabeledVector`
+object will have a FlinkML `Vector` member representing the features of the example and a `Double`
+member which represents the label, which could be the class in a classification problem, or the dependent
+variable for a regression problem.
+
+As an example, we can use Haberman's Survival Data Set , which you can
+[download from the UCI ML repository](http://archive.ics.uci.edu/ml/machine-learning-databases/haberman/haberman.data).
+This dataset *"contains cases from a study conducted on the survival of patients who had undergone
+surgery for breast cancer"*. The data comes in a comma-separated file, where the first 3 columns
+are the features and last column is the class, and the 4th column indicates whether the patient
+survived 5 years or longer (label 1), or died within 5 years (label 2). You can check the [UCI
+page](https://archive.ics.uci.edu/ml/datasets/Haberman%27s+Survival) for more information on the data.
+
+We can load the data as a `DataSet[String]` first:
+
+{% highlight scala %}
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val survival = env.readCsvFile[(String, String, String, String)]("/path/to/haberman.data")
+
+{% endhighlight %}
+
+We can now transform the data into a `DataSet[LabeledVector]`. This will allow us to use the
+dataset with the FlinkML classification algorithms. We know that the 4th element of the dataset
+is the class label, and the rest are features, so we can build `LabeledVector` elements like this:
+
+{% highlight scala %}
+
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.DenseVector
+
+val survivalLV = survival
+  .map{tuple =>
+    val list = tuple.productIterator.toList
+    val numList = list.map(_.asInstanceOf[String].toDouble)
+    LabeledVector(numList(3), DenseVector(numList.take(3).toArray))
+  }
+
+{% endhighlight %}
+
+We can then use this data to train a learner. We will however use another dataset to exemplify
+building a learner; that will allow us to show how we can import other dataset formats.
+
+**LibSVM files**
+
+A common format for ML datasets is the LibSVM format and a number of datasets using that format can be
+found [in the LibSVM datasets website](http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/). FlinkML provides utilities for loading
+datasets using the LibSVM format through the `readLibSVM` function available through the `MLUtils`
+object.
+You can also save datasets in the LibSVM format using the `writeLibSVM` function.
+Let's import the svmguide1 dataset. You can download the
+[training set here](http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/svmguide1)
+and the [test set here](http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary/svmguide1.t).
+This is an astroparticle binary classification dataset, used by Hsu et al. [[3]](#hsu) in their 
+practical Support Vector Machine (SVM) guide. It contains 4 numerical features, and the class label.
+
+We can simply import the dataset then using:
+
+{% highlight scala %}
+
+import org.apache.flink.ml.MLUtils
+
+val astroTrain: DataSet[LabeledVector] = MLUtils.readLibSVM("/path/to/svmguide1")
+val astroTest: DataSet[LabeledVector] = MLUtils.readLibSVM("/path/to/svmguide1.t")
+
+{% endhighlight %}
+
+This gives us two `DataSet[LabeledVector]` objects that we will use in the following section to
+create a classifier.
+
+## Classification
+
+Once we have imported the dataset we can train a `Predictor` such as a linear SVM classifier.
+We can set a number of parameters for the classifier. Here we set the `Blocks` parameter,
+which is used to split the input by the underlying CoCoA algorithm [[2]](#jaggi) uses. The 
+regularization parameter determines the amount of $l_2$ regularization applied, which is used
+to avoid overfitting. The step size determines the contribution of the weight vector updates to
+the next weight vector value. This parameter sets the initial step size.
+
+{% highlight scala %}
+
+import org.apache.flink.ml.classification.SVM
+
+val svm = SVM()
+  .setBlocks(env.getParallelism)
+  .setIterations(100)
+  .setRegularization(0.001)
+  .setStepsize(0.1)
+  .setSeed(42)
+
+svm.fit(astroTrain)
+
+{% endhighlight %}
+
+We can now make predictions on the test set.
+
+{% highlight scala %}
+
+val predictionPairs = svm.predict(astroTest)
+
+{% endhighlight %}
+
+Next we will see how we can pre-process our data, and use the ML pipelines capabilities of FlinkML.
+
+## Data pre-processing and pipelines
+
+A pre-processing step that is often encouraged [[3]](#hsu) when using SVM classification is scaling
+the input features to the [0, 1] range, in order to avoid features with extreme values
+dominating the rest.
+FlinkML has a number of `Transformers` such as `MinMaxScaler` that are used to pre-process data,
+and a key feature is the ability to chain `Transformers` and `Predictors` together. This allows
+us to run the same pipeline of transformations and make predictions on the train and test data in
+a straight-forward and type-safe manner. You can read more on the pipeline system of FlinkML
+[in the pipelines documentation](pipelines.html).
+
+Let us first create a normalizing transformer for the features in our dataset, and chain it to a
+new SVM classifier.
+
+{% highlight scala %}
+
+import org.apache.flink.ml.preprocessing.MinMaxScaler
+
+val scaler = MinMaxScaler()
+
+val scaledSVM = scaler.chainPredictor(svm)
+
+{% endhighlight %}
+
+We can now use our newly created pipeline to make predictions on the test set.
+First we call fit again, to train the scaler and the SVM classifier.
+The data of the test set will then be automatically scaled before being passed on to the SVM to
+make predictions.
+
+{% highlight scala %}
+
+scaledSVM.fit(astroTrain)
+
+val predictionPairsScaled: DataSet[(Double, Double)] = scaledSVM.predict(astroTest)
+
+{% endhighlight %}
+
+The scaled inputs should give us better prediction performance.
+The result of the prediction on `LabeledVector`s is a data set of tuples where the first entry denotes the true label value and the second entry is the predicted label value.
+
+## Where to go from here
+
+This quickstart guide can act as an introduction to the basic concepts of FlinkML, but there's a lot
+more you can do.
+We recommend going through the [FlinkML documentation](index.html), and trying out the different
+algorithms.
+A very good way to get started is to play around with interesting datasets from the UCI ML
+repository and the LibSVM datasets.
+Tackling an interesting problem from a website like [Kaggle](https://www.kaggle.com) or
+[DrivenData](http://www.drivendata.org/) is also a great way to learn by competing with other
+data scientists.
+If you would like to contribute some new algorithms take a look at our
+[contribution guide](contribution_guide.html).
+
+**References**
+
+<a name="murphy"></a>[1] Murphy, Kevin P. *Machine learning: a probabilistic perspective.* MIT 
+press, 2012.
+
+<a name="jaggi"></a>[2] Jaggi, Martin, et al. *Communication-efficient distributed dual 
+coordinate ascent.* Advances in Neural Information Processing Systems. 2014.
+
+<a name="hsu"></a>[3] Hsu, Chih-Wei, Chih-Chung Chang, and Chih-Jen Lin.
+ *A practical guide to support vector classification.* 2003.