You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/12/01 01:29:59 UTC

svn commit: r1040839 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/ test/org/apache/pig/test/

Author: thejas
Date: Wed Dec  1 00:29:58 2010
New Revision: 1040839

URL: http://svn.apache.org/viewvc?rev=1040839&view=rev
Log:
PIG-1747: pattern match classes for matching patterns in physical plan 

Added:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PatternNode.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PatternPlan.java
    pig/trunk/test/org/apache/pig/test/TestPhyPatternMatch.java
Modified:
    pig/trunk/CHANGES.txt

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1040839&r1=1040838&r2=1040839&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Dec  1 00:29:58 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1747: pattern match classes for matching patterns in physical plan (thejas)
+
 PIG-1707: Allow pig build to pull from alternate maven repo to enable building
 against newer hadoop versions (pradeepkth)
 

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PatternNode.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PatternNode.java?rev=1040839&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PatternNode.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PatternNode.java Wed Dec  1 00:29:58 2010
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+
+
+/**
+ * Used for finding/representing a pattern in the plan
+ * This class represents a node in the pattern
+ */
+public class PatternNode extends Operator{
+
+
+    public PatternNode(OperatorPlan p) {
+        super("pattern", p);
+    }
+
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Does this node have to be the leaf node
+     */
+    private boolean isLeafNode = false;
+
+    /**
+     * Does this node have to be the source node
+     */
+    private boolean isSourceNode = false;
+
+//    /**
+//     * Is this node optional
+//     */
+//    private boolean isOptional;
+
+    /**
+     * The class this node in plan should be an instance of
+     */
+    private Class<?> className;
+
+
+
+    /**
+     * The node in plan that this pattern node matched 
+     */
+    private Object match;
+
+
+    /**
+     * @return the isLeafNode
+     */
+    public boolean isLeafNode() {
+        return isLeafNode;
+    }
+
+
+    /**
+     * Set isLeafNode to true if the node must be a source
+     * @param isLeafNode 
+     */
+    public void setLeafNode(boolean isLeafNode) {
+        this.isLeafNode = isLeafNode;
+    }
+
+
+    /**
+     * @return the isSourceNode
+     */
+    public boolean isSourceNode() {
+        return isSourceNode;
+    }
+
+
+    /**
+     * Set isSourceNode to true if the node must be a source
+     * @param isSourceNode 
+     */
+    public void setSourceNode(boolean isSourceNode) {
+        this.isSourceNode = isSourceNode;
+    }
+
+    /**
+     * @return the className
+     */
+    public Class<?> getClassName() {
+        return className;
+    }
+
+
+    /**
+     * @param className the className to set
+     */
+    public void setClassName(Class<?> className) {
+        this.className = className;
+    }
+
+
+    /**
+     * @return the match
+     */
+    public Object getMatch() {
+        return match;
+    }
+
+
+    /**
+     * @param match the match to set
+     */
+    public void setMatch(Object match) {
+        this.match = match;
+    }
+
+
+    @Override
+    public void accept(org.apache.pig.newplan.PlanVisitor v)
+            throws FrontendException {
+        //should not be called
+        throw new RuntimeException("function not implemented");
+    }
+
+
+    @Override
+    public boolean isEqual(Operator operator) throws FrontendException {
+        //dummy
+        return false;
+    }
+
+    @Override
+    public String toString(){
+        String str ="";
+        if(className != null){
+            str = className.toString();
+        }
+        return str;
+    }
+
+
+
+
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PatternPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PatternPlan.java?rev=1040839&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PatternPlan.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PatternPlan.java Wed Dec  1 00:29:58 2010
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.impl.plan.Operator;
+import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.newplan.BaseOperatorPlan;
+
+
+/**
+ * Used for finding/representing a pattern in the plan
+ * This class represents the pattern
+ * Finds only a single matching pattern
+ * This is finding a sub-graph( represented by pattern) in the graph(plan)
+ */
+public class PatternPlan extends BaseOperatorPlan{
+
+    // this is used to keep track of any nodes whose match is to be reset
+    // when we backtrack after finding mismatch
+    ArrayList<PatternNode> ptNodesVisited = new ArrayList<PatternNode>();
+    OperatorPlan currentPlan = null;
+
+    /**
+     * Return true if the given plan has nodes that match the pattern 
+     * represented by this class
+     * If a match is found, the PatterNodes in the plan will return non 
+     * null node for getMatch(). 
+     * @param inpPlan - input plan to match
+     * @return true if match is found
+     */
+    public boolean match(OperatorPlan<? extends Operator<?>> inpPlan){
+        reset();
+
+        PatternPlan pattern = this;
+        currentPlan = inpPlan;
+
+        if(pattern.size() == 0){
+            return true;
+        }
+
+        PatternNode ptNode = (PatternNode) pattern.getSinks().get(0);
+        //try matching the pattern with the plan, starting with ptNode
+        Iterator it = currentPlan.iterator();
+        while(it.hasNext()){
+            Operator<?> plOp = (Operator<?>) it.next();
+            if(match(ptNode, plOp)){
+                if(this.size() != ptNodesVisited.size()){
+                    //BUG
+                    throw new RuntimeException("invalid size of pattern nodes visited");
+                }
+
+                return true;
+            }
+
+        }
+        return false;
+    }
+
+    /**
+     * Reset the matching information if the pattern has been used to find 
+     * a match
+     */
+    void reset(){
+        Iterator<org.apache.pig.newplan.Operator> iter =  this.getOperators();
+        while(iter.hasNext()){
+            PatternNode ptNode = (PatternNode) iter.next();
+            ptNode.setMatch(null);
+        }
+        ptNodesVisited.clear(); 
+
+    }
+
+    /**
+     * Check if the pattern node ptNode matches given Operator plOp
+     * @param ptNode
+     * @param plOp
+     * @return
+     */
+    // to suppress warnings from currentPlan.getPredecessors and 
+    // getSuccessors. 
+    @SuppressWarnings("unchecked") 
+    private boolean match(PatternNode ptNode, Operator plOp) {
+        if(ptNode.getMatch() != null && ptNode.getMatch() == plOp){
+            return true;
+        }
+
+        Class<?> ptClass = ptNode.getClassName();
+        Class<?> plClass = plOp.getClass();
+        //        if(!ptClass.getClass().isInstance(plOp)){
+        //            return false;
+        //        }
+        if(ptClass != plClass){
+            return false;
+        }
+
+        if(ptNode.isLeafNode()){
+            //pattern node requires matching plan node to be a sink/leaf
+            if(currentPlan.getSuccessors(plOp) != null 
+                    && currentPlan.getSuccessors(plOp).size() > 0){
+                return false;
+            }
+        }
+
+        if(ptNode.isSourceNode()){
+            //pattern node requires matching plan node to be a source/root
+            if(currentPlan.getPredecessors(plOp) != null 
+                    && currentPlan.getPredecessors(plOp).size() > 0){
+                return false;
+            }
+        }
+        //set this as match for now, it also indicates that this node is expected
+        // to match this operator while traversing other nodes
+        ptNode.setMatch(plOp);
+        int ptNodesVisitedIdx = ptNodesVisited.size();
+        ptNodesVisited.add(ptNode);
+
+        //try matching predecessors of this pattern node with the plan predecessors
+        List<org.apache.pig.newplan.Operator> ptPreds = this.getPredecessors(ptNode);
+        List<Operator<?>> plPreds = currentPlan.getPredecessors(plOp);
+        if(! match(ptPreds, plPreds)){
+            resetNewlyMatchedPtNodes(ptNodesVisitedIdx);
+            return false;
+        }
+
+        //try matching successors of this pattern node with the plan successors
+        List<org.apache.pig.newplan.Operator> ptSuccs = this.getSuccessors(ptNode);
+        List<Operator<?>> plSuccs = currentPlan.getSuccessors(plOp);
+        if(! match(ptSuccs, plSuccs)){
+            resetNewlyMatchedPtNodes(ptNodesVisitedIdx);
+            return false;
+        }
+
+        return true;
+    }
+
+    private void resetNewlyMatchedPtNodes(int ptNodesVisitedIdx) {
+        for(int i=ptNodesVisited.size() - 1; i >= ptNodesVisitedIdx; i--){
+            ptNodesVisited.get(i).setMatch(null);
+            ptNodesVisited.remove(i);
+        }
+    }
+
+
+    /**
+     * try matching list of pattern nodes with list of plan nodes . these are
+     * either predecessors or successors of a matching node
+     * if pattern nodes is a ordered subset of plan nodes, return true
+     * @param ptList list of pattern nodes
+     * @param plList list of plan nodes
+     * @return true if matched
+     */
+    private boolean match(List<org.apache.pig.newplan.Operator> ptList,
+            List<Operator<?>> plList) {
+
+        if(ptList == null || ptList.size() == 0){
+            return true;
+        }
+        if(plList == null){
+            return false;
+        }
+
+        // pattern list has to be smaller than plan list , as it is going
+        // to be a subset
+        if(ptList.size() > plList.size()){
+            return false;
+        }
+
+        int plStart = 0;
+        int ptIdx = 0;
+
+        // while there are sufficient nodes in list to be matched with list
+        // in pattern
+        while( (plList.size() - plStart) >=  ptList.size()){
+            // try matching the plan list with pattern list starting from
+            // plStart
+            for(int i = plStart; i < plList.size(); i++){
+                Operator<?> plNode = plList.get(i);
+                PatternNode ptNode = (PatternNode) ptList.get(ptIdx);
+                if(ptNode.getMatch() != null){
+                    if(plNode != ptNode.getMatch()){
+                        // an already matched node has to match same node again
+                        // if not start comparing again from next plan position
+                        ptIdx = 0;
+                        plStart++;
+                        break;                        
+                    }else{
+                        //already matched
+                        ptIdx++;
+                        if(ptIdx == ptList.size()){
+                            //matched the patter nodes list
+                            return true;
+                        }
+                    }
+                }
+                else if(!match(ptNode, plNode)){
+                    //not matched, start comparing again from next plan position
+                    ptIdx = 0;
+                    plStart++;
+                    break;
+                }else{
+                    //matched
+                    ptIdx++;
+                    if(ptIdx == ptList.size()){
+                        //matched the patter nodes list
+                        return true;
+                    }
+                }
+            }
+        }
+
+        return false;
+
+    }
+
+    /**
+     * This function can be used to create a new PatternPlan if the pattern
+     * nodes have at most one parent/child, and they are connected to each other.
+     * The PatternNode corresponding to the i'th class in classList will be
+     * the predecessor of the one corresponding to i+1'th class.
+     * @param classList
+     * @return new PatterPlan corresponding to classList
+     */
+    public static PatternPlan create(Class<?>[] classList) {
+        PatternPlan ptPlan = new PatternPlan();
+        PatternNode prevNode = null;
+        for(Class<?> ptClass : classList){
+            PatternNode ptNode = new PatternNode(ptPlan);
+            ptNode.setClassName(ptClass);
+            ptPlan.add(ptNode);
+            if(prevNode != null){
+                ptPlan.connect(prevNode, ptNode);
+            }
+            prevNode = ptNode;
+        }
+        return ptPlan;
+    }
+
+}

Added: pig/trunk/test/org/apache/pig/test/TestPhyPatternMatch.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPhyPatternMatch.java?rev=1040839&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPhyPatternMatch.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestPhyPatternMatch.java Wed Dec  1 00:29:58 2010
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PatternNode;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PatternPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestPhyPatternMatch {
+  
+    int opKeyNum = 0;
+    
+    @Before
+    public void setUp() throws Exception {
+        opKeyNum = 0;
+    }
+    
+    
+    @Test
+    public void testSingleNodePattern() throws PlanException{
+        
+        //create pattern 
+        PatternPlan ptPlan = new PatternPlan();
+        PatternNode ptFilNode = new PatternNode(ptPlan);
+        ptFilNode.setClassName(POFilter.class);
+        ptPlan.add(ptFilNode);
+                
+        //create plan with single ForEach node
+        PhysicalPlan pplan = new PhysicalPlan();
+        POForEach fe = new POForEach(getNewOpKey()); 
+        pplan.add(fe);
+
+        // verify that match is false
+        boolean matched = ptPlan.match(pplan);
+        assertFalse("plan not matched", matched);
+
+        
+        //add a filter to the plan (fe -> fil)
+        POFilter fil = new POFilter(getNewOpKey()); 
+        pplan.add(fil);
+        pplan.connect(fe, fil);
+        
+        //verify that pattern matches
+        matched = ptPlan.match(pplan);
+        assertTrue("plan matched", matched);
+        assertEquals(" class matched ", ptFilNode.getMatch(), fil);
+        
+        //test leaf/source settings in pattern node
+        ptFilNode.setSourceNode(true);
+        assertFalse("plan matched", ptPlan.match(pplan));
+        
+        ptFilNode.setSourceNode(false);
+        ptFilNode.setLeafNode(true);
+        assertTrue("plan matched", ptPlan.match(pplan));
+        
+        
+        
+    }
+
+    @Test
+    public void testTwoNodePattern() throws PlanException{
+        //create pattern (foreach -> filter)
+        Class<?>[] nodes = {POForEach.class, POFilter.class};
+        PatternPlan ptPlan = PatternPlan.create(nodes);
+        PatternNode ptFilNode = (PatternNode) ptPlan.getSinks().get(0);
+        PatternNode ptFENode = (PatternNode) ptPlan.getSources().get(0);
+
+        
+        //create plan with single Filter node
+        PhysicalPlan pplan = new PhysicalPlan();
+        POFilter fil = new POFilter(getNewOpKey()); 
+        pplan.add(fil);
+
+        // verify that match is false
+        assertFalse("plan not matched", ptPlan.match(pplan));
+        
+        //verify that there is no match in the pattern nodes
+        assertEquals("null match", ptFilNode.getMatch(), null);
+        assertEquals("null match", ptFENode.getMatch(), null);
+
+        //add a foreach to the plan (fe -> fil)
+        POForEach fe = new POForEach(getNewOpKey()); 
+        pplan.add(fe);
+        pplan.connect(fe, fil);
+
+        // verify that match is true
+        assertTrue("plan matched", ptPlan.match(pplan));
+
+        // set leaf and source properties and try again
+        ptFilNode.setLeafNode(true);
+        ptFENode.setSourceNode(true);
+        assertTrue("plan matched", ptPlan.match(pplan));        
+        
+        //add a store to the plan to make it (fe -> fil -> store)
+        POStore store = new POStore(getNewOpKey());
+        pplan.add(store);
+        pplan.connect(fil, store);
+        
+        // match should fail because filter pt node leaf property is set
+        assertFalse("plan matched", ptPlan.match(pplan));
+        //reset patter filter node leaf property
+        ptFilNode.setLeafNode(false);
+        
+        
+        // verify that match is true
+        assertTrue("plan matched", ptPlan.match(pplan));
+        assertEquals("filter pt node match", ptFilNode.getMatch(), fil);
+        assertEquals("foreach pt node match", ptFENode.getMatch(), fe);
+               
+        
+        //add a store to the plan to make it (fe -> fe -> fil -> store)
+        POForEach fe2 = new POForEach(getNewOpKey());
+        pplan.add(fe2);
+        pplan.connect(fe2, fe);
+        
+        // match fails because fe pattern node is set to be source
+        assertFalse("plan matched", ptPlan.match(pplan));
+        ptFENode.setSourceNode(false);
+        
+        // verify that match is true
+        assertTrue("plan matched", ptPlan.match(pplan));
+        assertEquals("filter pt node match", ptFilNode.getMatch(), fil);
+        assertEquals("foreach pt node match", ptFENode.getMatch(), fe);
+        
+        //create new plan (fil -> fe)
+        PhysicalPlan pplan2 = new PhysicalPlan();
+        POFilter fil2 = new POFilter(getNewOpKey()); 
+        pplan.add(fil2);
+        POForEach fe21 = new POForEach(getNewOpKey()); 
+        pplan.add(fe21);
+        pplan.connect(fil2, fe21);
+
+        //verify that plan does not match
+        assertFalse("plan not matched", ptPlan.match(pplan2));
+        assertEquals("null match", ptFilNode.getMatch(), null);
+        assertEquals("null match", ptFENode.getMatch(), null);     
+        
+    }
+    
+    
+    @Test
+    public void testThreeNodePatternLinear() throws PlanException{
+        
+        //create pattern (fil -> FE -> store) 
+        PatternPlan ptPlan = new PatternPlan();
+        PatternNode ptFilNode = createPtNode(ptPlan, POFilter.class);
+        PatternNode ptFENode = createPtNode(ptPlan, POForEach.class);
+        PatternNode ptStNode = createPtNode(ptPlan, POStore.class);
+
+        ptPlan.connect(ptFilNode, ptFENode);
+        ptPlan.connect(ptFENode, ptStNode);
+        
+        //create plan fil -> fil -> fe -> store
+        PhysicalPlan pplan = new PhysicalPlan();
+        POFilter fil = new POFilter(getNewOpKey()); 
+        pplan.add(fil);
+        assertFalse("plan not matched", ptPlan.match(pplan));
+        assertEquals("null match", ptFilNode.getMatch(), null);
+        assertEquals("null match", ptFENode.getMatch(), null);
+        assertEquals("null match", ptStNode.getMatch(), null);
+        
+        POForEach fe = new POForEach(getNewOpKey()); 
+        pplan.add(fe);
+        pplan.connect(fil, fe);
+        assertFalse("plan not matched", ptPlan.match(pplan));
+        
+        POFilter fil2 = new POFilter(getNewOpKey()); 
+        pplan.add(fil2);
+        pplan.connect(fil2, fil);
+        assertFalse("plan not matched", ptPlan.match(pplan));
+        
+        POStore store = new POStore(getNewOpKey()); 
+        pplan.add(store);
+        pplan.connect(fe, store);
+        assertTrue("plan matched", ptPlan.match(pplan));
+        
+        assertEquals("test match node", ptFilNode.getMatch(), fil);
+        assertEquals("test match node", ptFENode.getMatch(), fe);
+        assertEquals("test match node", ptStNode.getMatch(), store);
+        
+        
+        
+    }
+    
+    
+    @Test
+    public void testThreeNodePatternTwoParents() throws PlanException, ExecException{
+        //create pattern fe   fil
+        //                \   /
+        //                frJoin
+        PatternPlan ptPlan = new PatternPlan();
+        PatternNode ptFilNode = createPtNode(ptPlan, POFilter.class);
+        PatternNode ptFENode = createPtNode(ptPlan, POForEach.class);
+        PatternNode ptJoinNode = createPtNode(ptPlan, POFRJoin.class);
+
+        ptPlan.connect(ptFilNode, ptJoinNode);
+        ptPlan.connect(ptFENode, ptJoinNode);
+        
+        //create plan 
+        PhysicalPlan pplan = new PhysicalPlan();
+        POFilter fil = new POFilter(getNewOpKey()); 
+        pplan.add(fil);
+        assertFalse("plan not matched", ptPlan.match(pplan));
+        
+        POForEach fe = new POForEach(getNewOpKey()); 
+        pplan.add(fe);
+        assertFalse("plan not matched", ptPlan.match(pplan));
+        
+        POFRJoin join = new POFRJoin(getNewOpKey(), 0, null,
+                new ArrayList<List<PhysicalPlan>>(), null, null, 0, false,null); 
+        pplan.add(join);
+        pplan.connect(fil, join);
+        pplan.connect(fe, join);
+        assertTrue("plan matched", ptPlan.match(pplan));
+        assertEquals("test match node", ptFilNode.getMatch(), fil);
+        assertEquals("test match node", ptFENode.getMatch(), fe);
+        assertEquals("test match node", ptJoinNode.getMatch(), join);
+        
+    }
+    
+    
+    private PatternNode createPtNode(PatternPlan ptPlan, Class<?> ptClass) {
+        PatternNode ptNode = new PatternNode(ptPlan);
+        ptNode.setClassName(ptClass);
+        ptPlan.add(ptNode);
+        return ptNode;
+        
+    }
+
+
+    private OperatorKey getNewOpKey() {
+        return new OperatorKey("", ++opKeyNum);
+    }
+    
+    
+}