You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/22 09:43:46 UTC

svn commit: r1783988 [16/24] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...

Added: pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java (added)
+++ pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java Wed Feb 22 09:43:41 2017
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
+
+public class SparkMiniCluster extends MiniGenericCluster {
+    private static final File CONF_DIR = new File("build/classes");
+    private static final File CORE_CONF_FILE = new File(CONF_DIR, "core-site.xml");
+    private static final File HDFS_CONF_FILE = new File(CONF_DIR, "hdfs-site.xml");
+    private static final File MAPRED_CONF_FILE = new File(CONF_DIR, "mapred-site.xml");
+    private static final File YARN_CONF_FILE = new File(CONF_DIR, "yarn-site.xml");
+
+    private Configuration m_dfs_conf = null;
+    protected MiniMRYarnCluster m_mr = null;
+    private Configuration m_mr_conf = null;
+
+    private static final Log LOG = LogFactory
+            .getLog(SparkMiniCluster.class);
+    private ExecType spark = new SparkExecType();
+    SparkMiniCluster() {
+
+    }
+
+    @Override
+    public ExecType getExecType() {
+        return spark;
+    }
+
+    @Override
+    protected void setupMiniDfsAndMrClusters() {
+        try {
+            deleteConfFiles();
+            CONF_DIR.mkdirs();
+
+            // Build mini DFS cluster
+            Configuration hdfsConf = new Configuration();
+            m_dfs = new MiniDFSCluster.Builder(hdfsConf)
+                    .numDataNodes(2)
+                    .format(true)
+                    .racks(null)
+                    .build();
+            m_fileSys = m_dfs.getFileSystem();
+            m_dfs_conf = m_dfs.getConfiguration(0);
+
+            //Create user home directory
+            m_fileSys.mkdirs(m_fileSys.getWorkingDirectory());
+            // Write core-site.xml
+            Configuration core_site = new Configuration(false);
+            core_site.set(FileSystem.FS_DEFAULT_NAME_KEY, m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+            core_site.writeXml(new FileOutputStream(CORE_CONF_FILE));
+
+            Configuration hdfs_site = new Configuration(false);
+            for (Map.Entry<String, String> conf : m_dfs_conf) {
+                if (ArrayUtils.contains(m_dfs_conf.getPropertySources(conf.getKey()), "programatically")) {
+                    hdfs_site.set(conf.getKey(), m_dfs_conf.getRaw(conf.getKey()));
+                }
+            }
+            hdfs_site.writeXml(new FileOutputStream(HDFS_CONF_FILE));
+
+            // Build mini YARN cluster
+            m_mr = new MiniMRYarnCluster("PigMiniCluster", 2);
+            m_mr.init(m_dfs_conf);
+            m_mr.start();
+            m_mr_conf = m_mr.getConfig();
+            m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                    System.getProperty("java.class.path"));
+
+            Configuration mapred_site = new Configuration(false);
+            Configuration yarn_site = new Configuration(false);
+            for (Map.Entry<String, String> conf : m_mr_conf) {
+                if (ArrayUtils.contains(m_mr_conf.getPropertySources(conf.getKey()), "programatically")) {
+                    if (conf.getKey().contains("yarn")) {
+                        yarn_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
+                    } else if (!conf.getKey().startsWith("dfs")){
+                        mapred_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
+                    }
+                }
+            }
+
+            mapred_site.writeXml(new FileOutputStream(MAPRED_CONF_FILE));
+            yarn_site.writeXml(new FileOutputStream(YARN_CONF_FILE));
+
+            m_conf = m_mr_conf;
+            System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
+            System.setProperty("hadoop.log.dir", "build/test/logs");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+
+        }
+    }
+
+    @Override
+    protected void shutdownMiniMrClusters() {
+        deleteConfFiles();
+        if (m_mr != null) {
+            m_mr.stop();
+            m_mr = null;
+        }
+    }
+
+    private void deleteConfFiles() {
+
+        if(CORE_CONF_FILE.exists()) {
+            CORE_CONF_FILE.delete();
+        }
+        if(HDFS_CONF_FILE.exists()) {
+            HDFS_CONF_FILE.delete();
+        }
+        if(MAPRED_CONF_FILE.exists()) {
+            MAPRED_CONF_FILE.delete();
+        }
+        if(YARN_CONF_FILE.exists()) {
+            YARN_CONF_FILE.delete();
+        }
+    }
+
+    static public Launcher getLauncher() {
+        return new SparkLauncher();
+    }
+}

Modified: pig/branches/spark/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBZip.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestBZip.java Wed Feb 22 09:43:41 2017
@@ -43,7 +43,6 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -67,16 +66,10 @@ public class TestBZip {
 
     @Parameters(name = "pig.bzip.use.hadoop.inputformat = {0}.")
     public static Iterable<Object[]> data() {
-        if ( HadoopShims.isHadoopYARN() ) {
-            return Arrays.asList(new Object[][] {
-                { false  },
-                { true   }
-            });
-        } else {
-            return Arrays.asList(new Object[][] {
-                { false }
-            });
-        }
+        return Arrays.asList(new Object[][] {
+            { false  },
+            { true   }
+        });
     }
 
     public TestBZip (Boolean useBzipFromHadoop) {

Modified: pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java Wed Feb 22 09:43:41 2017
@@ -130,6 +130,7 @@ import org.apache.pig.data.DefaultBagFac
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.ReadToEndLoader;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -3206,29 +3207,31 @@ public class TestBuiltin {
     @Test
     public void testUniqueID() throws Exception {
         Util.resetStateForExecModeSwitch();
-        String inputFileName = "testUniqueID.txt";
-        Util.createInputFile(cluster, inputFileName, new String[]
-            {"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"});
         Properties copyproperties = new Properties();
         copyproperties.putAll(cluster.getProperties());
         PigServer pigServer = new PigServer(cluster.getExecType(), copyproperties);
-        pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size", "10");
+
+        // running with 2 mappers each taking 5 records
+        String TMP_DIR = FileLocalizer.getTemporaryPath(pigServer.getPigContext()).toUri().getPath();
+        Util.createInputFile(cluster, TMP_DIR + "/input1.txt", new String[] {"1\n2\n3\n4\n5"});
+        Util.createInputFile(cluster, TMP_DIR + "/input2.txt", new String[] {"1\n2\n3\n4\n5"});
         pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true");
-        pigServer.registerQuery("A = load '" + inputFileName + "' as (name);");
+
+        pigServer.registerQuery("A = load '" + TMP_DIR + "' as (name);");
         pigServer.registerQuery("B = foreach A generate name, UniqueID();");
         Iterator<Tuple> iter = pigServer.openIterator("B");
         if (!Util.isSparkExecType(cluster.getExecType())) {
-            assertEquals(iter.next().get(1), "0-0");
-            assertEquals(iter.next().get(1), "0-1");
-            assertEquals(iter.next().get(1), "0-2");
-            assertEquals(iter.next().get(1), "0-3");
-            assertEquals(iter.next().get(1), "0-4");
-            assertEquals(iter.next().get(1), "1-0");
-            assertEquals(iter.next().get(1), "1-1");
-            assertEquals(iter.next().get(1), "1-2");
-            assertEquals(iter.next().get(1), "1-3");
-            assertEquals(iter.next().get(1), "1-4");
-        } else{
+            assertEquals("0-0", iter.next().get(1));
+            assertEquals("0-1", iter.next().get(1));
+            assertEquals("0-2", iter.next().get(1));
+            assertEquals("0-3", iter.next().get(1));
+            assertEquals("0-4", iter.next().get(1));
+            assertEquals("1-0", iter.next().get(1));
+            assertEquals("1-1", iter.next().get(1));
+            assertEquals("1-2", iter.next().get(1));
+            assertEquals("1-3", iter.next().get(1));
+            assertEquals("1-4", iter.next().get(1));
+        } else {
             //there will be 2 InputSplits when mapred.max.split.size is 10(byte) for the testUniqueID.txt(20 bytes)
             //Split0:
             //            1\n
@@ -3244,34 +3247,35 @@ public class TestBuiltin {
             //            5\n
             //The size of Split0 is 12 not 10 because LineRecordReader#nextKeyValue will read one more line
             //More detail see PIG-4383
-            assertEquals(iter.next().get(1), "0-0");
-            assertEquals(iter.next().get(1), "0-1");
-            assertEquals(iter.next().get(1), "0-2");
-            assertEquals(iter.next().get(1), "0-3");
-            assertEquals(iter.next().get(1), "0-4");
-            assertEquals(iter.next().get(1), "0-5");
-            assertEquals(iter.next().get(1), "1-0");
-            assertEquals(iter.next().get(1), "1-1");
-            assertEquals(iter.next().get(1), "1-2");
-            assertEquals(iter.next().get(1), "1-3");
+            assertEquals("0-0", iter.next().get(1));
+            assertEquals("0-1", iter.next().get(1));
+            assertEquals("0-2", iter.next().get(1));
+            assertEquals("0-3", iter.next().get(1));
+            assertEquals("0-4", iter.next().get(1));
+            assertEquals("0-5", iter.next().get(1));
+            assertEquals("1-0", iter.next().get(1));
+            assertEquals("1-1", iter.next().get(1));
+            assertEquals("1-2", iter.next().get(1));
+            assertEquals("1-3", iter.next().get(1));
         }
-        Util.deleteFile(cluster, inputFileName);
+        Util.deleteFile(cluster, TMP_DIR + "/input1.txt");
+        Util.deleteFile(cluster, TMP_DIR + "/input2.txt");
     }
 
     @Test
     public void testRANDOMWithJob() throws Exception {
         Util.resetStateForExecModeSwitch();
-        String inputFileName = "testRANDOM.txt";
-        Util.createInputFile(cluster, inputFileName, new String[]
-            {"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"});
-
         Properties copyproperties = new Properties();
         copyproperties.putAll(cluster.getProperties());
         PigServer pigServer = new PigServer(cluster.getExecType(), copyproperties);
-        // running with two mappers
-        pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size", "10");
+
+        // running with 2 mappers each taking 5 records
+        String TMP_DIR = FileLocalizer.getTemporaryPath(pigServer.getPigContext()).toUri().getPath();
+        Util.createInputFile(cluster, TMP_DIR + "/input1.txt", new String[] {"1\n2\n3\n4\n5"});
+        Util.createInputFile(cluster, TMP_DIR + "/input2.txt", new String[] {"1\n2\n3\n4\n5"});
         pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true");
-        pigServer.registerQuery("A = load '" + inputFileName + "' as (name);");
+
+        pigServer.registerQuery("A = load '" + TMP_DIR + "' as (name);");
         pigServer.registerQuery("B = foreach A generate name, RANDOM();");
         Iterator<Tuple> iter = pigServer.openIterator("B");
         double [] mapper1 = new double[5];
@@ -3294,7 +3298,8 @@ public class TestBuiltin {
         for( int i = 0; i < 5; i++ ){
             assertNotEquals(mapper1[i], mapper2[i], 0.0001);
         }
-        Util.deleteFile(cluster, inputFileName);
+        Util.deleteFile(cluster, TMP_DIR + "/input1.txt");
+        Util.deleteFile(cluster, TMP_DIR + "/input2.txt");
     }
 
 

Added: pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java (added)
+++ pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java Wed Feb 22 09:43:41 2017
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+
+import java.util.Properties;
+
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestConfigurationUtil {
+
+    @Test
+    public void testExpandForAlternativeNames() {
+        Properties properties = null;
+        properties = ConfigurationUtil.expandForAlternativeNames("fs.df.interval", "500");
+        Assert.assertEquals(1,properties.size());
+        Assert.assertEquals("500",properties.get("fs.df.interval"));
+
+        properties = ConfigurationUtil.expandForAlternativeNames("dfs.df.interval", "600");
+        Assert.assertEquals(2,properties.size());
+        Assert.assertEquals("600",properties.get("fs.df.interval"));
+        Assert.assertEquals("600",properties.get("dfs.df.interval"));
+
+        properties = ConfigurationUtil.expandForAlternativeNames("", "");
+        Assert.assertEquals(1,properties.size());
+        Assert.assertEquals("",properties.get(""));
+
+    }
+}

Modified: pig/branches/spark/test/org/apache/pig/test/TestCounters.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCounters.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCounters.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCounters.java Wed Feb 22 09:43:41 2017
@@ -30,17 +30,17 @@ import java.util.Map;
 import java.util.Random;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
-import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -49,8 +49,8 @@ import org.junit.runners.JUnit4;
 public class TestCounters {
     String file = "input.txt";
 
-    static MiniCluster cluster = MiniCluster.buildCluster();
-    
+    static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+
     final int MAX = 100*1000;
     Random r = new Random();
 
@@ -59,7 +59,7 @@ public class TestCounters {
     public static void oneTimeTearDown() throws Exception {
         cluster.shutDown();
     }
-    
+
     @Test
     public void testMapOnly() throws IOException, ExecException {
         int count = 0;
@@ -70,13 +70,13 @@ public class TestCounters {
             if(t > 50) count ++;
         }
         pw.close();
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = filter a by $0 > 50;");
         pigServer.registerQuery("c = foreach b generate $0 - 50;");
         ExecJob job = pigServer.store("c", "output_map_only");
         PigStats pigStats = job.getStatistics();
-        
+
         //counting the no. of bytes in the output file
         //long filesize = cluster.getFileSystem().getFileStatus(new Path("output_map_only")).getLen();
         InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
@@ -85,9 +85,9 @@ public class TestCounters {
 
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
-        
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output_map_only"), true);
 
@@ -98,7 +98,7 @@ public class TestCounters {
         JobGraph jg = pigStats.getJobGraph();
         Iterator<JobStats> iter = jg.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();                    
+            JobStats js = iter.next();
 
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
@@ -123,20 +123,20 @@ public class TestCounters {
                 count ++;
         }
         pw.close();
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = filter a by $0 > 50;");
         pigServer.registerQuery("c = foreach b generate $0 - 50;");
         ExecJob job = pigServer.store("c", "output_map_only", "BinStorage");
         PigStats pigStats = job.getStatistics();
-        
+
         InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
                 "output_map_only", pigServer.getPigContext()),
                 pigServer.getPigContext());
 
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
 
         cluster.getFileSystem().delete(new Path(file), true);
@@ -149,8 +149,8 @@ public class TestCounters {
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();
-        
+            JobStats js = iter.next();
+
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -158,7 +158,7 @@ public class TestCounters {
             assertEquals(0, js.getReduceInputRecords());
             assertEquals(0, js.getReduceOutputRecords());
         }
-            
+
         System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
         assertEquals(filesize, pigStats.getBytesWritten());
     }
@@ -183,7 +183,7 @@ public class TestCounters {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group;");
@@ -195,7 +195,7 @@ public class TestCounters {
 
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
 
         cluster.getFileSystem().delete(new Path(file), true);
@@ -208,7 +208,7 @@ public class TestCounters {
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();
+            JobStats js = iter.next();
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -242,7 +242,7 @@ public class TestCounters {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group;");
@@ -253,9 +253,9 @@ public class TestCounters {
                 pigServer.getPigContext()), pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
-        
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
 
@@ -266,7 +266,7 @@ public class TestCounters {
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();
+            JobStats js = iter.next();
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -300,7 +300,7 @@ public class TestCounters {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
@@ -311,20 +311,20 @@ public class TestCounters {
                 pigServer.getPigContext()), pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
- 
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
 
         System.out.println("============================================");
         System.out.println("Test case MapCombineReduce");
         System.out.println("============================================");
-        
+
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();
+            JobStats js = iter.next();
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -337,7 +337,7 @@ public class TestCounters {
         System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
         assertEquals(filesize, pigStats.getBytesWritten());
     }
-     
+
     @Test
     public void testMapCombineReduceBinStorage() throws IOException, ExecException {
         int count = 0;
@@ -358,20 +358,20 @@ public class TestCounters {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
 
         ExecJob job = pigServer.store("c", "output", "BinStorage");
         PigStats pigStats = job.getStatistics();
-        
+
         InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
                 pigServer.getPigContext()), pigServer.getPigContext());
 
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
@@ -379,11 +379,11 @@ public class TestCounters {
         System.out.println("============================================");
         System.out.println("Test case MapCombineReduce");
         System.out.println("============================================");
- 
+
         JobGraph jp = pigStats.getJobGraph();
         Iterator<JobStats> iter = jp.iterator();
         while (iter.hasNext()) {
-            MRJobStats js = (MRJobStats) iter.next();
+            JobStats js = iter.next();
             System.out.println("Map input records : " + js.getMapInputRecords());
             assertEquals(MAX, js.getMapInputRecords());
             System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -399,6 +399,8 @@ public class TestCounters {
 
     @Test
     public void testMultipleMRJobs() throws IOException, ExecException {
+        Assume.assumeTrue("Skip this test for TEZ. Assert is done only for first MR job",
+                Util.isMapredExecType(cluster.getExecType()));
         int count = 0;
         PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
         int [] nos = new int[10];
@@ -413,38 +415,38 @@ public class TestCounters {
         }
         pw.close();
 
-        for(int i = 0; i < 10; i++) { 
+        for(int i = 0; i < 10; i++) {
             if(nos[i] > 0) count ++;
         }
 
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = order a by $0;");
         pigServer.registerQuery("c = group b by $0;");
         pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
         ExecJob job = pigServer.store("d", "output");
         PigStats pigStats = job.getStatistics();
-        
+
         InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
                 pigServer.getPigContext()), pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
-        
+
         is.close();
-        
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
-        
+
         System.out.println("============================================");
         System.out.println("Test case MultipleMRJobs");
         System.out.println("============================================");
-        
+
         JobGraph jp = pigStats.getJobGraph();
-        MRJobStats js = (MRJobStats)jp.getSinks().get(0);
-        
+        JobStats js = (JobStats)jp.getSinks().get(0);
+
         System.out.println("Job id: " + js.getName());
         System.out.println(jp.toString());
-        
+
         System.out.println("Map input records : " + js.getMapInputRecords());
         assertEquals(MAX, js.getMapInputRecords());
         System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -453,12 +455,12 @@ public class TestCounters {
         assertEquals(count, js.getReduceInputRecords());
         System.out.println("Reduce output records : " + js.getReduceOutputRecords());
         assertEquals(count, js.getReduceOutputRecords());
-        
+
         System.out.println("Hdfs bytes written : " + js.getHdfsBytesWritten());
         assertEquals(filesize, js.getHdfsBytesWritten());
 
     }
-    
+
     @Test
     public void testMapOnlyMultiQueryStores() throws Exception {
         PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
@@ -467,8 +469,8 @@ public class TestCounters {
             pw.println(t);
         }
         pw.close();
-        
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
+
+        PigServer pigServer = new PigServer(cluster.getExecType(),
                 cluster.getProperties());
         pigServer.setBatchOn();
         pigServer.registerQuery("a = load '" + file + "';");
@@ -479,22 +481,22 @@ public class TestCounters {
         List<ExecJob> jobs = pigServer.executeBatch();
         PigStats stats = jobs.get(0).getStatistics();
         assertTrue(stats.getOutputLocations().size() == 2);
-        
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
 
-        MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0);
-        
+        JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
+
         Map<String, Long> entry = js.getMultiStoreCounters();
         long counter = 0;
         for (Long val : entry.values()) {
             counter += val;
         }
-        
-        assertEquals(MAX, counter);       
-    }    
-    
+
+        assertEquals(MAX, counter);
+    }
+
     @Test
     public void testMultiQueryStores() throws Exception {
         int[] nums = new int[100];
@@ -505,13 +507,13 @@ public class TestCounters {
             nums[t]++;
         }
         pw.close();
-        
+
         int groups = 0;
         for (int i : nums) {
             if (i > 0) groups++;
         }
-        
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
+
+        PigServer pigServer = new PigServer(cluster.getExecType(),
                 cluster.getProperties());
         pigServer.setBatchOn();
         pigServer.registerQuery("a = load '" + file + "';");
@@ -525,29 +527,29 @@ public class TestCounters {
         pigServer.registerQuery("store g into '/tmp/outout2';");
         List<ExecJob> jobs = pigServer.executeBatch();
         PigStats stats = jobs.get(0).getStatistics();
-        
+
         assertTrue(stats.getOutputLocations().size() == 2);
-               
+
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
 
-        MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0);
-        
+        JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
+
         Map<String, Long> entry = js.getMultiStoreCounters();
         long counter = 0;
         for (Long val : entry.values()) {
             counter += val;
         }
-        
-        assertEquals(groups, counter);       
-    }    
-    
-    /*    
+
+        assertEquals(groups, counter);
+    }
+
+    /*
      * IMPORTANT NOTE:
      * COMMENTED OUT BECAUSE COUNTERS DO NOT CURRENTLY WORK IN LOCAL MODE -
      * SEE PIG-1286 - UNCOMMENT WHEN IT IS FIXED
-     */ 
+     */
 //    @Test
 //    public void testLocal() throws IOException, ExecException {
 //        int count = 0;
@@ -566,7 +568,7 @@ public class TestCounters {
 //        }
 //        pw.close();
 //
-//        for(int i = 0; i < 10; i++) 
+//        for(int i = 0; i < 10; i++)
 //            if(nos[i] > 0)
 //                count ++;
 //
@@ -580,56 +582,56 @@ public class TestCounters {
 //        pigServer.registerQuery("c = group b by $0;");
 //        pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
 //        PigStats pigStats = pigServer.store("d", "file://" + out.getAbsolutePath()).getStatistics();
-//        InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+//        InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), cluster.getExecType(), pigServer.getPigContext().getDfs());
 //        long filesize = 0;
 //        while(is.read() != -1) filesize++;
-//        
+//
 //        is.close();
 //        out.delete();
-//        
+//
 //        //Map<String, Map<String, String>> stats = pigStats.getPigStats();
-//        
+//
 //        assertEquals(10, pigStats.getRecordsWritten());
 //        assertEquals(110, pigStats.getBytesWritten());
 //
 //    }
 
     @Test
-    public void testJoinInputCounters() throws Exception {        
+    public void testJoinInputCounters() throws Exception {
         testInputCounters("join");
     }
-    
+
     @Test
-    public void testCogroupInputCounters() throws Exception {        
+    public void testCogroupInputCounters() throws Exception {
         testInputCounters("cogroup");
     }
-    
+
     @Test
-    public void testSkewedInputCounters() throws Exception {        
+    public void testSkewedInputCounters() throws Exception {
         testInputCounters("skewed");
     }
-    
+
     @Test
-    public void testSelfJoinInputCounters() throws Exception {        
+    public void testSelfJoinInputCounters() throws Exception {
         testInputCounters("self-join");
     }
-    
+
     private static boolean multiInputCreated = false;
-    
+
     private static int count = 0;
-            
-    private void testInputCounters(String keyword) throws Exception {  
+
+    private void testInputCounters(String keyword) throws Exception {
         String file1 = "multi-input1.txt";
         String file2 = "multi-input2.txt";
-        
+
         String output = keyword;
-        
+
         if (keyword.equals("self-join")) {
             file2 = file1;
             keyword = "join";
         }
-         
-        final int MAX_NUM_RECORDS = 100; 
+
+        final int MAX_NUM_RECORDS = 100;
         if (!multiInputCreated) {
             PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file1));
             for (int i = 0; i < MAX_NUM_RECORDS; i++) {
@@ -637,7 +639,7 @@ public class TestCounters {
                 pw.println(t);
             }
             pw.close();
-                        
+
             PrintWriter pw2 = new PrintWriter(Util.createInputFile(cluster, file2));
             for (int i = 0; i < MAX_NUM_RECORDS; i++) {
                 int t = r.nextInt(100);
@@ -649,8 +651,8 @@ public class TestCounters {
             pw2.close();
             multiInputCreated = true;
         }
-        
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, 
+
+        PigServer pigServer = new PigServer(cluster.getExecType(),
                 cluster.getProperties());
         pigServer.setBatchOn();
         pigServer.registerQuery("a = load '" + file1 + "';");
@@ -661,7 +663,7 @@ public class TestCounters {
             pigServer.registerQuery("c = join a by $0, b by $0 using 'skewed';");
         }
         ExecJob job = pigServer.store("c", output + "_output");
-        
+
         PigStats stats = job.getStatistics();
         assertTrue(stats.isSuccessful());
         List<InputStats> inputs = stats.getInputStats();
@@ -680,4 +682,46 @@ public class TestCounters {
             }
         }
     }
+
+    @Test
+    public void testSplitUnionOutputCounters() throws Exception {
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+        PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, "splitunion-input"));
+        for (int i = 0; i < 10; i++) {
+            pw.println(i);
+        }
+        pw.close();
+        String query =
+                "a = load 'splitunion-input';" +
+                "split a into b if $0 < 5, c otherwise;" +
+                "d = union b, c;";
+
+        pigServer.registerQuery(query);
+
+        ExecJob job = pigServer.store("d", "splitunion-output-0", "PigStorage");
+        PigStats stats1 = job.getStatistics();
+
+        query =
+                "a = load 'splitunion-input';" +
+                "split a into b if $0 < 3, c if $0 > 2 and $0 < 6, d if $0 > 5;" +
+                "e = distinct d;" +
+                "f = union b, c, e;";
+
+        pigServer.registerQuery(query);
+
+        job = pigServer.store("f", "splitunion-output-1", "PigStorage");
+        PigStats stats2 = job.getStatistics();
+
+        PigStats[] pigStats = new PigStats[]{stats1, stats2};
+        for (int i = 0; i < 2; i++) {
+            PigStats stats = pigStats[i];
+            assertTrue(stats.isSuccessful());
+            List<OutputStats> outputs = stats.getOutputStats();
+            assertEquals(1, outputs.size());
+            OutputStats output = outputs.get(0);
+            assertEquals("splitunion-output-" + i, output.getName());
+            assertEquals(10, output.getNumberRecords());
+            assertEquals(20, output.getBytes());
+        }
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestDataBag.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestDataBag.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestDataBag.java Wed Feb 22 09:43:41 2017
@@ -17,17 +17,36 @@
  */
 package org.apache.pig.test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
-import java.util.*;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
-
-
-import org.apache.pig.data.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.TreeSet;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.DistinctDataBag;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.InternalDistinctBag;
+import org.apache.pig.data.InternalSortedBag;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.SingleTupleBag;
+import org.apache.pig.data.SortedDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.util.Spillable;
 import org.junit.After;
 import org.junit.Test;
@@ -36,7 +55,7 @@ import org.junit.Test;
 /**
  * This class will exercise the basic Pig data model and members. It tests for proper behavior in
  * assignment and comparison, as well as function application.
- * 
+ *
  * @author dnm
  */
 public class TestDataBag  {
@@ -590,7 +609,7 @@ public class TestDataBag  {
             }
             mgr.forceSpill();
         }
-        
+
        assertEquals("Size of distinct data bag is incorrect", rightAnswer.size(), b.size());
 
         // Read tuples back, hopefully they come out in the same order.
@@ -719,14 +738,14 @@ public class TestDataBag  {
     @Test
     public void testDefaultBagFactory() throws Exception {
         BagFactory f = BagFactory.getInstance();
-       
+
         DataBag bag = f.newDefaultBag();
         DataBag sorted = f.newSortedBag(null);
         DataBag distinct = f.newDistinctBag();
 
         assertTrue("Expected a default bag", (bag instanceof DefaultDataBag));
         assertTrue("Expected a sorted bag", (sorted instanceof SortedDataBag));
-        assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));         
+        assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));
     }
 
     @Test
@@ -756,7 +775,7 @@ public class TestDataBag  {
         try {
             BagFactory f = BagFactory.getInstance();
         } catch (RuntimeException re) {
-            assertEquals("Expected does not extend BagFactory message", 
+            assertEquals("Expected does not extend BagFactory message",
                 "Provided factory org.apache.pig.test.TestDataBag does not extend BagFactory!",
                 re.getMessage());
             caughtIt = true;
@@ -775,7 +794,7 @@ public class TestDataBag  {
 
         BagFactory.resetSelf();
     }
-    
+
     @Test
     public void testNonSpillableDataBagEquals1() throws Exception {
         String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -789,7 +808,7 @@ public class TestDataBag  {
         }
         assertEquals(bg1, bg2);
     }
-    
+
     @Test
     public void testNonSpillableDataBagEquals2() throws Exception {
         String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -804,7 +823,7 @@ public class TestDataBag  {
         }
         assertEquals(bg1, bg2);
     }
-    
+
     @Test
     public void testDefaultDataBagEquals1() throws Exception {
         String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -820,7 +839,7 @@ public class TestDataBag  {
         }
         assertEquals(bg1, bg2);
     }
-    
+
     @Test
     public void testDefaultDataBagEquals2() throws Exception {
         String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -837,35 +856,35 @@ public class TestDataBag  {
         }
         assertEquals(bg1, bg2);
     }
-    
-    public void testInternalCachedBag() throws Exception {    
+
+    public void testInternalCachedBag() throws Exception {
     	// check adding empty tuple
     	DataBag bg0 = new InternalCachedBag();
     	bg0.add(TupleFactory.getInstance().newTuple());
     	bg0.add(TupleFactory.getInstance().newTuple());
     	assertEquals(bg0.size(), 2);
-    	
+
     	// check equal of bags
     	DataBag bg1 = new InternalCachedBag(1, 0.5f);
     	assertEquals(bg1.size(), 0);
-    	
+
     	String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
     	for (int i = 0; i < tupleContents.length; i++) {
             bg1.add(Util.createTuple(tupleContents[i]));
         }
-    	
+
     	// check size, and isSorted(), isDistinct()
     	assertEquals(bg1.size(), 3);
     	assertFalse(bg1.isSorted());
     	assertFalse(bg1.isDistinct());
-    	
+
     	tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
     	DataBag bg2 = new InternalCachedBag(1, 0.5f);
         for (int i = 0; i < tupleContents.length; i++) {
              bg2.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg2);
-        
+
         // check bag with data written to disk
         DataBag bg3 = new InternalCachedBag(1, 0.0f);
         tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
@@ -873,7 +892,7 @@ public class TestDataBag  {
             bg3.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg3);
-        
+
         // check iterator
         Iterator<Tuple> iter = bg3.iterator();
         DataBag bg4 = new InternalCachedBag(1, 0.0f);
@@ -881,7 +900,7 @@ public class TestDataBag  {
         	bg4.add(iter.next());
         }
         assertEquals(bg3, bg4);
-        
+
         // call iterator methods with irregular order
         iter = bg3.iterator();
         assertTrue(iter.hasNext());
@@ -894,46 +913,46 @@ public class TestDataBag  {
         assertFalse(iter.hasNext());
         assertFalse(iter.hasNext());
         assertEquals(bg3, bg5);
-        
-        
+
+
         bg4.clear();
-        assertEquals(bg4.size(), 0);        
+        assertEquals(bg4.size(), 0);
     }
-    
-    public void testInternalSortedBag() throws Exception {    
-    	
+
+    public void testInternalSortedBag() throws Exception {
+
     	// check adding empty tuple
     	DataBag bg0 = new InternalSortedBag();
     	bg0.add(TupleFactory.getInstance().newTuple());
     	bg0.add(TupleFactory.getInstance().newTuple());
     	assertEquals(bg0.size(), 2);
-    	
+
     	// check equal of bags
     	DataBag bg1 = new InternalSortedBag();
     	assertEquals(bg1.size(), 0);
-    	
+
     	String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"c", "d" }};
     	for (int i = 0; i < tupleContents.length; i++) {
             bg1.add(Util.createTuple(tupleContents[i]));
         }
-    	
+
     	// check size, and isSorted(), isDistinct()
     	assertEquals(bg1.size(), 3);
     	assertTrue(bg1.isSorted());
     	assertFalse(bg1.isDistinct());
-    	
+
     	tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
     	DataBag bg2 = new InternalSortedBag();
         for (int i = 0; i < tupleContents.length; i++) {
              bg2.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg2);
-        
+
         Iterator<Tuple> iter = bg1.iterator();
         iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
         iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
         iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-        
+
         // check bag with data written to disk
         DataBag bg3 = new InternalSortedBag(1, 0.0f, null);
         tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
@@ -941,17 +960,17 @@ public class TestDataBag  {
             bg3.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg3);
-        
+
         iter = bg3.iterator();
         iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
         iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
-        iter.next().equals(Util.createTuple(new String[] {"e", "f"}));                
-        
+        iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+
         // call iterator methods with irregular order
         iter = bg3.iterator();
         assertTrue(iter.hasNext());
         assertTrue(iter.hasNext());
-        
+
         DataBag bg4 = new InternalSortedBag(1, 0.0f, null);
         bg4.add(iter.next());
         bg4.add(iter.next());
@@ -959,21 +978,21 @@ public class TestDataBag  {
         bg4.add(iter.next());
         assertFalse(iter.hasNext());
         assertFalse(iter.hasNext());
-        assertEquals(bg3, bg4);        
-        
+        assertEquals(bg3, bg4);
+
         // check clear
         bg3.clear();
         assertEquals(bg3.size(), 0);
-        
+
         // test with all data spill out
-        DataBag bg5 = new InternalSortedBag();        
+        DataBag bg5 = new InternalSortedBag();
         for(int j=0; j<3; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg5.add(Util.createTuple(tupleContents[i]));
-        	}     
+        	}
         	bg5.spill();
         }
-        
+
         assertEquals(bg5.size(), 9);
         iter = bg5.iterator();
         for(int i=0; i<3; i++) {
@@ -983,21 +1002,21 @@ public class TestDataBag  {
         	iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
         }
         for(int i=0; i<3; i++) {
-        	iter.next().equals(Util.createTuple(new String[] {"e", "f"}));   
+        	iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
         }
-        
+
         // test with most data spill out, with some data in memory
         // and merge of spill files
-        DataBag bg6 = new InternalSortedBag();        
+        DataBag bg6 = new InternalSortedBag();
         for(int j=0; j<104; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg6.add(Util.createTuple(tupleContents[i]));
-        	}        	
+        	}
         	if (j != 103) {
         		bg6.spill();
         	}
         }
-        
+
         assertEquals(bg6.size(), 104*3);
         iter = bg6.iterator();
         for(int i=0; i<104; i++) {
@@ -1007,55 +1026,55 @@ public class TestDataBag  {
         	iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
         }
         for(int i=0; i<104; i++) {
-        	iter.next().equals(Util.createTuple(new String[] {"e", "f"}));   
+        	iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
         }
-        
+
         // check two implementation of sorted bag can compare correctly
-        DataBag bg7 = new SortedDataBag(null);        
+        DataBag bg7 = new SortedDataBag(null);
         for(int j=0; j<104; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg7.add(Util.createTuple(tupleContents[i]));
-        	}        	
+        	}
         	if (j != 103) {
         		bg7.spill();
         	}
         }
         assertEquals(bg6, bg7);
     }
-    
-    public void testInternalDistinctBag() throws Exception {    
+
+    public void testInternalDistinctBag() throws Exception {
     	// check adding empty tuple
     	DataBag bg0 = new InternalDistinctBag();
     	bg0.add(TupleFactory.getInstance().newTuple());
     	bg0.add(TupleFactory.getInstance().newTuple());
     	assertEquals(bg0.size(), 1);
-    	
+
     	// check equal of bags
     	DataBag bg1 = new InternalDistinctBag();
     	assertEquals(bg1.size(), 0);
-    	
+
     	String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
     	for (int i = 0; i < tupleContents.length; i++) {
             bg1.add(Util.createTuple(tupleContents[i]));
         }
-    	
+
     	// check size, and isSorted(), isDistinct()
     	assertEquals(bg1.size(), 3);
     	assertFalse(bg1.isSorted());
     	assertTrue(bg1.isDistinct());
-    	
+
     	tupleContents = new String[][] {{"a", "b" }, {"e", "d"}, {"e", "d"}, { "e", "f"} };
     	DataBag bg2 = new InternalDistinctBag();
         for (int i = 0; i < tupleContents.length; i++) {
              bg2.add(Util.createTuple(tupleContents[i]));
         }
         assertEquals(bg1, bg2);
-        
+
         Iterator<Tuple> iter = bg1.iterator();
         iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
         iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
         iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-        
+
         // check bag with data written to disk
         DataBag bg3 = new InternalDistinctBag(1, 0.0f);
         tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
@@ -1064,13 +1083,13 @@ public class TestDataBag  {
         }
         assertEquals(bg2, bg3);
         assertEquals(bg3.size(), 3);
-              
-        
+
+
         // call iterator methods with irregular order
         iter = bg3.iterator();
         assertTrue(iter.hasNext());
         assertTrue(iter.hasNext());
-        
+
         DataBag bg4 = new InternalDistinctBag(1, 0.0f);
         bg4.add(iter.next());
         bg4.add(iter.next());
@@ -1078,73 +1097,73 @@ public class TestDataBag  {
         bg4.add(iter.next());
         assertFalse(iter.hasNext());
         assertFalse(iter.hasNext());
-        assertEquals(bg3, bg4);        
-        
+        assertEquals(bg3, bg4);
+
         // check clear
         bg3.clear();
         assertEquals(bg3.size(), 0);
-        
+
         // test with all data spill out
-        DataBag bg5 = new InternalDistinctBag();        
+        DataBag bg5 = new InternalDistinctBag();
         for(int j=0; j<3; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg5.add(Util.createTuple(tupleContents[i]));
-        	}        
+        	}
         	bg5.spill();
         }
-        
+
         assertEquals(bg5.size(), 3);
-    
-        
+
+
         // test with most data spill out, with some data in memory
         // and merge of spill files
-        DataBag bg6 = new InternalDistinctBag();        
+        DataBag bg6 = new InternalDistinctBag();
         for(int j=0; j<104; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg6.add(Util.createTuple(tupleContents[i]));
-        	}        	
+        	}
         	if (j != 103) {
         		bg6.spill();
         	}
         }
-        
-        assertEquals(bg6.size(), 3);       
-        
+
+        assertEquals(bg6.size(), 3);
+
         // check two implementation of sorted bag can compare correctly
-        DataBag bg7 = new DistinctDataBag();        
+        DataBag bg7 = new DistinctDataBag();
         for(int j=0; j<104; j++) {
         	for (int i = 0; i < tupleContents.length; i++) {
         		bg7.add(Util.createTuple(tupleContents[i]));
-        	}        	
+        	}
         	if (j != 103) {
         		bg7.spill();
         	}
         }
         assertEquals(bg6, bg7);
     }
-    
+
     // See PIG-1231
     @Test
     public void testDataBagIterIdempotent() throws Exception {
         DataBag bg0 = new DefaultDataBag();
         processDataBag(bg0, true);
-        
+
         DataBag bg1 = new DistinctDataBag();
         processDataBag(bg1, true);
-        
+
         DataBag bg2 = new InternalDistinctBag();
         processDataBag(bg2, true);
-        
+
         DataBag bg3 = new InternalSortedBag();
         processDataBag(bg3, true);
-        
+
         DataBag bg4 = new SortedDataBag(null);
         processDataBag(bg4, true);
-        
+
         DataBag bg5 = new InternalCachedBag(0, 0);
         processDataBag(bg5, false);
     }
-    
+
     // See PIG-1285
     @Test
     public void testSerializeSingleTupleBag() throws Exception {
@@ -1159,7 +1178,7 @@ public class TestDataBag  {
         dfBag.readFields(dis);
         assertTrue(dfBag.equals(stBag));
     }
-    
+
     // See PIG-2550
     static class MyCustomTuple extends DefaultTuple {
         private static final long serialVersionUID = 8156382697467819543L;
@@ -1184,7 +1203,23 @@ public class TestDataBag  {
         Tuple t2 = iter.next();
         assertTrue(t2.equals(t));
     }
-    
+
+    // See PIG-4260
+    @Test
+    public void testSpillArrayBackedList() throws Exception {
+        Tuple[] tuples = new Tuple[2];
+        tuples[0] = TupleFactory.getInstance().newTuple(1);
+        tuples[0].set(0, "first");
+        tuples[1] = TupleFactory.getInstance().newTuple(1);
+        tuples[1].set(0, "second");
+        DefaultDataBag bag = new DefaultDataBag(Arrays.asList(tuples));
+        bag.spill();
+        Iterator<Tuple> iter = bag.iterator();
+        assertEquals(tuples[0], iter.next());
+        assertEquals(tuples[1], iter.next());
+        assertFalse(iter.hasNext());
+    }
+
     void processDataBag(DataBag bg, boolean doSpill) {
         Tuple t = TupleFactory.getInstance().newTuple(new Integer(0));
         bg.add(t);
@@ -1194,7 +1229,7 @@ public class TestDataBag  {
         assertTrue(iter.hasNext());
         iter.next();
         assertFalse(iter.hasNext());
-        assertFalse("hasNext should be idempotent", iter.hasNext());        
+        assertFalse("hasNext should be idempotent", iter.hasNext());
     }
 }
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestDivide.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestDivide.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestDivide.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestDivide.java Wed Feb 22 09:43:41 2017
@@ -20,6 +20,9 @@ package org.apache.pig.test;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
 import java.util.Map;
 import java.util.Random;
 
@@ -53,7 +56,7 @@ public class TestDivide {
     public void testOperator() throws ExecException {
         // int TRIALS = 10;
         byte[] types = { DataType.BAG, DataType.BOOLEAN, DataType.BYTEARRAY, DataType.CHARARRAY,
-                        DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, DataType.LONG,
+                        DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, DataType.LONG, DataType.BIGDECIMAL,
                         DataType.DATETIME, DataType.MAP, DataType.TUPLE };
         // Map<Byte,String> map = GenRandomData.genTypeToNameMap();
         System.out.println("Testing DIVIDE operator");
@@ -250,6 +253,33 @@ public class TestDivide {
                 assertEquals(null, (Long)resl.result);
                 break;
             }
+            case DataType.BIGDECIMAL: {
+                MathContext mc = new MathContext(Divide.BIGDECIMAL_MINIMAL_SCALE, RoundingMode.HALF_UP);
+                BigDecimal inpf1 = new BigDecimal(r.nextDouble(),mc);
+                BigDecimal inpf2 = new BigDecimal(r.nextDouble(),mc);
+                lt.setValue(inpf1);
+                rt.setValue(inpf2);
+                Result resf = op.getNextBigDecimal();
+                BigDecimal expected = inpf1.divide(inpf2, 2 * Divide.BIGDECIMAL_MINIMAL_SCALE + 1, RoundingMode.HALF_UP);
+                assertEquals(expected, (BigDecimal)resf.result);
+
+                // test with null in lhs
+                lt.setValue(null);
+                rt.setValue(inpf2);
+                resf = op.getNextBigDecimal();
+                assertEquals(null, (BigDecimal)resf.result);
+                // test with null in rhs
+                lt.setValue(inpf1);
+                rt.setValue(null);
+                resf = op.getNextBigDecimal();
+                assertEquals(null, (BigDecimal)resf.result);
+                // test divide by 0
+                lt.setValue(inpf1);
+                rt.setValue(new BigDecimal(0.0f,mc));
+                resf = op.getNextBigDecimal();
+                assertEquals(null, (BigDecimal)resf.result);
+                break;
+            }
             case DataType.DATETIME:
                 DateTime inpdt1 = new DateTime(r.nextLong());
                 DateTime inpdt2 = new DateTime(r.nextLong());

Modified: pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java Wed Feb 22 09:43:41 2017
@@ -23,13 +23,13 @@ import static org.junit.Assert.assertTru
 
 import java.io.File;
 import java.io.FileWriter;
+import java.io.IOException;
 import java.io.PrintWriter;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.pig.PigRunner;
-import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.junit.AfterClass;
@@ -38,16 +38,15 @@ import org.junit.Test;
 
 public class TestEmptyInputDir {
 
-    private static MiniCluster cluster; 
+    private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
     private static final String EMPTY_DIR = "emptydir";
     private static final String INPUT_FILE = "input";
     private static final String OUTPUT_FILE = "output";
     private static final String PIG_FILE = "test.pig";
 
-    
+
     @BeforeClass
     public static void setUpBeforeClass() throws Exception {
-        cluster = MiniCluster.buildCluster();
         FileSystem fs = cluster.getFileSystem();
         if (!fs.mkdirs(new Path(EMPTY_DIR))) {
             throw new Exception("failed to create empty dir");
@@ -64,7 +63,35 @@ public class TestEmptyInputDir {
     public static void tearDownAfterClass() throws Exception {
         cluster.shutDown();
     }
-    
+
+    @Test
+    public void testGroupBy() throws Exception {
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + EMPTY_DIR + "';");
+        w.println("B = group A by $0;");
+        w.println("store B into '" + OUTPUT_FILE + "';");
+        w.close();
+
+        try {
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+            PigStats stats = PigRunner.run(args, null);
+
+            assertTrue(stats.isSuccessful());
+
+            // This assert fails on 205 due to MAPREDUCE-3606
+            if (Util.isMapredExecType(cluster.getExecType())
+                    && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+                MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
+                assertEquals(0, js.getNumberMaps());
+            }
+
+            assertEmptyOutputFile();
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, OUTPUT_FILE);
+        }
+    }
+
     @Test
     public void testSkewedJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -73,31 +100,28 @@ public class TestEmptyInputDir {
         w.println("C = join B by $0, A by $0 using 'skewed';");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
+
             assertTrue(stats.isSuccessful());
-            // the sampler job has zero maps
-            MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
-            
+
             // This assert fails on 205 due to MAPREDUCE-3606
-            if (!Util.isHadoop205()&&!Util.isHadoop1_x())
-                assertEquals(0, js.getNumberMaps()); 
-            
-            FileSystem fs = cluster.getFileSystem();
-            FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
-            assertTrue(status.isDir());
-            assertEquals(0, status.getLen());
-            // output directory isn't empty
-            assertTrue(fs.listStatus(status.getPath()).length > 0);
+            if (Util.isMapredExecType(cluster.getExecType())
+                    && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+                // the sampler job has zero maps
+                MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
+                assertEquals(0, js.getNumberMaps());
+            }
+
+            assertEmptyOutputFile();
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
         }
     }
-    
+
     @Test
     public void testMergeJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -106,32 +130,28 @@ public class TestEmptyInputDir {
         w.println("C = join A by $0, B by $0 using 'merge';");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
-            assertTrue(stats.isSuccessful());    
-            // the indexer job has zero maps
-            MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
-            
+
+            assertTrue(stats.isSuccessful());
+
             // This assert fails on 205 due to MAPREDUCE-3606
-            if (!Util.isHadoop205()&&!Util.isHadoop1_x())
-                assertEquals(0, js.getNumberMaps()); 
-            
-            FileSystem fs = cluster.getFileSystem();
-            FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
-            assertTrue(status.isDir());
-            assertEquals(0, status.getLen());
-            
-            // output directory isn't empty
-            assertTrue(fs.listStatus(status.getPath()).length > 0);            
+            if (Util.isMapredExecType(cluster.getExecType())
+                    && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+                // the indexer job has zero maps
+                MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
+                assertEquals(0, js.getNumberMaps());
+            }
+
+            assertEmptyOutputFile();
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
         }
     }
-    
+
     @Test
     public void testFRJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -140,55 +160,44 @@ public class TestEmptyInputDir {
         w.println("C = join A by $0, B by $0 using 'repl';");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
-            assertTrue(stats.isSuccessful());    
-            // the indexer job has zero maps
-            MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
-            
+
+            assertTrue(stats.isSuccessful());
+
             // This assert fails on 205 due to MAPREDUCE-3606
-            if (!Util.isHadoop205()&&!Util.isHadoop1_x())
-                assertEquals(0, js.getNumberMaps()); 
-            
-            FileSystem fs = cluster.getFileSystem();
-            FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
-            assertTrue(status.isDir());
-            assertEquals(0, status.getLen());
-            
-            // output directory isn't empty
-            assertTrue(fs.listStatus(status.getPath()).length > 0);            
+            if (Util.isMapredExecType(cluster.getExecType())
+                    && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+                MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
+                assertEquals(0, js.getNumberMaps());
+            }
+
+            assertEmptyOutputFile();
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
         }
     }
-    
+
     @Test
     public void testRegularJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
         w.println("A = load '" + INPUT_FILE + "';");
         w.println("B = load '" + EMPTY_DIR + "';");
-        w.println("C = join B by $0, A by $0;");
+        w.println("C = join B by $0, A by $0 PARALLEL 0;");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
-            assertTrue(stats.isSuccessful());   
-            
-            FileSystem fs = cluster.getFileSystem();
-            FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
-            assertTrue(status.isDir());
-            assertEquals(0, status.getLen());
-            
-            // output directory isn't empty
-            assertTrue(fs.listStatus(status.getPath()).length > 0);            
-            
+
+            assertTrue(stats.isSuccessful());
+
+            assertEmptyOutputFile();
+
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
@@ -203,19 +212,19 @@ public class TestEmptyInputDir {
         w.println("C = join B by $0 right outer, A by $0;");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
-            assertTrue(stats.isSuccessful());               
-            assertEquals(2, stats.getNumberRecords(OUTPUT_FILE));                  
+
+            assertTrue(stats.isSuccessful());
+            assertEquals(2, stats.getNumberRecords(OUTPUT_FILE));
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
         }
     }
-    
+
     @Test
     public void testLeftOuterJoin() throws Exception {
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -224,16 +233,88 @@ public class TestEmptyInputDir {
         w.println("C = join B by $0 left outer, A by $0;");
         w.println("store C into '" + OUTPUT_FILE + "';");
         w.close();
-        
+
+        try {
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+            PigStats stats = PigRunner.run(args, null);
+
+            assertTrue(stats.isSuccessful());
+            assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, OUTPUT_FILE);
+        }
+    }
+
+    @Test
+    public void testBloomJoin() throws Exception {
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + INPUT_FILE + "' as (x:int);");
+        w.println("B = load '" + EMPTY_DIR + "' as (x:int);");
+        w.println("C = join B by $0, A by $0 using 'bloom';");
+        w.println("D = join A by $0, B by $0 using 'bloom';");
+        w.println("store C into '" + OUTPUT_FILE + "';");
+        w.println("store D into 'output1';");
+        w.close();
+
+        try {
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+            PigStats stats = PigRunner.run(args, null);
+
+            assertTrue(stats.isSuccessful());
+            assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
+            assertEquals(0, stats.getNumberRecords("output1"));
+            assertEmptyOutputFile();
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, OUTPUT_FILE);
+            Util.deleteFile(cluster, "output1");
+        }
+    }
+
+    @Test
+    public void testBloomJoinOuter() throws Exception {
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + INPUT_FILE + "' as (x:int);");
+        w.println("B = load '" + EMPTY_DIR + "' as (x:int);");
+        w.println("C = join B by $0 left outer, A by $0 using 'bloom';");
+        w.println("D = join A by $0 left outer, B by $0 using 'bloom';");
+        w.println("E = join B by $0 right outer, A by $0 using 'bloom';");
+        w.println("F = join A by $0 right outer, B by $0 using 'bloom';");
+        w.println("store C into '" + OUTPUT_FILE + "';");
+        w.println("store D into 'output1';");
+        w.println("store E into 'output2';");
+        w.println("store F into 'output3';");
+        w.close();
+
         try {
-            String[] args = { PIG_FILE };
+            String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
             PigStats stats = PigRunner.run(args, null);
-     
-            assertTrue(stats.isSuccessful());               
-            assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));                  
+
+            assertTrue(stats.isSuccessful());
+            assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
+            assertEquals(2, stats.getNumberRecords("output1"));
+            assertEquals(2, stats.getNumberRecords("output2"));
+            assertEquals(0, stats.getNumberRecords("output3"));
+            assertEmptyOutputFile();
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
+            Util.deleteFile(cluster, "output1");
+            Util.deleteFile(cluster, "output2");
+            Util.deleteFile(cluster, "output3");
         }
     }
+
+    private void assertEmptyOutputFile() throws IllegalArgumentException, IOException {
+        FileSystem fs = cluster.getFileSystem();
+        FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
+        assertTrue(status.isDir());
+        assertEquals(0, status.getLen());
+        // output directory isn't empty. Has one empty file
+        FileStatus[] files = fs.listStatus(status.getPath(), Util.getSuccessMarkerPathFilter());
+        assertEquals(1, files.length);
+        assertEquals(0, files[0].getLen());
+        assertTrue(files[0].getPath().getName().startsWith("part-"));
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java Wed Feb 22 09:43:41 2017
@@ -200,11 +200,11 @@ public class TestErrorHandlingStoreFunc
     private void updatePigProperties(boolean allowErrors, long minErrors,
             double errorThreshold) {
         Properties properties = pigServer.getPigContext().getProperties();
-        properties.put(PigConfiguration.PIG_ALLOW_STORE_ERRORS,
+        properties.put(PigConfiguration.PIG_ERROR_HANDLING_ENABLED,
                 Boolean.toString(allowErrors));
-        properties.put(PigConfiguration.PIG_ERRORS_MIN_RECORDS,
+        properties.put(PigConfiguration.PIG_ERROR_HANDLING_MIN_ERROR_RECORDS,
                 Long.toString(minErrors));
-        properties.put(PigConfiguration.PIG_ERROR_THRESHOLD_PERCENT,
+        properties.put(PigConfiguration.PIG_ERROR_HANDLING_THRESHOLD_PERCENT,
                 Double.toString(errorThreshold));
     }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java Wed Feb 22 09:43:41 2017
@@ -291,7 +291,7 @@ public class TestEvalPipeline {
             myMap.put("long", new Long(1));
             myMap.put("float", new Float(1.0));
             myMap.put("double", new Double(1.0));
-            myMap.put("dba", new DataByteArray(new String("bytes").getBytes()));
+            myMap.put("dba", new DataByteArray(new String("1234").getBytes()));
             myMap.put("map", mapInMap);
             myMap.put("tuple", tuple);
             myMap.put("bag", bag);
@@ -794,32 +794,31 @@ public class TestEvalPipeline {
     }
 
     @Test
-    public void testMapUDFfail() throws Exception{
+    public void testMapUDFWithImplicitTypeCast() throws Exception{
         int LOOP_COUNT = 2;
         File tmpFile = Util.createTempFileDelOnExit("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < LOOP_COUNT; i++) {
-            for(int j=0;j<LOOP_COUNT;j+=2){
-                ps.println(i+"\t"+j);
-                ps.println(i+"\t"+j);
-            }
+            ps.println(i);
         }
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); //the argument does not matter
-        String query = "C = foreach B {"
-        + "generate mymap#'dba' * 10;"
-        + "};";
+        String query = "C = foreach B generate mymap#'dba' * 10; ";
 
         pigServer.registerQuery(query);
-        try {
-            pigServer.openIterator("C");
-            Assert.fail("Error expected.");
-        } catch (Exception e) {
-            e.getMessage().contains("Cannot determine");
+
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        if(!iter.hasNext()) Assert.fail("No output found");
+        int numIdentity = 0;
+        while(iter.hasNext()){
+            Tuple t = iter.next();
+            Assert.assertEquals(new Integer(12340), (Integer)t.get(0));
+            ++numIdentity;
         }
+        Assert.assertEquals(LOOP_COUNT, numIdentity);
     }
 
     @Test

Modified: pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java Wed Feb 22 09:43:41 2017
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalMap;
@@ -38,10 +37,10 @@ import org.apache.pig.impl.builtin.FindQ
 import org.junit.Test;
 
 public class TestFindQuantiles {
-    
+
     private static TupleFactory tFact = TupleFactory.getInstance();
     private static final float epsilon = 0.0001f;
-    
+
     @Test
     public void testFindQuantiles() throws Exception {
        final int numSamples = 97778;
@@ -50,7 +49,7 @@ public class TestFindQuantiles {
        System.out.println("sum: " + sum);
        assertTrue(sum > (1-epsilon) && sum < (1+epsilon));
     }
-    
+
     @Test
     public void testFindQuantiles2() throws Exception {
        final int numSamples = 30000;
@@ -86,7 +85,7 @@ public class TestFindQuantiles {
     }
 
     private float[] getProbVec(Tuple values) throws Exception {
-        float[] probVec = new float[values.size()];        
+        float[] probVec = new float[values.size()];
         for(int i = 0; i < values.size(); i++) {
             probVec[i] = (Float)values.get(i);
         }
@@ -95,7 +94,7 @@ public class TestFindQuantiles {
 
     private DataBag generateRandomSortedSamples(int numSamples, int max) throws Exception {
         Random rand = new Random(1000);
-        List<Tuple> samples = new ArrayList<Tuple>(); 
+        List<Tuple> samples = new ArrayList<Tuple>();
         for (int i=0; i<numSamples; i++) {
             Tuple t = tFact.newTuple(1);
             t.set(0, rand.nextInt(max));
@@ -106,7 +105,7 @@ public class TestFindQuantiles {
     }
 
     private DataBag generateUniqueSamples(int numSamples) throws Exception {
-        DataBag samples = BagFactory.getInstance().newDefaultBag(); 
+        DataBag samples = BagFactory.getInstance().newDefaultBag();
         for (int i=0; i<numSamples; i++) {
             Tuple t = tFact.newTuple(1);
             t.set(0, new Integer(23));
@@ -121,9 +120,9 @@ public class TestFindQuantiles {
 
         in.set(0, new Integer(numReduceres));
         in.set(1, samples);
-        
+
         FindQuantiles fq = new FindQuantiles();
-        
+
         Map<String, Object> res = fq.exec(in);
         return res;
     }
@@ -135,12 +134,11 @@ public class TestFindQuantiles {
         InternalMap weightedPartsData = (InternalMap) res.get(FindQuantiles.WEIGHTED_PARTS);
         Iterator<Object> it = weightedPartsData.values().iterator();
         float[] probVec = getProbVec((Tuple)it.next());
-        new DiscreteProbabilitySampleGenerator(probVec);
         float sum = 0.0f;
         for (float f : probVec) {
             sum += f;
         }
         return sum;
     }
-    
+
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Wed Feb 22 09:43:41 2017
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Random;
 
 import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.Test;
@@ -105,6 +106,31 @@ public class TestForEachNestedPlanLocal
     }
 
     @Test
+    public void testNestedCrossTwoRelationsLimit() throws Exception {
+        Storage.Data data = Storage.resetData(pig);
+        data.set("input",
+                Storage.tuple(Storage.bag(Storage.tuple(1, 1), Storage.tuple(1, 2)), Storage.bag(Storage.tuple(1, 3), Storage.tuple(1, 4))),
+                Storage.tuple(Storage.bag(Storage.tuple(2, 1), Storage.tuple(2, 2)), Storage.bag(Storage.tuple(2, 3))),
+                Storage.tuple(Storage.bag(Storage.tuple(3, 1)), Storage.bag(Storage.tuple(3, 2))));
+
+        pig.setBatchOn();
+        pig.registerQuery("A = load 'input' using mock.Storage() as (bag1:bag{tup1:tuple(f1:int, f2:int)}, bag2:bag{tup2:tuple(f3:int, f4:int)});");
+        pig.registerQuery("B = foreach A {"
+                + "crossed = cross bag1, bag2;"
+                + "filtered = filter crossed by f1 == f3;"
+                + "lmt = limit filtered 1;"
+                + "generate FLATTEN(lmt);" + "}");
+        pig.registerQuery("store B into 'output' using mock.Storage();");
+
+        pig.executeBatch();
+
+        List<Tuple> actualResults = data.get("output");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                new String[] {"(1, 1, 1, 3)", "(2, 1, 2, 3)", "(3, 1, 3, 2)"});
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+    }
+
+    @Test
     public void testNestedCrossTwoRelationsComplex() throws Exception {
         File[] tmpFiles = generateDataSetFilesForNestedCross();
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStringAsByteArray(new String[] {

Modified: pig/branches/spark/test/org/apache/pig/test/TestGFCross.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGFCross.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGFCross.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGFCross.java Wed Feb 22 09:43:41 2017
@@ -20,6 +20,7 @@ package org.apache.pig.test;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -50,6 +51,7 @@ public class TestGFCross {
     public void testSerial() throws Exception {
         Configuration cfg = new Configuration();
         cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "1");
+        cfg.set(MRConfiguration.TASK_ID, "task_1473802673416_1808_m_000000");
         UDFContext.getUDFContext().addJobConf(cfg);
         Tuple t = TupleFactory.getInstance().newTuple(2);
 
@@ -66,6 +68,7 @@ public class TestGFCross {
     public void testParallelSet() throws Exception {
         Configuration cfg = new Configuration();
         cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "10");
+        cfg.set(MRConfiguration.TASK_ID, "task_14738102975522_0001_r_000000");
         UDFContext.getUDFContext().addJobConf(cfg);
         Tuple t = TupleFactory.getInstance().newTuple(2);
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Wed Feb 22 09:43:41 2017
@@ -28,6 +28,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileReader;
 import java.io.FileWriter;
+import java.io.FilenameFilter;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
@@ -970,7 +971,6 @@ public class TestGrunt {
 
     @Test
     public void testStopOnFailure() throws Throwable {
-        Assume.assumeTrue("Skip this test for TEZ", Util.isMapredExecType(cluster.getExecType()));
         PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
         PigContext context = server.getPigContext();
         context.getProperties().setProperty("stop.on.failure", ""+true);
@@ -1569,4 +1569,20 @@ public class TestGrunt {
         }
         assertTrue(found);
     }
+
+    @Test
+    public void testGruntUtf8() throws Throwable {
+        String command = "mkdir 测试\n" +
+                "quit\n";
+        System.setProperty("jline.WindowsTerminal.directConsole", "false");
+        System.setIn(new ByteArrayInputStream(command.getBytes()));
+        org.apache.pig.PigRunner.run(new String[] {"-x", "local"}, null);
+        File[] partFiles = new File(".").listFiles(new FilenameFilter() {
+            public boolean accept(File dir, String name) { 
+            return name.equals("测试");
+        }
+        });
+        assertEquals(partFiles.length, 1);
+        new File("测试").delete();
+    }
 }