You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/03/17 23:44:19 UTC
svn commit: r924536 - in /hadoop/pig/trunk: ./ lib/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
test/org/apache/pig/test/
Author: pradeepkth
Date: Wed Mar 17 22:44:18 2010
New Revision: 924536
URL: http://svn.apache.org/viewvc?rev=924536&view=rev
Log:
PIG-1287: Use hadoop-0.20.2 with pig 0.7.0 release (pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/build.xml
hadoop/pig/trunk/lib/hadoop20.jar
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Mar 17 22:44:18 2010
@@ -68,6 +68,8 @@ manner (rding via pradeepkth)
IMPROVEMENTS
+PIG-1287: Use hadoop-0.20.2 with pig 0.7.0 release (pradeepkth)
+
PIG-1257: PigStorage per the new load-store redesign should support splitting
of bzip files (pradeepkth)
Modified: hadoop/pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/build.xml?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
--- hadoop/pig/trunk/build.xml (original)
+++ hadoop/pig/trunk/build.xml Wed Mar 17 22:44:18 2010
@@ -533,7 +533,6 @@
<!-- Excluded under Windows.-->
<exclude name="**/TestHBaseStorage.java" if="isWindows" />
<!-- Excluced because we don't want to run them -->
- <exclude name="**/TestCounters.java" />
<exclude name="**/PigExecTestCase.java" />
<exclude name="**/TypeCheckingTestUtil.java" />
<exclude name="**/TypeGraphPrinter.java" />
Modified: hadoop/pig/trunk/lib/hadoop20.jar
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/lib/hadoop20.jar?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
Binary files - no diff available.
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Mar 17 22:44:18 2010
@@ -325,12 +325,13 @@ public class MapReduceLauncher extends L
// scripts mess up the stats reporting from hadoop.
List<String> rji = stats.getRootJobIDs();
if ( (rji != null && rji.size() == 1 && finalStores == 1) || pc.getExecType() == ExecType.LOCAL ) {
- if(stats.getRecordsWritten()==-1) {
+ // currently counters are not working in local mode - see PIG-1286
+ if(stats.getRecordsWritten()==-1 || pc.getExecType() == ExecType.LOCAL) {
log.info("Records written : Unable to determine number of records written");
} else {
log.info("Records written : " + stats.getRecordsWritten());
}
- if(stats.getBytesWritten()==-1) {
+ if(stats.getBytesWritten()==-1 || pc.getExecType() == ExecType.LOCAL) {
log.info("Bytes written : Unable to determine number of bytes written");
} else {
log.info("Bytes written : " + stats.getBytesWritten());
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Wed Mar 17 22:44:18 2010
@@ -134,7 +134,7 @@ public class PigCombiner {
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
pigHadoopLogger.setAggregate(aggregateWarning);
- pigHadoopLogger.setReporter(context);
+ pigHadoopLogger.setTaskIOContext(context);
PhysicalOperator.setPigLogger(pigHadoopLogger);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Wed Mar 17 22:44:18 2010
@@ -19,7 +19,8 @@ package org.apache.pig.backend.hadoop.ex
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
/**
@@ -39,7 +40,7 @@ public final class PigHadoopLogger imple
}
private static Log log = LogFactory.getLog(PigHadoopLogger.class);
- private Progressable reporter = null;
+ private TaskInputOutputContext<?, ?, ?, ?> taskIOContext = null;
private boolean aggregate = false;
private PigHadoopLogger() {
@@ -49,8 +50,9 @@ public final class PigHadoopLogger imple
public void warn(Object o, String msg, Enum warningEnum) {
String displayMessage = o.getClass().getName() + ": " + msg;
if(aggregate) {
- if(reporter != null) {
- //reporter.incrCounter(warningEnum, 1);
+ if(taskIOContext != null) {
+ Counter c = taskIOContext.getCounter(warningEnum);
+ c.increment(1);
} else {
//TODO:
//in local mode of execution if the PigHadoopLogger is used initially,
@@ -67,12 +69,12 @@ public final class PigHadoopLogger imple
}
}
- public Progressable getReporter() {
- return reporter;
+ public TaskInputOutputContext<?, ?, ?, ?> getTaskIOContext() {
+ return taskIOContext;
}
- public synchronized void setReporter(Progressable rep) {
- this.reporter = rep;
+ public synchronized void setTaskIOContext(TaskInputOutputContext<?, ?, ?, ?> tioc) {
+ this.taskIOContext = tioc;
}
public boolean getAggregate() {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Wed Mar 17 22:44:18 2010
@@ -211,7 +211,7 @@ public abstract class PigMapBase extends
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
pigHadoopLogger.setAggregate(aggregateWarning);
- pigHadoopLogger.setReporter(context);
+ pigHadoopLogger.setTaskIOContext(context);
PhysicalOperator.setPigLogger(pigHadoopLogger);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Wed Mar 17 22:44:18 2010
@@ -338,7 +338,7 @@ public class PigMapReduce {
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
pigHadoopLogger.setAggregate(aggregateWarning);
- pigHadoopLogger.setReporter(context);
+ pigHadoopLogger.setTaskIOContext(context);
PhysicalOperator.setPigLogger(pigHadoopLogger);
@@ -550,7 +550,7 @@ public class PigMapReduce {
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
pigHadoopLogger.setAggregate(aggregateWarning);
- pigHadoopLogger.setReporter(context);
+ pigHadoopLogger.setTaskIOContext(context);
PhysicalOperator.setPigLogger(pigHadoopLogger);
for (POStore store: stores) {
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java?rev=924536&r1=924535&r2=924536&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java Wed Mar 17 22:44:18 2010
@@ -513,50 +513,55 @@ public class TestCounters extends TestCa
}
- @Test
- public void testLocal() throws IOException, ExecException {
- int count = 0;
- //PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
- File file = File.createTempFile("data", ".txt");
- PrintWriter pw = new PrintWriter(new FileOutputStream(file));
- int [] nos = new int[10];
- for(int i = 0; i < 10; i++)
- nos[i] = 0;
-
- for(int i = 0; i < MAX; i++) {
- int index = r.nextInt(10);
- int value = r.nextInt(100);
- nos[index] += value;
- pw.println(index + "\t" + value);
- }
- pw.close();
-
- for(int i = 0; i < 10; i++)
- if(nos[i] > 0)
- count ++;
-
- File out = File.createTempFile("output", ".txt");
- out.delete();
- PigServer pigServer = new PigServer("local");
- // FileLocalizer is initialized before using HDFS by previous tests
- FileLocalizer.setInitialized(false);
- pigServer.registerQuery("a = load '" + Util.encodeEscape(file.toString()) + "';");
- pigServer.registerQuery("b = order a by $0;");
- pigServer.registerQuery("c = group b by $0;");
- pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
- PigStats pigStats = pigServer.store("d", "file://" + out.getAbsolutePath()).getStatistics();
- InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
- long filesize = 0;
- while(is.read() != -1) filesize++;
-
- is.close();
- out.delete();
-
- //Map<String, Map<String, String>> stats = pigStats.getPigStats();
-
- assertEquals(10, pigStats.getRecordsWritten());
- assertEquals(110, pigStats.getBytesWritten());
-
- }
+ /*
+ * IMPORTANT NOTE:
+ * COMMENTED OUT BECAUSE COUNTERS DO NOT CURRENTLY WORK IN LOCAL MODE -
+ * SEE PIG-1286 - UNCOMMENT WHEN IT IS FIXED
+ */
+// @Test
+// public void testLocal() throws IOException, ExecException {
+// int count = 0;
+// //PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
+// File file = File.createTempFile("data", ".txt");
+// PrintWriter pw = new PrintWriter(new FileOutputStream(file));
+// int [] nos = new int[10];
+// for(int i = 0; i < 10; i++)
+// nos[i] = 0;
+//
+// for(int i = 0; i < MAX; i++) {
+// int index = r.nextInt(10);
+// int value = r.nextInt(100);
+// nos[index] += value;
+// pw.println(index + "\t" + value);
+// }
+// pw.close();
+//
+// for(int i = 0; i < 10; i++)
+// if(nos[i] > 0)
+// count ++;
+//
+// File out = File.createTempFile("output", ".txt");
+// out.delete();
+// PigServer pigServer = new PigServer("local");
+// // FileLocalizer is initialized before using HDFS by previous tests
+// FileLocalizer.setInitialized(false);
+// pigServer.registerQuery("a = load '" + Util.encodeEscape(file.toString()) + "';");
+// pigServer.registerQuery("b = order a by $0;");
+// pigServer.registerQuery("c = group b by $0;");
+// pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
+// PigStats pigStats = pigServer.store("d", "file://" + out.getAbsolutePath()).getStatistics();
+// InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+// long filesize = 0;
+// while(is.read() != -1) filesize++;
+//
+// is.close();
+// out.delete();
+//
+// //Map<String, Map<String, String>> stats = pigStats.getPigStats();
+//
+// assertEquals(10, pigStats.getRecordsWritten());
+// assertEquals(110, pigStats.getBytesWritten());
+//
+// }
}