You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2011/04/14 18:11:25 UTC
svn commit: r1092378 [1/3] - in /pig/trunk: ./ contrib/ contrib/penny/
contrib/penny/java/ contrib/penny/java/src/ contrib/penny/java/src/main/
contrib/penny/java/src/main/java/ contrib/penny/java/src/main/java/org/
contrib/penny/java/src/main/java/org...
Author: gates
Date: Thu Apr 14 16:11:22 2011
New Revision: 1092378
URL: http://svn.apache.org/viewvc?rev=1092378&view=rev
Log:
PIG-1959 Penny: a framework for workflow instrumentation
Added:
pig/trunk/contrib/penny/
pig/trunk/contrib/penny/java/
pig/trunk/contrib/penny/java/build.xml
pig/trunk/contrib/penny/java/src/
pig/trunk/contrib/penny/java/src/main/
pig/trunk/contrib/penny/java/src/main/java/
pig/trunk/contrib/penny/java/src/main/java/org/
pig/trunk/contrib/penny/java/src/main/java/org/apache/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ClassWithArgs.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Communicator.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Coordinator.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Location.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/LogicalLocation.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/MonitorAgent.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/NoSuchLocationException.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ParsedPigScript.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PennyServer.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PhysicalLocation.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTCoordinator.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTInjectTaintMonitorAgent.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTMatchTaintMonitorAgent.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTPropagateTaintMonitorAgent.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/Main.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CICoordinator.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CIMonitorAgent.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/Main.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/dh/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/dh/DHCoordinator.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/dh/DHMonitorAgent.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/dh/Main.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ds/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ds/DSCoordinator.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ds/DSMonitorAgent.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ds/Main.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ft/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ft/FTCoordinator.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ft/FTMonitorAgent.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ft/Main.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/gl/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/gl/GLCoordinator.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/gl/GLMonitorAgent1.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/gl/GLMonitorAgent2.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/gl/GoldenLogic.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/gl/Main.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/gl/goldenLogicClasses/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/gl/goldenLogicClasses/FlattenLinksGL.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/la/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/la/LACoordinator.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/la/LAMonitorAgent.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/la/Main.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/lp/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/lp/LPCoordinator.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/lp/LPMonitorAgent.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/lp/Main.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/nop/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/nop/Main.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/nop/NOPCoordinator.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/nop/NOPMonitorAgent.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/op/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/op/Main.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/op/OPCoordinator.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/op/OPMonitorAgent.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ri/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ri/Main.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ri/RICoordinator.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ri/RIMonitorAgent.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ti/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ti/Main.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ti/TICoordinator.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ti/TIMonitorAgent.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/tr/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/tr/Main.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/tr/TRCoordinator.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/tr/TRMonitorAgent.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/comm/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/comm/AsyncMessageReceiptCallback.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/comm/Message.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/comm/MessageReceiptCallback.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/comm/SyncCallResult.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/harnesses/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/harnesses/CoordinatorHarness.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/harnesses/MessagingClient.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/harnesses/MessagingServer.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/harnesses/MonitorAgentHarness.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/pig/
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/pig/MonitorAgentUDF.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/pig/MonitorAgentUDFArgs.java
pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/impl/pig/PigLauncher.java
pig/trunk/contrib/penny/java/src/test/
pig/trunk/contrib/penny/java/src/test/java/
pig/trunk/contrib/penny/java/src/test/java/org/
pig/trunk/contrib/penny/java/src/test/java/org/apache/
pig/trunk/contrib/penny/java/src/test/java/org/apache/pig/
pig/trunk/contrib/penny/java/src/test/java/org/apache/pig/penny/
pig/trunk/contrib/penny/java/src/test/java/org/apache/pig/penny/test/
pig/trunk/contrib/penny/java/src/test/java/org/apache/pig/penny/test/ComTest.java
pig/trunk/contrib/penny/java/src/test/java/org/apache/pig/penny/test/PennyAgentTest.java
Modified:
pig/trunk/contrib/CHANGES.txt
pig/trunk/ivy.xml
pig/trunk/ivy/ivysettings.xml
pig/trunk/ivy/libraries.properties
pig/trunk/src/org/apache/pig/tools/ToolsPigServer.java
Modified: pig/trunk/contrib/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/CHANGES.txt?rev=1092378&r1=1092377&r2=1092378&view=diff
==============================================================================
--- pig/trunk/contrib/CHANGES.txt (original)
+++ pig/trunk/contrib/CHANGES.txt Thu Apr 14 16:11:22 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1959 Penny: a framework for workflow instrumentation (olston, breed via gates)
+
PIG-1924 CSV Loader/Store that handles newlines in fields, and other Excel CSV
features. (paepcke via gates)
Added: pig/trunk/contrib/penny/java/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/build.xml?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/build.xml (added)
+++ pig/trunk/contrib/penny/java/build.xml Thu Apr 14 16:11:22 2011
@@ -0,0 +1,139 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project basedir="." default="jar" name="penny">
+ <!-- javac properties -->
+ <property name="javac.debug" value="on" />
+ <property name="javac.level" value="source,lines,vars"/>
+ <property name="javac.optimize" value="on" />
+ <property name="javac.deprecation" value="off" />
+ <property name="javac.version" value="1.6" />
+ <property name="javac.args" value="" />
+ <!-- TODO we should use warning... <property name="javac.args.warnings" value="-Xlint:unchecked" /> -->
+ <property name="javac.args.warnings" value="" />
+
+ <!-- build properties -->
+ <property name="build.dir" value="${basedir}/build" />
+ <property name="build.classes" value="${build.dir}/classes" />
+ <property name="build.docs" value="${build.dir}/docs" />
+ <property name="build.javadoc" value="${build.docs}/api" />
+ <property name="pigjar" value="../../../pig.jar" />
+ <property name="pigjar-withouthadoop" value="../../../pig-withouthadoop.jar" />
+ <property name="pigtest" value="../../../build/test/classes" />
+ <property name="pennyjar" value="penny.jar" />
+ <property name="src.dir" value="src/main/java/org/apache/pig/penny" />
+ <property name="netty.jar" value="../../../build/ivy/lib/Pig/netty-3.2.2.Final.jar"/>
+
+ <!-- jar properties -->
+ <property name=".javadoc" value="${build.docs}/api" />
+
+ <!-- test properties -->
+ <property name="test.build.dir" value="${build.dir}/test" />
+ <property name="test.classes" value="${test.build.dir}/classes" />
+ <property name="test.logs" value="${test.build.dir}/logs" />
+ <property name="test.timeout" value="900000" />
+ <property name="test.junit.output.format" value="plain" />
+ <property name="test.src.dir" value="src/test/java" />
+ <property name="junit.hadoop.conf" value="${user.home}/pigtest/conf/"/>
+
+ <path id="penny.classpath">
+ <pathelement location="${build.classes}"/>
+ <pathelement location="${pigjar}"/>
+ <pathelement location="${pigjar-withouthadoop}"/>
+ <pathelement location="${pigtest}"/>
+ <fileset dir="../../../build/ivy/lib">
+ <include name="**/*.jar"/>
+ </fileset>
+ </path>
+
+ <path id="test.classpath">
+ <pathelement location="${build.classes}"/>
+ <pathelement location="${pigjar}"/>
+ <pathelement location="${pigjar-withouthadoop}"/>
+ <pathelement location="${test.classes}"/>
+ <pathelement location="${test.src.dir}"/>
+ <pathelement location="${junit.hadoop.conf}" />
+ <pathelement location="${pigtest}"/>
+ <fileset dir="../../../build/ivy/lib">
+ <include name="**/*.jar"/>
+ </fileset>
+ </path>
+
+ <target name="init">
+ <mkdir dir="${build.dir}"/>
+ <mkdir dir="${build.classes}"/>
+ <mkdir dir="${test.build.dir}"/>
+ <mkdir dir="${test.classes}"/>
+ <mkdir dir="${build.javadoc}"/>
+ </target>
+ <target name="clean">
+ <delete dir="build"/>
+ </target>
+ <target depends="init" name="compile" description="compile all of the class files">
+ <echo> *** Compiling Penny ***</echo>
+ <javac srcdir="${src.dir}" debug="${javac.debug}" debuglevel="${javac.level}" destdir="${build.classes}" source="${javac.version}"
+ target="${javac.version}" optimize="${javac.optimize}" deprecation="${javac.deprecation}">
+ <compilerarg line="${javac.args} ${javac.args.warnings}" />
+ <classpath refid="penny.classpath"/>
+ </javac>
+ </target>
+ <target depends="init,compile" name="jar" description="create the jar files">
+ <echo> *** Creating penny.jar ***</echo>
+ <jar destfile="${pennyjar}">
+ <fileset dir="build/classes"/>
+ </jar>
+ </target>
+ <target depends="compile" name="compile-test">
+ <echo> *** Compiling Penny tests ***</echo>
+ <javac srcdir="${test.src.dir}" debug="${javac.debug}" debuglevel="${javac.level}" destdir="${test.classes}" source="${javac.version}" target="${javac.version}">
+ <compilerarg line="${javac.args} ${javac.args.warnings}"/>
+ <classpath refid="test.classpath"/>
+ </javac>
+ </target>
+ <target depends="compile-test,jar" name="test">
+ <echo> *** Running Penny tests ***</echo>
+ <delete dir="${test.logs}"/>
+ <mkdir dir="${test.logs}"/>
+ <junit printsummary="yes" haltonfailure="no" fork="yes" maxmemory="512m" dir="${basedir}" timeout="${test.timeout}" errorProperty="tests.failed" failureProperty="tests.failed">
+ <sysproperty key="hadoop.log.dir" value="${test.logs}"/>
+ <classpath refid="test.classpath"/>
+ <formatter type="${test.junit.output.format}" />
+ <batchtest fork="yes" todir="${test.logs}" unless="testcase">
+ <fileset dir="${test.src.dir}">
+ <include name="**/*Test*.java" />
+ </fileset>
+ </batchtest>
+ <batchtest fork="yes" todir="${test.logs}" if="testcase">
+ <fileset dir="${test.src.dir}" includes="**/${testcase}.java"/>
+ </batchtest>
+ </junit>
+ <fail if="tests.failed">Tests failed!</fail>
+ </target>
+ <target depends="init" name="javadoc"
+ description="build javadoc for all of the packages">
+ <echo> *** Creating Javadocs ***</echo>
+ <javadoc destdir="build/javadoc"
+ author="true">
+ <fileset dir="${src.dir}/evaluation" includes="**/*.java"/>
+ <fileset dir="${src.dir}/storage" includes="**/*.java"/>
+ <fileset dir="${src.dir}/filtering" includes="**/*.java"/>
+ <fileset dir="${src.dir}/grouping" includes="**/*.java"/>
+ <fileset dir="${src.dir}/comparison" includes="**/*.java"/>
+ </javadoc>
+ </target>
+
+</project>
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ClassWithArgs.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ClassWithArgs.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ClassWithArgs.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ClassWithArgs.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+import java.io.Serializable;
+
+public class ClassWithArgs {
+
+ private final Class theClass;
+ private final Serializable[] args;
+
+ public ClassWithArgs(Class theClass, Serializable ... args) {
+ this.theClass = theClass;
+ this.args = args;
+ }
+
+ public Class theClass() {
+ return theClass;
+ }
+
+ public Serializable[] args() {
+ return args;
+ }
+
+}
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Communicator.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Communicator.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Communicator.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Communicator.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+
+public abstract class Communicator {
+
+ /**
+ * Find out my (physical) location.
+ */
+ public abstract Location myLocation();
+
+ /**
+ * Send an message to the coordinator, asynchronously.
+ */
+ public abstract void sendToCoordinator(Tuple message);
+
+ /**
+ * Send a message to immediate downstream neighbor(s), synchronously.
+ * If downstream neighbor(s) span a task boundary, all instances will receive it; otherwise only same-task instances will receive it.
+ * If there is no downstream neighbor, an exception will be thrown.
+ */
+ public abstract void sendDownstream(Tuple message) throws NoSuchLocationException;
+
+ /**
+ * Send a message to immediate upstream neighbor(s), synchronously.
+ * If upstream neighbor(s) are non-existent or span a task boundary, an exception will be thrown.
+ */
+ public abstract void sendUpstream(Tuple message) throws NoSuchLocationException;
+
+ /**
+ * Send a message to current/future instances of a given logical location.
+ * Instances that have already terminated will not receive the message (obviously).
+ * Instances that are currently executing will receive it asynchronously (or perhaps not at all, if they terminate before the message arrives).
+ * Instances that have not yet started will receive the message prior to beginning processing of tuples.
+ */
+ public abstract void sendToAgents(LogicalLocation destination, Tuple message) throws NoSuchLocationException;
+
+ public void sendToCoordinator(Object ... message) {
+ sendToCoordinator(makeTuple(message));
+ }
+
+ public void sendDownstream(Object ... message) throws NoSuchLocationException {
+ sendDownstream(makeTuple(message));
+ }
+
+ public void sendUpstream(Object ... message) throws NoSuchLocationException {
+ sendUpstream(makeTuple(message));
+ }
+
+ public void sendToAgents(LogicalLocation destination, Object ... message) throws NoSuchLocationException {
+ sendToAgents(destination, makeTuple(message));
+ }
+
+ private static Tuple makeTuple(Object[] items) {
+ Tuple t = new DefaultTuple();
+ for (int i = 0; i < items.length; i++) {
+ t.append(items[i]);
+ }
+ return t;
+ }
+
+}
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Coordinator.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Coordinator.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Coordinator.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Coordinator.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+import java.io.Serializable;
+
+import org.apache.pig.data.Tuple;
+
+public abstract class Coordinator {
+
+ // ADDED 10-19-10 BY CHRIS:
+
+ private Communicator communicator;
+
+ public void initialize(Communicator communicator) {
+ this.communicator = communicator;
+ }
+
+ protected Communicator communicator() {
+ return communicator;
+ }
+
+
+
+ ///// Abstract methods that app-writer implements:
+
+ /**
+ * Initialize, using any arguments passed from higher layer.
+ */
+ public abstract void init(Serializable[] args);
+
+ /**
+ * Process an incoming (synchronous or asynchronous) message.
+ */
+ public abstract void receiveMessage(Location source, Tuple message);
+
+ /**
+ * The data flow has completed and all messages have been delivered. Finish processing.
+ *
+ * @return final output to pass back to application
+ */
+ public abstract Object finish();
+
+}
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Location.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Location.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Location.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/Location.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+import java.io.Serializable;
+
+public interface Location extends Serializable {
+
+ public LogicalLocation asLogical(); // return the logical-only part of this location (i.e. strip off physical id, if it has one)
+ public boolean isLogicalOnly(); // is this a logical-only location (i.e. no physical id)?
+ public String logId(); // return logical id
+
+}
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/LogicalLocation.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/LogicalLocation.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/LogicalLocation.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/LogicalLocation.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+public class LogicalLocation implements Location {
+
+ private final String id;
+
+ public LogicalLocation(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public LogicalLocation asLogical() {
+ return this;
+ }
+
+ @Override
+ public boolean isLogicalOnly() {
+ return true;
+ }
+
+ @Override
+ public String logId() {
+ return id;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof LogicalLocation)) return false;
+ LogicalLocation llOther = (LogicalLocation) other;
+ return (llOther.id.equals(this.id));
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "Loc[" + id + "]";
+ }
+
+}
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/MonitorAgent.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/MonitorAgent.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/MonitorAgent.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/MonitorAgent.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+
+public abstract class MonitorAgent {
+
+ ///// tag constants
+ public static final Set<String> NO_TAGS = new HashSet<String>();
+ public static final Set<String> FILTER_OUT = null;
+
+
+ private Communicator communicator;
+
+ public final void initialize(Communicator communicator) {
+ this.communicator = communicator;
+ }
+
+ protected Communicator communicator() {
+ if (communicator == null) {
+ throw new IllegalStateException("Agent must be initialized before getting communicator");
+ }
+ return communicator;
+ }
+
+
+ ///// Abstract methods that app-writer implements:
+
+ /**
+ * Furnish set of fields to monitor. (Null means monitor all fields ('*').)
+ */
+ public abstract Set<Integer> furnishFieldsToMonitor();
+
+ /**
+ * Initialize, using any arguments passed from higher layer.
+ */
+ public abstract void init(Serializable[] args);
+
+ /**
+ * Observe a tuple (record) that passes through the monitoring point.
+ *
+ * @param t the tuple
+ * @param tag t's tags
+ * @return FILTER_OUT to remove the tuple from the data stream; NO_TAGS to let it pass through and not give it any tags; a set of tags to let it pass through and assign those tags
+ */
+ public abstract Set<String> observeTuple(Tuple t, Set<String> tags) throws ExecException;
+
+ /**
+ * Process an incoming (synchronous or asynchronous) message.
+ */
+ public abstract void receiveMessage(Location source, Tuple message);
+
+ /**
+ * No more tuples are going to pass through the monitoring point. Finish any ongoing processing.
+ */
+ public abstract void finish();
+
+}
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/NoSuchLocationException.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/NoSuchLocationException.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/NoSuchLocationException.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/NoSuchLocationException.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+public class NoSuchLocationException extends Exception {
+
+ public NoSuchLocationException(String msg) {
+ super(msg);
+ }
+
+ public NoSuchLocationException(String msg, Exception cause) {
+ super(msg, cause);
+ }
+
+}
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ParsedPigScript.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ParsedPigScript.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ParsedPigScript.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/ParsedPigScript.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.UserFuncExpression;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.penny.impl.harnesses.CoordinatorHarness;
+import org.apache.pig.penny.impl.pig.MonitorAgentUDF;
+import org.apache.pig.penny.impl.pig.MonitorAgentUDFArgs;
+import org.apache.pig.penny.impl.pig.PigLauncher;
+import org.apache.pig.tools.ToolsPigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.ObjectSerializer;
+
+
+public class ParsedPigScript {
+
+ private final ToolsPigServer pigServer;
+ private final String pigScript; // user's raw pig script
+ private final LogicalPlan queryPlan; // pig's logical query plan object
+ private Map<String, LogicalRelationalOperator> aliasOperatorMap; // the pig LogicalRelationalOperator corresponding to each alias
+
+ // Ibis change : start
+ // the pig LOStore corresponding to each alias which is actually a 'LOStore'
+ private Map<String, LOStore> storeMap;
+ // Ibis change : end
+
+ private final Random rand = new Random(System.currentTimeMillis());
+
+ public ParsedPigScript(PigContext pigContext, String pigScriptFilename) throws IOException {
+ this.pigServer = new ToolsPigServer(pigContext);
+ this.pigScript = pigScriptFilename;
+ pigServer.registerNoRun(pigScript, new HashMap<String, String>(), new LinkedList<String>());
+ this.queryPlan = pigServer.getPlans().lp;
+ /* debug: */ //System.out.println("ORIGINAL PLAN: " + queryPlan);
+ aliasOperatorMap = new HashMap<String, LogicalRelationalOperator>();
+ // Ibis change : start
+ this.storeMap = new HashMap<String, LOStore>();
+ // Ibis change : end
+ populateAliasOperatorMap(queryPlan, aliasOperatorMap, storeMap);
+ }
+
+ // Ibis change : start
+ private static void populateAliasOperatorMap(LogicalPlan queryPlan, Map<String, LogicalRelationalOperator> map, Map<String, LOStore> storeMap) {
+ // Ibis change : end
+ for (Iterator<Operator> it = queryPlan.getOperators(); it.hasNext(); ) {
+ LogicalRelationalOperator op = (LogicalRelationalOperator) it.next();
+ if (!(op instanceof LOStore)) {
+ String alias = op.getAlias();
+ if (alias != null) {
+ if (map.containsKey(alias)) throw new RuntimeException("Cannot use Penny with scripts that re-use aliases. This script re-uses alias " + alias + ".");
+ map.put(alias, op);
+ }
+ }
+ // Ibis change : start
+ else if (null != storeMap) {
+ addStoreOperator(storeMap, (LOStore) op);
+ }
+ // Ibis change : end
+ }
+ }
+
+ private Object trace(ClassWithArgs coordinatorClass, Map<String, ClassWithArgs> monitorClasses, String truncateAtAlias, boolean outputToTemp) throws Exception {
+ List<String> logicalIds = new ArrayList<String>(monitorClasses.keySet());
+ Collections.sort(logicalIds);
+
+ LogicalPlan instrumentedQueryPlan = createInstrumentedPlan(monitorClasses, new InetSocketAddress(InetAddress.getLocalHost(), CoordinatorHarness.MASTER_LISTEN_PORT), logicalIds, truncateAtAlias, outputToTemp);
+ /* debug: */ //System.out.println("INSTRUMENTED PLAN: " + instrumentedQueryPlan);
+ return PigLauncher.launch(pigServer.getPigContext(), instrumentedQueryPlan, coordinatorClass);
+ }
+
+ public Object trace(ClassWithArgs coordinatorClass, Map<String, ClassWithArgs> monitorClasses, boolean outputToTemp) throws Exception {
+ return trace(coordinatorClass, monitorClasses, null, outputToTemp);
+ }
+
+ public Object truncateAndTrace(ClassWithArgs coordinatorClass, Map<String, ClassWithArgs> monitorClasses, String truncateAtAlias) throws Exception {
+ return trace(coordinatorClass, monitorClasses, truncateAtAlias, true);
+ }
+
+ public Object truncateAndTrace(Class coordinatorClass, Map<String, ClassWithArgs> monitorClasses, String truncateAtAlias) throws Exception {
+ return truncateAndTrace(new ClassWithArgs(coordinatorClass), monitorClasses, truncateAtAlias);
+ }
+
+ public Object trace(ClassWithArgs coordinatorClass, Map<String, ClassWithArgs> monitorClasses) throws Exception {
+ return trace(coordinatorClass, monitorClasses, false);
+ }
+
+ public Object trace(Class coordinatorClass, Map<String, ClassWithArgs> monitorClasses, boolean outputToTemp) throws Exception {
+ return trace(new ClassWithArgs(coordinatorClass), monitorClasses, outputToTemp);
+ }
+
+ public Object trace(Class coordinatorClass, Map<String, ClassWithArgs> monitorClasses) throws Exception {
+ return trace(coordinatorClass, monitorClasses, false);
+ }
+
+
+ // Ibis change : start
+ private static void addStoreOperator(Map<String, LOStore> storeMap, LOStore store) {
+
+ String alias = ((LogicalRelationalOperator)store.getPlan().getPredecessors(store).get(0)).getAlias();
+
+ if (storeMap.containsKey(alias)) throw new RuntimeException("Cannot use Penny with scripts that re-use aliases. This script re-uses alias " + alias + " in store.");
+ storeMap.put(alias, store);
+ }
+
+ /**
+ * Get all the aliases which are 'stored'.
+ * TODO: Change from storeMap.keySet() to Collections.unModifiableSet(storeMap.keySet()) so that consumers cant modify it. But I dont know enough of penny to be sure if it is required ...
+ *
+ * @return
+ */
+ public Collection<String> getStoreAliases(){
+ return storeMap.keySet();
+ }
+ // Ibis change : end
+ /*
+ * Get all aliases used in this script.
+ */
+ public Collection<String> aliases() {
+ return aliasOperatorMap.keySet();
+ }
+
+ public String operator(String alias) {
+ return aliasOperatorMap.get(alias).getName();
+ }
+
+ public int arity(String alias) throws Exception {
+ return aliasOperatorMap.get(alias).getSchema().size();
+ }
+
+ // Ibis change : start
+
+ /**
+ * Get the actual pig LogicalRelationalOperator corresponding to an alias.
+ * Note that Penny does not support reuse of aliases !
+ *
+ * @param alias
+ * @return
+ */
+ public LogicalRelationalOperator getLogicalRelationalOperator(String alias){
+ return aliasOperatorMap.get(alias);
+ }
+
+ /**
+ * Get the actual pig LOStore corresponding to an alias which is getting stored.
+ * Note that Penny does not support reuse of aliases (even for store) !
+ *
+ * @param alias
+ * @return
+ */
+ public LOStore getStoreOperator(String alias){
+ return storeMap.get(alias);
+ }
+ // Ibis change : end
+
+ public Map<Integer, List<Integer>> opKeys(String alias) throws Exception {
+ LogicalRelationalOperator op = aliasOperatorMap.get(alias);
+
+ if (op instanceof LOCogroup) {
+ Map<Integer, List<Integer>> opKeys = new HashMap<Integer, List<Integer>>();
+ MultiMap<Integer, LogicalExpressionPlan> expressionPlans = ((LOCogroup) op).getExpressionPlans();
+ for (int pos : expressionPlans.keySet()) {
+ List<Integer> onePosKeys = new LinkedList<Integer>();
+ for (LogicalExpressionPlan plan : expressionPlans.get(pos)) {
+ if (plan.size() == 1) {
+ Operator groupExpr = plan.getSources().get(0);
+ if (groupExpr instanceof ConstantExpression) {
+ // "group all" or "group by <const>", so don't append any key col
+ } else if (groupExpr instanceof ProjectExpression) {
+ onePosKeys.add(((ProjectExpression) groupExpr).getColNum());
+ } else {
+ throw new Exception("Penny tagging does not support complex expressions in group/cogroup.");
+ }
+ } else {
+ throw new Exception("Penny tagging does not support complex expressions in group/cogroup.");
+ }
+ }
+ opKeys.put(pos, onePosKeys);
+ }
+ return opKeys;
+ } else {
+ throw new Exception("Operator keys not defined for non-(co)group operator.");
+ }
+ }
+
+ /**
+ * Get aliases that this alias depends on directly (for a load, will be an empty list).
+ */
+ public List<String> inEdges(String alias) {
+ List<Operator> inOps = queryPlan.getPredecessors(aliasOperatorMap.get(alias));
+ if (inOps == null) inOps = new LinkedList<Operator>();
+ List<String> inAliases = new ArrayList<String>(inOps.size());
+ for (Operator inOp : inOps) inAliases.add(((LogicalRelationalOperator) inOp).getAlias());
+ return inAliases;
+ }
+
+ /**
+ * Get the alias that this alias feeds into directly.
+ */
+ public String outEdge(String alias) {
+ List<Operator> outOps = queryPlan.getSuccessors(aliasOperatorMap.get(alias));
+ if (outOps == null) outOps = new LinkedList<Operator>();
+ if (outOps.size() > 1) throw new RuntimeException("Expecting at most one outgoing edge.");
+ if (outOps.size() == 0) return null;
+ if (outOps.get(0).getName().equals("LOStore")) return null;
+ else return ((LogicalRelationalOperator) outOps.get(0)).getAlias();
+ }
+
+ /**
+ * Does this script have a linear chain structure (i.e. no joins, splits, etc.)?
+ */
+ public boolean isChain() {
+ for (String alias : aliases()) {
+ // Ibis change : start
+ if (operator(alias).equals("LOSplit")) return false;
+ // Ibis change : end
+ if (inEdges(alias).size() > 1) return false;
+ }
+ return true;
+ }
+
+ /**
+ * Is this alias an entry-point to a new map or reduce task stage?
+ */
+ public boolean isTaskEntryPoint(String alias) {
+ String LogicalRelationalOperator = operator(alias);
+ if (LogicalRelationalOperator == null) return false;
+ // Ibis change : start
+ return (LogicalRelationalOperator.equals("LOLoad") || LogicalRelationalOperator.equals("LOGroup") ||
+ LogicalRelationalOperator.equals("LOCogroup") || LogicalRelationalOperator.equals("LOJoin"));
+ // Ibis change : end
+ }
+
+ /**
+ * Is this alias an exit-point to for a map or reduce task stage?
+ */
+ public boolean isTaskExitPoint(String alias) {
+ String LogicalRelationalOperator = operator(alias);
+ // Ibis change : start
+ return (LogicalRelationalOperator.equals("LOStore") || LogicalRelationalOperator.equals("LOGroup") ||
+ LogicalRelationalOperator.equals("LOCogroup") || LogicalRelationalOperator.equals("LOJoin"));
+ // Ibis change : end
+ }
+
+ private void truncate(LogicalPlan plan, String alias) throws FrontendException {
+ // find truncation point in plan copy
+ Operator truncationPoint = null;
+ for (Iterator<Operator> it = plan.getOperators(); it.hasNext(); ) {
+ LogicalRelationalOperator op = (LogicalRelationalOperator) it.next();
+ if (alias.equals(op.getAlias())) {
+ truncationPoint = op;
+ break;
+ }
+ }
+
+ // find all operators to remove from plan (closure starting at truncation point)
+ List<Operator> leads = new LinkedList<Operator>();
+ leads.add(truncationPoint);
+ Set<Operator> toRemove = new HashSet<Operator>();
+ while (!leads.isEmpty()) {
+ Operator op = leads.remove(0);
+ Collection<Operator> successors = plan.getSuccessors(op);
+ if (successors == null) successors = new LinkedList<Operator>();
+ toRemove.addAll(successors);
+ leads.addAll(successors);
+ }
+
+ // disconnect operators
+ for (Operator op : toRemove) {
+ for (Operator pred : new LinkedList<Operator>(plan.getPredecessors(op))) {
+ plan.disconnect(pred, op);
+ }
+ }
+
+ // remove operators
+ for (Operator op : toRemove) {
+ plan.remove(op);
+ }
+
+ // add "store" operator to store into temp file
+ LogicalRelationalOperator store = new LOStore(plan, new FileSpec(getTempFilename(), new FuncSpec("PigStorage")));
+ plan.add(store);
+ plan.connect(truncationPoint, store);
+ }
+
+ private LogicalPlan createInstrumentedPlan(Map<String, ClassWithArgs> monitorClasses, InetSocketAddress masterAddr, List<String> logicalIds, String truncateAtAlias, boolean outputToTemp) throws Exception {
+ // sanity check
+ Set<String> aliases = new HashSet<String>(aliases());
+ for (String alias : monitorClasses.keySet()) {
+ if (!aliases.contains(alias)) throw new IllegalArgumentException("Illegal request to monitor pig script alias " + alias);
+ }
+
+ // first, make a fresh copy of the query plan so we can mutate (i.e. instrument) it
+ // since LogicalPlan does not offer a deepCopy() method, we'll do this by re-parsing the original script
+ ToolsPigServer newPigServer = new ToolsPigServer(pigServer.getPigContext());
+ newPigServer.registerNoRun(pigScript, new HashMap<String, String>(), new LinkedList<String>());
+ LogicalPlan instrumentedQueryPlan = newPigServer.getPlans().lp;
+ if (truncateAtAlias != null) truncate(instrumentedQueryPlan, truncateAtAlias);
+ Map<String, LogicalRelationalOperator> cloneAliasOperatorMap = new HashMap<String, LogicalRelationalOperator>();
+ // Ibis change : start
+ populateAliasOperatorMap(instrumentedQueryPlan, cloneAliasOperatorMap, null);
+ // Ibis change : end
+
+ for (String alias : monitorClasses.keySet()) {
+ addAgentLogicalRelationalOperator(instrumentedQueryPlan, cloneAliasOperatorMap, alias, monitorClasses.get(alias), masterAddr, logicalIds, withinTaskUpstreamNeighbors(alias), withinTaskDownstreamNeighbors(alias), crossTaskDownstreamNeighbors(alias), incomingCrossTaskKeyFields(alias), outgoingCrossTaskKeyFields(alias));
+ }
+
+ if (outputToTemp) {
+ // convert all store operators to go to temp file
+ Collection<LogicalRelationalOperator> ops = new LinkedList<LogicalRelationalOperator>();
+ for (Iterator<Operator> it = instrumentedQueryPlan.getOperators(); it.hasNext(); ) {
+ ops.add((LogicalRelationalOperator) it.next());
+ }
+ for (LogicalRelationalOperator op : ops) {
+ if (op instanceof LOStore) {
+ FuncSpec funcSpec = ((LOStore) op).getOutputSpec().getFuncSpec();
+ instrumentedQueryPlan.replace(op, new LOStore(instrumentedQueryPlan, new FileSpec(getTempFilename(), funcSpec)));
+ }
+ }
+ }
+
+ return instrumentedQueryPlan;
+ }
+
+ private Set<String> withinTaskUpstreamNeighbors(String alias) {
+ if (!isCrossTaskLogicalRelationalOperator(operator(alias))) return new HashSet<String>(inEdges(alias));
+ else return new HashSet<String>();
+ }
+
+ private Set<String> withinTaskDownstreamNeighbors(String alias) {
+ Set<String> neighbors = new HashSet<String>();
+ for (String otherAlias : aliases()) {
+ if (inEdges(otherAlias).contains(alias) && !isCrossTaskLogicalRelationalOperator(operator(otherAlias))) {
+ neighbors.add(otherAlias);
+ }
+ }
+ return neighbors;
+ }
+
+ private Set<String> crossTaskDownstreamNeighbors(String alias) {
+ Set<String> neighbors = new HashSet<String>();
+ for (String otherAlias : aliases()) {
+ if (inEdges(otherAlias).contains(alias) && isCrossTaskLogicalRelationalOperator(operator(otherAlias))) {
+ neighbors.add(otherAlias);
+ }
+ }
+ return neighbors;
+ }
+
+ private Map<String, List<Integer>> incomingCrossTaskKeyFields(String alias) throws Exception {
+ if (isCrossTaskLogicalRelationalOperator(operator(alias))) {
+ Map<String, List<Integer>> result = new HashMap<String, List<Integer>>();
+ int pos = 0;
+ for (String originAlias : inEdges(alias)) {
+ List<Integer> oneResult = new ArrayList<Integer>();
+ if (operator(alias).equals("LOCogroup")) {
+ int numKeys = opKeys(alias).get(pos).size();
+ if (numKeys > 1) { // (co)group with multiple group keys
+ throw new Exception("Penny tagging does not support complex expressions in group/cogroup.");
+ } else if (numKeys == 1) { // (co)group with single group key
+ oneResult.add(0);
+ } else { // "group all" or "group <const>"
+ // leave empty
+ }
+ } else if (operator(alias).equals("LOSort")) {
+ oneResult.addAll(generateFieldList(0, arity(alias))); // for sort, use all fields
+ } else if (operator(alias).equals("LOJoin")) {
+ if (pos == 0) oneResult.addAll(generateFieldList(0, arity(originAlias))); // for join, use all fields from one input
+ else oneResult.addAll(generateFieldList(arity(originAlias), arity(alias)));
+ } else {
+ throw new RuntimeException("Unrecognized operator: " + operator(alias));
+ }
+ result.put(originAlias, oneResult);
+ pos++;
+ }
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ private List<Integer> outgoingCrossTaskKeyFields(String alias) throws Exception {
+ String dstAlias = outEdge(alias);
+ int pos = inEdges(dstAlias).indexOf(alias);
+ if (dstAlias != null && isCrossTaskLogicalRelationalOperator(operator(dstAlias))) {
+ if (operator(dstAlias).equals("LOCogroup")) {
+ return opKeys(dstAlias).get(pos);
+ } else if (operator(dstAlias).equals("LOJoin") || operator(dstAlias).equals("LOSort")) {
+ return generateFieldList(0, arity(alias));
+ } else {
+ throw new RuntimeException("Unrecognized operator: " + operator(dstAlias));
+ }
+ } else {
+ return null;
+ }
+ }
+
+ private static List<Integer> generateFieldList(int a, int b) {
+ List<Integer> l = new ArrayList<Integer>();
+ for (int i = a; i < b; i++) l.add(i);
+ return l;
+ }
+
+ private static boolean isCrossTaskLogicalRelationalOperator(String opName) {
+ return (opName.equals("LOCogroup") || opName.equals("LOCross") || opName.equals("LOJoin") || opName.equals("LOSort"));
+ }
+
+ private static void addAgentLogicalRelationalOperator(LogicalPlan queryPlan, Map<String, LogicalRelationalOperator> aliasOperatorMap, String alias, ClassWithArgs monitorClass, InetSocketAddress masterAddr, List<String> logicalIds, Set<String> withinTaskUpstreamNeighbors, Set<String> withinTaskDownstreamNeighbors, Set<String> crossTaskDownstreamNeighbors, Map<String, List<Integer>> incomingCrossTaskKeyFields, List<Integer> outgoingCrossTaskKeyFields) throws InstantiationException, IllegalAccessException, IOException {
+ //String monitorFuncName = "monitor_" + alias;
+ MonitorAgentUDFArgs args = new MonitorAgentUDFArgs(alias, monitorClass, masterAddr, logicalIds, withinTaskUpstreamNeighbors, withinTaskDownstreamNeighbors, crossTaskDownstreamNeighbors, incomingCrossTaskKeyFields, outgoingCrossTaskKeyFields);
+
+ // create pig LogicalRelationalOperator equivalent to this syntax:
+ // "DEFINE " + monitorFuncName + " " + MonitorAgentUDF.class.getCanonicalName() + "(\'" + ObjectSerializer.serialize(args) + "\');
+ // alias + " = FOREACH " + alias + " GENERATE FLATTEN(" + monitorFuncName + "(" + monitorFieldList(monitorClass.theClass()) + "));
+
+ FuncSpec funcSpec = new FuncSpec(MonitorAgentUDF.class.getCanonicalName(), ObjectSerializer.serialize(args));
+
+ LOForEach agentOp = new LOForEach(queryPlan);
+ LogicalPlan innerPlan = new LogicalPlan();
+ agentOp.setInnerPlan(innerPlan);
+
+ boolean[] flatten = { true };
+ List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>(1);
+ LogicalRelationalOperator innerLoad = new LOInnerLoad(innerPlan, agentOp, -1); // -1 in the 3rd arg. means "*"
+ LogicalRelationalOperator innerGenerate = new LOGenerate(innerPlan, exps, flatten);
+ innerPlan.add(innerLoad);
+ innerPlan.add(innerGenerate);
+ innerPlan.connect(innerLoad, innerGenerate);
+
+ LogicalExpressionPlan lep = new LogicalExpressionPlan();
+ LogicalExpression proj = new ProjectExpression(lep, 0, -1, innerGenerate); // -1 in the 3rd arg. means "*"
+ LogicalExpression func = new UserFuncExpression(lep, funcSpec);
+ lep.add(proj);
+ lep.add(func);
+ lep.connect(func, proj);
+ exps.add(lep);
+
+ LogicalRelationalOperator opBefore = aliasOperatorMap.get(alias);
+ LogicalRelationalOperator opAfter = (LogicalRelationalOperator) queryPlan.getSuccessors(opBefore).get(0);
+ queryPlan.add(agentOp);
+ queryPlan.insertBetween(opBefore, agentOp, opAfter);
+ }
+
+ private String monitorFieldList(Class monitorClass) throws InstantiationException, IllegalAccessException {
+ Set<Integer> fields = ((MonitorAgent) monitorClass.newInstance()).furnishFieldsToMonitor();
+ if (fields == null) {
+ return "*";
+ } else {
+ StringBuffer sb = new StringBuffer();
+
+ for (Iterator<Integer> it = fields.iterator(); it.hasNext(); ) {
+ sb.append("$" + it.next());
+ if (it.hasNext()) sb.append(", ");
+ }
+ return sb.toString();
+ }
+ }
+
+ private String getTempFilename() {
+ return ("/tmp/t_" + Math.abs(rand.nextInt()));
+ }
+
+ @Override
+ public String toString() {
+ return queryPlan.toString();
+ }
+
+}
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PennyServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PennyServer.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PennyServer.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PennyServer.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+
+public class PennyServer {
+
+ private final PigContext pigContext;
+ private static ExecType execType = ExecType.MAPREDUCE;
+ private static Properties properties;
+ public static void setExecType(ExecType execType) {
+ PennyServer.execType = execType;
+ }
+ public static void setProperties(Properties properties) {
+ PennyServer.properties = properties;
+ }
+
+ public PennyServer() throws ExecException {
+ pigContext = new PigContext(execType, properties);
+ }
+
+ public ParsedPigScript parse(String pigScriptFilename) throws IOException {
+ return new ParsedPigScript(pigContext, pigScriptFilename);
+ }
+
+ public PigContext getPigContext() {
+ return pigContext;
+ }
+
+}
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PhysicalLocation.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PhysicalLocation.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PhysicalLocation.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/PhysicalLocation.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny;
+
+
+public class PhysicalLocation implements Location {
+
+ private final LogicalLocation logLoc;
+ private final int physId;
+
+ private static final PhysicalLocation coordinatorLocation = new PhysicalLocation("", -1);
+ public static PhysicalLocation coordinatorLocation() {
+ return coordinatorLocation;
+ }
+
+ public PhysicalLocation(LogicalLocation logLoc, int physId) {
+ this.logLoc = logLoc;
+ this.physId = physId;
+ }
+
+ public PhysicalLocation(String logId, int physId) {
+ this(new LogicalLocation(logId), physId);
+ }
+
+ @Override
+ public LogicalLocation asLogical() {
+ return logLoc;
+ }
+
+ @Override
+ public boolean isLogicalOnly() {
+ return false;
+ }
+
+ @Override
+ public String logId() {
+ return logLoc.logId();
+ }
+
+ public int physId() {
+ return physId;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof PhysicalLocation)) return false;
+ PhysicalLocation plOther = (PhysicalLocation) other;
+ return (plOther.logLoc.equals(this.logLoc) && (plOther.physId == this.physId));
+ }
+
+ @Override
+ public int hashCode() {
+ return logLoc.hashCode() + physId;
+ }
+
+ @Override
+ public String toString() {
+ if (this.equals(coordinatorLocation())) return "Loc[COORD]";
+ else return "Loc[" + logId() + ";" + physId + "]";
+ }
+
+}
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTCoordinator.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTCoordinator.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTCoordinator.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTCoordinator.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny.apps.bt;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.penny.Coordinator;
+import org.apache.pig.penny.Location;
+
+
+
+public class BTCoordinator extends Coordinator {
+
+ private Set<String> matchedTags = new HashSet<String>();
+ private Map<String, Collection<Tuple>> taggedTuples = new HashMap<String, Collection<Tuple>>();
+
+ @Override
+ public Object finish() {
+ Collection<Tuple> traceResults = new LinkedList<Tuple>();
+ for (String tag : matchedTags) {
+ traceResults.addAll(taggedTuples.get(tag));
+ }
+ return traceResults;
+ }
+
+ @Override
+ public void init(Serializable[] args) {
+ }
+
+ @Override
+ public void receiveMessage(Location source, Tuple message) {
+ try {
+ if (message.get(0).equals("matched_tags")) {
+ Tuple tagsAsTuple = (Tuple) message.get(1);
+ for (int i = 0; i < tagsAsTuple.size(); i++) {
+ matchedTags.add((String) tagsAsTuple.get(i));
+ }
+ } else {
+ Tuple t = (Tuple) message.get(0);
+ Tuple tagsAsTuple = (Tuple) message.get(1);
+ for (int i = 0; i < tagsAsTuple.size(); i++) {
+ String tag = (String) tagsAsTuple.get(i);
+ if (!taggedTuples.containsKey(tag)) taggedTuples.put(tag, new LinkedList<Tuple>());
+ taggedTuples.get(tag).add(t);
+ }
+ }
+ } catch (ExecException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTInjectTaintMonitorAgent.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTInjectTaintMonitorAgent.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTInjectTaintMonitorAgent.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTInjectTaintMonitorAgent.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny.apps.bt;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.penny.Location;
+import org.apache.pig.penny.MonitorAgent;
+
+
+
+public class BTInjectTaintMonitorAgent extends MonitorAgent {
+
+ private int counter = 0;
+
+ @Override
+ public void finish() {
+ }
+
+ @Override
+ public Set<Integer> furnishFieldsToMonitor() {
+ return null;
+ }
+
+ @Override
+ public void init(Serializable[] args) {
+ }
+
+ @Override
+ public Set<String> observeTuple(Tuple t, Set<String> tags) throws ExecException {
+ // for now, consider every tuple as a candidate; later restrict the candidate set using some user-supplied predicate
+
+ String tag = communicator().myLocation() + "_" + (counter++); // assign a unique tag to every candidate tuple
+ Set<String> injectedTags = new HashSet<String>();
+ injectedTags.add(tag);
+ Tuple tagAsTuple = new DefaultTuple();
+ tagAsTuple.append(tag);
+ communicator().sendToCoordinator(t, tagAsTuple);
+ return injectedTags;
+ }
+
+ @Override
+ public void receiveMessage(Location source, Tuple message) {
+ }
+
+}
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTMatchTaintMonitorAgent.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTMatchTaintMonitorAgent.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTMatchTaintMonitorAgent.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTMatchTaintMonitorAgent.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny.apps.bt;
+
+import java.io.Serializable;
+import java.util.Set;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.penny.Location;
+import org.apache.pig.penny.MonitorAgent;
+
+
+
+public class BTMatchTaintMonitorAgent extends MonitorAgent {
+
+ private String traceTuple;
+
+ @Override
+ public void finish() {
+ }
+
+ @Override
+ public Set<Integer> furnishFieldsToMonitor() {
+ return null;
+ }
+
+ @Override
+ public void init(Serializable[] args) {
+ this.traceTuple = (String) args[0];
+ }
+
+ @Override
+ public Set<String> observeTuple(Tuple t, Set<String> tags) throws ExecException {
+ if (t.toString().equals(traceTuple)) {
+ Tuple tagsAsTuple = new DefaultTuple();
+ for (String tag : tags) tagsAsTuple.append(tag);
+ communicator().sendToCoordinator("matched_tags", tagsAsTuple);
+ }
+ return FILTER_OUT;
+ }
+
+ @Override
+ public void receiveMessage(Location source, Tuple message) {
+ }
+
+}
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTPropagateTaintMonitorAgent.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTPropagateTaintMonitorAgent.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTPropagateTaintMonitorAgent.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/BTPropagateTaintMonitorAgent.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny.apps.bt;
+
+import java.io.Serializable;
+import java.util.Set;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.penny.Location;
+import org.apache.pig.penny.MonitorAgent;
+
+
+
+public class BTPropagateTaintMonitorAgent extends MonitorAgent {
+
+ @Override
+ public void finish() {
+ }
+
+ @Override
+ public Set<Integer> furnishFieldsToMonitor() {
+ return null;
+ }
+
+ @Override
+ public void init(Serializable[] args) {
+ }
+
+ @Override
+ public Set<String> observeTuple(Tuple t, Set<String> tags) throws ExecException {
+ if (!tags.isEmpty()) {
+ Tuple tagsAsTuple = new DefaultTuple();
+ for (String tag : tags) tagsAsTuple.append(tag);
+ communicator().sendToCoordinator(t, tagsAsTuple);
+ }
+ return tags;
+ }
+
+ @Override
+ public void receiveMessage(Location source, Tuple message) {
+ }
+
+}
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/Main.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/Main.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/bt/Main.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny.apps.bt;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.penny.ClassWithArgs;
+import org.apache.pig.penny.ParsedPigScript;
+import org.apache.pig.penny.PennyServer;
+
+
+
+/**
+ * Backward tracing app.
+ */
+public class Main {
+
+ public static void main(String[] args) throws Exception {
+
+ PennyServer pennyServer = new PennyServer();
+ String pigScriptFilename = args[0];
+ ParsedPigScript parsedPigScript = pennyServer.parse(pigScriptFilename);
+
+ String traceAlias = args[1]; // script alias of tuple to get trace of
+ String traceTuple = args[2]; // tuple to trace (has to match exactly tuple.toSTring())
+
+ // Ibis change : start
+ if (!(parsedPigScript.aliases().contains(traceAlias) && parsedPigScript.operator(traceAlias).equals("LOLoad"))) throw new IllegalArgumentException("Invalid alias.");
+ // Ibis change : end
+
+ Map<String, ClassWithArgs> monitorClasses = new HashMap<String, ClassWithArgs>();
+ for (String alias : parsedPigScript.aliases()) {
+ // Ibis change : start
+ if (parsedPigScript.operator(alias).equals("LOLoad")) {
+ // Ibis change : end
+ monitorClasses.put(alias, new ClassWithArgs(BTInjectTaintMonitorAgent.class));
+ } else if (alias.equals(traceAlias)) {
+ monitorClasses.put(alias, new ClassWithArgs(BTMatchTaintMonitorAgent.class, traceTuple));
+ } else {
+ monitorClasses.put(alias, new ClassWithArgs(BTPropagateTaintMonitorAgent.class));
+ }
+ }
+ Collection<Tuple> traceResults = (Collection<Tuple>) parsedPigScript.trace(BTCoordinator.class, monitorClasses);
+ System.out.println("*** TRACE RESULTS:" + traceResults);
+ }
+
+}
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CICoordinator.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CICoordinator.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CICoordinator.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CICoordinator.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny.apps.ci;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.penny.Coordinator;
+import org.apache.pig.penny.Location;
+import org.apache.pig.penny.PhysicalLocation;
+
+
+
+public class CICoordinator extends Coordinator {
+
+ private Map<PhysicalLocation, Tuple> latestTuples = new HashMap<PhysicalLocation, Tuple>(); // latest tuple from each monitor agent
+
+ public void init(Serializable[] args) {
+ }
+
+ public Object finish() {
+ return latestTuples; // return latest tuples
+ }
+
+ public void receiveMessage(Location source, Tuple message) {
+ // keep track of latest tuple from each monitor agent
+ latestTuples.put((PhysicalLocation) source, message);
+ }
+
+}
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CIMonitorAgent.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CIMonitorAgent.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CIMonitorAgent.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/CIMonitorAgent.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny.apps.ci;
+
+import java.io.Serializable;
+import java.util.Set;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.penny.Location;
+import org.apache.pig.penny.MonitorAgent;
+
+
+
+public class CIMonitorAgent extends MonitorAgent {
+
+ private int firstToMonitor; // first tuple # to monitor
+ private int skipSize; // # of tuples to skip between monitoring
+ private int tupleNum = 0; // # of tuples seen so far
+
+ public Set<Integer> furnishFieldsToMonitor() {
+ return null; // monitor all fields
+ }
+
+ public void init(Serializable[] args) {
+ firstToMonitor = (Integer) args[0];
+ skipSize = (Integer) args[1];
+ }
+
+ public Set<String> observeTuple(Tuple t, Set<String> tags) {
+ if ((tupleNum >= firstToMonitor) && (tupleNum % skipSize == 0)) {
+ communicator().sendToCoordinator(tupleNum, t);
+ }
+ tupleNum++;
+ return NO_TAGS;
+ }
+
+ public void receiveMessage(Location source, Tuple message) {
+ }
+
+ public void finish() {
+ }
+
+}
Added: pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/Main.java?rev=1092378&view=auto
==============================================================================
--- pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/Main.java (added)
+++ pig/trunk/contrib/penny/java/src/main/java/org/apache/pig/penny/apps/ci/Main.java Thu Apr 14 16:11:22 2011
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.penny.apps.ci;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.penny.ClassWithArgs;
+import org.apache.pig.penny.ParsedPigScript;
+import org.apache.pig.penny.PennyServer;
+import org.apache.pig.penny.PhysicalLocation;
+
+
+
+/**
+ * Crash investigator app.
+ */
+public class Main {
+
+ public static void main(String[] args) throws Exception {
+
+ System.out.println("***** CRASH INVESTIGATOR STARTING ...");
+
+ PennyServer pennyServer = new PennyServer();
+ String pigScriptFilename = args[0];
+ ParsedPigScript parsedPigScript = pennyServer.parse(pigScriptFilename);
+
+ Map<String, Integer> lowerBounds = new HashMap<String, Integer>(); // lower bound on tuple num for each logical location
+ Map<PhysicalLocation, Tuple> latestTuples = null;
+ for (int skipSize = 100; skipSize > 0; skipSize /= 10) {
+ System.out.println("***** TRYING SCRIPT ...");
+
+ Map<String, ClassWithArgs> monitorClasses = new HashMap<String, ClassWithArgs>();
+ for (String alias : parsedPigScript.aliases()) {
+ int lowerBound = (lowerBounds.containsKey(alias))? lowerBounds.get(alias) : 0;
+ monitorClasses.put(alias, new ClassWithArgs(CIMonitorAgent.class, lowerBound, skipSize));
+ }
+ latestTuples = (Map<PhysicalLocation, Tuple>) parsedPigScript.trace(CICoordinator.class, monitorClasses);
+
+ // update lower bounds
+ lowerBounds.clear();
+ for (Entry<PhysicalLocation, Tuple> entry : latestTuples.entrySet()) {
+ String alias = entry.getKey().logId();
+ int tupleNum = (Integer) entry.getValue().get(0);
+ if (lowerBounds.containsKey(alias)) lowerBounds.put(alias, Math.min(lowerBounds.get(alias), tupleNum));
+ else lowerBounds.put(alias, tupleNum);
+ }
+ }
+
+ Map<String, Set<Tuple>> crashCulprits = new HashMap<String, Set<Tuple>>();
+ for (Entry<PhysicalLocation, Tuple> entry : latestTuples.entrySet()) {
+ String alias = entry.getKey().logId();
+ Tuple t = (Tuple) entry.getValue().get(1);
+ if (!crashCulprits.containsKey(alias)) crashCulprits.put(alias, new HashSet<Tuple>());
+ crashCulprits.get(alias).add(t);
+ }
+
+ System.out.println("\n\n***** CRASH SUSPECTS:");
+ for (String alias : parsedPigScript.aliases()) {
+ if (crashCulprits.containsKey(alias)) {
+ System.out.println("\nAlias " + alias + ":");
+ for (Tuple t : crashCulprits.get(alias)) {
+ System.out.println("\t" + t);
+ }
+ }
+ }
+ }
+
+}