You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/22 09:43:46 UTC
svn commit: r1783988 [16/24] - in /pig/branches/spark: ./ bin/ conf/
contrib/piggybank/java/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...
Added: pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java (added)
+++ pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java Wed Feb 22 09:43:41 2017
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
+
+public class SparkMiniCluster extends MiniGenericCluster {
+ private static final File CONF_DIR = new File("build/classes");
+ private static final File CORE_CONF_FILE = new File(CONF_DIR, "core-site.xml");
+ private static final File HDFS_CONF_FILE = new File(CONF_DIR, "hdfs-site.xml");
+ private static final File MAPRED_CONF_FILE = new File(CONF_DIR, "mapred-site.xml");
+ private static final File YARN_CONF_FILE = new File(CONF_DIR, "yarn-site.xml");
+
+ private Configuration m_dfs_conf = null;
+ protected MiniMRYarnCluster m_mr = null;
+ private Configuration m_mr_conf = null;
+
+ private static final Log LOG = LogFactory
+ .getLog(SparkMiniCluster.class);
+ private ExecType spark = new SparkExecType();
+ SparkMiniCluster() {
+
+ }
+
+ @Override
+ public ExecType getExecType() {
+ return spark;
+ }
+
+ @Override
+ protected void setupMiniDfsAndMrClusters() {
+ try {
+ deleteConfFiles();
+ CONF_DIR.mkdirs();
+
+ // Build mini DFS cluster
+ Configuration hdfsConf = new Configuration();
+ m_dfs = new MiniDFSCluster.Builder(hdfsConf)
+ .numDataNodes(2)
+ .format(true)
+ .racks(null)
+ .build();
+ m_fileSys = m_dfs.getFileSystem();
+ m_dfs_conf = m_dfs.getConfiguration(0);
+
+ //Create user home directory
+ m_fileSys.mkdirs(m_fileSys.getWorkingDirectory());
+ // Write core-site.xml
+ Configuration core_site = new Configuration(false);
+ core_site.set(FileSystem.FS_DEFAULT_NAME_KEY, m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+ core_site.writeXml(new FileOutputStream(CORE_CONF_FILE));
+
+ Configuration hdfs_site = new Configuration(false);
+ for (Map.Entry<String, String> conf : m_dfs_conf) {
+ if (ArrayUtils.contains(m_dfs_conf.getPropertySources(conf.getKey()), "programatically")) {
+ hdfs_site.set(conf.getKey(), m_dfs_conf.getRaw(conf.getKey()));
+ }
+ }
+ hdfs_site.writeXml(new FileOutputStream(HDFS_CONF_FILE));
+
+ // Build mini YARN cluster
+ m_mr = new MiniMRYarnCluster("PigMiniCluster", 2);
+ m_mr.init(m_dfs_conf);
+ m_mr.start();
+ m_mr_conf = m_mr.getConfig();
+ m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ System.getProperty("java.class.path"));
+
+ Configuration mapred_site = new Configuration(false);
+ Configuration yarn_site = new Configuration(false);
+ for (Map.Entry<String, String> conf : m_mr_conf) {
+ if (ArrayUtils.contains(m_mr_conf.getPropertySources(conf.getKey()), "programatically")) {
+ if (conf.getKey().contains("yarn")) {
+ yarn_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
+ } else if (!conf.getKey().startsWith("dfs")){
+ mapred_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
+ }
+ }
+ }
+
+ mapred_site.writeXml(new FileOutputStream(MAPRED_CONF_FILE));
+ yarn_site.writeXml(new FileOutputStream(YARN_CONF_FILE));
+
+ m_conf = m_mr_conf;
+ System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
+ System.setProperty("hadoop.log.dir", "build/test/logs");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+
+ }
+ }
+
+ @Override
+ protected void shutdownMiniMrClusters() {
+ deleteConfFiles();
+ if (m_mr != null) {
+ m_mr.stop();
+ m_mr = null;
+ }
+ }
+
+ private void deleteConfFiles() {
+
+ if(CORE_CONF_FILE.exists()) {
+ CORE_CONF_FILE.delete();
+ }
+ if(HDFS_CONF_FILE.exists()) {
+ HDFS_CONF_FILE.delete();
+ }
+ if(MAPRED_CONF_FILE.exists()) {
+ MAPRED_CONF_FILE.delete();
+ }
+ if(YARN_CONF_FILE.exists()) {
+ YARN_CONF_FILE.delete();
+ }
+ }
+
+ static public Launcher getLauncher() {
+ return new SparkLauncher();
+ }
+}
Modified: pig/branches/spark/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBZip.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestBZip.java Wed Feb 22 09:43:41 2017
@@ -43,7 +43,6 @@ import org.apache.hadoop.mapreduce.Input
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -67,16 +66,10 @@ public class TestBZip {
@Parameters(name = "pig.bzip.use.hadoop.inputformat = {0}.")
public static Iterable<Object[]> data() {
- if ( HadoopShims.isHadoopYARN() ) {
- return Arrays.asList(new Object[][] {
- { false },
- { true }
- });
- } else {
- return Arrays.asList(new Object[][] {
- { false }
- });
- }
+ return Arrays.asList(new Object[][] {
+ { false },
+ { true }
+ });
}
public TestBZip (Boolean useBzipFromHadoop) {
Modified: pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java Wed Feb 22 09:43:41 2017
@@ -130,6 +130,7 @@ import org.apache.pig.data.DefaultBagFac
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.ReadToEndLoader;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -3206,29 +3207,31 @@ public class TestBuiltin {
@Test
public void testUniqueID() throws Exception {
Util.resetStateForExecModeSwitch();
- String inputFileName = "testUniqueID.txt";
- Util.createInputFile(cluster, inputFileName, new String[]
- {"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"});
Properties copyproperties = new Properties();
copyproperties.putAll(cluster.getProperties());
PigServer pigServer = new PigServer(cluster.getExecType(), copyproperties);
- pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size", "10");
+
+ // running with 2 mappers each taking 5 records
+ String TMP_DIR = FileLocalizer.getTemporaryPath(pigServer.getPigContext()).toUri().getPath();
+ Util.createInputFile(cluster, TMP_DIR + "/input1.txt", new String[] {"1\n2\n3\n4\n5"});
+ Util.createInputFile(cluster, TMP_DIR + "/input2.txt", new String[] {"1\n2\n3\n4\n5"});
pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true");
- pigServer.registerQuery("A = load '" + inputFileName + "' as (name);");
+
+ pigServer.registerQuery("A = load '" + TMP_DIR + "' as (name);");
pigServer.registerQuery("B = foreach A generate name, UniqueID();");
Iterator<Tuple> iter = pigServer.openIterator("B");
if (!Util.isSparkExecType(cluster.getExecType())) {
- assertEquals(iter.next().get(1), "0-0");
- assertEquals(iter.next().get(1), "0-1");
- assertEquals(iter.next().get(1), "0-2");
- assertEquals(iter.next().get(1), "0-3");
- assertEquals(iter.next().get(1), "0-4");
- assertEquals(iter.next().get(1), "1-0");
- assertEquals(iter.next().get(1), "1-1");
- assertEquals(iter.next().get(1), "1-2");
- assertEquals(iter.next().get(1), "1-3");
- assertEquals(iter.next().get(1), "1-4");
- } else{
+ assertEquals("0-0", iter.next().get(1));
+ assertEquals("0-1", iter.next().get(1));
+ assertEquals("0-2", iter.next().get(1));
+ assertEquals("0-3", iter.next().get(1));
+ assertEquals("0-4", iter.next().get(1));
+ assertEquals("1-0", iter.next().get(1));
+ assertEquals("1-1", iter.next().get(1));
+ assertEquals("1-2", iter.next().get(1));
+ assertEquals("1-3", iter.next().get(1));
+ assertEquals("1-4", iter.next().get(1));
+ } else {
//there will be 2 InputSplits when mapred.max.split.size is 10(byte) for the testUniqueID.txt(20 bytes)
//Split0:
// 1\n
@@ -3244,34 +3247,35 @@ public class TestBuiltin {
// 5\n
//The size of Split0 is 12 not 10 because LineRecordReader#nextKeyValue will read one more line
//More detail see PIG-4383
- assertEquals(iter.next().get(1), "0-0");
- assertEquals(iter.next().get(1), "0-1");
- assertEquals(iter.next().get(1), "0-2");
- assertEquals(iter.next().get(1), "0-3");
- assertEquals(iter.next().get(1), "0-4");
- assertEquals(iter.next().get(1), "0-5");
- assertEquals(iter.next().get(1), "1-0");
- assertEquals(iter.next().get(1), "1-1");
- assertEquals(iter.next().get(1), "1-2");
- assertEquals(iter.next().get(1), "1-3");
+ assertEquals("0-0", iter.next().get(1));
+ assertEquals("0-1", iter.next().get(1));
+ assertEquals("0-2", iter.next().get(1));
+ assertEquals("0-3", iter.next().get(1));
+ assertEquals("0-4", iter.next().get(1));
+ assertEquals("0-5", iter.next().get(1));
+ assertEquals("1-0", iter.next().get(1));
+ assertEquals("1-1", iter.next().get(1));
+ assertEquals("1-2", iter.next().get(1));
+ assertEquals("1-3", iter.next().get(1));
}
- Util.deleteFile(cluster, inputFileName);
+ Util.deleteFile(cluster, TMP_DIR + "/input1.txt");
+ Util.deleteFile(cluster, TMP_DIR + "/input2.txt");
}
@Test
public void testRANDOMWithJob() throws Exception {
Util.resetStateForExecModeSwitch();
- String inputFileName = "testRANDOM.txt";
- Util.createInputFile(cluster, inputFileName, new String[]
- {"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"});
-
Properties copyproperties = new Properties();
copyproperties.putAll(cluster.getProperties());
PigServer pigServer = new PigServer(cluster.getExecType(), copyproperties);
- // running with two mappers
- pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size", "10");
+
+ // running with 2 mappers each taking 5 records
+ String TMP_DIR = FileLocalizer.getTemporaryPath(pigServer.getPigContext()).toUri().getPath();
+ Util.createInputFile(cluster, TMP_DIR + "/input1.txt", new String[] {"1\n2\n3\n4\n5"});
+ Util.createInputFile(cluster, TMP_DIR + "/input2.txt", new String[] {"1\n2\n3\n4\n5"});
pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true");
- pigServer.registerQuery("A = load '" + inputFileName + "' as (name);");
+
+ pigServer.registerQuery("A = load '" + TMP_DIR + "' as (name);");
pigServer.registerQuery("B = foreach A generate name, RANDOM();");
Iterator<Tuple> iter = pigServer.openIterator("B");
double [] mapper1 = new double[5];
@@ -3294,7 +3298,8 @@ public class TestBuiltin {
for( int i = 0; i < 5; i++ ){
assertNotEquals(mapper1[i], mapper2[i], 0.0001);
}
- Util.deleteFile(cluster, inputFileName);
+ Util.deleteFile(cluster, TMP_DIR + "/input1.txt");
+ Util.deleteFile(cluster, TMP_DIR + "/input2.txt");
}
Added: pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java (added)
+++ pig/branches/spark/test/org/apache/pig/test/TestConfigurationUtil.java Wed Feb 22 09:43:41 2017
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+
+import java.util.Properties;
+
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestConfigurationUtil {
+
+ @Test
+ public void testExpandForAlternativeNames() {
+ Properties properties = null;
+ properties = ConfigurationUtil.expandForAlternativeNames("fs.df.interval", "500");
+ Assert.assertEquals(1,properties.size());
+ Assert.assertEquals("500",properties.get("fs.df.interval"));
+
+ properties = ConfigurationUtil.expandForAlternativeNames("dfs.df.interval", "600");
+ Assert.assertEquals(2,properties.size());
+ Assert.assertEquals("600",properties.get("fs.df.interval"));
+ Assert.assertEquals("600",properties.get("dfs.df.interval"));
+
+ properties = ConfigurationUtil.expandForAlternativeNames("", "");
+ Assert.assertEquals(1,properties.size());
+ Assert.assertEquals("",properties.get(""));
+
+ }
+}
Modified: pig/branches/spark/test/org/apache/pig/test/TestCounters.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCounters.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCounters.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCounters.java Wed Feb 22 09:43:41 2017
@@ -30,17 +30,17 @@ import java.util.Map;
import java.util.Random;
import org.apache.hadoop.fs.Path;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
-import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.junit.AfterClass;
+import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -49,8 +49,8 @@ import org.junit.runners.JUnit4;
public class TestCounters {
String file = "input.txt";
- static MiniCluster cluster = MiniCluster.buildCluster();
-
+ static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+
final int MAX = 100*1000;
Random r = new Random();
@@ -59,7 +59,7 @@ public class TestCounters {
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
-
+
@Test
public void testMapOnly() throws IOException, ExecException {
int count = 0;
@@ -70,13 +70,13 @@ public class TestCounters {
if(t > 50) count ++;
}
pw.close();
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = filter a by $0 > 50;");
pigServer.registerQuery("c = foreach b generate $0 - 50;");
ExecJob job = pigServer.store("c", "output_map_only");
PigStats pigStats = job.getStatistics();
-
+
//counting the no. of bytes in the output file
//long filesize = cluster.getFileSystem().getFileStatus(new Path("output_map_only")).getLen();
InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
@@ -85,9 +85,9 @@ public class TestCounters {
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output_map_only"), true);
@@ -98,7 +98,7 @@ public class TestCounters {
JobGraph jg = pigStats.getJobGraph();
Iterator<JobStats> iter = jg.iterator();
while (iter.hasNext()) {
- MRJobStats js = (MRJobStats) iter.next();
+ JobStats js = iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
@@ -123,20 +123,20 @@ public class TestCounters {
count ++;
}
pw.close();
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = filter a by $0 > 50;");
pigServer.registerQuery("c = foreach b generate $0 - 50;");
ExecJob job = pigServer.store("c", "output_map_only", "BinStorage");
PigStats pigStats = job.getStatistics();
-
+
InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
"output_map_only", pigServer.getPigContext()),
pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
cluster.getFileSystem().delete(new Path(file), true);
@@ -149,8 +149,8 @@ public class TestCounters {
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
- MRJobStats js = (MRJobStats) iter.next();
-
+ JobStats js = iter.next();
+
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -158,7 +158,7 @@ public class TestCounters {
assertEquals(0, js.getReduceInputRecords());
assertEquals(0, js.getReduceOutputRecords());
}
-
+
System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
assertEquals(filesize, pigStats.getBytesWritten());
}
@@ -183,7 +183,7 @@ public class TestCounters {
if(nos[i] > 0) count ++;
}
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group;");
@@ -195,7 +195,7 @@ public class TestCounters {
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
cluster.getFileSystem().delete(new Path(file), true);
@@ -208,7 +208,7 @@ public class TestCounters {
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
- MRJobStats js = (MRJobStats) iter.next();
+ JobStats js = iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -242,7 +242,7 @@ public class TestCounters {
if(nos[i] > 0) count ++;
}
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group;");
@@ -253,9 +253,9 @@ public class TestCounters {
pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
@@ -266,7 +266,7 @@ public class TestCounters {
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
- MRJobStats js = (MRJobStats) iter.next();
+ JobStats js = iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -300,7 +300,7 @@ public class TestCounters {
if(nos[i] > 0) count ++;
}
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
@@ -311,20 +311,20 @@ public class TestCounters {
pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
System.out.println("============================================");
System.out.println("Test case MapCombineReduce");
System.out.println("============================================");
-
+
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
- MRJobStats js = (MRJobStats) iter.next();
+ JobStats js = iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -337,7 +337,7 @@ public class TestCounters {
System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
assertEquals(filesize, pigStats.getBytesWritten());
}
-
+
@Test
public void testMapCombineReduceBinStorage() throws IOException, ExecException {
int count = 0;
@@ -358,20 +358,20 @@ public class TestCounters {
if(nos[i] > 0) count ++;
}
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
ExecJob job = pigServer.store("c", "output", "BinStorage");
PigStats pigStats = job.getStatistics();
-
+
InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
@@ -379,11 +379,11 @@ public class TestCounters {
System.out.println("============================================");
System.out.println("Test case MapCombineReduce");
System.out.println("============================================");
-
+
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
- MRJobStats js = (MRJobStats) iter.next();
+ JobStats js = iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -399,6 +399,8 @@ public class TestCounters {
@Test
public void testMultipleMRJobs() throws IOException, ExecException {
+ Assume.assumeTrue("Skip this test for TEZ. Assert is done only for first MR job",
+ Util.isMapredExecType(cluster.getExecType()));
int count = 0;
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
int [] nos = new int[10];
@@ -413,38 +415,38 @@ public class TestCounters {
}
pw.close();
- for(int i = 0; i < 10; i++) {
+ for(int i = 0; i < 10; i++) {
if(nos[i] > 0) count ++;
}
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = order a by $0;");
pigServer.registerQuery("c = group b by $0;");
pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
ExecJob job = pigServer.store("d", "output");
PigStats pigStats = job.getStatistics();
-
+
InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
-
+
System.out.println("============================================");
System.out.println("Test case MultipleMRJobs");
System.out.println("============================================");
-
+
JobGraph jp = pigStats.getJobGraph();
- MRJobStats js = (MRJobStats)jp.getSinks().get(0);
-
+ JobStats js = (JobStats)jp.getSinks().get(0);
+
System.out.println("Job id: " + js.getName());
System.out.println(jp.toString());
-
+
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -453,12 +455,12 @@ public class TestCounters {
assertEquals(count, js.getReduceInputRecords());
System.out.println("Reduce output records : " + js.getReduceOutputRecords());
assertEquals(count, js.getReduceOutputRecords());
-
+
System.out.println("Hdfs bytes written : " + js.getHdfsBytesWritten());
assertEquals(filesize, js.getHdfsBytesWritten());
}
-
+
@Test
public void testMapOnlyMultiQueryStores() throws Exception {
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
@@ -467,8 +469,8 @@ public class TestCounters {
pw.println(t);
}
pw.close();
-
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE,
+
+ PigServer pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
pigServer.setBatchOn();
pigServer.registerQuery("a = load '" + file + "';");
@@ -479,22 +481,22 @@ public class TestCounters {
List<ExecJob> jobs = pigServer.executeBatch();
PigStats stats = jobs.get(0).getStatistics();
assertTrue(stats.getOutputLocations().size() == 2);
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
- MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0);
-
+ JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
+
Map<String, Long> entry = js.getMultiStoreCounters();
long counter = 0;
for (Long val : entry.values()) {
counter += val;
}
-
- assertEquals(MAX, counter);
- }
-
+
+ assertEquals(MAX, counter);
+ }
+
@Test
public void testMultiQueryStores() throws Exception {
int[] nums = new int[100];
@@ -505,13 +507,13 @@ public class TestCounters {
nums[t]++;
}
pw.close();
-
+
int groups = 0;
for (int i : nums) {
if (i > 0) groups++;
}
-
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE,
+
+ PigServer pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
pigServer.setBatchOn();
pigServer.registerQuery("a = load '" + file + "';");
@@ -525,29 +527,29 @@ public class TestCounters {
pigServer.registerQuery("store g into '/tmp/outout2';");
List<ExecJob> jobs = pigServer.executeBatch();
PigStats stats = jobs.get(0).getStatistics();
-
+
assertTrue(stats.getOutputLocations().size() == 2);
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
- MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0);
-
+ JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
+
Map<String, Long> entry = js.getMultiStoreCounters();
long counter = 0;
for (Long val : entry.values()) {
counter += val;
}
-
- assertEquals(groups, counter);
- }
-
- /*
+
+ assertEquals(groups, counter);
+ }
+
+ /*
* IMPORTANT NOTE:
* COMMENTED OUT BECAUSE COUNTERS DO NOT CURRENTLY WORK IN LOCAL MODE -
* SEE PIG-1286 - UNCOMMENT WHEN IT IS FIXED
- */
+ */
// @Test
// public void testLocal() throws IOException, ExecException {
// int count = 0;
@@ -566,7 +568,7 @@ public class TestCounters {
// }
// pw.close();
//
-// for(int i = 0; i < 10; i++)
+// for(int i = 0; i < 10; i++)
// if(nos[i] > 0)
// count ++;
//
@@ -580,56 +582,56 @@ public class TestCounters {
// pigServer.registerQuery("c = group b by $0;");
// pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
// PigStats pigStats = pigServer.store("d", "file://" + out.getAbsolutePath()).getStatistics();
-// InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+// InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), cluster.getExecType(), pigServer.getPigContext().getDfs());
// long filesize = 0;
// while(is.read() != -1) filesize++;
-//
+//
// is.close();
// out.delete();
-//
+//
// //Map<String, Map<String, String>> stats = pigStats.getPigStats();
-//
+//
// assertEquals(10, pigStats.getRecordsWritten());
// assertEquals(110, pigStats.getBytesWritten());
//
// }
@Test
- public void testJoinInputCounters() throws Exception {
+ public void testJoinInputCounters() throws Exception {
testInputCounters("join");
}
-
+
@Test
- public void testCogroupInputCounters() throws Exception {
+ public void testCogroupInputCounters() throws Exception {
testInputCounters("cogroup");
}
-
+
@Test
- public void testSkewedInputCounters() throws Exception {
+ public void testSkewedInputCounters() throws Exception {
testInputCounters("skewed");
}
-
+
@Test
- public void testSelfJoinInputCounters() throws Exception {
+ public void testSelfJoinInputCounters() throws Exception {
testInputCounters("self-join");
}
-
+
private static boolean multiInputCreated = false;
-
+
private static int count = 0;
-
- private void testInputCounters(String keyword) throws Exception {
+
+ private void testInputCounters(String keyword) throws Exception {
String file1 = "multi-input1.txt";
String file2 = "multi-input2.txt";
-
+
String output = keyword;
-
+
if (keyword.equals("self-join")) {
file2 = file1;
keyword = "join";
}
-
- final int MAX_NUM_RECORDS = 100;
+
+ final int MAX_NUM_RECORDS = 100;
if (!multiInputCreated) {
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file1));
for (int i = 0; i < MAX_NUM_RECORDS; i++) {
@@ -637,7 +639,7 @@ public class TestCounters {
pw.println(t);
}
pw.close();
-
+
PrintWriter pw2 = new PrintWriter(Util.createInputFile(cluster, file2));
for (int i = 0; i < MAX_NUM_RECORDS; i++) {
int t = r.nextInt(100);
@@ -649,8 +651,8 @@ public class TestCounters {
pw2.close();
multiInputCreated = true;
}
-
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE,
+
+ PigServer pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
pigServer.setBatchOn();
pigServer.registerQuery("a = load '" + file1 + "';");
@@ -661,7 +663,7 @@ public class TestCounters {
pigServer.registerQuery("c = join a by $0, b by $0 using 'skewed';");
}
ExecJob job = pigServer.store("c", output + "_output");
-
+
PigStats stats = job.getStatistics();
assertTrue(stats.isSuccessful());
List<InputStats> inputs = stats.getInputStats();
@@ -680,4 +682,46 @@ public class TestCounters {
}
}
}
+
+ @Test
+ public void testSplitUnionOutputCounters() throws Exception {
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, "splitunion-input"));
+ for (int i = 0; i < 10; i++) {
+ pw.println(i);
+ }
+ pw.close();
+ String query =
+ "a = load 'splitunion-input';" +
+ "split a into b if $0 < 5, c otherwise;" +
+ "d = union b, c;";
+
+ pigServer.registerQuery(query);
+
+ ExecJob job = pigServer.store("d", "splitunion-output-0", "PigStorage");
+ PigStats stats1 = job.getStatistics();
+
+ query =
+ "a = load 'splitunion-input';" +
+ "split a into b if $0 < 3, c if $0 > 2 and $0 < 6, d if $0 > 5;" +
+ "e = distinct d;" +
+ "f = union b, c, e;";
+
+ pigServer.registerQuery(query);
+
+ job = pigServer.store("f", "splitunion-output-1", "PigStorage");
+ PigStats stats2 = job.getStatistics();
+
+ PigStats[] pigStats = new PigStats[]{stats1, stats2};
+ for (int i = 0; i < 2; i++) {
+ PigStats stats = pigStats[i];
+ assertTrue(stats.isSuccessful());
+ List<OutputStats> outputs = stats.getOutputStats();
+ assertEquals(1, outputs.size());
+ OutputStats output = outputs.get(0);
+ assertEquals("splitunion-output-" + i, output.getName());
+ assertEquals(10, output.getNumberRecords());
+ assertEquals(20, output.getBytes());
+ }
+ }
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestDataBag.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestDataBag.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestDataBag.java Wed Feb 22 09:43:41 2017
@@ -17,17 +17,36 @@
*/
package org.apache.pig.test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
-import java.util.*;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
-
-
-import org.apache.pig.data.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.TreeSet;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.DistinctDataBag;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.InternalDistinctBag;
+import org.apache.pig.data.InternalSortedBag;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.SingleTupleBag;
+import org.apache.pig.data.SortedDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.Spillable;
import org.junit.After;
import org.junit.Test;
@@ -36,7 +55,7 @@ import org.junit.Test;
/**
* This class will exercise the basic Pig data model and members. It tests for proper behavior in
* assignment and comparison, as well as function application.
- *
+ *
* @author dnm
*/
public class TestDataBag {
@@ -590,7 +609,7 @@ public class TestDataBag {
}
mgr.forceSpill();
}
-
+
assertEquals("Size of distinct data bag is incorrect", rightAnswer.size(), b.size());
// Read tuples back, hopefully they come out in the same order.
@@ -719,14 +738,14 @@ public class TestDataBag {
@Test
public void testDefaultBagFactory() throws Exception {
BagFactory f = BagFactory.getInstance();
-
+
DataBag bag = f.newDefaultBag();
DataBag sorted = f.newSortedBag(null);
DataBag distinct = f.newDistinctBag();
assertTrue("Expected a default bag", (bag instanceof DefaultDataBag));
assertTrue("Expected a sorted bag", (sorted instanceof SortedDataBag));
- assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));
+ assertTrue("Expected a distinct bag", (distinct instanceof DistinctDataBag));
}
@Test
@@ -756,7 +775,7 @@ public class TestDataBag {
try {
BagFactory f = BagFactory.getInstance();
} catch (RuntimeException re) {
- assertEquals("Expected does not extend BagFactory message",
+ assertEquals("Expected does not extend BagFactory message",
"Provided factory org.apache.pig.test.TestDataBag does not extend BagFactory!",
re.getMessage());
caughtIt = true;
@@ -775,7 +794,7 @@ public class TestDataBag {
BagFactory.resetSelf();
}
-
+
@Test
public void testNonSpillableDataBagEquals1() throws Exception {
String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -789,7 +808,7 @@ public class TestDataBag {
}
assertEquals(bg1, bg2);
}
-
+
@Test
public void testNonSpillableDataBagEquals2() throws Exception {
String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -804,7 +823,7 @@ public class TestDataBag {
}
assertEquals(bg1, bg2);
}
-
+
@Test
public void testDefaultDataBagEquals1() throws Exception {
String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -820,7 +839,7 @@ public class TestDataBag {
}
assertEquals(bg1, bg2);
}
-
+
@Test
public void testDefaultDataBagEquals2() throws Exception {
String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
@@ -837,35 +856,35 @@ public class TestDataBag {
}
assertEquals(bg1, bg2);
}
-
- public void testInternalCachedBag() throws Exception {
+
+ public void testInternalCachedBag() throws Exception {
// check adding empty tuple
DataBag bg0 = new InternalCachedBag();
bg0.add(TupleFactory.getInstance().newTuple());
bg0.add(TupleFactory.getInstance().newTuple());
assertEquals(bg0.size(), 2);
-
+
// check equal of bags
DataBag bg1 = new InternalCachedBag(1, 0.5f);
assertEquals(bg1.size(), 0);
-
+
String[][] tupleContents = new String[][] {{"a", "b"},{"c", "d" }, { "e", "f"} };
for (int i = 0; i < tupleContents.length; i++) {
bg1.add(Util.createTuple(tupleContents[i]));
}
-
+
// check size, and isSorted(), isDistinct()
assertEquals(bg1.size(), 3);
assertFalse(bg1.isSorted());
assertFalse(bg1.isDistinct());
-
+
tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
DataBag bg2 = new InternalCachedBag(1, 0.5f);
for (int i = 0; i < tupleContents.length; i++) {
bg2.add(Util.createTuple(tupleContents[i]));
}
assertEquals(bg1, bg2);
-
+
// check bag with data written to disk
DataBag bg3 = new InternalCachedBag(1, 0.0f);
tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
@@ -873,7 +892,7 @@ public class TestDataBag {
bg3.add(Util.createTuple(tupleContents[i]));
}
assertEquals(bg1, bg3);
-
+
// check iterator
Iterator<Tuple> iter = bg3.iterator();
DataBag bg4 = new InternalCachedBag(1, 0.0f);
@@ -881,7 +900,7 @@ public class TestDataBag {
bg4.add(iter.next());
}
assertEquals(bg3, bg4);
-
+
// call iterator methods with irregular order
iter = bg3.iterator();
assertTrue(iter.hasNext());
@@ -894,46 +913,46 @@ public class TestDataBag {
assertFalse(iter.hasNext());
assertFalse(iter.hasNext());
assertEquals(bg3, bg5);
-
-
+
+
bg4.clear();
- assertEquals(bg4.size(), 0);
+ assertEquals(bg4.size(), 0);
}
-
- public void testInternalSortedBag() throws Exception {
-
+
+ public void testInternalSortedBag() throws Exception {
+
// check adding empty tuple
DataBag bg0 = new InternalSortedBag();
bg0.add(TupleFactory.getInstance().newTuple());
bg0.add(TupleFactory.getInstance().newTuple());
assertEquals(bg0.size(), 2);
-
+
// check equal of bags
DataBag bg1 = new InternalSortedBag();
assertEquals(bg1.size(), 0);
-
+
String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"c", "d" }};
for (int i = 0; i < tupleContents.length; i++) {
bg1.add(Util.createTuple(tupleContents[i]));
}
-
+
// check size, and isSorted(), isDistinct()
assertEquals(bg1.size(), 3);
assertTrue(bg1.isSorted());
assertFalse(bg1.isDistinct());
-
+
tupleContents = new String[][] {{"c", "d" }, {"a", "b"},{ "e", "f"} };
DataBag bg2 = new InternalSortedBag();
for (int i = 0; i < tupleContents.length; i++) {
bg2.add(Util.createTuple(tupleContents[i]));
}
assertEquals(bg1, bg2);
-
+
Iterator<Tuple> iter = bg1.iterator();
iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-
+
// check bag with data written to disk
DataBag bg3 = new InternalSortedBag(1, 0.0f, null);
tupleContents = new String[][] {{ "e", "f"}, {"c", "d" }, {"a", "b"}};
@@ -941,17 +960,17 @@ public class TestDataBag {
bg3.add(Util.createTuple(tupleContents[i]));
}
assertEquals(bg1, bg3);
-
+
iter = bg3.iterator();
iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
- iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-
+ iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+
// call iterator methods with irregular order
iter = bg3.iterator();
assertTrue(iter.hasNext());
assertTrue(iter.hasNext());
-
+
DataBag bg4 = new InternalSortedBag(1, 0.0f, null);
bg4.add(iter.next());
bg4.add(iter.next());
@@ -959,21 +978,21 @@ public class TestDataBag {
bg4.add(iter.next());
assertFalse(iter.hasNext());
assertFalse(iter.hasNext());
- assertEquals(bg3, bg4);
-
+ assertEquals(bg3, bg4);
+
// check clear
bg3.clear();
assertEquals(bg3.size(), 0);
-
+
// test with all data spill out
- DataBag bg5 = new InternalSortedBag();
+ DataBag bg5 = new InternalSortedBag();
for(int j=0; j<3; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg5.add(Util.createTuple(tupleContents[i]));
- }
+ }
bg5.spill();
}
-
+
assertEquals(bg5.size(), 9);
iter = bg5.iterator();
for(int i=0; i<3; i++) {
@@ -983,21 +1002,21 @@ public class TestDataBag {
iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
}
for(int i=0; i<3; i++) {
- iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+ iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
}
-
+
// test with most data spill out, with some data in memory
// and merge of spill files
- DataBag bg6 = new InternalSortedBag();
+ DataBag bg6 = new InternalSortedBag();
for(int j=0; j<104; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg6.add(Util.createTuple(tupleContents[i]));
- }
+ }
if (j != 103) {
bg6.spill();
}
}
-
+
assertEquals(bg6.size(), 104*3);
iter = bg6.iterator();
for(int i=0; i<104; i++) {
@@ -1007,55 +1026,55 @@ public class TestDataBag {
iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
}
for(int i=0; i<104; i++) {
- iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
+ iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
}
-
+
// check two implementation of sorted bag can compare correctly
- DataBag bg7 = new SortedDataBag(null);
+ DataBag bg7 = new SortedDataBag(null);
for(int j=0; j<104; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg7.add(Util.createTuple(tupleContents[i]));
- }
+ }
if (j != 103) {
bg7.spill();
}
}
assertEquals(bg6, bg7);
}
-
- public void testInternalDistinctBag() throws Exception {
+
+ public void testInternalDistinctBag() throws Exception {
// check adding empty tuple
DataBag bg0 = new InternalDistinctBag();
bg0.add(TupleFactory.getInstance().newTuple());
bg0.add(TupleFactory.getInstance().newTuple());
assertEquals(bg0.size(), 1);
-
+
// check equal of bags
DataBag bg1 = new InternalDistinctBag();
assertEquals(bg1.size(), 0);
-
+
String[][] tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
for (int i = 0; i < tupleContents.length; i++) {
bg1.add(Util.createTuple(tupleContents[i]));
}
-
+
// check size, and isSorted(), isDistinct()
assertEquals(bg1.size(), 3);
assertFalse(bg1.isSorted());
assertTrue(bg1.isDistinct());
-
+
tupleContents = new String[][] {{"a", "b" }, {"e", "d"}, {"e", "d"}, { "e", "f"} };
DataBag bg2 = new InternalDistinctBag();
for (int i = 0; i < tupleContents.length; i++) {
bg2.add(Util.createTuple(tupleContents[i]));
}
assertEquals(bg1, bg2);
-
+
Iterator<Tuple> iter = bg1.iterator();
iter.next().equals(Util.createTuple(new String[] {"a", "b"}));
iter.next().equals(Util.createTuple(new String[] {"c", "d"}));
iter.next().equals(Util.createTuple(new String[] {"e", "f"}));
-
+
// check bag with data written to disk
DataBag bg3 = new InternalDistinctBag(1, 0.0f);
tupleContents = new String[][] {{ "e", "f"}, {"a", "b"}, {"e", "d" }, {"a", "b"}, {"e", "f"}};
@@ -1064,13 +1083,13 @@ public class TestDataBag {
}
assertEquals(bg2, bg3);
assertEquals(bg3.size(), 3);
-
-
+
+
// call iterator methods with irregular order
iter = bg3.iterator();
assertTrue(iter.hasNext());
assertTrue(iter.hasNext());
-
+
DataBag bg4 = new InternalDistinctBag(1, 0.0f);
bg4.add(iter.next());
bg4.add(iter.next());
@@ -1078,73 +1097,73 @@ public class TestDataBag {
bg4.add(iter.next());
assertFalse(iter.hasNext());
assertFalse(iter.hasNext());
- assertEquals(bg3, bg4);
-
+ assertEquals(bg3, bg4);
+
// check clear
bg3.clear();
assertEquals(bg3.size(), 0);
-
+
// test with all data spill out
- DataBag bg5 = new InternalDistinctBag();
+ DataBag bg5 = new InternalDistinctBag();
for(int j=0; j<3; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg5.add(Util.createTuple(tupleContents[i]));
- }
+ }
bg5.spill();
}
-
+
assertEquals(bg5.size(), 3);
-
-
+
+
// test with most data spill out, with some data in memory
// and merge of spill files
- DataBag bg6 = new InternalDistinctBag();
+ DataBag bg6 = new InternalDistinctBag();
for(int j=0; j<104; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg6.add(Util.createTuple(tupleContents[i]));
- }
+ }
if (j != 103) {
bg6.spill();
}
}
-
- assertEquals(bg6.size(), 3);
-
+
+ assertEquals(bg6.size(), 3);
+
// check two implementation of sorted bag can compare correctly
- DataBag bg7 = new DistinctDataBag();
+ DataBag bg7 = new DistinctDataBag();
for(int j=0; j<104; j++) {
for (int i = 0; i < tupleContents.length; i++) {
bg7.add(Util.createTuple(tupleContents[i]));
- }
+ }
if (j != 103) {
bg7.spill();
}
}
assertEquals(bg6, bg7);
}
-
+
// See PIG-1231
@Test
public void testDataBagIterIdempotent() throws Exception {
DataBag bg0 = new DefaultDataBag();
processDataBag(bg0, true);
-
+
DataBag bg1 = new DistinctDataBag();
processDataBag(bg1, true);
-
+
DataBag bg2 = new InternalDistinctBag();
processDataBag(bg2, true);
-
+
DataBag bg3 = new InternalSortedBag();
processDataBag(bg3, true);
-
+
DataBag bg4 = new SortedDataBag(null);
processDataBag(bg4, true);
-
+
DataBag bg5 = new InternalCachedBag(0, 0);
processDataBag(bg5, false);
}
-
+
// See PIG-1285
@Test
public void testSerializeSingleTupleBag() throws Exception {
@@ -1159,7 +1178,7 @@ public class TestDataBag {
dfBag.readFields(dis);
assertTrue(dfBag.equals(stBag));
}
-
+
// See PIG-2550
static class MyCustomTuple extends DefaultTuple {
private static final long serialVersionUID = 8156382697467819543L;
@@ -1184,7 +1203,23 @@ public class TestDataBag {
Tuple t2 = iter.next();
assertTrue(t2.equals(t));
}
-
+
+ // See PIG-4260
+ @Test
+ public void testSpillArrayBackedList() throws Exception {
+ Tuple[] tuples = new Tuple[2];
+ tuples[0] = TupleFactory.getInstance().newTuple(1);
+ tuples[0].set(0, "first");
+ tuples[1] = TupleFactory.getInstance().newTuple(1);
+ tuples[1].set(0, "second");
+ DefaultDataBag bag = new DefaultDataBag(Arrays.asList(tuples));
+ bag.spill();
+ Iterator<Tuple> iter = bag.iterator();
+ assertEquals(tuples[0], iter.next());
+ assertEquals(tuples[1], iter.next());
+ assertFalse(iter.hasNext());
+ }
+
void processDataBag(DataBag bg, boolean doSpill) {
Tuple t = TupleFactory.getInstance().newTuple(new Integer(0));
bg.add(t);
@@ -1194,7 +1229,7 @@ public class TestDataBag {
assertTrue(iter.hasNext());
iter.next();
assertFalse(iter.hasNext());
- assertFalse("hasNext should be idempotent", iter.hasNext());
+ assertFalse("hasNext should be idempotent", iter.hasNext());
}
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestDivide.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestDivide.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestDivide.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestDivide.java Wed Feb 22 09:43:41 2017
@@ -20,6 +20,9 @@ package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.math.RoundingMode;
import java.util.Map;
import java.util.Random;
@@ -53,7 +56,7 @@ public class TestDivide {
public void testOperator() throws ExecException {
// int TRIALS = 10;
byte[] types = { DataType.BAG, DataType.BOOLEAN, DataType.BYTEARRAY, DataType.CHARARRAY,
- DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, DataType.LONG,
+ DataType.DOUBLE, DataType.FLOAT, DataType.INTEGER, DataType.LONG, DataType.BIGDECIMAL,
DataType.DATETIME, DataType.MAP, DataType.TUPLE };
// Map<Byte,String> map = GenRandomData.genTypeToNameMap();
System.out.println("Testing DIVIDE operator");
@@ -250,6 +253,33 @@ public class TestDivide {
assertEquals(null, (Long)resl.result);
break;
}
+ case DataType.BIGDECIMAL: {
+ MathContext mc = new MathContext(Divide.BIGDECIMAL_MINIMAL_SCALE, RoundingMode.HALF_UP);
+ BigDecimal inpf1 = new BigDecimal(r.nextDouble(),mc);
+ BigDecimal inpf2 = new BigDecimal(r.nextDouble(),mc);
+ lt.setValue(inpf1);
+ rt.setValue(inpf2);
+ Result resf = op.getNextBigDecimal();
+ BigDecimal expected = inpf1.divide(inpf2, 2 * Divide.BIGDECIMAL_MINIMAL_SCALE + 1, RoundingMode.HALF_UP);
+ assertEquals(expected, (BigDecimal)resf.result);
+
+ // test with null in lhs
+ lt.setValue(null);
+ rt.setValue(inpf2);
+ resf = op.getNextBigDecimal();
+ assertEquals(null, (BigDecimal)resf.result);
+ // test with null in rhs
+ lt.setValue(inpf1);
+ rt.setValue(null);
+ resf = op.getNextBigDecimal();
+ assertEquals(null, (BigDecimal)resf.result);
+ // test divide by 0
+ lt.setValue(inpf1);
+ rt.setValue(new BigDecimal(0.0f,mc));
+ resf = op.getNextBigDecimal();
+ assertEquals(null, (BigDecimal)resf.result);
+ break;
+ }
case DataType.DATETIME:
DateTime inpdt1 = new DateTime(r.nextLong());
DateTime inpdt2 = new DateTime(r.nextLong());
Modified: pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java Wed Feb 22 09:43:41 2017
@@ -23,13 +23,13 @@ import static org.junit.Assert.assertTru
import java.io.File;
import java.io.FileWriter;
+import java.io.IOException;
import java.io.PrintWriter;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pig.PigRunner;
-import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.junit.AfterClass;
@@ -38,16 +38,15 @@ import org.junit.Test;
public class TestEmptyInputDir {
- private static MiniCluster cluster;
+ private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
private static final String EMPTY_DIR = "emptydir";
private static final String INPUT_FILE = "input";
private static final String OUTPUT_FILE = "output";
private static final String PIG_FILE = "test.pig";
-
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- cluster = MiniCluster.buildCluster();
FileSystem fs = cluster.getFileSystem();
if (!fs.mkdirs(new Path(EMPTY_DIR))) {
throw new Exception("failed to create empty dir");
@@ -64,7 +63,35 @@ public class TestEmptyInputDir {
public static void tearDownAfterClass() throws Exception {
cluster.shutDown();
}
-
+
+ @Test
+ public void testGroupBy() throws Exception {
+ PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+ w.println("A = load '" + EMPTY_DIR + "';");
+ w.println("B = group A by $0;");
+ w.println("store B into '" + OUTPUT_FILE + "';");
+ w.close();
+
+ try {
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+ PigStats stats = PigRunner.run(args, null);
+
+ assertTrue(stats.isSuccessful());
+
+ // This assert fails on 205 due to MAPREDUCE-3606
+ if (Util.isMapredExecType(cluster.getExecType())
+ && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+ MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
+ assertEquals(0, js.getNumberMaps());
+ }
+
+ assertEmptyOutputFile();
+ } finally {
+ new File(PIG_FILE).delete();
+ Util.deleteFile(cluster, OUTPUT_FILE);
+ }
+ }
+
@Test
public void testSkewedJoin() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -73,31 +100,28 @@ public class TestEmptyInputDir {
w.println("C = join B by $0, A by $0 using 'skewed';");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
PigStats stats = PigRunner.run(args, null);
-
+
assertTrue(stats.isSuccessful());
- // the sampler job has zero maps
- MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
-
+
// This assert fails on 205 due to MAPREDUCE-3606
- if (!Util.isHadoop205()&&!Util.isHadoop1_x())
- assertEquals(0, js.getNumberMaps());
-
- FileSystem fs = cluster.getFileSystem();
- FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
- assertTrue(status.isDir());
- assertEquals(0, status.getLen());
- // output directory isn't empty
- assertTrue(fs.listStatus(status.getPath()).length > 0);
+ if (Util.isMapredExecType(cluster.getExecType())
+ && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+ // the sampler job has zero maps
+ MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
+ assertEquals(0, js.getNumberMaps());
+ }
+
+ assertEmptyOutputFile();
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void testMergeJoin() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -106,32 +130,28 @@ public class TestEmptyInputDir {
w.println("C = join A by $0, B by $0 using 'merge';");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
- // the indexer job has zero maps
- MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
-
+
+ assertTrue(stats.isSuccessful());
+
// This assert fails on 205 due to MAPREDUCE-3606
- if (!Util.isHadoop205()&&!Util.isHadoop1_x())
- assertEquals(0, js.getNumberMaps());
-
- FileSystem fs = cluster.getFileSystem();
- FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
- assertTrue(status.isDir());
- assertEquals(0, status.getLen());
-
- // output directory isn't empty
- assertTrue(fs.listStatus(status.getPath()).length > 0);
+ if (Util.isMapredExecType(cluster.getExecType())
+ && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+ // the indexer job has zero maps
+ MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
+ assertEquals(0, js.getNumberMaps());
+ }
+
+ assertEmptyOutputFile();
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void testFRJoin() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -140,55 +160,44 @@ public class TestEmptyInputDir {
w.println("C = join A by $0, B by $0 using 'repl';");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
- // the indexer job has zero maps
- MRJobStats js = (MRJobStats)stats.getJobGraph().getSources().get(0);
-
+
+ assertTrue(stats.isSuccessful());
+
// This assert fails on 205 due to MAPREDUCE-3606
- if (!Util.isHadoop205()&&!Util.isHadoop1_x())
- assertEquals(0, js.getNumberMaps());
-
- FileSystem fs = cluster.getFileSystem();
- FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
- assertTrue(status.isDir());
- assertEquals(0, status.getLen());
-
- // output directory isn't empty
- assertTrue(fs.listStatus(status.getPath()).length > 0);
+ if (Util.isMapredExecType(cluster.getExecType())
+ && !Util.isHadoop205() && !Util.isHadoop1_x()) {
+ MRJobStats js = (MRJobStats) stats.getJobGraph().getSources().get(0);
+ assertEquals(0, js.getNumberMaps());
+ }
+
+ assertEmptyOutputFile();
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void testRegularJoin() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
w.println("A = load '" + INPUT_FILE + "';");
w.println("B = load '" + EMPTY_DIR + "';");
- w.println("C = join B by $0, A by $0;");
+ w.println("C = join B by $0, A by $0 PARALLEL 0;");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
-
- FileSystem fs = cluster.getFileSystem();
- FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
- assertTrue(status.isDir());
- assertEquals(0, status.getLen());
-
- // output directory isn't empty
- assertTrue(fs.listStatus(status.getPath()).length > 0);
-
+
+ assertTrue(stats.isSuccessful());
+
+ assertEmptyOutputFile();
+
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
@@ -203,19 +212,19 @@ public class TestEmptyInputDir {
w.println("C = join B by $0 right outer, A by $0;");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
- assertEquals(2, stats.getNumberRecords(OUTPUT_FILE));
+
+ assertTrue(stats.isSuccessful());
+ assertEquals(2, stats.getNumberRecords(OUTPUT_FILE));
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
}
}
-
+
@Test
public void testLeftOuterJoin() throws Exception {
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
@@ -224,16 +233,88 @@ public class TestEmptyInputDir {
w.println("C = join B by $0 left outer, A by $0;");
w.println("store C into '" + OUTPUT_FILE + "';");
w.close();
-
+
+ try {
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+ PigStats stats = PigRunner.run(args, null);
+
+ assertTrue(stats.isSuccessful());
+ assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
+ } finally {
+ new File(PIG_FILE).delete();
+ Util.deleteFile(cluster, OUTPUT_FILE);
+ }
+ }
+
+ @Test
+ public void testBloomJoin() throws Exception {
+ PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+ w.println("A = load '" + INPUT_FILE + "' as (x:int);");
+ w.println("B = load '" + EMPTY_DIR + "' as (x:int);");
+ w.println("C = join B by $0, A by $0 using 'bloom';");
+ w.println("D = join A by $0, B by $0 using 'bloom';");
+ w.println("store C into '" + OUTPUT_FILE + "';");
+ w.println("store D into 'output1';");
+ w.close();
+
+ try {
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
+ PigStats stats = PigRunner.run(args, null);
+
+ assertTrue(stats.isSuccessful());
+ assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
+ assertEquals(0, stats.getNumberRecords("output1"));
+ assertEmptyOutputFile();
+ } finally {
+ new File(PIG_FILE).delete();
+ Util.deleteFile(cluster, OUTPUT_FILE);
+ Util.deleteFile(cluster, "output1");
+ }
+ }
+
+ @Test
+ public void testBloomJoinOuter() throws Exception {
+ PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+ w.println("A = load '" + INPUT_FILE + "' as (x:int);");
+ w.println("B = load '" + EMPTY_DIR + "' as (x:int);");
+ w.println("C = join B by $0 left outer, A by $0 using 'bloom';");
+ w.println("D = join A by $0 left outer, B by $0 using 'bloom';");
+ w.println("E = join B by $0 right outer, A by $0 using 'bloom';");
+ w.println("F = join A by $0 right outer, B by $0 using 'bloom';");
+ w.println("store C into '" + OUTPUT_FILE + "';");
+ w.println("store D into 'output1';");
+ w.println("store E into 'output2';");
+ w.println("store F into 'output3';");
+ w.close();
+
try {
- String[] args = { PIG_FILE };
+ String[] args = { "-x", cluster.getExecType().name(), PIG_FILE, };
PigStats stats = PigRunner.run(args, null);
-
- assertTrue(stats.isSuccessful());
- assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
+
+ assertTrue(stats.isSuccessful());
+ assertEquals(0, stats.getNumberRecords(OUTPUT_FILE));
+ assertEquals(2, stats.getNumberRecords("output1"));
+ assertEquals(2, stats.getNumberRecords("output2"));
+ assertEquals(0, stats.getNumberRecords("output3"));
+ assertEmptyOutputFile();
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
+ Util.deleteFile(cluster, "output1");
+ Util.deleteFile(cluster, "output2");
+ Util.deleteFile(cluster, "output3");
}
}
+
+ private void assertEmptyOutputFile() throws IllegalArgumentException, IOException {
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus status = fs.getFileStatus(new Path(OUTPUT_FILE));
+ assertTrue(status.isDir());
+ assertEquals(0, status.getLen());
+ // output directory isn't empty. Has one empty file
+ FileStatus[] files = fs.listStatus(status.getPath(), Util.getSuccessMarkerPathFilter());
+ assertEquals(1, files.length);
+ assertEquals(0, files[0].getLen());
+ assertTrue(files[0].getPath().getName().startsWith("part-"));
+ }
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java Wed Feb 22 09:43:41 2017
@@ -200,11 +200,11 @@ public class TestErrorHandlingStoreFunc
private void updatePigProperties(boolean allowErrors, long minErrors,
double errorThreshold) {
Properties properties = pigServer.getPigContext().getProperties();
- properties.put(PigConfiguration.PIG_ALLOW_STORE_ERRORS,
+ properties.put(PigConfiguration.PIG_ERROR_HANDLING_ENABLED,
Boolean.toString(allowErrors));
- properties.put(PigConfiguration.PIG_ERRORS_MIN_RECORDS,
+ properties.put(PigConfiguration.PIG_ERROR_HANDLING_MIN_ERROR_RECORDS,
Long.toString(minErrors));
- properties.put(PigConfiguration.PIG_ERROR_THRESHOLD_PERCENT,
+ properties.put(PigConfiguration.PIG_ERROR_HANDLING_THRESHOLD_PERCENT,
Double.toString(errorThreshold));
}
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java Wed Feb 22 09:43:41 2017
@@ -291,7 +291,7 @@ public class TestEvalPipeline {
myMap.put("long", new Long(1));
myMap.put("float", new Float(1.0));
myMap.put("double", new Double(1.0));
- myMap.put("dba", new DataByteArray(new String("bytes").getBytes()));
+ myMap.put("dba", new DataByteArray(new String("1234").getBytes()));
myMap.put("map", mapInMap);
myMap.put("tuple", tuple);
myMap.put("bag", bag);
@@ -794,32 +794,31 @@ public class TestEvalPipeline {
}
@Test
- public void testMapUDFfail() throws Exception{
+ public void testMapUDFWithImplicitTypeCast() throws Exception{
int LOOP_COUNT = 2;
File tmpFile = Util.createTempFileDelOnExit("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
for(int i = 0; i < LOOP_COUNT; i++) {
- for(int j=0;j<LOOP_COUNT;j+=2){
- ps.println(i+"\t"+j);
- ps.println(i+"\t"+j);
- }
+ ps.println(i);
}
ps.close();
pigServer.registerQuery("A = LOAD '"
+ Util.generateURI(tmpFile.toString(), pigContext) + "';");
pigServer.registerQuery("B = foreach A generate " + MapUDF.class.getName() + "($0) as mymap;"); //the argument does not matter
- String query = "C = foreach B {"
- + "generate mymap#'dba' * 10;"
- + "};";
+ String query = "C = foreach B generate mymap#'dba' * 10; ";
pigServer.registerQuery(query);
- try {
- pigServer.openIterator("C");
- Assert.fail("Error expected.");
- } catch (Exception e) {
- e.getMessage().contains("Cannot determine");
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ if(!iter.hasNext()) Assert.fail("No output found");
+ int numIdentity = 0;
+ while(iter.hasNext()){
+ Tuple t = iter.next();
+ Assert.assertEquals(new Integer(12340), (Integer)t.get(0));
+ ++numIdentity;
}
+ Assert.assertEquals(LOOP_COUNT, numIdentity);
}
@Test
Modified: pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java Wed Feb 22 09:43:41 2017
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalMap;
@@ -38,10 +37,10 @@ import org.apache.pig.impl.builtin.FindQ
import org.junit.Test;
public class TestFindQuantiles {
-
+
private static TupleFactory tFact = TupleFactory.getInstance();
private static final float epsilon = 0.0001f;
-
+
@Test
public void testFindQuantiles() throws Exception {
final int numSamples = 97778;
@@ -50,7 +49,7 @@ public class TestFindQuantiles {
System.out.println("sum: " + sum);
assertTrue(sum > (1-epsilon) && sum < (1+epsilon));
}
-
+
@Test
public void testFindQuantiles2() throws Exception {
final int numSamples = 30000;
@@ -86,7 +85,7 @@ public class TestFindQuantiles {
}
private float[] getProbVec(Tuple values) throws Exception {
- float[] probVec = new float[values.size()];
+ float[] probVec = new float[values.size()];
for(int i = 0; i < values.size(); i++) {
probVec[i] = (Float)values.get(i);
}
@@ -95,7 +94,7 @@ public class TestFindQuantiles {
private DataBag generateRandomSortedSamples(int numSamples, int max) throws Exception {
Random rand = new Random(1000);
- List<Tuple> samples = new ArrayList<Tuple>();
+ List<Tuple> samples = new ArrayList<Tuple>();
for (int i=0; i<numSamples; i++) {
Tuple t = tFact.newTuple(1);
t.set(0, rand.nextInt(max));
@@ -106,7 +105,7 @@ public class TestFindQuantiles {
}
private DataBag generateUniqueSamples(int numSamples) throws Exception {
- DataBag samples = BagFactory.getInstance().newDefaultBag();
+ DataBag samples = BagFactory.getInstance().newDefaultBag();
for (int i=0; i<numSamples; i++) {
Tuple t = tFact.newTuple(1);
t.set(0, new Integer(23));
@@ -121,9 +120,9 @@ public class TestFindQuantiles {
in.set(0, new Integer(numReduceres));
in.set(1, samples);
-
+
FindQuantiles fq = new FindQuantiles();
-
+
Map<String, Object> res = fq.exec(in);
return res;
}
@@ -135,12 +134,11 @@ public class TestFindQuantiles {
InternalMap weightedPartsData = (InternalMap) res.get(FindQuantiles.WEIGHTED_PARTS);
Iterator<Object> it = weightedPartsData.values().iterator();
float[] probVec = getProbVec((Tuple)it.next());
- new DiscreteProbabilitySampleGenerator(probVec);
float sum = 0.0f;
for (float f : probVec) {
sum += f;
}
return sum;
}
-
+
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Wed Feb 22 09:43:41 2017
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Random;
import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.data.Tuple;
import org.apache.pig.test.utils.TestHelper;
import org.junit.Test;
@@ -105,6 +106,31 @@ public class TestForEachNestedPlanLocal
}
@Test
+ public void testNestedCrossTwoRelationsLimit() throws Exception {
+ Storage.Data data = Storage.resetData(pig);
+ data.set("input",
+ Storage.tuple(Storage.bag(Storage.tuple(1, 1), Storage.tuple(1, 2)), Storage.bag(Storage.tuple(1, 3), Storage.tuple(1, 4))),
+ Storage.tuple(Storage.bag(Storage.tuple(2, 1), Storage.tuple(2, 2)), Storage.bag(Storage.tuple(2, 3))),
+ Storage.tuple(Storage.bag(Storage.tuple(3, 1)), Storage.bag(Storage.tuple(3, 2))));
+
+ pig.setBatchOn();
+ pig.registerQuery("A = load 'input' using mock.Storage() as (bag1:bag{tup1:tuple(f1:int, f2:int)}, bag2:bag{tup2:tuple(f3:int, f4:int)});");
+ pig.registerQuery("B = foreach A {"
+ + "crossed = cross bag1, bag2;"
+ + "filtered = filter crossed by f1 == f3;"
+ + "lmt = limit filtered 1;"
+ + "generate FLATTEN(lmt);" + "}");
+ pig.registerQuery("store B into 'output' using mock.Storage();");
+
+ pig.executeBatch();
+
+ List<Tuple> actualResults = data.get("output");
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {"(1, 1, 1, 3)", "(2, 1, 2, 3)", "(3, 1, 3, 2)"});
+ Util.checkQueryOutputs(actualResults.iterator(), expectedResults);
+ }
+
+ @Test
public void testNestedCrossTwoRelationsComplex() throws Exception {
File[] tmpFiles = generateDataSetFilesForNestedCross();
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStringAsByteArray(new String[] {
Modified: pig/branches/spark/test/org/apache/pig/test/TestGFCross.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGFCross.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGFCross.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGFCross.java Wed Feb 22 09:43:41 2017
@@ -20,6 +20,7 @@ package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -50,6 +51,7 @@ public class TestGFCross {
public void testSerial() throws Exception {
Configuration cfg = new Configuration();
cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "1");
+ cfg.set(MRConfiguration.TASK_ID, "task_1473802673416_1808_m_000000");
UDFContext.getUDFContext().addJobConf(cfg);
Tuple t = TupleFactory.getInstance().newTuple(2);
@@ -66,6 +68,7 @@ public class TestGFCross {
public void testParallelSet() throws Exception {
Configuration cfg = new Configuration();
cfg.set(PigImplConstants.PIG_CROSS_PARALLELISM + ".1", "10");
+ cfg.set(MRConfiguration.TASK_ID, "task_14738102975522_0001_r_000000");
UDFContext.getUDFContext().addJobConf(cfg);
Tuple t = TupleFactory.getInstance().newTuple(2);
Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Wed Feb 22 09:43:41 2017
@@ -28,6 +28,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.FileWriter;
+import java.io.FilenameFilter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
@@ -970,7 +971,6 @@ public class TestGrunt {
@Test
public void testStopOnFailure() throws Throwable {
- Assume.assumeTrue("Skip this test for TEZ", Util.isMapredExecType(cluster.getExecType()));
PigServer server = new PigServer(cluster.getExecType(), cluster.getProperties());
PigContext context = server.getPigContext();
context.getProperties().setProperty("stop.on.failure", ""+true);
@@ -1569,4 +1569,20 @@ public class TestGrunt {
}
assertTrue(found);
}
+
+ @Test
+ public void testGruntUtf8() throws Throwable {
+ String command = "mkdir 测试\n" +
+ "quit\n";
+ System.setProperty("jline.WindowsTerminal.directConsole", "false");
+ System.setIn(new ByteArrayInputStream(command.getBytes()));
+ org.apache.pig.PigRunner.run(new String[] {"-x", "local"}, null);
+ File[] partFiles = new File(".").listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return name.equals("测试");
+ }
+ });
+ assertEquals(partFiles.length, 1);
+ new File("测试").delete();
+ }
}