You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/01/28 17:32:14 UTC

svn commit: r1562110 - in /pig/branches/tez: shims/test/hadoop20/org/apache/pig/test/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/backend/hadoop/executionengine/tez/ test/ test/org/apache/pig/...

Author: cheolsoo
Date: Tue Jan 28 16:32:13 2014
New Revision: 1562110

URL: http://svn.apache.org/r1562110
Log:
PIG-3728: Fix TestSkewedJoin in tez mode (cheolsoo)

Modified:
    pig/branches/tez/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java
    pig/branches/tez/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java
    pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java
    pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java
    pig/branches/tez/test/tez-tests

Modified: pig/branches/tez/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java?rev=1562110&r1=1562109&r2=1562110&view=diff
==============================================================================
--- pig/branches/tez/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java (original)
+++ pig/branches/tez/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java Tue Jan 28 16:32:13 2014
@@ -24,6 +24,7 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.pig.ExecType;
 
 public class MiniCluster extends MiniGenericCluster {
     private static final File CONF_DIR = new File("build/classes");
@@ -41,6 +42,11 @@ public class MiniCluster extends MiniGen
     }
 
     @Override
+    protected ExecType getExecType() {
+        return ExecType.MAPREDUCE;
+    }
+
+    @Override
     protected void setupMiniDfsAndMrClusters() {
         try {
             System.setProperty("hadoop.log.dir", "build/test/logs");

Modified: pig/branches/tez/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java?rev=1562110&r1=1562109&r2=1562110&view=diff
==============================================================================
--- pig/branches/tez/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/branches/tez/shims/test/hadoop20/org/apache/pig/test/TezMiniCluster.java Tue Jan 28 16:32:13 2014
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.test;
 
+import org.apache.pig.ExecType;
 import org.apache.pig.test.MiniGenericCluster;
 
 /**
@@ -28,6 +29,11 @@ public class TezMiniCluster extends Mini
     }
 
     @Override
+    protected ExecType getExecType() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     protected void setupMiniDfsAndMrClusters() {
         throw new UnsupportedOperationException();
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java?rev=1562110&r1=1562109&r2=1562110&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java Tue Jan 28 16:32:13 2014
@@ -119,7 +119,7 @@ public class POSkewedJoin extends Physic
     }
 
     @Override
-  public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         return null;
     }
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java?rev=1562110&r1=1562109&r2=1562110&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POPartitionRearrangeTez.java Tue Jan 28 16:32:13 2014
@@ -203,8 +203,9 @@ public class POPartitionRearrangeTez ext
             // We've already collected sampleMap in PigProcessor
             distMap = PigProcessor.sampleMap;
         } else {
-            throw new RuntimeException(this.getClass().getSimpleName() +
-                    " used but no key distribution found");
+            LOG.info("Key distribution map is empty");
+            inited = true;
+            return;
         }
 
         long start = System.currentTimeMillis();

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1562110&r1=1562109&r2=1562110&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Tue Jan 28 16:32:13 2014
@@ -215,9 +215,12 @@ public class PigProcessor implements Log
         BroadcastKVReader reader = (BroadcastKVReader) logicalInput.getReader();
         reader.next();
         Object val = reader.getCurrentValue();
-        NullableTuple nTup = (NullableTuple) val;
-        Tuple t = (Tuple) nTup.getValueAsPigType();
-        sampleMap = (Map<String, Object>) t.get(0);
+        if (val != null) {
+            // Sample is not empty
+            NullableTuple nTup = (NullableTuple) val;
+            Tuple t = (Tuple) nTup.getValueAsPigType();
+            sampleMap = (Map<String, Object>) t.get(0);
+        }
     }
 
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java?rev=1562110&r1=1562109&r2=1562110&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java Tue Jan 28 16:32:13 2014
@@ -53,8 +53,9 @@ public class SkewedPartitionerTez extend
             // We've collected sampleMap in PigProcessor
             distMap = PigProcessor.sampleMap;
         } else {
-            throw new RuntimeException(this.getClass().getSimpleName() +
-                    " used but no key distribution found");
+            LOG.info("Key distribution map is empty");
+            inited = true;
+            return;
         }
 
         long start = System.currentTimeMillis();

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java?rev=1562110&r1=1562109&r2=1562110&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java Tue Jan 28 16:32:13 2014
@@ -57,8 +57,9 @@ public class WeightedRangePartitionerTez
             // We've collected sampleMap in PigProcessor
             quantileMap = PigProcessor.sampleMap;
         } else {
-            throw new RuntimeException(this.getClass().getSimpleName()
-                    + " used but no quantiles found");
+            LOG.info("Quantiles map is empty");
+            inited = true;
+            return;
         }
 
         long start = System.currentTimeMillis();

Modified: pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java?rev=1562110&r1=1562109&r2=1562110&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java Tue Jan 28 16:32:13 2014
@@ -44,9 +44,9 @@ public class TestAccumulator {
     private static final String INPUT_FILE2 = "AccumulatorInput2.txt";
     private static final String INPUT_FILE3 = "AccumulatorInput3.txt";
     private static final String INPUT_FILE4 = "AccumulatorInput4.txt";
+    private static final String INPUT_DIR = "build/test/data";
 
     private PigServer pigServer;
-    private static String execType = System.getProperty("test.exec.type");
     private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
     @BeforeClass
@@ -64,10 +64,7 @@ public class TestAccumulator {
 
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
-        new File(INPUT_FILE1).delete();
-        new File(INPUT_FILE2).delete();
-        new File(INPUT_FILE3).delete();
-        new File(INPUT_FILE4).delete();
+        deleteFiles();
         cluster.shutDown();
     }
 
@@ -75,7 +72,7 @@ public class TestAccumulator {
     public void setUp() throws Exception {
         // Drop stale configuration from previous test run
         cluster.getProperties().remove(PigConfiguration.OPT_ACCUMULATOR);
-        pigServer = new PigServer(execType, cluster.getProperties());
+        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
     }
 
     @After
@@ -85,7 +82,9 @@ public class TestAccumulator {
     }
 
     private static void createFiles() throws IOException {
-        PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE1));
+        new File(INPUT_DIR).mkdir();
+
+        PrintWriter w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE1));
 
         w.println("100\tapple");
         w.println("200\torange");
@@ -96,9 +95,9 @@ public class TestAccumulator {
         w.println("400\tapple");
         w.close();
 
-        Util.copyFromLocalToCluster(cluster, INPUT_FILE1, INPUT_FILE1);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, INPUT_FILE1);
 
-        w = new PrintWriter(new FileWriter(INPUT_FILE2));
+        w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE2));
 
         w.println("100\t");
         w.println("100\t");
@@ -107,9 +106,9 @@ public class TestAccumulator {
         w.println("300\tstrawberry");
         w.close();
 
-        Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE2, INPUT_FILE2);
 
-        w = new PrintWriter(new FileWriter(INPUT_FILE3));
+        w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE3));
 
         w.println("100\t1.0");
         w.println("100\t2.0");
@@ -124,9 +123,9 @@ public class TestAccumulator {
         w.println("400\t");
         w.close();
 
-        Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE3, INPUT_FILE3);
 
-        w = new PrintWriter(new FileWriter(INPUT_FILE4));
+        w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE4));
 
         w.println("100\thttp://ibm.com,ibm");
         w.println("100\thttp://ibm.com,ibm");
@@ -134,7 +133,11 @@ public class TestAccumulator {
         w.println("300\thttp://sun.com,sun");
         w.close();
 
-        Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE4, INPUT_FILE4);
+    }
+
+    private static void deleteFiles() {
+        Util.deleteDirectory(new File(INPUT_DIR));
     }
 
     @Test

Modified: pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java?rev=1562110&r1=1562109&r2=1562110&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java Tue Jan 28 16:32:13 2014
@@ -31,7 +31,10 @@ import java.io.PrintWriter;
 import java.util.Iterator;
 import java.util.Map;
 
-import org.apache.pig.ExecType;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
@@ -41,9 +44,9 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.test.utils.TestHelper;
-import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestSkewedJoin {
@@ -54,25 +57,37 @@ public class TestSkewedJoin {
     private static final String INPUT_FILE5 = "SkewedJoinInput5.txt";
     private static final String INPUT_FILE6 = "SkewedJoinInput6.txt";
     private static final String INPUT_FILE7 = "SkewedJoinInput7.txt";
+    private static final String INPUT_DIR = "build/test/data";
+    private static final String OUTPUT_DIR = "skwedjoin";
 
     private PigServer pigServer;
-    private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+    private static FileSystem fs;
+    private static MiniGenericCluster cluster;
 
     @Before
     public void setUp() throws Exception {
         pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.maxtuple", "5");
         pigServer.getPigContext().getProperties().setProperty("pig.skewedjoin.reduce.memusage", "0.01");
+    }
+
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+        cluster = MiniGenericCluster.buildCluster();
+        fs = cluster.getFileSystem();
         createFiles();
     }
 
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
+        deleteFiles();
         cluster.shutDown();
     }
 
-    private void createFiles() throws IOException {
-        PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE1));
+    private static void createFiles() throws IOException {
+        new File(INPUT_DIR).mkdir();
+
+        PrintWriter w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE1));
 
         int k = 0;
         for(int j=0; j<120; j++) {
@@ -86,7 +101,7 @@ public class TestSkewedJoin {
 
         w.close();
 
-        PrintWriter w2 = new PrintWriter(new FileWriter(INPUT_FILE2));
+        PrintWriter w2 = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE2));
         w2.println("100\tapple1");
         w2.println("100\tapple2");
         w2.println("100\tapple2");
@@ -97,7 +112,7 @@ public class TestSkewedJoin {
 
         w2.close();
 
-        PrintWriter w3 = new PrintWriter(new FileWriter(INPUT_FILE3));
+        PrintWriter w3 = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE3));
         w3.println("100\tapple1");
         w3.println("100\tapple2");
         w3.println("200\torange1");
@@ -108,14 +123,14 @@ public class TestSkewedJoin {
 
         w3.close();
 
-        PrintWriter w4 = new PrintWriter(new FileWriter(INPUT_FILE4));
+        PrintWriter w4 = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE4));
         for(int i=0; i < 100; i++) {
             w4.println("[a100#apple1,a100#apple2,a200#orange1,a200#orange2,a300#strawberry,a300#strawberry2,a400#pear]");
         }
         w4.close();
 
         // Create a file with null keys
-        PrintWriter w5 = new PrintWriter(new FileWriter(INPUT_FILE5));
+        PrintWriter w5 = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE5));
         for(int i=0; i < 10; i++) {
             w5.println("\tapple1");
         }
@@ -127,7 +142,7 @@ public class TestSkewedJoin {
         w5.println("100\t");
         w5.close();
 
-        PrintWriter w6 = new PrintWriter(new FileWriter(INPUT_FILE6));
+        PrintWriter w6 = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE6));
 
         for(int i=0; i<300; i++) {
             for(int j=0; j<5; j++) {
@@ -136,7 +151,7 @@ public class TestSkewedJoin {
         }
         w6.close();
 
-        PrintWriter w7 = new PrintWriter(new FileWriter(INPUT_FILE7));
+        PrintWriter w7 = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE7));
 
         for(int i=0; i<300; i = i+3) {
             for(int j=0; j<2; j++) {
@@ -145,33 +160,18 @@ public class TestSkewedJoin {
         }
         w7.close();
 
-        Util.copyFromLocalToCluster(cluster, INPUT_FILE1, INPUT_FILE1);
-        Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
-        Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
-        Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4);
-        Util.copyFromLocalToCluster(cluster, INPUT_FILE5, INPUT_FILE5);
-        Util.copyFromLocalToCluster(cluster, INPUT_FILE6, INPUT_FILE6);
-        Util.copyFromLocalToCluster(cluster, INPUT_FILE7, INPUT_FILE7);
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        new File(INPUT_FILE1).delete();
-        new File(INPUT_FILE2).delete();
-        new File(INPUT_FILE3).delete();
-        new File(INPUT_FILE4).delete();
-        new File(INPUT_FILE5).delete();
-        new File(INPUT_FILE6).delete();
-        new File(INPUT_FILE7).delete();
-        Util.deleteDirectory(new File("skewedjoin"));
-
-        Util.deleteFile(cluster, INPUT_FILE1);
-        Util.deleteFile(cluster, INPUT_FILE2);
-        Util.deleteFile(cluster, INPUT_FILE3);
-        Util.deleteFile(cluster, INPUT_FILE4);
-        Util.deleteFile(cluster, INPUT_FILE5);
-        Util.deleteFile(cluster, INPUT_FILE6);
-        Util.deleteFile(cluster, INPUT_FILE7);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, INPUT_FILE1);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE2, INPUT_FILE2);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE3, INPUT_FILE3);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE4, INPUT_FILE4);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE5, INPUT_FILE5);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE6, INPUT_FILE6);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE7, INPUT_FILE7);
+    }
+
+    private static void deleteFiles() throws IOException {
+        Util.deleteDirectory(new File(INPUT_DIR));
+        Util.deleteDirectory(new File(OUTPUT_DIR));
     }
 
     @Test
@@ -205,7 +205,7 @@ public class TestSkewedJoin {
 
     @Test
     public void testSkewedJoinWithNoProperties() throws IOException{
-        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
 
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
@@ -271,36 +271,40 @@ public class TestSkewedJoin {
         }
     }
 
-
     @Test
     public void testSkewedJoinKeyPartition() throws IOException {
         try{
-             Util.deleteFile(cluster, "skewedjoin");
+             Util.deleteFile(cluster, OUTPUT_DIR);
         }catch(Exception e){
             // it is ok if directory not exist
         }
 
          pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
          pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
-
-
          pigServer.registerQuery("E = join A by id, B by id using 'skewed' parallel 7;");
-         pigServer.store("E", "skewedjoin");
+         pigServer.store("E", OUTPUT_DIR);
 
          int[][] lineCount = new int[3][7];
 
-         new File("skewedjoin").mkdir();
+         new File(OUTPUT_DIR).mkdir();
+         FileStatus[] outputFiles = fs.listStatus(new Path(OUTPUT_DIR), new PathFilter() {
+                @Override
+                public boolean accept(Path p) {
+                    return !p.getName().startsWith("_");
+                }
+             });
          // check how many times a key appear in each part- file
-         for(int i=0; i<7; i++) {
-             Util.copyFromClusterToLocal(cluster, "skewedjoin/part-r-0000"+i, "skewedjoin/part-r-0000"+i);
-
-             BufferedReader reader = new BufferedReader(new FileReader("skewedjoin/part-r-0000"+i));
-               String line = null;
-               while((line = reader.readLine()) != null) {
-                  String[] cols = line.split("\t");
-                  int key = Integer.parseInt(cols[0])/100 -1;
-                  lineCount[key][i] ++;
-              }
+         for (int i=0; i<7; i++) {
+             String filename = outputFiles[i].getPath().toString();
+             Util.copyFromClusterToLocal(cluster, filename, OUTPUT_DIR + "/" + i);
+             BufferedReader reader = new BufferedReader(new FileReader(OUTPUT_DIR + "/" + i));
+             String line = null;
+             while((line = reader.readLine()) != null) {
+                 String[] cols = line.split("\t");
+                 int key = Integer.parseInt(cols[0])/100 -1;
+                 lineCount[key][i] ++;
+             }
+             reader.close();
          }
 
          int fc = 0;

Modified: pig/branches/tez/test/tez-tests
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/tez-tests?rev=1562110&r1=1562109&r2=1562110&view=diff
==============================================================================
--- pig/branches/tez/test/tez-tests (original)
+++ pig/branches/tez/test/tez-tests Tue Jan 28 16:32:13 2014
@@ -3,6 +3,7 @@
 **/TestTezLauncher.java
 **/TestCombiner.java
 **/TestAccumulator.java
+**/TestSkewedJoin.java
 **/TestSplitStore.java
 **/TestCustomPartitioner.java
 ## TODO: Runs fine individually. Hangs with file.out.index not found when run together. Likely Tez Bug