You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by an...@apache.org on 2014/01/25 06:25:45 UTC
svn commit: r1561258 -
/pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java
Author: aniket486
Date: Sat Jan 25 05:25:44 2014
New Revision: 1561258
URL: http://svn.apache.org/r1561258
Log:
PIG-3463 Add missing test file (aniket486)
Added:
pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java
Added: pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java?rev=1561258&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestAutoLocalMode.java Sat Jan 25 05:25:44 2014
@@ -0,0 +1,155 @@
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAutoLocalMode {
+
+ static MiniCluster cluster = MiniCluster.buildCluster();
+ private PigServer pigServer;
+ private File logFile;
+
+ private String miniFileName;
+ private String smallFileName;
+ private String bigFileName;
+
+ private File createInputFile(String filename, long minSizeInBytes) throws IOException {
+ File tmpFile = Util.createTempFileDelOnExit(filename, "txt");
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ Random r = new Random(1);
+ int rand;
+ while (tmpFile.length() < minSizeInBytes) {
+ rand = r.nextInt(100);
+ ps.println(rand);
+ }
+ ps.close();
+ return tmpFile;
+ }
+
+ @SuppressWarnings("resource")
+ private boolean checkLogFileMessage(String[] messages) {
+ BufferedReader reader = null;
+
+ try {
+ reader = new BufferedReader(new FileReader(logFile));
+ List<String> logMessages=new ArrayList<String>();
+ String line;
+ while ((line=reader.readLine())!=null)
+ {
+ logMessages.add(line);
+ }
+ // Messages should appear in the expected order
+ int i = 0;
+ for(String logMsg : logMessages) {
+ if (logMsg.contains(messages[i])) {
+ i++;
+ if(i == messages.length) {
+ return true;
+ }
+ }
+ }
+ return false;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception{
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer.getPigContext().getExecutionEngine().setProperty(PigConfiguration.PIG_AUTO_LOCAL_ENABLED, String.valueOf("true"));
+ pigServer.getPigContext().getExecutionEngine().setProperty(PigConfiguration.PIG_AUTO_LOCAL_INPUT_MAXBYTES, "200");
+
+ Logger logger = Logger.getLogger(JobControlCompiler.class);
+ logger.removeAllAppenders();
+ logger.setLevel(Level.INFO);
+ SimpleLayout layout = new SimpleLayout();
+ logFile = File.createTempFile("log", "");
+ FileAppender appender = new FileAppender(layout, logFile.toString(), false, false, 0);
+ logger.addAppender(appender);
+
+ miniFileName = createInputFile("miniFile", 10).getAbsolutePath();
+ smallFileName = createInputFile("smallFile", 100).getAbsolutePath();
+ bigFileName = createInputFile("bigFile", 1000).getAbsolutePath();
+ }
+
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ cluster.shutDown();
+ }
+
+ @Test
+ public void testSmallJob() throws IOException {
+ pigServer.registerQuery("A = LOAD '"
+ + Util.generateURI(Util.encodeEscape(smallFileName), pigServer
+ .getPigContext()) + "' AS (num:int);");
+ pigServer.registerQuery("B = filter A by 1 == 0;");
+ pigServer.openIterator("B");
+
+ assertTrue(checkLogFileMessage(new String[]{JobControlCompiler.SMALL_JOB_LOG_MSG}));
+ }
+
+ @Test
+ public void testBigJob() throws IOException {
+ pigServer.registerQuery("A = LOAD '"
+ + Util.generateURI(Util.encodeEscape(bigFileName), pigServer
+ .getPigContext()) + "' AS (num:int);");
+ pigServer.registerQuery("B = filter A by 1 == 0;");
+ pigServer.openIterator("B");
+
+ assertTrue(checkLogFileMessage(new String[]{JobControlCompiler.BIG_JOB_LOG_MSG}));
+ }
+
+ @Test
+ public void testReplicatedJoin() throws IOException {
+ pigServer.registerQuery("A1 = LOAD '"
+ + Util.generateURI(Util.encodeEscape(smallFileName), pigServer
+ .getPigContext()) + "' AS (num:int);");
+ pigServer.registerQuery("A2 = LOAD '"
+ + Util.generateURI(Util.encodeEscape(miniFileName), pigServer
+ .getPigContext()) + "' AS (num:int);");
+ pigServer.registerQuery("A = join A1 by num, A2 by num using 'replicated';");
+ pigServer.registerQuery("B = filter A by 1 == 0;");
+ pigServer.openIterator("B");
+
+ assertTrue(checkLogFileMessage(new String[]{JobControlCompiler.SMALL_JOB_LOG_MSG}));
+ }
+
+ @Test
+ public void testOrderBy() throws IOException {
+ pigServer.registerQuery("A1 = LOAD '"
+ + Util.generateURI(Util.encodeEscape(bigFileName), pigServer
+ .getPigContext()) + "' AS (num:int);");
+ pigServer.registerQuery("A = filter A1 by num == 1;");
+ pigServer.registerQuery("B = order A by num;");
+ pigServer.openIterator("B");
+
+ assertTrue(checkLogFileMessage(new String[]{
+ JobControlCompiler.BIG_JOB_LOG_MSG,
+ JobControlCompiler.SMALL_JOB_LOG_MSG,
+ JobControlCompiler.SMALL_JOB_LOG_MSG}));
+ }
+
+
+}