You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/11/27 16:02:42 UTC
[1/3] incubator-flink git commit: [scala] Add equals() to
TraversableTypeInfo
Repository: incubator-flink
Updated Branches:
refs/heads/master f66892d46 -> 1a7a50f82
[scala] Add equals() to TraversableTypeInfo
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1a7a50f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1a7a50f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1a7a50f8
Branch: refs/heads/master
Commit: 1a7a50f82f8a1b62a037d765bc6ea4b288cf292b
Parents: 1dd8b8d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Nov 27 15:07:30 2014 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 27 16:01:39 2014 +0100
----------------------------------------------------------------------
.../api/scala/typeutils/TraversableTypeInfo.scala | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a7a50f8/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
index 6ff8c6e..f5ab3ba 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableTypeInfo.scala
@@ -27,7 +27,7 @@ import scala.collection.generic.CanBuildFrom
*/
abstract class TraversableTypeInfo[T <: TraversableOnce[E], E](
clazz: Class[T],
- elementTypeInfo: TypeInformation[E])
+ val elementTypeInfo: TypeInformation[E])
extends TypeInformation[T] {
override def isBasicType: Boolean = false
@@ -39,5 +39,14 @@ abstract class TraversableTypeInfo[T <: TraversableOnce[E], E](
def createSerializer(): TypeSerializer[T]
- override def toString = s"Collection[$elementTypeInfo]"
+ override def equals(other: Any): Boolean = {
+ if (other.isInstanceOf[TraversableTypeInfo[_, _]]) {
+ val otherTrav = other.asInstanceOf[TraversableTypeInfo[_, _]]
+ otherTrav.getTypeClass == getTypeClass && otherTrav.elementTypeInfo == elementTypeInfo
+ } else {
+ false
+ }
+ }
+
+ override def toString = s"$clazz[$elementTypeInfo]"
}
[2/3] incubator-flink git commit: Rewrite TPC-H Q3 to use Long
instead of Integer
Posted by al...@apache.org.
Rewrite TPC-H Q3 to use Long instead of Integer
Both Java and Scala version fixed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1dd8b8d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1dd8b8d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1dd8b8d7
Branch: refs/heads/master
Commit: 1dd8b8d7addce8305389f95c45d11f73fe621d01
Parents: 1a91132
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Nov 17 09:31:10 2014 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 27 16:01:39 2014 +0100
----------------------------------------------------------------------
.../examples/java/relational/TPCHQuery3.java | 28 ++++++++++----------
.../examples/scala/relational/TPCHQuery3.scala | 10 +++----
2 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1dd8b8d7/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
index c10147c..46161b4 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
@@ -174,47 +174,47 @@ public class TPCHQuery3 {
// DATA TYPES
// *************************************************************************
- public static class Lineitem extends Tuple4<Integer, Double, Double, String> {
+ public static class Lineitem extends Tuple4<Long, Double, Double, String> {
- public Integer getOrderkey() { return this.f0; }
+ public Long getOrderkey() { return this.f0; }
public Double getDiscount() { return this.f2; }
public Double getExtendedprice() { return this.f1; }
public String getShipdate() { return this.f3; }
}
- public static class Customer extends Tuple2<Integer, String> {
+ public static class Customer extends Tuple2<Long, String> {
- public Integer getCustKey() { return this.f0; }
+ public Long getCustKey() { return this.f0; }
public String getMktsegment() { return this.f1; }
}
- public static class Order extends Tuple4<Integer, Integer, String, Integer> {
+ public static class Order extends Tuple4<Long, Long, String, Long> {
- public Integer getOrderKey() { return this.f0; }
- public Integer getCustKey() { return this.f1; }
+ public Long getOrderKey() { return this.f0; }
+ public Long getCustKey() { return this.f1; }
public String getOrderdate() { return this.f2; }
- public Integer getShippriority() { return this.f3; }
+ public Long getShippriority() { return this.f3; }
}
- public static class ShippingPriorityItem extends Tuple4<Integer, Double, String, Integer> {
+ public static class ShippingPriorityItem extends Tuple4<Long, Double, String, Long> {
public ShippingPriorityItem() { }
- public ShippingPriorityItem(Integer o_orderkey, Double revenue,
- String o_orderdate, Integer o_shippriority) {
+ public ShippingPriorityItem(Long o_orderkey, Double revenue,
+ String o_orderdate, Long o_shippriority) {
this.f0 = o_orderkey;
this.f1 = revenue;
this.f2 = o_orderdate;
this.f3 = o_shippriority;
}
- public Integer getOrderkey() { return this.f0; }
- public void setOrderkey(Integer orderkey) { this.f0 = orderkey; }
+ public Long getOrderkey() { return this.f0; }
+ public void setOrderkey(Long orderkey) { this.f0 = orderkey; }
public Double getRevenue() { return this.f1; }
public void setRevenue(Double revenue) { this.f1 = revenue; }
public String getOrderdate() { return this.f2; }
- public Integer getShippriority() { return this.f3; }
+ public Long getShippriority() { return this.f3; }
}
// *************************************************************************
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1dd8b8d7/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
index 6cea953..2229060 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
@@ -18,7 +18,7 @@
package org.apache.flink.examples.scala.relational
import org.apache.flink.api.scala._
-import org.apache.flink.util.Collector
+import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.api.java.aggregation.Aggregations
@@ -117,10 +117,10 @@ object TPCHQuery3 {
// USER DATA TYPES
// *************************************************************************
- case class Lineitem(orderId: Integer, extdPrice: Double, discount: Double, shipDate: String)
- case class Customer(custId: Integer, mktSegment: String)
- case class Order(orderId: Integer, custId: Integer, orderDate: String, shipPrio: Integer)
- case class ShippedItem(orderId: Integer, revenue: Double, orderDate: String, shipPrio: Integer)
+ case class Lineitem(orderId: Long, extdPrice: Double, discount: Double, shipDate: String)
+ case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long)
+ case class Customer(custId: Long, mktSegment: String)
+ case class ShippedItem(orderId: Long, revenue: Double, orderDate: String, shipPrio: Long)
// *************************************************************************
// UTIL METHODS
[3/3] incubator-flink git commit: [scala] Fix slow building of
adjacency list in Scala PageRank
Posted by al...@apache.org.
[scala] Fix slow building of adjacency list in Scala PageRank
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1a911320
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1a911320
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1a911320
Branch: refs/heads/master
Commit: 1a911320287b0e9c96e23f38586f421afd0c5eb2
Parents: f66892d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Nov 27 15:15:10 2014 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 27 16:01:39 2014 +0100
----------------------------------------------------------------------
.../examples/scala/graph/PageRankBasic.scala | 25 ++++++++++++--------
1 file changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a911320/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
index e032bee..8eeef70 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
@@ -17,12 +17,17 @@
*/
package org.apache.flink.examples.scala.graph
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.GroupReduceFunction
import org.apache.flink.api.scala._
import org.apache.flink.examples.java.graph.util.PageRankData
import org.apache.flink.api.java.aggregation.Aggregations.SUM
import org.apache.flink.util.Collector
+import scala.collection.JavaConverters._
+
/**
* A basic implementation of the Page Rank algorithm using a bulk iteration.
*
@@ -83,12 +88,13 @@ object PageRankBasic {
// build adjacency list from link input
val adjacencyLists = links
- // initialize lists
- .map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
- // concatenate lists
- .groupBy("sourceId").reduce {
- (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
- }
+ .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] {
+ override def reduce(values: Iterable[Link], out: Collector[AdjacencyList]): Unit = {
+ var outputId = -1L
+ val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId }
+ out.collect(new AdjacencyList(outputId, outputList.toArray))
+ }
+ })
// start iteration
val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
@@ -97,9 +103,9 @@ object PageRankBasic {
// distribute ranks to target pages
.join(adjacencyLists).where("pageId").equalTo("sourceId") {
(page, adjacent, out: Collector[Page]) =>
- for (targetId <- adjacent.targetIds) {
- out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
- }
+ val targets = adjacent.targetIds
+ val len = targets.length
+ adjacent.targetIds foreach { t => out.collect(Page(t, page.rank /len )) }
}
// collect ranks and sum them up
.groupBy("pageId").aggregate(SUM, "rank")
@@ -114,7 +120,6 @@ object PageRankBasic {
// check for significant update
if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
}
-
(newRanks, termination)
}