You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2017/01/27 12:51:55 UTC
incubator-mrql git commit: [MRQL-101] Fix memory materialization of
lazy bags
Repository: incubator-mrql
Updated Branches:
refs/heads/master c5d5028bd -> 00581f10d
[MRQL-101] Fix memory materialization of lazy bags
Project: http://git-wip-us.apache.org/repos/asf/incubator-mrql/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mrql/commit/00581f10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mrql/tree/00581f10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mrql/diff/00581f10
Branch: refs/heads/master
Commit: 00581f10dfd1ffd15ba71f9ff785dabbaf98d9e3
Parents: c5d5028
Author: fegaras <fe...@cse.uta.edu>
Authored: Thu Jan 26 13:45:09 2017 -0600
Committer: fegaras <fe...@cse.uta.edu>
Committed: Thu Jan 26 13:45:09 2017 -0600
----------------------------------------------------------------------
conf/mrql-env.sh | 6 +-
.../main/java/org/apache/mrql/Interpreter.gen | 5 +-
.../java/org/apache/mrql/Materialization.gen | 155 +++++++++----------
.../main/java/org/apache/mrql/TypeInference.gen | 1 +
pom.xml | 2 +-
5 files changed, 79 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/00581f10/conf/mrql-env.sh
----------------------------------------------------------------------
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index 3da0168..332ca89 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -82,7 +82,7 @@ BSP_SPLIT_INPUT=
# Spark versions 0.8.1, 0.9.0, and 0.9.1 are supported by MRQL 0.9.0 only.
# You may use the Spark prebuilts bin-hadoop1 or bin-hadoop2 (Yarn)
# Tested in local, standalone deploy, and Yarn modes
-SPARK_HOME=${HOME}/spark
+SPARK_HOME=${HOME}/spark-1.6.2-bin-hadoop2.6
# URI of the Spark master node:
# to run Spark on Standalone Mode, set it to spark://`hostname`:7077
# to run Spark on a YARN cluster, set it to "yarn-client"
@@ -99,9 +99,9 @@ SPARK_EXECUTOR_MEMORY=1G
# Optional: Flink configuration. Supports version 1.0.2 and 1.0.3
-FLINK_VERSION=1.1.2
+FLINK_VERSION=1.0.2
# Flink installation directory
-FLINK_HOME=${HOME}/ap/flink-${FLINK_VERSION}
+FLINK_HOME=${HOME}/flink-${FLINK_VERSION}
# number of slots per TaskManager (typically, the number of cores per node)
FLINK_SLOTS=4
# memory per TaskManager
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/00581f10/core/src/main/java/org/apache/mrql/Interpreter.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Interpreter.gen b/core/src/main/java/org/apache/mrql/Interpreter.gen
index c138c47..f934abe 100644
--- a/core/src/main/java/org/apache/mrql/Interpreter.gen
+++ b/core/src/main/java/org/apache/mrql/Interpreter.gen
@@ -939,12 +939,13 @@ public class Interpreter extends TypeInference {
ne = Simplification.simplify_all(ne);
if (Streaming.is_streaming(ne) && !Config.incremental)
ne = Streaming.streamify(ne);
+ ne = Materialization.materialize_terms(ne);
Tree plan = PlanGeneration.makePlan(ne);
if (Config.bsp_mode) {
BSPTranslator.reset();
if (Config.trace)
System.out.println("Physical plan:\n"+plan.pretty(0));
- plan = Materialization.materialize_terms(BSPTranslator.constructBSPplan(plan));
+ plan = BSPTranslator.constructBSPplan(plan);
if (Config.trace)
System.out.println("BSP plan:\n"+plan.pretty(0));
else {
@@ -955,7 +956,7 @@ public class Interpreter extends TypeInference {
} else {
if (Config.hadoop_mode)
plan = PlanGeneration.physical_plan(plan);
- plan = Materialization.materialize_terms(AlgebraicOptimization.common_factoring(plan));
+ plan = AlgebraicOptimization.common_factoring(plan);
if (Config.trace)
System.out.println("Physical plan:\n"+plan.pretty(0));
else {
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/00581f10/core/src/main/java/org/apache/mrql/Materialization.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Materialization.gen b/core/src/main/java/org/apache/mrql/Materialization.gen
index c056e20..e644b83 100644
--- a/core/src/main/java/org/apache/mrql/Materialization.gen
+++ b/core/src/main/java/org/apache/mrql/Materialization.gen
@@ -21,24 +21,24 @@ import org.apache.mrql.gen.*;
import java.util.*;
-class Domains {
- public Trees domains;
- public Trees repeats;
- public Domains ( Trees d, Trees r ) { domains = d; repeats = r; }
-}
-
-
/** if the plan refers to a variable bound to a stream-based Bag and occurs in the code
* multiple times, embed code to materialize this Bag in memory
*/
final public class Materialization extends Translator {
- // is this a direct-access term? (not the results of a bulk operation)
+ // terms that access bulk terms directly
+ private static Trees access_terms = #[];
+ // access_terms that are repeated more than once
+ private static Trees repeat_terms = #[];
+
+ /** is this a direct-access term? (not the results of a bulk operation) */
private static boolean access_variable ( Tree e ) {
match e {
case nth(`x,_):
return access_variable(x);
- case union_value(`x):
+ case project(`x,_):
+ return access_variable(x);
+ case typed(union_value(`x),_):
return access_variable(x);
case index(`x,`n):
return access_variable(x);
@@ -49,90 +49,77 @@ final public class Materialization extends Translator {
return false;
}
- private static Domains new_domain ( Trees vars, Tree e, Domains d ) {
- if (!access_variable(e))
- return materialize(vars,e,d);
- Domains nd = new Domains(d.domains,d.repeats);
- if ((d.domains.member(e) || !free_variables(e,vars).is_empty())
- && !d.repeats.member(e))
- nd.repeats = nd.repeats.cons(e);
- nd.domains = nd.domains.cons(e);
- return nd;
+ /** check if the term e is a direct access term that extracts a collection */
+ private static boolean transient_term ( Tree e ) {
+ match e {
+ case materialize(_):
+ return false;
+ };
+ if (e.is_variable() || !access_variable(e))
+ return false;
+ match TypeInference.type_inference(e) {
+ case `T(_):
+ if (is_collection(T))
+ return true;
+ };
+ return false;
}
- private static Domains union ( Domains xd, Domains yd ) {
- Domains nd = new Domains(xd.domains,xd.repeats);
- for ( Tree y: yd.domains )
- if (!nd.domains.member(y))
- nd.domains = nd.domains.cons(y);
- for ( Tree y: yd.repeats )
- if (!nd.repeats.member(y))
- nd.repeats = nd.repeats.cons(y);
- return nd;
+ /** derive the access_terms and the repated_terms */
+ private static void find_repeated_terms ( Tree e ) {
+ if (transient_term(e))
+ if (access_terms.member(e)) {
+ if (!repeat_terms.member(e))
+ repeat_terms = repeat_terms.cons(e);
+ } else access_terms = access_terms.cons(e);
+ else match e {
+ case type(_): ;
+ case materialize(_): ;
+ case `f(...as):
+ for ( Tree a: as)
+ find_repeated_terms(a);
+ }
}
- final static int unionM = ClassImporter.find_method_number("plus",#[bag(any),bag(any)]);
-
- private static Domains materialize ( Trees vars, Tree e, Domains d ) {
- match e {
- case lambda(`v,`b):
- return materialize(#[`v],b,d);
- case cmap(lambda(`v,`b),`s):
- return materialize(#[`v],b,new_domain(vars,s,d));
- case map(lambda(`v,`b),`s):
- return materialize(#[`v],b,new_domain(vars,s,d));
- case filter(lambda(`v1,`b1),lambda(`v2,`b2),`s):
- return materialize(#[`v1],b1,materialize(#[`v2],b2,new_domain(vars,s,d)));
- case aggregate(lambda(`v,`b),`z,`s):
- return materialize(#[`v],b,new_domain(vars,s,d));
- case groupBy(`s):
- return new_domain(vars,s,d);
- case orderBy(`s):
- return new_domain(vars,s,d);
- case mapReduce(lambda(`mv,`m),lambda(`v,`b),`s,`o):
- return materialize(#[`v],b,materialize(#[`mv],m,
- new_domain(vars,s,d)));
- case mapReduce2(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`v,`b),`x,`y,`o):
- return materialize(#[`v],b,materialize(#[`v1],b1,materialize(#[`v2],b2,
- new_domain(vars,x,new_domain(vars,y,d)))));
- case join(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`v,`b),`x,`y):
- return materialize(#[`v],b,materialize(#[`v1],b1,materialize(#[`v2],b2,
- new_domain(vars,x,new_domain(vars,y,d)))));
- case crossProduct(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`v,`b),`x,`y):
- return materialize(#[`v],b,materialize(#[`v1],b1,materialize(#[`v2],b2,
- new_domain(vars,x,new_domain(vars,y,d)))));
- case let(`v,`x,`y):
- Domains nd = materialize(vars.cons(v),y,materialize(vars,x,d));
- Trees zs = #[];
- for ( Tree z: nd.repeats )
- if (!v.equals(z))
- zs = zs.cons(z);
- nd.repeats = zs;
- return nd;
- case if(`p,`x,`y):
- Domains nd = materialize(vars,p,d);
- return union(materialize(vars,x,nd),
- materialize(vars,y,nd));
- case callM(union,_,`x,`y):
- return new_domain(vars,x,new_domain(vars,y,d));
- case callM(_,`k,`x,`y):
- if (((LongLeaf)k).value() != unionM)
+ /** if the term e is a bag processing operation that accesses a term
+ in access_terms and it is inside the functional of another
+ bag processing operation, it is a repeated_term */
+ private static void repeated_terms ( Tree e, int level ) {
+ if (access_terms.member(e) && level > 0) {
+ if (!repeat_terms.member(e))
+ repeat_terms = repeat_terms.cons(e);
+ } else match e {
+ case `f(lambda(`v,`b),`x,...):
+ if (! #[cmap,map,filter,aggregate].member(#<`f>))
+ fail;
+ repeated_terms(b,level+1); // the map function is repeated
+ repeated_terms(x,level);
+ case mapReduce(lambda(`v1,`b1),lambda(`v2,`b2),`x,...):
+ repeated_terms(b1,level+1); // the map function is repeated
+ repeated_terms(b2,level+1); // the reduce function is repeated
+ repeated_terms(x,level);
+ case `f(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`v3,`b3),`x,`y,...):
+ if (! #[join,mapReduce2,crossProduct].member(#<`f>))
fail;
- return new_domain(vars,x,new_domain(vars,y,d));
- case callM(join_key,_,`x,`y):
- return new_domain(vars,x,new_domain(vars,y,d));
+ repeated_terms(b1,level+1); // the left key function is repeated
+ repeated_terms(b2,level+1); // the right key function is repeated
+ repeated_terms(b3,level+1); // the reduce function is repeated
+ repeated_terms(x,level);
+ repeated_terms(y,level);
case `f(...as):
- Domains nd = new Domains(d.domains,d.repeats);
- for ( Tree a: as )
- nd = materialize(vars,a,nd);
- return nd;
- };
- return d;
+ for ( Tree a: as)
+ repeated_terms(a,level);
+ }
}
+ /** if a direct-access term that constructs a lazy bag (an iterator)
+ is used more than once, materialize it in memory */
public static Tree materialize_terms ( Tree e ) {
- Domains d = materialize(#[],e,new Domains(#[],#[]));
- for ( Tree x: d.repeats )
+ access_terms = #[];
+ repeat_terms = #[];
+ find_repeated_terms(e);
+ repeated_terms(e,0);
+ for ( Tree x: repeat_terms )
e = subst(x,#<materialize(`x)>,e);
return e;
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/00581f10/core/src/main/java/org/apache/mrql/TypeInference.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/TypeInference.gen b/core/src/main/java/org/apache/mrql/TypeInference.gen
index 60ca56c..bc17ca4 100644
--- a/core/src/main/java/org/apache/mrql/TypeInference.gen
+++ b/core/src/main/java/org/apache/mrql/TypeInference.gen
@@ -1448,6 +1448,7 @@ public class TypeInference extends Translator {
};
case callM(`f,_,...el):
return type_inference(#<call(`f,...el)>);
+ case materialize(`x): return type_inference2(x);
case true: return #<bool>;
case false: return #<bool>;
case null: return #<any>;
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/00581f10/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9ac5989..6c18bfe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -49,7 +49,7 @@
<hama.version>0.7.1</hama.version>
<spark.version>1.6.2</spark.version>
<scala.version>2.10</scala.version>
- <flink.version>1.0.3</flink.version>
+ <flink.version>1.0.2</flink.version>
<storm.version>1.0.2</storm.version>
<skipTests>true</skipTests>
</properties>