You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/11/27 16:02:42 UTC

[1/3] incubator-flink git commit: [scala] Add equals() to TraversableTypeInfo

Repository: incubator-flink
Updated Branches:
  refs/heads/master f66892d46 -> 1a7a50f82


[scala] Add equals() to TraversableTypeInfo


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1a7a50f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1a7a50f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1a7a50f8

Branch: refs/heads/master
Commit: 1a7a50f82f8a1b62a037d765bc6ea4b288cf292b
Parents: 1dd8b8d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Nov 27 15:07:30 2014 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 27 16:01:39 2014 +0100

----------------------------------------------------------------------
 .../api/scala/typeutils/TraversableTypeInfo.scala      | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a7a50f8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
index 6ff8c6e..f5ab3ba 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
@@ -27,7 +27,7 @@ import scala.collection.generic.CanBuildFrom
  */
 abstract class TraversableTypeInfo[T <: TraversableOnce[E], E](
     clazz: Class[T],
-    elementTypeInfo: TypeInformation[E])
+    val elementTypeInfo: TypeInformation[E])
   extends TypeInformation[T] {
 
   override def isBasicType: Boolean = false
@@ -39,5 +39,14 @@ abstract class TraversableTypeInfo[T <: TraversableOnce[E], E](
 
   def createSerializer(): TypeSerializer[T]
 
-  override def toString = s"Collection[$elementTypeInfo]"
+  override def equals(other: Any): Boolean = {
+    if (other.isInstanceOf[TraversableTypeInfo[_, _]]) {
+      val otherTrav = other.asInstanceOf[TraversableTypeInfo[_, _]]
+      otherTrav.getTypeClass == getTypeClass && otherTrav.elementTypeInfo == elementTypeInfo
+    } else {
+      false
+    }
+  }
+
+  override def toString = s"$clazz[$elementTypeInfo]"
 }


[2/3] incubator-flink git commit: Rewrite TPC-H Q3 to use Long instead of Integer

Posted by al...@apache.org.
Rewrite TPC-H Q3 to use Long instead of Integer

Both Java and Scala version fixed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1dd8b8d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1dd8b8d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1dd8b8d7

Branch: refs/heads/master
Commit: 1dd8b8d7addce8305389f95c45d11f73fe621d01
Parents: 1a91132
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Nov 17 09:31:10 2014 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 27 16:01:39 2014 +0100

----------------------------------------------------------------------
 .../examples/java/relational/TPCHQuery3.java    | 28 ++++++++++----------
 .../examples/scala/relational/TPCHQuery3.scala  | 10 +++----
 2 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1dd8b8d7/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
index c10147c..46161b4 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
@@ -174,47 +174,47 @@ public class TPCHQuery3 {
 	//     DATA TYPES
 	// *************************************************************************
 	
-	public static class Lineitem extends Tuple4<Integer, Double, Double, String> {
+	public static class Lineitem extends Tuple4<Long, Double, Double, String> {
 
-		public Integer getOrderkey() { return this.f0; }
+		public Long getOrderkey() { return this.f0; }
 		public Double getDiscount() { return this.f2; }
 		public Double getExtendedprice() { return this.f1; }
 		public String getShipdate() { return this.f3; }
 	}
 
-	public static class Customer extends Tuple2<Integer, String> {
+	public static class Customer extends Tuple2<Long, String> {
 		
-		public Integer getCustKey() { return this.f0; }
+		public Long getCustKey() { return this.f0; }
 		public String getMktsegment() { return this.f1; }
 	}
 
-	public static class Order extends Tuple4<Integer, Integer, String, Integer> {
+	public static class Order extends Tuple4<Long, Long, String, Long> {
 		
-		public Integer getOrderKey() { return this.f0; }
-		public Integer getCustKey() { return this.f1; }
+		public Long getOrderKey() { return this.f0; }
+		public Long getCustKey() { return this.f1; }
 		public String getOrderdate() { return this.f2; }
-		public Integer getShippriority() { return this.f3; }
+		public Long getShippriority() { return this.f3; }
 	}
 
-	public static class ShippingPriorityItem extends Tuple4<Integer, Double, String, Integer> {
+	public static class ShippingPriorityItem extends Tuple4<Long, Double, String, Long> {
 
 		public ShippingPriorityItem() { }
 
-		public ShippingPriorityItem(Integer o_orderkey, Double revenue,
-				String o_orderdate, Integer o_shippriority) {
+		public ShippingPriorityItem(Long o_orderkey, Double revenue,
+				String o_orderdate, Long o_shippriority) {
 			this.f0 = o_orderkey;
 			this.f1 = revenue;
 			this.f2 = o_orderdate;
 			this.f3 = o_shippriority;
 		}
 		
-		public Integer getOrderkey() { return this.f0; }
-		public void setOrderkey(Integer orderkey) { this.f0 = orderkey; }
+		public Long getOrderkey() { return this.f0; }
+		public void setOrderkey(Long orderkey) { this.f0 = orderkey; }
 		public Double getRevenue() { return this.f1; }
 		public void setRevenue(Double revenue) { this.f1 = revenue; }
 		
 		public String getOrderdate() { return this.f2; }
-		public Integer getShippriority() { return this.f3; }
+		public Long getShippriority() { return this.f3; }
 	}
 	
 	// *************************************************************************

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1dd8b8d7/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
index 6cea953..2229060 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.examples.scala.relational
 
 import org.apache.flink.api.scala._
-import org.apache.flink.util.Collector
+import org.apache.flink.core.fs.FileSystem.WriteMode
 
 import org.apache.flink.api.java.aggregation.Aggregations
 
@@ -117,10 +117,10 @@ object TPCHQuery3 {
   //     USER DATA TYPES
   // *************************************************************************
   
-  case class Lineitem(orderId: Integer, extdPrice: Double, discount: Double, shipDate: String)
-  case class Customer(custId: Integer, mktSegment: String)
-  case class Order(orderId: Integer, custId: Integer, orderDate: String, shipPrio: Integer)
-  case class ShippedItem(orderId: Integer, revenue: Double, orderDate: String, shipPrio: Integer)
+  case class Lineitem(orderId: Long, extdPrice: Double, discount: Double, shipDate: String)
+  case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long)
+  case class Customer(custId: Long, mktSegment: String)
+  case class ShippedItem(orderId: Long, revenue: Double, orderDate: String, shipPrio: Long)
 
   // *************************************************************************
   //     UTIL METHODS


[3/3] incubator-flink git commit: [scala] Fix slow building of adjacency list in Scala PageRank

Posted by al...@apache.org.
[scala] Fix slow building of adjacency list in Scala PageRank


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1a911320
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1a911320
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1a911320

Branch: refs/heads/master
Commit: 1a911320287b0e9c96e23f38586f421afd0c5eb2
Parents: f66892d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Nov 27 15:15:10 2014 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 27 16:01:39 2014 +0100

----------------------------------------------------------------------
 .../examples/scala/graph/PageRankBasic.scala    | 25 ++++++++++++--------
 1 file changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a911320/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
index e032bee..8eeef70 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
@@ -17,12 +17,17 @@
  */
 package org.apache.flink.examples.scala.graph
 
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.GroupReduceFunction
 import org.apache.flink.api.scala._
 import org.apache.flink.examples.java.graph.util.PageRankData
 import org.apache.flink.api.java.aggregation.Aggregations.SUM
 
 import org.apache.flink.util.Collector
 
+import scala.collection.JavaConverters._
+
 /**
  * A basic implementation of the Page Rank algorithm using a bulk iteration.
  * 
@@ -83,12 +88,13 @@ object PageRankBasic {
 
     // build adjacency list from link input
     val adjacencyLists = links
-      // initialize lists
-      .map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
-      // concatenate lists
-      .groupBy("sourceId").reduce {
-      (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
-      }
+      .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] {
+        override def reduce(values: Iterable[Link], out: Collector[AdjacencyList]): Unit = {
+          var outputId = -1L
+          val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId }
+          out.collect(new AdjacencyList(outputId, outputList.toArray))
+        }
+      })
 
     // start iteration
     val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
@@ -97,9 +103,9 @@ object PageRankBasic {
           // distribute ranks to target pages
           .join(adjacencyLists).where("pageId").equalTo("sourceId") {
             (page, adjacent, out: Collector[Page]) =>
-            for (targetId <- adjacent.targetIds) {
-              out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
-            }
+              val targets = adjacent.targetIds
+              val len = targets.length
+              adjacent.targetIds foreach { t => out.collect(Page(t, page.rank /len )) }
           }
           // collect ranks and sum them up
           .groupBy("pageId").aggregate(SUM, "rank")
@@ -114,7 +120,6 @@ object PageRankBasic {
             // check for significant update
             if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
         }
-
         (newRanks, termination)
     }