You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2016/08/04 15:58:27 UTC

incubator-mrql git commit: [MRQL-93] Support query evaluation tracing and how-provenance

Repository: incubator-mrql
Updated Branches:
  refs/heads/master c2bd1b818 -> ba7521e09


[MRQL-93] Support query evaluation tracing and how-provenance


Project: http://git-wip-us.apache.org/repos/asf/incubator-mrql/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mrql/commit/ba7521e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mrql/tree/ba7521e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mrql/diff/ba7521e0

Branch: refs/heads/master
Commit: ba7521e09329df01009c96ce0605725592d0f1fd
Parents: c2bd1b8
Author: Leonidas Fegaras <fe...@cse.uta.edu>
Authored: Wed Aug 3 11:29:58 2016 -0500
Committer: Leonidas Fegaras <fe...@cse.uta.edu>
Committed: Wed Aug 3 11:29:58 2016 -0500

----------------------------------------------------------------------
 conf/mrql-env.sh                                |   8 +-
 .../org/apache/mrql/AlgebraicOptimization.gen   |   2 +
 core/src/main/java/org/apache/mrql/Compiler.gen |   2 +
 core/src/main/java/org/apache/mrql/Config.java  |   7 +
 .../main/java/org/apache/mrql/Interpreter.gen   |  30 ++
 .../java/org/apache/mrql/PlanGeneration.gen     |   5 +
 core/src/main/java/org/apache/mrql/Printer.gen  |   2 +
 .../main/java/org/apache/mrql/Provenance.gen    | 378 +++++++++++++++++++
 .../src/main/java/org/apache/mrql/Streaming.gen |  22 +-
 core/src/main/java/org/apache/mrql/TopLevel.gen |  19 +
 .../main/java/org/apache/mrql/Translator.gen    |   2 +-
 .../main/java/org/apache/mrql/TypeInference.gen |  13 +
 core/src/main/java/org/apache/mrql/mrql.cgen    |  11 +-
 core/src/main/java/org/apache/mrql/mrql.lex     |   2 +
 .../java/org/apache/mrql/FlinkEvaluator.gen     |   3 +
 15 files changed, 494 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ba7521e0/conf/mrql-env.sh
----------------------------------------------------------------------
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index 6462949..ed891e1 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -34,9 +34,13 @@
 
 
 # Required: The Java installation directory
-if [ ! -f ${JAVA_HOME}) ]; then
+if [ -z ${JAVA_HOME} ]; then
    export JAVA_HOME=/usr/lib/jvm/java-8-oracle
 fi
+if [ ! -e ${JAVA_HOME} ]; then
+    echo "*** Non-existent JAVA_HOME"
+    exit -1
+fi
 
 # Required: The CUP parser library
 # You can download it from http://www2.cs.tum.edu/projects/cup/
@@ -49,7 +53,7 @@ JLINE_JAR=${HOME}/.m2/repository/jline/jline/1.0/jline-1.0.jar
 
 # Hadoop configuration. Supports versions 1.x and 2.x (YARN)
 # The Hadoop installation directory
-if [ ! -f ${HADOOP_HOME}) ]; then
+if [ -z ${HADOOP_HOME} ]; then
     HADOOP_VERSION=2.7.1
     HADOOP_HOME=${HOME}/hadoop-${HADOOP_VERSION}
 fi

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ba7521e0/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen b/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen
index 7f1fbcc..0421e43 100644
--- a/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen
+++ b/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen
@@ -184,6 +184,8 @@ public class AlgebraicOptimization extends Simplification {
                     };
             };
             fail
+        case provenance(`x,...s):
+            return #<provenance(`(translate(x)),...s)>;
         case `f(...al):
             Trees bl = #[];
             for ( Tree a: al )

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ba7521e0/core/src/main/java/org/apache/mrql/Compiler.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Compiler.gen b/core/src/main/java/org/apache/mrql/Compiler.gen
index 0c2ba3a..5dbaf24 100644
--- a/core/src/main/java/org/apache/mrql/Compiler.gen
+++ b/core/src/main/java/org/apache/mrql/Compiler.gen
@@ -421,6 +421,8 @@ final public class Compiler extends Translator {
             for ( Tree x: el )
                 ret += ","+compileE(x);
             return ret+"))";
+        case Lineage(...el):
+            return compileE(#<tuple(...el)>);
         case tagged_union(`n,`u):
             return "(new Union((byte)"+((LongLeaf)n).value()+","+compileE(u)+"))";
         case union_value(`x):

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ba7521e0/core/src/main/java/org/apache/mrql/Config.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Config.java b/core/src/main/java/org/apache/mrql/Config.java
index 53db404..19069f3 100644
--- a/core/src/main/java/org/apache/mrql/Config.java
+++ b/core/src/main/java/org/apache/mrql/Config.java
@@ -89,6 +89,9 @@ final public class Config {
     public static int stream_tries = 100;
     // if true and stream_window > 0, then incremental streaming
     public static boolean incremental = false;
+    // if true, generate provenance tracing
+    public static boolean lineage = false;
+    public static boolean debug = false;
 
     /** store the configuration parameters */
     public static void write ( Configuration conf ) {
@@ -121,6 +124,8 @@ final public class Config {
         conf.setBoolean("mrql.info",info);
         conf.setInt("mrql.stream.window",stream_window);
         conf.setBoolean("mrql.incremental",incremental);
+        conf.setBoolean("mrql.lineage",lineage);
+        conf.setBoolean("mrql.debug",debug);
     }
 
     /** load the configuration parameters */
@@ -154,6 +159,8 @@ final public class Config {
         info = conf.getBoolean("mrql.info",info);
         stream_window = conf.getInt("mrql.stream.window",stream_window);
         incremental = conf.getBoolean("mrql.incremental",incremental);
+        lineage = conf.getBoolean("mrql.lineage",lineage);
+        debug = conf.getBoolean("mrql.debug",debug);
     }
 
     public static ArrayList<String> extra_args = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ba7521e0/core/src/main/java/org/apache/mrql/Interpreter.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Interpreter.gen b/core/src/main/java/org/apache/mrql/Interpreter.gen
index 600f5aa..e7dac63 100644
--- a/core/src/main/java/org/apache/mrql/Interpreter.gen
+++ b/core/src/main/java/org/apache/mrql/Interpreter.gen
@@ -613,6 +613,26 @@ public class Interpreter extends TypeInference {
                     }
                 });
             return new Bag();
+        case provenance(`x,`tp,...s):
+            MRData value = evalE(x,env);
+            Provenance.display(value,tp,#[...s]);
+            return new Tuple(0);
+        case Provenance(`x,`tp,...s):
+            MRData value = (Config.hadoop_mode)
+                ? new MR_dataset(Evaluator.evaluator.eval(x,env,"-"))
+                : evalS(x,env);
+            Provenance.display(value,tp,#[...s]);
+            return new Tuple(0);
+        case Lineage(...as):
+            return evalEE(#<tuple(...as)>,env);
+        case Let(`v,`x,`y):
+            if (Config.hadoop_mode)
+                return evalEE(y,new Environment(v.toString(),
+                                                new MR_dataset(Evaluator.evaluator.eval(x,env,"-")),
+                                                env));
+            Bag b = evalS(x,env);
+            b.materialize();
+            return evalEE(y,new Environment(v.toString(),b,env));
         case _:
             try {
                 if (Config.hadoop_mode)
@@ -889,6 +909,15 @@ public class Interpreter extends TypeInference {
                 System.out.println("Algebraic type: "+print_type(pt));
             if (Config.stream_window > 0 && Config.incremental)
                 ne = Streaming.generate_incremental_code(ne);
+            else if (Config.lineage) {
+                ne = Provenance.embed_provenance(ne,false);
+                if (Config.trace)
+                    System.out.println("After provenance injection:\n"+ne.pretty(0));
+            } else if (Config.debug) {
+                ne = Provenance.embed_provenance(ne,true);
+                if (Config.trace)
+                    System.out.println("After provenance injection:\n"+ne.pretty(0));
+            };
             ne = AlgebraicOptimization.translate_all(ne);
             if (Config.trace)
                 System.out.println("Translated expression:\n"+ne.pretty(0));
@@ -941,3 +970,4 @@ public class Interpreter extends TypeInference {
         }
     }
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ba7521e0/core/src/main/java/org/apache/mrql/PlanGeneration.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/PlanGeneration.gen b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
index a42380c..fdb148f 100644
--- a/core/src/main/java/org/apache/mrql/PlanGeneration.gen
+++ b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
@@ -744,6 +744,11 @@ final public class PlanGeneration extends AlgebraicOptimization {
                };
            body = makePlan(body);
            return #<function(tuple(...params),`outp,`body)>;
+       case provenance(`x,...s):
+           Tree px = makePlan(x);
+           if (is_dataset_expr(x))
+               return #<Provenance(`px,...s)>;
+           else return #<provenance(`px,...s)>;
        case `f(...al):
            Trees bl = #[];
            for ( Tree a: al )

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ba7521e0/core/src/main/java/org/apache/mrql/Printer.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Printer.gen b/core/src/main/java/org/apache/mrql/Printer.gen
index 03fc28e..ee651c2 100644
--- a/core/src/main/java/org/apache/mrql/Printer.gen
+++ b/core/src/main/java/org/apache/mrql/Printer.gen
@@ -381,6 +381,8 @@ public class Printer {
         case Merge(`x,`y):
             return "Merge:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
                    +tab(n+3)+"right: "+print_plan(y,n+10,true);
+        case Provenance(`x,...):
+            return print_plan(x,n,pv);
         case BSP(_,_,_,_,...ds):
             String ret = "BSP:\n";
             for ( Tree d: ds )

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ba7521e0/core/src/main/java/org/apache/mrql/Provenance.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Provenance.gen b/core/src/main/java/org/apache/mrql/Provenance.gen
new file mode 100644
index 0000000..513d629
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Provenance.gen
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.Iterator;
+import org.apache.mrql.gen.*;
+
+
+/** Embeds provenance information to queries for debugging and tracing */
+public class Provenance extends Streaming {
+    // set it to true for fine-grained provenance
+    static boolean fine_grain = false;
+
+    /** map {((k,v),p)} to {(k,(v,p))} */
+    private static Tree flip ( Tree e ) {
+        Tree nv = new_var();
+        return #<cmap(lambda(`nv,bag(tuple(nth(nth(`nv,0),0),
+                                           tuple(nth(nth(`nv,0),1),
+                                                 nth(`nv,1))))),
+                      `e)>;
+    }
+
+    /** map {((v,b),p)} to {((v,p),b)} */
+    private static Tree flipr ( Tree e ) {
+        Tree nv = new_var();
+        return #<cmap(lambda(`nv,bag(tuple(tuple(nth(nth(`nv,0),0),
+                                                 nth(`nv,1)),
+                                           nth(nth(`nv,0),1)))),
+                      `e)>;
+    }
+
+    /** map {(v,p)} to {v} */
+    private static Tree first ( Tree e ) {
+        Tree nv = new_var();
+        return #<cmap(lambda(`nv,bag(nth(`nv,0))),`e)>;
+    }
+
+    /** map {(v,p)} to {p} */
+    private static Tree second ( Tree e ) {
+        Tree nv = new_var();
+        return #<cmap(lambda(`nv,bag(nth(`nv,1))),`e)>;
+    }
+
+    /** The nodes of the query AST */
+    private static Trees exprs = #[];
+
+    /** Construct a provenance tuple
+     * @param expr the AST that corresponds to this value
+     * @param value the value
+     * @param provenance the input provenance of this value
+     * @return a provenance tuple
+     */
+    private static Tree prov ( Tree expr, Tree value, Trees provenance ) {
+        exprs = exprs.append(expr);
+        int loc = exprs.length()-1;
+        Tree nv = new_var();
+        return #<let(`nv,`value,tuple(`nv,Lineage(`loc,`nv,...provenance)))>;
+    }
+
+    private static Tree prov ( Tree expr, Tree value, Tree provenance ) {
+        return prov(expr,value,#[`provenance]);
+    }
+
+    private static Tree lift_var ( Tree var, Tree nvar, Tree fvar, Tree e ) {
+        match e {
+        case cmap(`f,`x):
+            if (fine_grain)
+                fail;
+            // don't lift the cmap function in coarse-grained provenance
+            Tree nf = subst(var,fvar,f);
+            Tree nx = lift_var(var,nvar,fvar,x);
+            return #<cmap(`nf,`nx)>;
+        case `f(...as):
+            Trees bs = #[ ];
+            for ( Tree a: as )
+                bs = bs.append(lift_var(var,nvar,fvar,a));
+            return #<`f(...bs)>;
+        };
+        return (e.equals(var)) ? nvar : e;
+    }
+
+    /** Lift the expression e of type {t} to {(t,{provenance})} */
+    private static Tree embedB ( Tree e ) {
+        match e {
+        case repeat(lambda(`v,`u),`x,`n):
+            Tree nv = new_var();
+            Tree nn = new_var();
+            Tree ex = embedB(x);
+            Tree ef = lift_var(v,nv,#<cmap(lambda(`nn,bag(nth(`nn,0))),`nv)>,
+                               flipr(embedB(u)));
+            return #<repeat(lambda(`nv,`ef),`ex,`n)>;
+        case cmap(lambda(`v,`b),`x):
+            if (fine_grain)
+                fail;
+            Tree nv = new_var();
+            Tree nw = new_var();
+            Tree ex = embedB(x);
+            Tree nb = subst(v,#<nth(`nv,0)>,b);
+            return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(tuple(`nw,nth(`nv,1)))),
+                                          `nb)),
+                                 `ex)>;
+        case cmap(lambda(`v,`b),`x):
+            Tree nv = new_var();
+            Tree nw = new_var();
+            Tree y = new_var();
+            Tree ex = embedB(x);
+            Tree ef = lift_var(v,nv,#<nth(`nv,0)>,embedB(b));
+            Tree p = prov(e,#<nth(`nw,0)>,#<nth(`nw,1)>);
+            return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(`p)),`ef)),`ex)>;
+        case groupBy(`x):
+            Tree nv = new_var();
+            Tree ex = flip(embedB(x));
+            Tree val = #<tuple(nth(`nv,0),`(first(#<nth(`nv,1)>)))>;
+            Tree p = prov(e,val,second(#<nth(`nv,1)>));
+            return #<cmap(lambda(`nv,bag(`p)),groupBy(`ex))>;
+        case orderBy(`x):
+            Tree nv = new_var();
+            Tree ex = flip(embedB(x));
+            Tree val = #<tuple(nth(`nv,0),`(first(#<nth(`nv,1)>)))>;
+            Tree p = prov(e,val,second(#<nth(`nv,1)>));
+            return #<cmap(lambda(`nv,bag(`p)),orderBy(`ex))>;
+        case coGroup(`x,`y):
+            Tree nv = new_var();
+            Tree ex = flip(embedB(x));
+            Tree ey = flip(embedB(y));
+            Tree val = #<tuple(nth(`nv,0),tuple(`(first(#<nth(nth(`nv,1),0)>)),
+                                                `(first(#<nth(nth(`nv,1),1)>))))>;
+            Tree p = prov(e,val,#[`(second(#<nth(nth(`nv,1),0)>)),
+                                  `(second(#<nth(nth(`nv,1),1)>))]);
+            return #<cmap(lambda(`nv,bag(`p)),coGroup(`ex,`ey))>;
+        case call(source,...):
+            Tree nv = new_var();
+            Tree p = prov(e,nv,#[ ]);
+            return #<cmap(lambda(`nv,bag(`p)),`e)>;
+        case bag(...as):
+            Trees es = #[ ];
+            for ( Tree a: as )
+                es = es.append(embedP(a));
+            return #<bag(...es)>;
+        case nth(`x,`n):
+            Tree nv = new_var();
+            Tree nw = new_var();
+            Tree ex = embedP(x);
+            Tree p = prov(e,nv,#<nth(`nw,1)>);
+            return #<let(`nw,`ex,cmap(lambda(`nv,bag(`p)),
+                                      nth(nth(`nw,0),`n)))>;
+        case project(`x,`a):
+            Tree nv = new_var();
+            Tree nw = new_var();
+            Tree ex = embedP(x);
+            Tree p = prov(e,nv,#<nth(`nw,1)>);
+            return #<let(`nw,`ex,cmap(lambda(`nv,bag(`p)),
+                                      project(nth(`nw,0),`a)))>;
+        case if(`pred,`x,`y):
+            Tree nv = new_var();
+            Tree ep = embedP(pred);
+            Tree ex = embedB(x);
+            Tree ey = embedB(y);
+            return #<let(`nv,tuple(`ep,`ex,`ey),
+                         if(nth(nth(`nv,0),0),nth(`nv,1),nth(`nv,2)))>;
+        case `v:
+            if (v.is_variable())
+                if (Interpreter.repeat_variables.member(v))
+                    return v;
+                else if (Interpreter.lookup_global_binding(v.toString()) != null) {
+                    Tree nv = new_var();
+                    Tree p = prov(v,nv,#[ ]);
+                    return #<cmap(lambda(`nv,bag(`p)),`v)>;
+                } else return v;
+        };
+        match TypeInference.type_inference(e) {
+        case `T(_):
+            if (!is_collection(T))
+                fail;
+            Tree nv = new_var();
+            Tree p = prov(e,nv,#[]);
+            return #<cmap(lambda(`nv,bag(`p)),`e)>;
+        };
+        return embedP(e);
+    }
+
+    /** Lift the expression e of type t to (t,provenance) */
+    public static Tree embedP ( Tree e ) {
+        match e {
+        case reduce(`m,`x):
+            Tree nv = new_var();
+            Tree ex = embedB(x);
+            Tree p = prov(e,#<reduce(`m,`(first(nv)))>,second(nv));
+            return #<Let(`nv,`ex,`p)>;
+        case tuple(...as):
+            Tree nv = new_var();
+            Trees es = #[ ];
+            Trees vs = #[ ];
+            Trees ps = #[ ];
+            int i = 0;
+            for ( Tree a: as ) {
+                es = es.append(embedP(a));
+                vs = vs.append(#<nth(nth(`nv,`i),0)>);
+                ps = ps.append(#<nth(nth(`nv,`i),1)>);
+                i++;
+            };
+            Tree p = prov(e,#<tuple(...vs)>,ps);
+            return #<let(`nv,tuple(...es),`p)>;
+        case record(...as):
+            Tree nv = new_var();
+            Trees es = #[ ];
+            Trees vs = #[ ];
+            Trees ps = #[ ];
+            int i = 0;
+            for ( Tree a: as )
+                match a {
+                case bind(`v,`b):
+                    es = es.append(embedP(b));
+                    vs = vs.append(#<bind(`v,nth(nth(`nv,`i),0))>);
+                    ps = ps.append(#<nth(nth(`nv,`i),1)>);
+                    i++;
+                };
+            Tree p = prov(e,#<record(...vs)>,ps);
+            return #<let(`nv,tuple(...es),`p)>;
+        case call(`f,...as):
+            Tree nv = new_var();
+            Trees es = #[ ];
+            Trees vs = #[ ];
+            Trees ps = #[ ];
+            int i = 0;
+            for ( Tree a: as ) {
+                es = es.append(embedP(a));
+                vs = vs.append(#<nth(nth(`nv,`i),0)>);
+                ps = ps.append(#<nth(nth(`nv,`i),1)>);
+                i++;
+            };
+            Tree p = prov(e,#<call(`f,...vs)>,ps);
+            return #<let(`nv,tuple(...es),`p)>;
+        case nth(`x,`n):
+            Tree nv = new_var();
+            Tree ex = embedP(x);
+            Tree p = prov(e,#<nth(nth(`nv,0),`n)>,#<nth(`nv,1)>);
+            return #<let(`nv,`ex,`p)>;
+        case project(`x,`a):
+            Tree nv = new_var();
+            Tree ex = embedP(x);
+            Tree p = prov(e,#<project(nth(`nv,0),`a)>,#<nth(`nv,1)>);
+            return #<let(`nv,`ex,`p)>;
+        case if(`pred,`x,`y):
+            Tree nv = new_var();
+            Tree ep = embedP(pred);
+            Tree ex = embedP(x);
+            Tree ey = embedP(y);
+            Tree p = prov(e,#<if(nth(nth(`nv,0),0),nth(nth(`nv,1),0),nth(nth(`nv,2),0))>,
+                          #[nth(nth(`nv,0),1),nth(nth(`nv,1),1),nth(nth(`nv,2),1)]);
+            return #<let(`nv,tuple(`ep,`ex,`ey),`p)>;
+        case typed(`u,_):
+            return embedP(u);
+        case index(`x,`n):
+            Tree ex = embedB(x);
+            return #<index(`ex,`n)>;
+        case true: return prov(e,e,#[ ]);
+        case false: return prov(e,e,#[ ]);
+        case `v:
+            if (v.is_variable())
+                if (Interpreter.lookup_global_binding(v.toString()) != null)
+                    return prov(e,e,#[ ]);
+                else return v;
+        };
+        match TypeInference.type_inference(e) {
+        case `T(_):
+            if (!is_collection(T))
+                fail;
+            Tree nv = new_var();
+            Tree ex = embedB(e);
+            Tree p = prov(e,first(nv),second(nv));
+            return #<Let(`nv,`ex,`p)>;
+        };
+        return prov(e,e,#[ ]);
+    }
+
+    /** Lift the expression e to an expression with provenance annotations */
+    public static Tree embed_provenance ( Tree e, boolean fine_grained ) {
+        fine_grain = fine_grained;
+        exprs = #[ ];
+        Tree ne = SimplifyTerm(normalize_term(e));
+        Tree tp = TypeInference.type_inference(ne);
+        ne = SimplifyTerm(embed_missing_cmaps(ne));
+        TypeInference.type_inference(ne);
+        match TypeInference.type_inference(e) {
+        case `T(_):
+            if (!is_collection(T))
+                fail;
+            ne = SimplifyTerm(embedB(ne));
+        case _: ne = SimplifyTerm(embedP(ne));
+        };
+        ne = SimplifyTerm(convert_to_algebra(ne));
+        TypeInference.type_inference(ne);
+        return #<provenance(`ne,`tp,...exprs)>;
+    }
+
+    private static boolean member ( Bag s, MRData x ) {
+        for ( MRData e: s )
+            if (e.equals(x))
+                return true;
+        return false;
+    }
+
+    /** Collect the provenance leaves (the data sources that contribute to the output) into a bag */
+    private static Bag collect_lineage ( MRData value ) {
+        if (value instanceof Tuple) {
+            Tuple p = ((Tuple)value);
+            if (p.size() == 2)
+                match exprs.nth(((MR_int)p.get(0)).get()) {
+                case call(source,...):
+                    return new Bag(p.get(1));
+                };
+            Bag s = new Bag();
+            for ( int i = 2; i < p.size() && s.size() < Config.max_bag_size_print; i++ )
+                for ( MRData e: collect_lineage(p.get(i)) )
+                    if (!member(s,e))
+                        s.add(e);
+            return s;
+        } else if (value instanceof Bag) {
+            Bag s = new Bag();
+            ((Bag)value).materialize();
+            for ( MRData e: (Bag)value )
+                for ( MRData x: collect_lineage(e) )
+                    if (s.size() < Config.max_bag_size_print && !member(s,x))
+                        s.add(x);
+            return s;
+        } else return new Bag();
+    }
+
+    /** Print the the data sources that contribute to the output to the output */
+    public static void display ( MRData value, Tree tp, Trees prov_exprs ) {
+        exprs = prov_exprs;
+        match tp {
+        case `T(`etp):
+            if (!is_collection(T))
+                fail;
+            if (value instanceof Bag)
+                for ( MRData e: (Bag)value ) {
+                    System.out.println(Printer.print(((Tuple)e).get(0),etp));
+                    System.out.println(" <- "+collect_lineage(((Tuple)e).get(1)));
+                } else if (value instanceof MR_dataset)
+                for ( MRData e: ((MR_dataset)value).dataset().take(Config.max_bag_size_print) ) {
+                    System.out.println(Printer.print(((Tuple)e).get(0),etp));
+                    System.out.println(" <- "+collect_lineage(((Tuple)e).get(1)));
+                }
+        case _:
+            System.out.println(Printer.print(((Tuple)value).get(0),tp));
+            System.out.println(" <- "+collect_lineage(((Tuple)value).get(1)));
+        }
+    }
+
+    /** Return the query result without the provenance annotations */
+    public static MRData getValue ( MRData value ) {
+        if (value instanceof Bag) {
+            final Iterator<MRData> i = ((Bag)value).iterator();
+            return new Bag(new BagIterator() {
+                    public MRData next () { return ((Tuple)i.next()).get(0); }
+                    public boolean hasNext() { return i.hasNext(); }
+                });
+        };
+        return ((Tuple)value).get(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ba7521e0/core/src/main/java/org/apache/mrql/Streaming.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Streaming.gen b/core/src/main/java/org/apache/mrql/Streaming.gen
index f54cad8..53034a9 100644
--- a/core/src/main/java/org/apache/mrql/Streaming.gen
+++ b/core/src/main/java/org/apache/mrql/Streaming.gen
@@ -20,7 +20,7 @@ package org.apache.mrql;
 import org.apache.mrql.gen.*;
 
 /** Generates code for streaming queries */
-final public class Streaming extends AlgebraicOptimization {
+public class Streaming extends AlgebraicOptimization {
     private static int istate = 0; 
     // set it to true to debug the monoid inference
     private static boolean inference_tracing = false;
@@ -846,7 +846,7 @@ final public class Streaming extends AlgebraicOptimization {
     }
 
     /** Convert coGroups back to joins and reduces to aggregations */
-    private static Tree convert_to_algebra ( Tree e ) {
+    static Tree convert_to_algebra ( Tree e ) {
         match e {
         case coGroup(`x,`y):
             Tree v = new_var();
@@ -1129,7 +1129,7 @@ final public class Streaming extends AlgebraicOptimization {
     }
 
     /** Convert joins to coGroups, plus other transformations */
-    private static Tree normalize_term ( Tree e ) {
+    static Tree normalize_term ( Tree e ) {
         match e {
         case join(`kx,`ky,`r,cmap(`f,`x),cmap(`g,`y)):
             if (!x.equals(y))
@@ -1254,18 +1254,18 @@ final public class Streaming extends AlgebraicOptimization {
     }
 
     /** Embed missing cmaps */
-    private static Tree embed_missing_cmaps ( Tree e ) {
+    static Tree embed_missing_cmaps ( Tree e ) {
         match e {
         case project(`x,`a):
             match TypeInference.type_inference(x) {
             case `T(`tp):
                 if (!is_collection(T))
-                    fail;
+                    return #<project(`(embed_missing_cmaps(x)),`a)>;
                 Tree v = new_var();
                 type_env.insert(v.toString(),tp);
                 return embed_missing_cmaps(#<cmap(lambda(`v,bag(project(`v,`a))),`x)>);
             };
-            fail
+            return #<project(`(embed_missing_cmaps(x)),`a)>;
         case nth(`x,`n):
             match TypeInference.type_inference(x) {
             case `T(`tp):
@@ -1335,6 +1335,16 @@ final public class Streaming extends AlgebraicOptimization {
                     if (v.equals(a))
                         return simplify_term(u);
                 };
+        case nth(let(`v,`w,tuple(...al)),`n):
+            if (!n.is_long())
+                fail;
+            int i = (int)n.longValue();
+            if (i >= 0 && i < al.length())
+                return #<let(`v,`w,`(simplify_term(al.nth(i))))>;
+        case let(`v,`w,`x):
+            if (occurences(v,x) <= 1)
+                return subst_var(v,w,x);
+            fail
         case `f(...as):
             Trees bs = #[ ];
             for ( Tree a: as )

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ba7521e0/core/src/main/java/org/apache/mrql/TopLevel.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/TopLevel.gen b/core/src/main/java/org/apache/mrql/TopLevel.gen
index 9c5ecfa..769099a 100644
--- a/core/src/main/java/org/apache/mrql/TopLevel.gen
+++ b/core/src/main/java/org/apache/mrql/TopLevel.gen
@@ -377,6 +377,25 @@ final public class TopLevel extends Interpreter {
                         }
                     });
             };
+        case lineage(`e):
+            Config.lineage = true;
+            boolean quiet = Config.quiet_execution;
+            Config.quiet_execution = true;
+            long t = System.currentTimeMillis();
+            if (expression(e,false) != null && !Config.quiet_execution)
+                System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
+            Config.lineage = false;
+            Config.quiet_execution = quiet;
+        case debug(`path,`e):
+            Config.debug = true;
+            boolean quiet = Config.quiet_execution;
+            Config.quiet_execution = true;
+            long t = System.currentTimeMillis();
+            dump(path.stringValue(),e);
+            if (!Config.quiet_execution)
+                System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
+            Config.debug = false;
+            Config.quiet_execution = quiet;
         case dump(`s,`e):
             long t = System.currentTimeMillis();
             dump(s.stringValue(),e);

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ba7521e0/core/src/main/java/org/apache/mrql/Translator.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Translator.gen b/core/src/main/java/org/apache/mrql/Translator.gen
index 2d9d2fe..01db2f3 100644
--- a/core/src/main/java/org/apache/mrql/Translator.gen
+++ b/core/src/main/java/org/apache/mrql/Translator.gen
@@ -179,7 +179,7 @@ public class Translator extends Printer {
         = #[MapReduce,MapAggregateReduce,MapCombineReduce,FroupByJoin,Aggregate,
             MapReduce2,MapCombineReduce2,MapAggregateReduce2,MapJoin,MapAggregateJoin,
             CrossProduct,CrossAggregateProduct,cMap,AggregateMap,BSP,GroupByJoin,
-            OuterMerge,RightOuterMerge];
+            OuterMerge,RightOuterMerge,Provenance];
 
     static Trees algebraic_operators
         = #[mapReduce,mapReduce2,cmap,join,groupBy,orderBy,aggregate,map,filter,repeat,closure];

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ba7521e0/core/src/main/java/org/apache/mrql/TypeInference.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/TypeInference.gen b/core/src/main/java/org/apache/mrql/TypeInference.gen
index f6769f4..60ca56c 100644
--- a/core/src/main/java/org/apache/mrql/TypeInference.gen
+++ b/core/src/main/java/org/apache/mrql/TypeInference.gen
@@ -1199,6 +1199,19 @@ public class TypeInference extends Translator {
             };
         case cstep(`x):  // used in QueryPlan for closure
             return type_inference(x);
+        case provenance(`x,...exprs):   // used in Provenance generation
+            match type_inference(x) {
+            case `S(tuple(`tp,_)):
+                if (!is_collection(S))
+                    fail;
+                return #<`S(`tp)>;
+            case tuple(`tp,_):
+                return tp;
+            };
+            type_error(e,"Wrong provenance injection");
+        case Lineage(...as):   // used in Provenance generation
+            type_inference(#<tuple(...as)>);
+            return #<lineage>;
         case outerMerge(lambda(`v,`b),`s,`d):
             Tree ts = type_inference(s);
             if (unify(type_inference(d),ts) == null)

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ba7521e0/core/src/main/java/org/apache/mrql/mrql.cgen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/mrql.cgen b/core/src/main/java/org/apache/mrql/mrql.cgen
index 87245a0..6e09902 100644
--- a/core/src/main/java/org/apache/mrql/mrql.cgen
+++ b/core/src/main/java/org/apache/mrql/mrql.cgen
@@ -34,7 +34,8 @@ parser code {:
         sym.SOME, sym.ALL, sym.GTR, sym.SEP, sym.STORE, sym.DUMP, sym.TYPE, sym.DATA, sym.REPEAT,
         sym.STEP, sym.LIMIT, sym.LET, sym.ATSYM, sym.EXCLAMATION,
         sym.Variable, sym.Integer, sym.Double, sym.String, sym.Decimal,
-        sym.START_TEMPLATE, sym.END_TEMPLATE, sym.TEXT, sym.TRACE, sym.INCR
+        sym.START_TEMPLATE, sym.END_TEMPLATE, sym.TEXT, sym.TRACE, sym.INCR,
+        sym.LINEAGE, sym.DEBUG
     };
 
     static String[] token_names = {
@@ -49,7 +50,8 @@ parser code {:
         "some", "all", ">", "|", "store", "dump", "type", "data", "repeat",
         "step", "limit", "let", "@", "!",
         "Variable", "Integer", "Double", "String", "Decimal",
-        "[|", "|]", "Text", "trace", "incr"
+        "[|", "|]", "Text", "trace", "incr",
+        "lineage", "debug"
     };
 
     public static String print ( Symbol s ) {
@@ -98,7 +100,8 @@ terminal IF, THEN, ELSE, SELECT, FROM, HAVING, LB, RB, LP, RP, LSB, RSB, LDOT, S
          UNION, INTERSECT, EXCEPT, EXISTS, IN, COMMA, DOT, COLON, ASSIGN, SEMI, WHERE,
          ORDER, GROUP, BY, ASCENDING, DESCENDING, UMINUS, FUNCTION, DISTINCT, BSLASH,
          SOME, ALL, GTR, SEP, STORE, TYPE, DATA, CASE, ATSYM, XPATH, REPEAT, STEP, LIMIT,
-         LET, IMPORT, PARSER, AGGREGATION, INCLUDE, EXCLAMATION, MACRO, DUMP, TRACE, INCR;
+         LET, IMPORT, PARSER, AGGREGATION, INCLUDE, EXCLAMATION, MACRO, DUMP, TRACE, INCR,
+         LINEAGE, DEBUG;
 
 terminal String         Variable;
 terminal Long           Integer;
@@ -143,6 +146,8 @@ item            ::= expr:e                              {: RESULT = #<expression
                 |   STORE String:s FROM expr:e          {: RESULT = #<dump(`(new StringLeaf(s)),`e)>; :}
                 |   DUMP String:s FROM expr:e           {: RESULT = #<dump_text(`(new StringLeaf(s)),`e)>; :}
                 |   INCR expr:e                         {: RESULT = #<incr(`e)>; :}
+                |   LINEAGE expr:e                      {: RESULT = #<lineage(`e)>; :}
+                |   DEBUG String:s expr:e               {: RESULT = #<debug(`s,`e)>; :}
                 |   TYPE var:v EQ type:t                {: RESULT = #<typedef(`v,`t)>; :}
                 |   DATA var:v EQ data_binds:nl         {: RESULT = #<datadef(`v,union(...nl))>; :}
                 |   FUNCTION var:f LP

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ba7521e0/core/src/main/java/org/apache/mrql/mrql.lex
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/mrql.lex b/core/src/main/java/org/apache/mrql/mrql.lex
index 2f6cb54..868eb05 100644
--- a/core/src/main/java/org/apache/mrql/mrql.lex
+++ b/core/src/main/java/org/apache/mrql/mrql.lex
@@ -184,6 +184,8 @@ DOUBLE = [0-9]+([\.][0-9]+)?([eE][+-]?[0-9]+)?
 <YYINITIAL> "include"		{ return symbol(sym.INCLUDE); }
 <YYINITIAL> "aggregation"	{ return symbol(sym.AGGREGATION); }
 <YYINITIAL> "trace"	        { return symbol(sym.TRACE); }
+<YYINITIAL> "lineage"	        { return symbol(sym.LINEAGE); }
+<YYINITIAL> "debug"	        { return symbol(sym.DEBUG); }
 
 <YYINITIAL> {ID}		{ return symbol(sym.Variable,yytext()); }
 

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ba7521e0/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
index f66d422..61072a3 100644
--- a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
+++ b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
@@ -895,6 +895,9 @@ public class FlinkEvaluator extends Evaluator implements Serializable {
                 if (x != null)
                     if (x instanceof MR_flink)
                         return ((MR_flink)x).flink();
+                    else if (x instanceof MR_dataset)
+                        return dataset(new Bag(((MR_dataset)x).dataset.take(Integer.MAX_VALUE)))
+			    .map(new restore_global_functions());
                     else new Error("Variable "+v+" is of type: "+x);
                 x = variable_lookup(v.toString(),global_env);
                 if (x != null)