You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2011/04/14 18:11:25 UTC

svn commit: r1092378 [1/3] - in /pig/trunk: ./ contrib/ contrib/penny/ contrib/penny/java/ contrib/penny/java/src/ contrib/penny/java/src/main/ contrib/penny/java/src/main/java/ contrib/penny/java/src/main/java/org/ contrib/penny/java/src/main/java/org...

Author: gates
Date: Thu Apr 14 16:11:22 2011
New Revision: 1092378

URL: http://svn.apache.org/viewvc?rev=1092378&view=rev
Log:
PIG-1959 Penny: a framework for workflow instrumentation

Added:
    pig/trunk/contrib/penny/
    pig/trunk/contrib/penny/java/
    pig/trunk/contrib/penny/java/build.xml
    pig/trunk/contrib/penny/java/src/
    pig/trunk/contrib/penny/java/src/main/
    pig/trunk/contrib/penny/java/src/main/java/
    pig/trunk/contrib/penny/java/src/main/java/org/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ClassWithArgs.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Communicator.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Coordinator.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Location.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/LogicalLocation.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/MonitorAgent.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/NoSuchLocationException.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ParsedPigScript.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PennyServer.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PhysicalLocation.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTCoordinator.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTInjectTaintMonitorAgent.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTMatchTaintMonitorAgent.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTPropagateTaintMonitorAgent.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/Main.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CICoordinator.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CIMonitorAgent.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/Main.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/dh/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/dh/DHCoordinator.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/dh/DHMonitorAgent.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/dh/Main.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ds/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ds/DSCoordinator.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ds/DSMonitorAgent.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ds/Main.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ft/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ft/FTCoordinator.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ft/FTMonitorAgent.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ft/Main.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/gl/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/gl/GLCoordinator.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/gl/GLMonitorAgent1.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/gl/GLMonitorAgent2.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/gl/GoldenLogic.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/gl/Main.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/gl/goldenLogicClasses/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/gl/goldenLogicClasses/FlattenLinksGL.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/la/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/la/LACoordinator.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/la/LAMonitorAgent.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/la/Main.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/lp/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/lp/LPCoordinator.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/lp/LPMonitorAgent.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/lp/Main.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/nop/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/nop/Main.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/nop/NOPCoordinator.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/nop/NOPMonitorAgent.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/op/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/op/Main.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/op/OPCoordinator.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/op/OPMonitorAgent.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ri/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ri/Main.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ri/RICoordinator.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ri/RIMonitorAgent.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ti/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ti/Main.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ti/TICoordinator.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ti/TIMonitorAgent.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/tr/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/tr/Main.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/tr/TRCoordinator.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/tr/TRMonitorAgent.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/comm/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/comm/AsyncMessageReceiptCallback.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/comm/Message.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/comm/MessageReceiptCallback.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/comm/SyncCallResult.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/harnesses/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/harnesses/CoordinatorHarness.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/harnesses/MessagingClient.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/harnesses/MessagingServer.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/harnesses/MonitorAgentHarness.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/pig/
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/pig/MonitorAgentUDF.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/pig/MonitorAgentUDFArgs.java
    pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/pig/PigLauncher.java
    pig/trunk/contrib/penny/java/src/test/
    pig/trunk/contrib/penny/java/src/test/java/
    pig/trunk/contrib/penny/java/src/test/java/org/
    pig/trunk/contrib/penny/java/src/test/java/org/apache/
    pig/trunk/contrib/penny/java/src/test/java/org/apache/pig/
    pig/trunk/contrib/penny/java/src/test/java/org/apache/pig/penny/
    pig/trunk/contrib/penny/java/src/test/java/org/apache/pig/penny/test/
    pig/trunk/contrib/penny/java/src/test/java/org/apache/pig/penny/test/ComTest.java
    pig/trunk/contrib/penny/java/src/test/java/org/apache/pig/penny/test/PennyAgentTest.java
Modified:
    pig/trunk/contrib/CHANGES.txt
    pig/trunk/ivy.xml
    pig/trunk/ivy/ivysettings.xml
    pig/trunk/ivy/libraries.properties
    pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java

Modified: pig/trunk/contrib/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/CHANGES.txt?rev=1092378&r1=1092377&r2=1092378&view=diff
==============================================================================
--- pig/trunk/contrib/CHANGES.txt (original)
+++ pig/trunk/contrib/CHANGES.txt Thu Apr 14 16:11:22 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1959 Penny: a framework for workflow instrumentation (olston, breed via gates)
+
 PIG-1924 CSV Loader/Store that handles newlines in fields, and other Excel CSV
 features. (paepcke via gates)
 

Added: pig/trunk/contrib/penny/java/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/build.xml?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/build.xml (added)
+++ pig/trunk/contrib/penny/java/build.xml Thu Apr 14 16:11:22 2011
@@ -0,0 +1,139 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project basedir="." default="jar" name="penny">
+    <!-- javac properties -->
+    <property name="javac.debug" value="on" />
+    <property name="javac.level" value="source,lines,vars"/>
+    <property name="javac.optimize" value="on" />
+    <property name="javac.deprecation" value="off" />
+    <property name="javac.version" value="1.6" />
+    <property name="javac.args" value="" />
+    <!-- TODO we should use warning...   <property name="javac.args.warnings" value="-Xlint:unchecked" /> -->
+    <property name="javac.args.warnings" value="" />
+
+    <!-- build properties -->
+    <property name="build.dir" value="${basedir}/build" />
+    <property name="build.classes" value="${build.dir}/classes" />
+    <property name="build.docs" value="${build.dir}/docs" />
+    <property name="build.javadoc" value="${build.docs}/api" />
+    <property name="pigjar" value="../../../pig.jar" />
+    <property name="pigjar-withouthadoop" value="../../../pig-withouthadoop.jar" />
+    <property name="pigtest" value="../../../build/test/classes" />
+    <property name="pennyjar" value="penny.jar" />
+    <property name="src.dir" value="src/main/java/org/apache/pig/penny" />
+    <property name="netty.jar" value="../../../build/ivy/lib/Pig/netty-3.2.2.Final.jar"/>
+
+    <!-- jar properties -->
+    <property name=".javadoc" value="${build.docs}/api" />
+    
+    <!-- test properties -->
+    <property name="test.build.dir" value="${build.dir}/test" />
+    <property name="test.classes" value="${test.build.dir}/classes" />
+    <property name="test.logs" value="${test.build.dir}/logs" />
+    <property name="test.timeout" value="900000" />
+    <property name="test.junit.output.format" value="plain" />
+    <property name="test.src.dir" value="src/test/java" />
+    <property name="junit.hadoop.conf" value="${user.home}/pigtest/conf/"/>
+
+    <path id="penny.classpath">
+        <pathelement location="${build.classes}"/>
+        <pathelement location="${pigjar}"/>
+        <pathelement location="${pigjar-withouthadoop}"/>
+        <pathelement location="${pigtest}"/>
+        <fileset dir="../../../build/ivy/lib">
+            <include name="**/*.jar"/>
+        </fileset>
+    </path>
+
+    <path id="test.classpath">
+        <pathelement location="${build.classes}"/>
+        <pathelement location="${pigjar}"/>
+        <pathelement location="${pigjar-withouthadoop}"/>
+        <pathelement location="${test.classes}"/>
+        <pathelement location="${test.src.dir}"/>
+        <pathelement location="${junit.hadoop.conf}" />
+        <pathelement location="${pigtest}"/>
+        <fileset dir="../../../build/ivy/lib">
+            <include name="**/*.jar"/>
+        </fileset>
+    </path>
+
+    <target name="init">
+        <mkdir dir="${build.dir}"/>
+        <mkdir dir="${build.classes}"/>
+        <mkdir dir="${test.build.dir}"/>
+        <mkdir dir="${test.classes}"/>
+        <mkdir dir="${build.javadoc}"/>
+    </target>
+    <target name="clean">
+        <delete dir="build"/>
+    </target>
+    <target depends="init" name="compile" description="compile all of the class files">
+        <echo> *** Compiling Penny ***</echo>
+        <javac srcdir="${src.dir}" debug="${javac.debug}" debuglevel="${javac.level}" destdir="${build.classes}" source="${javac.version}"
+        target="${javac.version}" optimize="${javac.optimize}" deprecation="${javac.deprecation}">
+            <compilerarg line="${javac.args} ${javac.args.warnings}" />
+            <classpath refid="penny.classpath"/>
+        </javac>
+    </target>
+    <target depends="init,compile" name="jar" description="create the jar files">
+        <echo> *** Creating penny.jar ***</echo>
+      <jar destfile="${pennyjar}">
+        <fileset dir="build/classes"/>
+      </jar>
+    </target>
+    <target depends="compile" name="compile-test">
+        <echo> *** Compiling Penny tests ***</echo>
+        <javac srcdir="${test.src.dir}" debug="${javac.debug}" debuglevel="${javac.level}" destdir="${test.classes}" source="${javac.version}" target="${javac.version}">
+            <compilerarg line="${javac.args} ${javac.args.warnings}"/>
+            <classpath refid="test.classpath"/>
+        </javac>
+    </target>
+    <target depends="compile-test,jar" name="test">
+        <echo> *** Running Penny tests ***</echo>
+        <delete dir="${test.logs}"/>
+        <mkdir dir="${test.logs}"/>
+        <junit printsummary="yes" haltonfailure="no" fork="yes" maxmemory="512m" dir="${basedir}" timeout="${test.timeout}" errorProperty="tests.failed" failureProperty="tests.failed">        
+            <sysproperty key="hadoop.log.dir" value="${test.logs}"/>
+            <classpath refid="test.classpath"/>
+            <formatter type="${test.junit.output.format}" />
+            <batchtest fork="yes" todir="${test.logs}" unless="testcase">
+                <fileset dir="${test.src.dir}">
+                    <include name="**/*Test*.java" />
+                </fileset>
+            </batchtest>
+            <batchtest fork="yes" todir="${test.logs}" if="testcase">
+                <fileset dir="${test.src.dir}" includes="**/${testcase}.java"/>
+            </batchtest>
+        </junit>
+        <fail if="tests.failed">Tests failed!</fail>
+    </target>
+    <target depends="init" name="javadoc"
+            description="build javadoc for all of the packages">
+        <echo> *** Creating Javadocs ***</echo>
+      <javadoc destdir="build/javadoc"
+               author="true">
+        <fileset dir="${src.dir}/evaluation" includes="**/*.java"/>
+        <fileset dir="${src.dir}/storage" includes="**/*.java"/>
+        <fileset dir="${src.dir}/filtering" includes="**/*.java"/>
+        <fileset dir="${src.dir}/grouping" includes="**/*.java"/>
+        <fileset dir="${src.dir}/comparison" includes="**/*.java"/>
+      </javadoc>
+    </target>
+    
+</project>

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ClassWithArgs.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ClassWithArgs.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ClassWithArgs.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ClassWithArgs.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+import java.io.Serializable;
+
+public class ClassWithArgs {
+    
+    private final Class theClass;
+    private final Serializable[] args;
+    
+    public ClassWithArgs(Class theClass, Serializable ... args) {
+        this.theClass = theClass;
+        this.args = args;
+    }
+    
+    public Class theClass() {
+        return theClass;
+    }
+    
+    public Serializable[] args() {
+        return args;
+    }
+
+}

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Communicator.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Communicator.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Communicator.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Communicator.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+
+public abstract class Communicator {
+    
+    /**
+     * Find out my (physical) location.
+     */
+    public abstract Location myLocation();
+    
+    /**
+     * Send an message to the coordinator, asynchronously.
+     */
+    public abstract void sendToCoordinator(Tuple message);
+    
+    /**
+     * Send a message to immediate downstream neighbor(s), synchronously.
+     * If downstream neighbor(s) span a task boundary, all instances will receive it; otherwise only same-task instances will receive it.
+     * If there is no downstream neighbor, an exception will be thrown.
+     */
+    public abstract void sendDownstream(Tuple message) throws NoSuchLocationException;
+    
+    /**
+     * Send a message to immediate upstream neighbor(s), synchronously.
+     * If upstream neighbor(s) are non-existent or span a task boundary, an exception will be thrown.
+     */
+    public abstract void sendUpstream(Tuple message) throws NoSuchLocationException;
+    
+    /**
+     * Send a message to current/future instances of a given logical location.
+     * Instances that have already terminated will not receive the message (obviously).
+     * Instances that are currently executing will receive it asynchronously (or perhaps not at all, if they terminate before the message arrives).
+     * Instances that have not yet started will receive the message prior to beginning processing of tuples.
+     */
+    public abstract void sendToAgents(LogicalLocation destination, Tuple message) throws NoSuchLocationException;
+    
+    public void sendToCoordinator(Object ... message) {
+        sendToCoordinator(makeTuple(message));
+    }
+    
+    public void sendDownstream(Object ... message) throws NoSuchLocationException {
+        sendDownstream(makeTuple(message));
+    }
+
+    public void sendUpstream(Object ... message) throws NoSuchLocationException {
+        sendUpstream(makeTuple(message));
+    }
+    
+    public void sendToAgents(LogicalLocation destination, Object ... message) throws NoSuchLocationException {
+        sendToAgents(destination, makeTuple(message));
+    }
+
+    private static Tuple makeTuple(Object[] items) {
+        Tuple t = new DefaultTuple();
+        for (int i = 0; i < items.length; i++) {
+            t.append(items[i]);
+        }
+        return t;
+    }
+    
+}

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Coordinator.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Coordinator.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Coordinator.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Coordinator.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+import java.io.Serializable;
+
+import org.apache.pig.data.Tuple;
+
+public abstract class Coordinator {
+    
+    // ADDED 10-19-10 BY CHRIS:
+    
+    private Communicator communicator;
+
+    public void initialize(Communicator communicator) {
+        this.communicator = communicator;
+    }
+    
+    protected Communicator communicator() {
+        return communicator;
+    }
+    
+    
+    
+    ///// Abstract methods that app-writer implements:
+    
+    /**
+     * Initialize, using any arguments passed from higher layer.
+     */
+    public abstract void init(Serializable[] args);
+        
+    /**
+     * Process an incoming (synchronous or asynchronous) message.
+     */
+    public abstract void receiveMessage(Location source, Tuple message);
+
+    /**
+     * The data flow has completed and all messages have been delivered. Finish processing.
+     * 
+     * @return        final output to pass back to application
+     */
+    public abstract Object finish();
+
+}

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Location.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Location.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Location.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Location.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+import java.io.Serializable;
+
+public interface Location extends Serializable {
+    
+    public LogicalLocation asLogical();            // return the logical-only part of this location (i.e. strip off physical id, if it has one)
+    public boolean isLogicalOnly();                // is this a logical-only location (i.e. no physical id)?
+    public String logId();                        // return logical id
+    
+}

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/LogicalLocation.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/LogicalLocation.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/LogicalLocation.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/LogicalLocation.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+public class LogicalLocation implements Location {
+
+    private final String id;
+    
+    public LogicalLocation(String id) {
+        this.id = id;
+    }
+    
+    @Override
+    public LogicalLocation asLogical() {
+        return this;
+    }
+
+    @Override
+    public boolean isLogicalOnly() {
+        return true;
+    }
+
+    @Override
+    public String logId() {
+        return id;
+    }
+    
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof LogicalLocation)) return false;
+        LogicalLocation llOther = (LogicalLocation) other;        
+        return (llOther.id.equals(this.id));
+    }
+    
+    @Override
+    public int hashCode() {
+        return id.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "Loc[" + id + "]";
+    }
+
+}

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/MonitorAgent.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/MonitorAgent.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/MonitorAgent.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/MonitorAgent.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+
+public abstract class MonitorAgent {
+    
+    ///// tag constants
+    public static final Set<String> NO_TAGS = new HashSet<String>();
+    public static final Set<String> FILTER_OUT = null;
+    
+    
+    private Communicator communicator;
+
+    public final void initialize(Communicator communicator) {
+        this.communicator = communicator;
+    }
+    
+    protected Communicator communicator() {
+        if (communicator == null) {
+            throw new IllegalStateException("Agent must be initialized before getting communicator");
+        }
+        return communicator;
+    }
+    
+    
+    ///// Abstract methods that app-writer implements:
+    
+    /**
+     * Furnish set of fields to monitor. (Null means monitor all fields ('*').)
+     */
+    public abstract Set<Integer> furnishFieldsToMonitor();
+    
+    /**
+     * Initialize, using any arguments passed from higher layer.
+     */
+    public abstract void init(Serializable[] args);
+    
+    /**
+     * Observe a tuple (record) that passes through the monitoring point.
+     * 
+     * @param t                the tuple
+     * @param tag            t's tags
+     * @return                FILTER_OUT to remove the tuple from the data stream; NO_TAGS to let it pass through and not give it any tags; a set of tags to let it pass through and assign those tags
+     */
+    public abstract Set<String> observeTuple(Tuple t, Set<String> tags) throws ExecException;
+    
+    /**
+     * Process an incoming (synchronous or asynchronous) message.
+     */
+    public abstract void receiveMessage(Location source, Tuple message);
+    
+    /**
+     * No more tuples are going to pass through the monitoring point. Finish any ongoing processing.
+     */
+    public abstract void finish();
+
+}

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/NoSuchLocationException.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/NoSuchLocationException.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/NoSuchLocationException.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/NoSuchLocationException.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+public class NoSuchLocationException extends Exception {
+    
+    public NoSuchLocationException(String msg) {
+        super(msg);
+    }
+    
+    public NoSuchLocationException(String msg, Exception cause) {
+        super(msg, cause);
+    }
+
+}

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ParsedPigScript.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ParsedPigScript.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ParsedPigScript.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ParsedPigScript.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.UserFuncExpression;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.penny.impl.harnesses.CoordinatorHarness;
+import org.apache.pig.penny.impl.pig.MonitorAgentUDF;
+import org.apache.pig.penny.impl.pig.MonitorAgentUDFArgs;
+import org.apache.pig.penny.impl.pig.PigLauncher;
+import org.apache.pig.tools.ToolsPigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+
+public class ParsedPigScript {
+    
+    private final ToolsPigServer pigServer;
+    private final String pigScript;                                            // user's raw pig script
+    private final LogicalPlan queryPlan;                                    // pig's logical query plan object
+    private Map<String, LogicalRelationalOperator> aliasOperatorMap;        // the pig LogicalRelationalOperator corresponding to each alias
+
+  // Ibis change : start
+  // the pig LOStore corresponding to each alias which is actually a 'LOStore'
+  private Map<String, LOStore> storeMap;
+  // Ibis change : end
+
+    private final Random rand = new Random(System.currentTimeMillis());
+
+  public ParsedPigScript(PigContext pigContext, String pigScriptFilename) throws IOException {
+        this.pigServer = new ToolsPigServer(pigContext);
+        this.pigScript = pigScriptFilename;
+        pigServer.registerNoRun(pigScript, new HashMap<String, String>(), new LinkedList<String>());
+        this.queryPlan = pigServer.getPlans().lp;
+        /* debug: */ //System.out.println("ORIGINAL PLAN: " + queryPlan);
+        aliasOperatorMap = new HashMap<String, LogicalRelationalOperator>();
+    // Ibis change : start
+    this.storeMap = new HashMap<String, LOStore>();
+    // Ibis change : end
+        populateAliasOperatorMap(queryPlan, aliasOperatorMap, storeMap);
+    }
+
+  // Ibis change : start
+    private static void populateAliasOperatorMap(LogicalPlan queryPlan, Map<String, LogicalRelationalOperator> map, Map<String, LOStore> storeMap) {
+  // Ibis change : end
+        for (Iterator<Operator> it = queryPlan.getOperators(); it.hasNext(); ) {
+            LogicalRelationalOperator op = (LogicalRelationalOperator) it.next();
+            if (!(op instanceof LOStore)) {
+                String alias = op.getAlias();
+                if (alias != null) {
+                    if (map.containsKey(alias)) throw new RuntimeException("Cannot use Penny with scripts that re-use aliases. This script re-uses alias " + alias + ".");
+                    map.put(alias, op);
+                }
+            }
+      // Ibis change : start
+      else if (null != storeMap) {
+        addStoreOperator(storeMap, (LOStore) op);
+      }
+      // Ibis change : end
+        }
+    }
+
+    private Object trace(ClassWithArgs coordinatorClass, Map<String, ClassWithArgs> monitorClasses, String truncateAtAlias, boolean outputToTemp) throws Exception {
+        List<String> logicalIds = new ArrayList<String>(monitorClasses.keySet());
+        Collections.sort(logicalIds);
+        
+        LogicalPlan instrumentedQueryPlan = createInstrumentedPlan(monitorClasses, new InetSocketAddress(InetAddress.getLocalHost(), CoordinatorHarness.MASTER_LISTEN_PORT), logicalIds, truncateAtAlias, outputToTemp);
+        /* debug: */ //System.out.println("INSTRUMENTED PLAN: " + instrumentedQueryPlan);
+        return PigLauncher.launch(pigServer.getPigContext(), instrumentedQueryPlan, coordinatorClass);
+    }
+    
+    public Object trace(ClassWithArgs coordinatorClass, Map<String, ClassWithArgs> monitorClasses, boolean outputToTemp) throws Exception {
+        return trace(coordinatorClass, monitorClasses, null, outputToTemp);
+    }
+    
+    public Object truncateAndTrace(ClassWithArgs coordinatorClass, Map<String, ClassWithArgs> monitorClasses, String truncateAtAlias) throws Exception {
+        return trace(coordinatorClass, monitorClasses, truncateAtAlias, true);
+    }
+        
+    public Object truncateAndTrace(Class coordinatorClass, Map<String, ClassWithArgs> monitorClasses, String truncateAtAlias) throws Exception {
+        return truncateAndTrace(new ClassWithArgs(coordinatorClass), monitorClasses, truncateAtAlias);
+    }
+
+    public Object trace(ClassWithArgs coordinatorClass, Map<String, ClassWithArgs> monitorClasses) throws Exception {
+        return trace(coordinatorClass, monitorClasses, false);
+    }
+    
+    public Object trace(Class coordinatorClass, Map<String, ClassWithArgs> monitorClasses, boolean outputToTemp) throws Exception {
+        return trace(new ClassWithArgs(coordinatorClass), monitorClasses, outputToTemp);
+    }
+
+    public Object trace(Class coordinatorClass, Map<String, ClassWithArgs> monitorClasses) throws Exception {
+        return trace(coordinatorClass, monitorClasses, false);
+    }
+
+
+  // Ibis change : start
+  private static void addStoreOperator(Map<String, LOStore> storeMap, LOStore store) {
+
+    String alias = ((LogicalRelationalOperator)store.getPlan().getPredecessors(store).get(0)).getAlias();
+
+    if (storeMap.containsKey(alias)) throw new RuntimeException("Cannot use Penny with scripts that re-use aliases. This script re-uses alias " + alias + " in store.");
+    storeMap.put(alias, store);
+  }
+
+  /**
+   * Get all the aliases which are 'stored'.
+   * TODO: Change from storeMap.keySet() to Collections.unModifiableSet(storeMap.keySet()) so that consumers cant modify it. But I dont know enough of penny to be sure if it is required ...
+   *
+   * @return
+   */
+  public Collection<String> getStoreAliases(){
+    return storeMap.keySet();
+  }
+  // Ibis change : end
+        /*
+     * Get all aliases used in this script.
+     */
+    public Collection<String> aliases() {
+        return aliasOperatorMap.keySet();
+    }
+    
+    public String operator(String alias) {
+        return aliasOperatorMap.get(alias).getName();
+    }
+    
+    public int arity(String alias) throws Exception {
+        return aliasOperatorMap.get(alias).getSchema().size();
+    }
+    
+  // Ibis change : start
+
+  /**
+   * Get the actual pig LogicalRelationalOperator corresponding to an alias.
+   * Note that Penny does not support reuse of aliases !
+   *
+   * @param alias
+   * @return
+   */
+  public LogicalRelationalOperator getLogicalRelationalOperator(String alias){
+    return aliasOperatorMap.get(alias);
+  }
+
+  /**
+   * Get the actual pig LOStore corresponding to an alias which is getting stored.
+   * Note that Penny does not support reuse of aliases (even for store) !
+   *
+   * @param alias
+   * @return
+   */
+  public LOStore getStoreOperator(String alias){
+    return storeMap.get(alias);
+  }
+  // Ibis change : end
+
+    public Map<Integer, List<Integer>> opKeys(String alias) throws Exception {
+        LogicalRelationalOperator op = aliasOperatorMap.get(alias);
+        
+        if (op instanceof LOCogroup) {
+            Map<Integer, List<Integer>> opKeys = new HashMap<Integer, List<Integer>>();
+            MultiMap<Integer, LogicalExpressionPlan> expressionPlans = ((LOCogroup) op).getExpressionPlans();
+            for (int pos : expressionPlans.keySet()) {
+                List<Integer> onePosKeys = new LinkedList<Integer>();
+                for (LogicalExpressionPlan plan : expressionPlans.get(pos)) {
+                    if (plan.size() == 1) {
+                        Operator groupExpr = plan.getSources().get(0);
+                        if (groupExpr instanceof ConstantExpression) {
+                            // "group all" or "group by <const>", so don't append any key col
+                        } else if (groupExpr instanceof ProjectExpression) {
+                            onePosKeys.add(((ProjectExpression) groupExpr).getColNum());
+                        } else {
+                            throw new Exception("Penny tagging does not support complex expressions in group/cogroup.");
+                        }
+                    } else {
+                        throw new Exception("Penny tagging does not support complex expressions in group/cogroup.");
+                    }
+                }
+                opKeys.put(pos, onePosKeys);
+            }
+            return opKeys;
+        } else {
+            throw new Exception("Operator keys not defined for non-(co)group operator.");
+        }
+    }
+    
+    /**
+     * Get aliases that this alias depends on directly (for a load, will be an empty list).
+     */
+    public List<String> inEdges(String alias) {
+        List<Operator> inOps = queryPlan.getPredecessors(aliasOperatorMap.get(alias));
+        if (inOps == null) inOps = new LinkedList<Operator>();
+        List<String> inAliases = new ArrayList<String>(inOps.size());
+        for (Operator inOp : inOps) inAliases.add(((LogicalRelationalOperator) inOp).getAlias());
+        return inAliases;
+    }
+    
+    /**
+     * Get the alias that this alias feeds into directly.
+     */
+    public String outEdge(String alias) {
+        List<Operator> outOps = queryPlan.getSuccessors(aliasOperatorMap.get(alias));
+        if (outOps == null) outOps = new LinkedList<Operator>();
+        if (outOps.size() > 1) throw new RuntimeException("Expecting at most one outgoing edge.");
+        if (outOps.size() == 0) return null;
+        if (outOps.get(0).getName().equals("LOStore")) return null;
+        else return ((LogicalRelationalOperator) outOps.get(0)).getAlias();
+    }
+    
+    /**
+     * Does this script have a linear chain structure (i.e. no joins, splits, etc.)?
+     */
+    public boolean isChain() {
+        for (String alias : aliases()) {
+      // Ibis change : start
+            if (operator(alias).equals("LOSplit")) return false;
+      // Ibis change : end
+            if (inEdges(alias).size() > 1) return false;
+        }
+        return true;
+    }
+    
+    /**
+     * Is this alias an entry-point to a new map or reduce task stage?
+     */
+    public boolean isTaskEntryPoint(String alias) {
+        String LogicalRelationalOperator = operator(alias);
+        if (LogicalRelationalOperator == null) return false;
+    // Ibis change : start
+        return (LogicalRelationalOperator.equals("LOLoad") || LogicalRelationalOperator.equals("LOGroup") ||
+        LogicalRelationalOperator.equals("LOCogroup") || LogicalRelationalOperator.equals("LOJoin"));
+    // Ibis change : end
+    }
+    
+    /**
+     * Is this alias an exit-point to for a map or reduce task stage?
+     */
+    public boolean isTaskExitPoint(String alias) {
+        String LogicalRelationalOperator = operator(alias);
+    // Ibis change : start
+    return (LogicalRelationalOperator.equals("LOStore") || LogicalRelationalOperator.equals("LOGroup") ||
+        LogicalRelationalOperator.equals("LOCogroup") || LogicalRelationalOperator.equals("LOJoin"));
+    // Ibis change : end
+    }
+    
+    private void truncate(LogicalPlan plan, String alias) throws FrontendException {
+        // find truncation point in plan copy
+        Operator truncationPoint = null;
+        for (Iterator<Operator> it = plan.getOperators(); it.hasNext(); ) {
+            LogicalRelationalOperator op = (LogicalRelationalOperator) it.next();
+            if (alias.equals(op.getAlias())) {
+                truncationPoint = op;
+                break;
+            }
+        }
+            
+        // find all operators to remove from plan (closure starting at truncation point)
+        List<Operator> leads = new LinkedList<Operator>();
+        leads.add(truncationPoint);
+        Set<Operator> toRemove = new HashSet<Operator>();
+        while (!leads.isEmpty()) {
+            Operator op = leads.remove(0);
+            Collection<Operator> successors = plan.getSuccessors(op);
+            if (successors == null) successors = new LinkedList<Operator>();
+            toRemove.addAll(successors);
+            leads.addAll(successors);
+        }
+        
+        // disconnect operators
+        for (Operator op : toRemove) {
+            for (Operator pred : new LinkedList<Operator>(plan.getPredecessors(op))) {
+                plan.disconnect(pred, op);
+            }
+        }
+        
+        // remove operators
+        for (Operator op : toRemove) {
+            plan.remove(op);
+        }
+        
+        // add "store" operator to store into temp file
+        LogicalRelationalOperator store = new LOStore(plan, new FileSpec(getTempFilename(), new FuncSpec("PigStorage")));
+        plan.add(store);
+        plan.connect(truncationPoint, store);
+    }
+    
+    private LogicalPlan createInstrumentedPlan(Map<String, ClassWithArgs> monitorClasses, InetSocketAddress masterAddr, List<String> logicalIds, String truncateAtAlias, boolean outputToTemp) throws Exception {
+        // sanity check
+        Set<String> aliases = new HashSet<String>(aliases());
+        for (String alias : monitorClasses.keySet()) {
+            if (!aliases.contains(alias)) throw new IllegalArgumentException("Illegal request to monitor pig script alias " + alias);
+        }
+        
+        // first, make a fresh copy of the query plan so we can mutate (i.e. instrument) it
+        // since LogicalPlan does not offer a deepCopy() method, we'll do this by re-parsing the original script
+        ToolsPigServer newPigServer = new ToolsPigServer(pigServer.getPigContext());
+        newPigServer.registerNoRun(pigScript, new HashMap<String, String>(), new LinkedList<String>());
+        LogicalPlan instrumentedQueryPlan = newPigServer.getPlans().lp;
+        if (truncateAtAlias != null) truncate(instrumentedQueryPlan, truncateAtAlias);
+        Map<String, LogicalRelationalOperator> cloneAliasOperatorMap = new HashMap<String, LogicalRelationalOperator>();
+    // Ibis change : start
+        populateAliasOperatorMap(instrumentedQueryPlan, cloneAliasOperatorMap, null);
+    // Ibis change : end
+
+        for (String alias : monitorClasses.keySet()) {
+            addAgentLogicalRelationalOperator(instrumentedQueryPlan, cloneAliasOperatorMap, alias, monitorClasses.get(alias), masterAddr, logicalIds, withinTaskUpstreamNeighbors(alias), withinTaskDownstreamNeighbors(alias), crossTaskDownstreamNeighbors(alias), incomingCrossTaskKeyFields(alias), outgoingCrossTaskKeyFields(alias));
+        }
+        
+        if (outputToTemp) {
+            // convert all store operators to go to temp file
+            Collection<LogicalRelationalOperator> ops = new LinkedList<LogicalRelationalOperator>();
+            for (Iterator<Operator> it = instrumentedQueryPlan.getOperators(); it.hasNext(); ) {
+                ops.add((LogicalRelationalOperator) it.next());
+            }
+            for (LogicalRelationalOperator op : ops) {
+                if (op instanceof LOStore) {
+                    FuncSpec funcSpec = ((LOStore) op).getOutputSpec().getFuncSpec();
+                    instrumentedQueryPlan.replace(op, new LOStore(instrumentedQueryPlan, new FileSpec(getTempFilename(), funcSpec)));
+                }
+            }
+        }
+        
+        return instrumentedQueryPlan;
+    }
+    
+    private Set<String> withinTaskUpstreamNeighbors(String alias) {
+        if (!isCrossTaskLogicalRelationalOperator(operator(alias))) return new HashSet<String>(inEdges(alias));
+        else return new HashSet<String>();
+    }
+    
+    private Set<String> withinTaskDownstreamNeighbors(String alias) {
+        Set<String> neighbors = new HashSet<String>();
+        for (String otherAlias : aliases()) {
+            if (inEdges(otherAlias).contains(alias) && !isCrossTaskLogicalRelationalOperator(operator(otherAlias))) {
+                neighbors.add(otherAlias);
+            }
+        }
+        return neighbors;
+    }
+    
+    private Set<String> crossTaskDownstreamNeighbors(String alias) {
+        Set<String> neighbors = new HashSet<String>();
+        for (String otherAlias : aliases()) {
+            if (inEdges(otherAlias).contains(alias) && isCrossTaskLogicalRelationalOperator(operator(otherAlias))) {
+                neighbors.add(otherAlias);
+            }
+        }
+        return neighbors;
+    }
+    
+    private Map<String, List<Integer>> incomingCrossTaskKeyFields(String alias) throws Exception {
+        if (isCrossTaskLogicalRelationalOperator(operator(alias))) {
+            Map<String, List<Integer>> result = new HashMap<String, List<Integer>>();
+            int pos = 0;
+            for (String originAlias : inEdges(alias)) {
+                List<Integer> oneResult = new ArrayList<Integer>();
+                if (operator(alias).equals("LOCogroup")) {
+                    int numKeys = opKeys(alias).get(pos).size();
+                    if (numKeys > 1) {        // (co)group with multiple group keys
+                        throw new Exception("Penny tagging does not support complex expressions in group/cogroup.");
+                    } else if (numKeys == 1) {                                        // (co)group with single group key
+                        oneResult.add(0);
+                    } else {            // "group all" or "group <const>"
+                        // leave empty
+                    }
+                } else if (operator(alias).equals("LOSort")) {
+                    oneResult.addAll(generateFieldList(0, arity(alias)));                                // for sort, use all fields
+                } else if (operator(alias).equals("LOJoin")) {
+                    if (pos == 0) oneResult.addAll(generateFieldList(0, arity(originAlias)));            // for join, use all fields from one input
+                    else oneResult.addAll(generateFieldList(arity(originAlias), arity(alias)));
+                } else {
+                    throw new RuntimeException("Unrecognized operator: " + operator(alias));
+                }
+                result.put(originAlias, oneResult);
+                pos++;
+            }
+            return result;
+        } else {
+            return null;
+        }
+    }
+    
+    private List<Integer> outgoingCrossTaskKeyFields(String alias) throws Exception {
+        String dstAlias = outEdge(alias);
+        int pos = inEdges(dstAlias).indexOf(alias);
+        if (dstAlias != null && isCrossTaskLogicalRelationalOperator(operator(dstAlias))) {
+            if (operator(dstAlias).equals("LOCogroup")) {
+                return opKeys(dstAlias).get(pos);
+            } else if (operator(dstAlias).equals("LOJoin") || operator(dstAlias).equals("LOSort")) {
+                return generateFieldList(0, arity(alias));
+            } else {
+                throw new RuntimeException("Unrecognized operator: " + operator(dstAlias));
+            }
+        } else {
+            return null;
+        }
+    }
+    
+    private static List<Integer> generateFieldList(int a, int b) {
+        List<Integer> l = new ArrayList<Integer>();
+        for (int i = a; i < b; i++) l.add(i);
+        return l;
+    }
+    
+    private static boolean isCrossTaskLogicalRelationalOperator(String opName) {
+        return (opName.equals("LOCogroup") || opName.equals("LOCross") || opName.equals("LOJoin") || opName.equals("LOSort"));
+    }
+    
+    private static void addAgentLogicalRelationalOperator(LogicalPlan queryPlan, Map<String, LogicalRelationalOperator> aliasOperatorMap, String alias, ClassWithArgs monitorClass, InetSocketAddress masterAddr, List<String> logicalIds, Set<String> withinTaskUpstreamNeighbors, Set<String> withinTaskDownstreamNeighbors, Set<String> crossTaskDownstreamNeighbors, Map<String, List<Integer>> incomingCrossTaskKeyFields, List<Integer> outgoingCrossTaskKeyFields) throws InstantiationException, IllegalAccessException, IOException {
+        //String monitorFuncName = "monitor_" + alias;
+        MonitorAgentUDFArgs args = new MonitorAgentUDFArgs(alias, monitorClass, masterAddr, logicalIds, withinTaskUpstreamNeighbors, withinTaskDownstreamNeighbors, crossTaskDownstreamNeighbors, incomingCrossTaskKeyFields, outgoingCrossTaskKeyFields);
+        
+        // create pig LogicalRelationalOperator equivalent to this syntax:
+        // "DEFINE " + monitorFuncName + " " + MonitorAgentUDF.class.getCanonicalName() + "(\'" + ObjectSerializer.serialize(args) + "\');
+        // alias + " = FOREACH " + alias + " GENERATE FLATTEN(" + monitorFuncName + "(" + monitorFieldList(monitorClass.theClass()) + "));
+
+        FuncSpec funcSpec = new FuncSpec(MonitorAgentUDF.class.getCanonicalName(), ObjectSerializer.serialize(args));
+        
+        LOForEach agentOp = new LOForEach(queryPlan);
+        LogicalPlan innerPlan = new LogicalPlan();
+        agentOp.setInnerPlan(innerPlan);
+        
+        boolean[] flatten = { true };
+        List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>(1);
+        LogicalRelationalOperator innerLoad = new LOInnerLoad(innerPlan, agentOp, -1);        // -1 in the 3rd arg. means "*"
+        LogicalRelationalOperator innerGenerate = new LOGenerate(innerPlan, exps, flatten);
+        innerPlan.add(innerLoad);
+        innerPlan.add(innerGenerate);
+        innerPlan.connect(innerLoad, innerGenerate);
+
+        LogicalExpressionPlan lep = new LogicalExpressionPlan();
+        LogicalExpression proj = new ProjectExpression(lep, 0, -1, innerGenerate);    // -1 in the 3rd arg. means "*"
+        LogicalExpression func = new UserFuncExpression(lep, funcSpec);
+        lep.add(proj);
+        lep.add(func);
+        lep.connect(func, proj);
+        exps.add(lep);
+        
+        LogicalRelationalOperator opBefore = aliasOperatorMap.get(alias);
+        LogicalRelationalOperator opAfter = (LogicalRelationalOperator) queryPlan.getSuccessors(opBefore).get(0);
+        queryPlan.add(agentOp);
+        queryPlan.insertBetween(opBefore, agentOp, opAfter);
+    }
+    
+    private String monitorFieldList(Class monitorClass) throws InstantiationException, IllegalAccessException {
+        Set<Integer> fields = ((MonitorAgent) monitorClass.newInstance()).furnishFieldsToMonitor();
+        if (fields == null) {
+            return "*";
+        } else {
+            StringBuffer sb = new StringBuffer();
+            
+            for (Iterator<Integer> it = fields.iterator(); it.hasNext(); ) {
+                sb.append("$" + it.next());
+                if (it.hasNext()) sb.append(", ");
+            }
+            return sb.toString();
+        }
+    }
+    
+    private String getTempFilename() {
+        return ("/tmp/t_" + Math.abs(rand.nextInt()));
+    }
+    
+    @Override
+    public String toString() {
+        return queryPlan.toString();
+    }
+
+}

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PennyServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PennyServer.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PennyServer.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PennyServer.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+
+public class PennyServer {
+    
+    private final PigContext pigContext;
+    private static ExecType execType = ExecType.MAPREDUCE;
+    private static Properties properties;
+    public static void setExecType(ExecType execType) {
+        PennyServer.execType = execType;
+    }
+    public static void setProperties(Properties properties) {
+        PennyServer.properties = properties;
+    }
+    
+    public PennyServer() throws ExecException {
+        pigContext = new PigContext(execType, properties);
+    }
+
+    public ParsedPigScript parse(String pigScriptFilename) throws IOException {
+        return new ParsedPigScript(pigContext, pigScriptFilename);
+    }
+    
+    public PigContext getPigContext() {
+        return pigContext;
+    }
+
+}

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PhysicalLocation.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PhysicalLocation.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PhysicalLocation.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PhysicalLocation.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+
+public class PhysicalLocation implements Location {
+    
+    private final LogicalLocation logLoc;
+    private final int physId;
+    
+    private static final PhysicalLocation coordinatorLocation = new PhysicalLocation("", -1);
+    public static PhysicalLocation coordinatorLocation() {
+        return coordinatorLocation;
+    }
+    
+    public PhysicalLocation(LogicalLocation logLoc, int physId) {
+        this.logLoc = logLoc;
+        this.physId = physId;
+    }
+    
+    public PhysicalLocation(String logId, int physId) {
+        this(new LogicalLocation(logId), physId);
+    }
+
+    @Override
+    public LogicalLocation asLogical() {
+        return logLoc;
+    }
+
+    @Override
+    public boolean isLogicalOnly() {
+        return false;
+    }
+    
+    @Override
+    public String logId() {
+        return logLoc.logId();
+    }
+
+    public int physId() {
+        return physId;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (!(other instanceof PhysicalLocation)) return false;
+        PhysicalLocation plOther = (PhysicalLocation) other;
+        return (plOther.logLoc.equals(this.logLoc) && (plOther.physId == this.physId));
+    }
+    
+    @Override
+    public int hashCode() {
+        return logLoc.hashCode() + physId;
+    }
+    
+    @Override
+    public String toString() {
+        if (this.equals(coordinatorLocation())) return "Loc[COORD]";
+        else return "Loc[" + logId() + ";" + physId + "]";
+    }
+
+}

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTCoordinator.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTCoordinator.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTCoordinator.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTCoordinator.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny.apps.bt;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.penny.Coordinator;
+import org.apache.pig.penny.Location;
+
+
+
+public class BTCoordinator extends Coordinator {
+    
+    private Set<String> matchedTags = new HashSet<String>();
+    private Map<String, Collection<Tuple>> taggedTuples = new HashMap<String, Collection<Tuple>>();
+
+    @Override
+    public Object finish() {
+        Collection<Tuple> traceResults = new LinkedList<Tuple>();
+        for (String tag : matchedTags) {
+            traceResults.addAll(taggedTuples.get(tag));
+        }
+        return traceResults;
+    }
+
+    @Override
+    public void init(Serializable[] args) {
+    }
+
+    @Override
+    public void receiveMessage(Location source, Tuple message) {
+        try {
+            if (message.get(0).equals("matched_tags")) {
+                Tuple tagsAsTuple = (Tuple) message.get(1);
+                for (int i = 0; i < tagsAsTuple.size(); i++) {
+                    matchedTags.add((String) tagsAsTuple.get(i));
+                }
+            } else {
+                Tuple t = (Tuple) message.get(0);
+                Tuple tagsAsTuple = (Tuple) message.get(1);
+                for (int i = 0; i < tagsAsTuple.size(); i++) {
+                    String tag = (String) tagsAsTuple.get(i);
+                    if (!taggedTuples.containsKey(tag)) taggedTuples.put(tag, new LinkedList<Tuple>());
+                    taggedTuples.get(tag).add(t);
+                }            
+            }
+        } catch (ExecException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTInjectTaintMonitorAgent.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTInjectTaintMonitorAgent.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTInjectTaintMonitorAgent.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTInjectTaintMonitorAgent.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny.apps.bt;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.penny.Location;
+import org.apache.pig.penny.MonitorAgent;
+
+
+
+public class BTInjectTaintMonitorAgent extends MonitorAgent {
+
+    private int counter = 0;
+    
+    @Override
+    public void finish() {
+    }
+
+    @Override
+    public Set<Integer> furnishFieldsToMonitor() {
+        return null;
+    }
+
+    @Override
+    public void init(Serializable[] args) {
+    }
+
+    @Override
+    public Set<String> observeTuple(Tuple t, Set<String> tags) throws ExecException {
+        // for now, consider every tuple as a candidate; later restrict the candidate set using some user-supplied predicate
+        
+        String tag = communicator().myLocation() + "_" + (counter++);            // assign a unique tag to every candidate tuple
+        Set<String> injectedTags = new HashSet<String>();
+        injectedTags.add(tag);
+        Tuple tagAsTuple = new DefaultTuple();
+        tagAsTuple.append(tag);
+        communicator().sendToCoordinator(t, tagAsTuple);
+        return injectedTags;
+    }
+
+    @Override
+    public void receiveMessage(Location source, Tuple message) {
+    }
+
+}

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTMatchTaintMonitorAgent.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTMatchTaintMonitorAgent.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTMatchTaintMonitorAgent.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTMatchTaintMonitorAgent.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny.apps.bt;
+
+import java.io.Serializable;
+import java.util.Set;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.penny.Location;
+import org.apache.pig.penny.MonitorAgent;
+
+
+
+public class BTMatchTaintMonitorAgent extends MonitorAgent {
+
+    private String traceTuple;
+    
+    @Override
+    public void finish() {
+    }
+
+    @Override
+    public Set<Integer> furnishFieldsToMonitor() {
+        return null;
+    }
+
+    @Override
+    public void init(Serializable[] args) {
+        this.traceTuple = (String) args[0];
+    }
+
+    @Override
+    public Set<String> observeTuple(Tuple t, Set<String> tags) throws ExecException {
+        if (t.toString().equals(traceTuple)) {
+            Tuple tagsAsTuple = new DefaultTuple();
+            for (String tag : tags) tagsAsTuple.append(tag);
+            communicator().sendToCoordinator("matched_tags", tagsAsTuple);
+        }
+        return FILTER_OUT;
+    }
+
+    @Override
+    public void receiveMessage(Location source, Tuple message) {
+    }
+
+}

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTPropagateTaintMonitorAgent.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTPropagateTaintMonitorAgent.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTPropagateTaintMonitorAgent.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTPropagateTaintMonitorAgent.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny.apps.bt;
+
+import java.io.Serializable;
+import java.util.Set;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.penny.Location;
+import org.apache.pig.penny.MonitorAgent;
+
+
+
+public class BTPropagateTaintMonitorAgent extends MonitorAgent {
+
+    @Override
+    public void finish() {
+    }
+
+    @Override
+    public Set<Integer> furnishFieldsToMonitor() {
+        return null;
+    }
+
+    @Override
+    public void init(Serializable[] args) {
+    }
+
+    @Override
+    public Set<String> observeTuple(Tuple t, Set<String> tags) throws ExecException {
+        if (!tags.isEmpty()) {
+            Tuple tagsAsTuple = new DefaultTuple();
+            for (String tag : tags) tagsAsTuple.append(tag);
+            communicator().sendToCoordinator(t, tagsAsTuple);
+        }
+        return tags;
+    }
+
+    @Override
+    public void receiveMessage(Location source, Tuple message) {
+    }
+
+}

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/Main.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/Main.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/Main.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny.apps.bt;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.penny.ClassWithArgs;
+import org.apache.pig.penny.ParsedPigScript;
+import org.apache.pig.penny.PennyServer;
+
+
+
+/**
+ * Backward tracing app.
+ */
+public class Main {
+    
+    public static void main(String[] args) throws Exception {
+        
+        PennyServer pennyServer = new PennyServer();
+        String pigScriptFilename = args[0];
+        ParsedPigScript parsedPigScript = pennyServer.parse(pigScriptFilename);
+        
+        String traceAlias = args[1];                            // script alias of tuple to get trace of
+        String traceTuple = args[2];                            // tuple to trace (has to match exactly tuple.toSTring())
+
+    // Ibis change : start
+        if (!(parsedPigScript.aliases().contains(traceAlias) && parsedPigScript.operator(traceAlias).equals("LOLoad"))) throw new IllegalArgumentException("Invalid alias.");
+    // Ibis change : end
+    
+        Map<String, ClassWithArgs> monitorClasses = new HashMap<String, ClassWithArgs>();
+        for (String alias : parsedPigScript.aliases()) {
+      // Ibis change : start
+            if (parsedPigScript.operator(alias).equals("LOLoad")) {
+      // Ibis change : end
+                monitorClasses.put(alias, new ClassWithArgs(BTInjectTaintMonitorAgent.class));                    
+            } else if (alias.equals(traceAlias)) {
+                monitorClasses.put(alias, new ClassWithArgs(BTMatchTaintMonitorAgent.class, traceTuple));
+            } else {
+                monitorClasses.put(alias, new ClassWithArgs(BTPropagateTaintMonitorAgent.class));
+            }
+        }
+        Collection<Tuple> traceResults = (Collection<Tuple>) parsedPigScript.trace(BTCoordinator.class, monitorClasses);
+        System.out.println("*** TRACE RESULTS:" + traceResults);
+    }
+            
+}

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CICoordinator.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CICoordinator.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CICoordinator.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CICoordinator.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny.apps.ci;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.penny.Coordinator;
+import org.apache.pig.penny.Location;
+import org.apache.pig.penny.PhysicalLocation;
+
+
+
+public class CICoordinator extends Coordinator {
+    
+    private Map<PhysicalLocation, Tuple> latestTuples = new HashMap<PhysicalLocation, Tuple>();        // latest tuple from each monitor agent
+
+    public void init(Serializable[] args) {
+    }
+    
+    public Object finish() {
+        return latestTuples;            // return latest tuples
+    }
+
+    public void receiveMessage(Location source, Tuple message) {
+        // keep track of latest tuple from each monitor agent
+        latestTuples.put((PhysicalLocation) source, message);
+    }
+
+}

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CIMonitorAgent.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CIMonitorAgent.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CIMonitorAgent.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CIMonitorAgent.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny.apps.ci;
+
+import java.io.Serializable;
+import java.util.Set;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.penny.Location;
+import org.apache.pig.penny.MonitorAgent;
+
+
+
+public class CIMonitorAgent extends MonitorAgent {
+
+    private int firstToMonitor;                // first tuple # to monitor
+    private int skipSize;                    // # of tuples to skip between monitoring
+    private int tupleNum = 0;                // # of tuples seen so far
+    
+    public Set<Integer> furnishFieldsToMonitor() {
+        return null;        // monitor all fields
+    }
+
+    public void init(Serializable[] args) {
+        firstToMonitor = (Integer) args[0];
+        skipSize = (Integer) args[1];
+    }
+
+    public Set<String> observeTuple(Tuple t, Set<String> tags) {
+        if ((tupleNum >= firstToMonitor) && (tupleNum % skipSize == 0)) {
+            communicator().sendToCoordinator(tupleNum, t);
+        }
+        tupleNum++;
+        return NO_TAGS;
+    }
+
+    public void receiveMessage(Location source, Tuple message) {
+    }
+    
+    public void finish() {
+    }
+
+}

Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/Main.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/Main.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/Main.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny.apps.ci;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.penny.ClassWithArgs;
+import org.apache.pig.penny.ParsedPigScript;
+import org.apache.pig.penny.PennyServer;
+import org.apache.pig.penny.PhysicalLocation;
+
+
+
+/**
+ * Crash investigator app.
+ */
+public class Main {
+    
+    public static void main(String[] args) throws Exception {
+        
+        System.out.println("***** CRASH INVESTIGATOR STARTING ...");
+        
+        PennyServer pennyServer = new PennyServer();
+        String pigScriptFilename = args[0];
+        ParsedPigScript parsedPigScript = pennyServer.parse(pigScriptFilename);
+
+        Map<String, Integer> lowerBounds = new HashMap<String, Integer>();            // lower bound on tuple num for each logical location
+        Map<PhysicalLocation, Tuple> latestTuples = null;
+        for (int skipSize = 100; skipSize > 0; skipSize /= 10) {
+            System.out.println("***** TRYING SCRIPT ...");
+            
+            Map<String, ClassWithArgs> monitorClasses = new HashMap<String, ClassWithArgs>();
+            for (String alias : parsedPigScript.aliases()) {
+                int lowerBound = (lowerBounds.containsKey(alias))? lowerBounds.get(alias) : 0;
+                monitorClasses.put(alias, new ClassWithArgs(CIMonitorAgent.class, lowerBound, skipSize));
+            }
+            latestTuples = (Map<PhysicalLocation, Tuple>) parsedPigScript.trace(CICoordinator.class, monitorClasses);
+            
+            // update lower bounds
+            lowerBounds.clear();
+            for (Entry<PhysicalLocation, Tuple> entry : latestTuples.entrySet()) {
+                String alias = entry.getKey().logId();
+                int tupleNum = (Integer) entry.getValue().get(0);
+                if (lowerBounds.containsKey(alias)) lowerBounds.put(alias, Math.min(lowerBounds.get(alias), tupleNum));
+                else lowerBounds.put(alias, tupleNum);
+            }
+        }
+        
+        Map<String, Set<Tuple>> crashCulprits = new HashMap<String, Set<Tuple>>();
+        for (Entry<PhysicalLocation, Tuple> entry : latestTuples.entrySet()) {
+            String alias = entry.getKey().logId();
+            Tuple t = (Tuple) entry.getValue().get(1);
+            if (!crashCulprits.containsKey(alias)) crashCulprits.put(alias, new HashSet<Tuple>());
+            crashCulprits.get(alias).add(t);
+        }
+        
+        System.out.println("\n\n***** CRASH SUSPECTS:");
+        for (String alias : parsedPigScript.aliases()) {
+            if (crashCulprits.containsKey(alias)) {
+                System.out.println("\nAlias " + alias + ":");
+                for (Tuple t : crashCulprits.get(alias)) {
+                    System.out.println("\t" + t);
+                }
+            }
+        }
+    }
+
+}