You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/04/30 16:00:20 UTC
svn commit: r1591297 - in /pig/trunk: ./
shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/
shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/
src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengi...
Author: rohini
Date: Wed Apr 30 14:00:20 2014
New Revision: 1591297
URL: http://svn.apache.org/r1591297
Log:
PIG-3672: Pig should not check for hardcoded file system implementations (rohini)
Added:
pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
pig/trunk/src/org/apache/pig/PigServer.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigStatsOutputSizeReader.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
pig/trunk/src/org/apache/pig/impl/util/UriUtil.java
pig/trunk/src/org/apache/pig/impl/util/Utils.java
pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java
pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
pig/trunk/test/org/apache/pig/test/TestMRJobStats.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Apr 30 14:00:20 2014
@@ -32,6 +32,8 @@ PIG-2207: Support custom counters for ag
IMPROVEMENTS
+PIG-3672: Pig should not check for hardcoded file system implementations (rohini)
+
PIG-3860: Refactor PigStatusReporter and PigLogger for non-MR execution engine (cheolsoo)
PIG-3865: Remodel the XMLLoader to work to be faster and more maintainable (aseldawy via daijy)
Modified: pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Wed Apr 30 14:00:20 2014
@@ -119,14 +119,33 @@ public class HadoopShims {
public static void unsetConf(Configuration conf, String key) {
// Not supported in Hadoop 0.20/1.x
}
-
+
/**
- * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
+ * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
* @param conf
* @param taskAttemptID
*/
public static void setTaskAttemptId(Configuration conf, TaskAttemptID taskAttemptID) {
conf.set("mapred.task.id", taskAttemptID.toString());
}
-
+
+ /**
+ * Returns whether the give path has a FileSystem implementation.
+ *
+ * @param path path
+ * @param conf configuration
+ * @return true if the give path's scheme has a FileSystem implementation,
+ * false otherwise
+ */
+ public static boolean hasFileSystemImpl(Path path, Configuration conf) {
+ String scheme = path.toUri().getScheme();
+ if (scheme != null) {
+ String fsImpl = conf.get("fs." + scheme + ".impl");
+ if (fsImpl == null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
}
Modified: pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Wed Apr 30 14:00:20 2014
@@ -18,7 +18,10 @@
package org.apache.pig.backend.hadoop.executionengine.shims;
import java.io.IOException;
+import java.lang.reflect.Method;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -41,6 +44,10 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop23.PigJobControl;
public class HadoopShims {
+
+ private static Log LOG = LogFactory.getLog(HadoopShims.class);
+ private static Method getFileSystemClass;
+
static public JobContext cloneJobContext(JobContext original) throws IOException, InterruptedException {
JobContext newContext = ContextFactory.cloneContext(original,
new JobConf(original.getConfiguration()));
@@ -103,7 +110,7 @@ public class HadoopShims {
public static JobControl newJobControl(String groupName, int timeToSleep) {
return new PigJobControl(groupName, timeToSleep);
}
-
+
public static long getDefaultBlockSize(FileSystem fs, Path path) {
return fs.getDefaultBlockSize(path);
}
@@ -111,22 +118,63 @@ public class HadoopShims {
public static Counters getCounters(Job job) throws IOException, InterruptedException {
return new Counters(job.getJob().getCounters());
}
-
+
public static boolean isJobFailed(TaskReport report) {
return report.getCurrentStatus()==TIPStatus.FAILED;
}
-
+
public static void unsetConf(Configuration conf, String key) {
conf.unset(key);
}
-
+
/**
- * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
+ * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
* @param conf
* @param taskAttemptID
*/
public static void setTaskAttemptId(Configuration conf, TaskAttemptID taskAttemptID) {
conf.setInt("mapreduce.job.application.attempt.id", taskAttemptID.getId());
}
-
+
+ /**
+ * Returns whether the give path has a FileSystem implementation.
+ *
+ * @param path path
+ * @param conf configuration
+ * @return true if the give path's scheme has a FileSystem implementation,
+ * false otherwise
+ */
+ public static boolean hasFileSystemImpl(Path path, Configuration conf) {
+ String scheme = path.toUri().getScheme();
+ if (scheme != null) {
+ // Hadoop 0.23
+ if (conf.get("fs.file.impl") != null) {
+ String fsImpl = conf.get("fs." + scheme + ".impl");
+ if (fsImpl == null) {
+ return false;
+ }
+ } else {
+ // Hadoop 2.x HADOOP-7549
+ if (getFileSystemClass == null) {
+ try {
+ getFileSystemClass = FileSystem.class.getDeclaredMethod(
+ "getFileSystemClass", String.class, Configuration.class);
+ } catch (NoSuchMethodException e) {
+ LOG.warn("Error while trying to determine if path " + path +
+ " has a filesystem implementation");
+ // Assume has implementation to be safe
+ return true;
+ }
+ }
+ try {
+ Object fs = getFileSystemClass.invoke(null, scheme, conf);
+ return fs == null ? false : true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
}
Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Wed Apr 30 14:00:20 2014
@@ -127,7 +127,7 @@ public class PigServer {
public static final String PRETTY_PRINT_SCHEMA_PROPERTY = "pig.pretty.print.schema";
private static final String PIG_LOCATION_CHECK_STRICT = "pig.location.check.strict";
- /*
+ /*
* The data structure to support grunt shell operations.
* The grunt shell can only work on one graph at a time.
* If a script is contained inside another script, the grunt
@@ -176,10 +176,10 @@ public class PigServer {
/**
* @param execTypeString can be 'mapreduce' or 'local'. Local mode will
* use Hadoop's local job runner to execute the job on the local machine.
- * Mapreduce mode will connect to a cluster to execute the job. If
+ * Mapreduce mode will connect to a cluster to execute the job. If
* execTypeString is not one of these two, Pig will deduce the ExecutionEngine
* if it is on the classpath and use it for the backend execution.
- * @throws ExecException
+ * @throws ExecException
* @throws IOException
*/
public PigServer(String execTypeString) throws ExecException, IOException {
@@ -293,7 +293,7 @@ public class PigServer {
/**
* Current DAG
- *
+ *
* @return
*/
public Graph getCurrentDAG() {
@@ -368,7 +368,7 @@ public class PigServer {
* should be followed by {@link PigServer#executeBatch(boolean)} with
* argument as false. Do Not use {@link PigServer#executeBatch()} after
* calling this method as that will re-parse and build the script.
- *
+ *
* @throws IOException
*/
public void parseAndBuild() throws IOException {
@@ -383,7 +383,7 @@ public class PigServer {
/**
* Submits a batch of Pig commands for execution.
- *
+ *
* @return list of jobs being executed
* @throws IOException
*/
@@ -395,7 +395,7 @@ public class PigServer {
* Submits a batch of Pig commands for execution. Parse and build of script
* should be skipped if user called {@link PigServer#parseAndBuild()}
* before. Pass false as an argument in which case.
- *
+ *
* @param parseAndBuild
* @return
* @throws IOException
@@ -1049,7 +1049,7 @@ public class PigServer {
* @param lps Stream to print the logical tree
* @param eps Stream to print the ExecutionEngine trees. If null, then will print to files
* @param dir Directory to print ExecutionEngine trees. If null, will use eps
- * @param suffix Suffix of file names
+ * @param suffix Suffix of file names
* @throws IOException if the requested alias cannot be found.
*/
public void explain(String alias,
@@ -1063,7 +1063,7 @@ public class PigServer {
try {
pigContext.inExplain = true;
buildStorePlan( alias );
-
+
//Only add root xml node if all plans are being written to same stream.
if (format == "xml" && lps == eps) {
lps.println("<plan>");
@@ -1083,7 +1083,7 @@ public class PigServer {
if (format.equals("xml") && lps == eps) {
lps.println("</plan>");
}
-
+
if (markAsExecute) {
currDAG.markAsExecuted();
}
@@ -1408,13 +1408,13 @@ public class PigServer {
}
return op;
}
-
+
/**
* Returns data associated with LogicalPlan. It makes
* sense to call this method only after a query/script
* has been registered with one of the {@link #registerQuery(String)}
* or {@link #registerScript(InputStream)} methods.
- *
+ *
* @return LogicalPlanData
*/
public LogicalPlanData getLogicalPlanData() {
@@ -1730,7 +1730,7 @@ public class PigServer {
private void compile(LogicalPlan lp) throws FrontendException {
DanglingNestedNodeRemover DanglingNestedNodeRemover = new DanglingNestedNodeRemover( lp );
DanglingNestedNodeRemover.visit();
-
+
new ColumnAliasConversionVisitor(lp).visit();
new SchemaAliasVisitor(lp).visit();
new ScalarVisitor(lp, pigContext, scope).visit();
@@ -1813,14 +1813,14 @@ public class PigServer {
/**
* This method checks whether the multiple sinks (STORE) use the same
* "file-based" location. If yes, throws a RuntimeException
- *
+ *
* @param storeOps
*/
private void checkDuplicateStoreLoc(Set<LOStore> storeOps) {
Set<String> uniqueStoreLoc = new HashSet<String>();
for(LOStore store : storeOps) {
String fileName = store.getFileSpec().getFileName();
- if(!uniqueStoreLoc.add(fileName) && UriUtil.isHDFSFileOrLocalOrS3N(fileName)) {
+ if(!uniqueStoreLoc.add(fileName) && UriUtil.isHDFSFileOrLocalOrS3N(fileName, new Configuration(true))) {
throw new RuntimeException("Script contains 2 or more STORE statements writing to same location : "+ fileName);
}
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java Wed Apr 30 14:00:20 2014
@@ -35,15 +35,15 @@ public class FileBasedOutputSizeReader i
private static final Log log = LogFactory.getLog(FileBasedOutputSizeReader.class);
- /**
+ /**
* Returns whether the given POStore is supported by this output size reader
* or not. We check whether the uri scheme of output file is one of hdfs,
* local, and s3.
* @param sto POStore
*/
@Override
- public boolean supports(POStore sto) {
- return UriUtil.isHDFSFileOrLocalOrS3N(getLocationUri(sto));
+ public boolean supports(POStore sto, Configuration conf) {
+ return UriUtil.isHDFSFileOrLocalOrS3N(getLocationUri(sto), conf);
}
/**
@@ -53,7 +53,7 @@ public class FileBasedOutputSizeReader i
*/
@Override
public long getOutputSize(POStore sto, Configuration conf) throws IOException {
- if (!supports(sto)) {
+ if (!supports(sto, conf)) {
log.warn("'" + sto.getStoreFunc().getClass().getName()
+ "' is not supported by " + getClass().getName());
return -1;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Wed Apr 30 14:00:20 2014
@@ -109,7 +109,7 @@ public class InputSizeReducerEstimator i
// the input file location might be a list of comma separated files,
// separate them out
for (String location : LoadFunc.getPathStrings(ld.getLFile().getFileName())) {
- if (UriUtil.isHDFSFileOrLocalOrS3N(location)) {
+ if (UriUtil.isHDFSFileOrLocalOrS3N(location, conf)) {
Path path = new Path(location);
FileSystem fs = path.getFileSystem(conf);
FileStatus[] status = fs.globStatus(path);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Apr 30 14:00:20 2014
@@ -96,6 +96,7 @@ public class MapReduceLauncher extends L
private boolean aggregateWarning = false;
+ @Override
public void kill() {
try {
log.debug("Receive kill signal");
@@ -112,6 +113,7 @@ public class MapReduceLauncher extends L
}
}
+ @Override
public void killJob(String jobID, Configuration conf) throws BackendException {
try {
if (conf != null) {
@@ -440,7 +442,7 @@ public class MapReduceLauncher extends L
// Clean up all the intermediate data
for (String path : intermediateVisitor.getIntermediate()) {
// Skip non-file system paths such as hbase, see PIG-3617
- if (Utils.hasFileSystemImpl(new Path(path), conf)) {
+ if (HadoopShims.hasFileSystemImpl(new Path(path), conf)) {
FileLocalizer.delete(path, pc);
}
}
@@ -728,7 +730,7 @@ public class MapReduceLauncher extends L
if(shouldMarkOutputDir(job)) {
Path outputPath = new Path(store.getSFile().getFileName());
String scheme = outputPath.toUri().getScheme();
- if (Utils.hasFileSystemImpl(outputPath, job.getJobConf())) {
+ if (HadoopShims.hasFileSystemImpl(outputPath, job.getJobConf())) {
FileSystem fs = outputPath.getFileSystem(job.getJobConf());
if (fs.exists(outputPath)) {
// create a file in the folder to mark it
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigStatsOutputSizeReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigStatsOutputSizeReader.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigStatsOutputSizeReader.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigStatsOutputSizeReader.java Wed Apr 30 14:00:20 2014
@@ -30,7 +30,7 @@ import org.apache.pig.classification.Int
* HBaseStorage), the output size cannot always be computed as the total size of
* output files.
*
- * @see FileBasedOutputSizeReader
+ * @see FileBasedOutputSizeReader
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@@ -38,12 +38,13 @@ public interface PigStatsOutputSizeReade
static final String OUTPUT_SIZE_READER_KEY = "pig.stats.output.size.reader";
- /**
+ /**
* Returns whether the given PSStore is supported by this output size reader
* or not.
* @param sto POStore
+ * @param conf Configuration
*/
- public boolean supports(POStore sto);
+ public boolean supports(POStore sto, Configuration conf);
/**
* Returns the size of output in bytes. If the size of output cannot be
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Wed Apr 30 14:00:20 2014
@@ -46,6 +46,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@@ -67,7 +68,7 @@ import org.apache.pig.impl.util.Utils;
public class MapRedUtil {
private static Log log = LogFactory.getLog(MapRedUtil.class);
-
+
public static final String FILE_SYSTEM_NAME = "fs.default.name";
/**
@@ -85,8 +86,8 @@ public class MapRedUtil {
Map<E, Pair<Integer, Integer>> reducerMap = new HashMap<E, Pair<Integer, Integer>>();
// use local file system to get the keyDistFile
- Configuration conf = new Configuration(false);
-
+ Configuration conf = new Configuration(false);
+
if (mapConf.get("yarn.resourcemanager.principal")!=null) {
conf.set("yarn.resourcemanager.principal", mapConf.get("yarn.resourcemanager.principal"));
}
@@ -121,7 +122,7 @@ public class MapRedUtil {
Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
// Used to replace the maxIndex with the number of reducers
if (maxIndex < minIndex) {
- maxIndex = totalReducers[0] + maxIndex;
+ maxIndex = totalReducers[0] + maxIndex;
}
E keyT;
@@ -131,7 +132,7 @@ public class MapRedUtil {
// it in the reducer map
Tuple keyTuple = TupleFactory.getInstance().newTuple();
for (int i=0; i < idxTuple.size() - 2; i++) {
- keyTuple.append(idxTuple.get(i));
+ keyTuple.append(idxTuple.get(i));
}
keyT = (E) keyTuple;
} else {
@@ -169,12 +170,12 @@ public class MapRedUtil {
public static void setupUDFContext(Configuration job) throws IOException {
UDFContext udfc = UDFContext.getUDFContext();
udfc.addJobConf(job);
- // don't deserialize in front-end
+ // don't deserialize in front-end
if (udfc.isUDFConfEmpty()) {
udfc.deserialize();
}
}
-
+
/**
* Sets up output and log dir paths for a single-store streaming job
*
@@ -187,7 +188,7 @@ public class MapRedUtil {
Configuration conf) throws IOException {
// set out filespecs
String outputPathString = st.getSFile().getFileName();
- if (!outputPathString.contains("://") || outputPathString.startsWith("hdfs://")) {
+ if (HadoopShims.hasFileSystemImpl(new Path(outputPathString), conf)) {
conf.set("pig.streaming.log.dir",
new Path(outputPathString, JobControlCompiler.LOG_DIR).toString());
}
@@ -245,11 +246,11 @@ public class MapRedUtil {
/**
* Get all files recursively from the given list of files
- *
+ *
* @param files a list of FileStatus
* @param conf the configuration object
* @return the list of fileStatus that contains all the files in the given
- * list and, recursively, all the files inside the directories in
+ * list and, recursively, all the files inside the directories in
* the given list
* @throws IOException
*/
@@ -267,12 +268,12 @@ public class MapRedUtil {
result.add(file);
}
}
- log.info("Total input paths to process : " + result.size());
+ log.info("Total input paths to process : " + result.size());
return result;
}
-
+
private static void addInputPathRecursively(List<FileStatus> result,
- FileSystem fs, Path path, PathFilter inputFilter)
+ FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
for (FileStatus stat: fs.listStatus(path, inputFilter)) {
if (stat.isDir()) {
@@ -284,6 +285,7 @@ public class MapRedUtil {
}
private static final PathFilter hiddenFileFilter = new PathFilter(){
+ @Override
public boolean accept(Path p){
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
@@ -319,7 +321,7 @@ public class MapRedUtil {
return cmp == 0 ? 0 : cmp < 0 ? -1 : 1;
}
};
-
+
private static final class ComparableSplit implements Comparable<ComparableSplit> {
private InputSplit rawInputSplit;
private HashSet<Node> nodes;
@@ -330,32 +332,32 @@ public class MapRedUtil {
nodes = new HashSet<Node>();
this.id = id;
}
-
+
void add(Node node) {
nodes.add(node);
}
-
+
void removeFromNodes() {
for (Node node : nodes)
node.remove(this);
}
-
+
public InputSplit getSplit() {
return rawInputSplit;
}
-
+
@Override
public boolean equals(Object other) {
if (other == null || !(other instanceof ComparableSplit))
return false;
return (compareTo((ComparableSplit) other) == 0);
}
-
+
@Override
public int hashCode() {
return 41;
}
-
+
@Override
public int compareTo(ComparableSplit other) {
try {
@@ -369,41 +371,41 @@ public class MapRedUtil {
}
}
}
-
+
private static class DummySplit extends InputSplit {
private long length;
-
+
@Override
public String[] getLocations() {
return null;
}
-
+
@Override
public long getLength() {
return length;
}
-
+
public void setLength(long length) {
this.length = length;
}
}
-
+
private static class Node {
private long length = 0;
private ArrayList<ComparableSplit> splits;
private boolean sorted;
-
+
Node() throws IOException, InterruptedException {
length = 0;
splits = new ArrayList<ComparableSplit>();
sorted = false;
}
-
+
void add(ComparableSplit split) throws IOException, InterruptedException {
splits.add(split);
length++;
}
-
+
void remove(ComparableSplit split) {
if (!sorted)
sort();
@@ -413,23 +415,23 @@ public class MapRedUtil {
length--;
}
}
-
+
void sort() {
if (!sorted) {
Collections.sort(splits);
sorted = true;
}
}
-
+
ArrayList<ComparableSplit> getSplits() {
return splits;
}
-
+
public long getLength() {
return length;
}
}
-
+
public static List<List<InputSplit>> getCombinePigSplits(List<InputSplit>
oneInputSplits, long maxCombinedSplitSize, Configuration conf)
throws IOException, InterruptedException {
@@ -438,13 +440,13 @@ public class MapRedUtil {
List<List<InputSplit>> result = new ArrayList<List<InputSplit>>();
List<Long> resultLengths = new ArrayList<Long>();
long comparableSplitId = 0;
-
+
int size = 0, nSplits = oneInputSplits.size();
InputSplit lastSplit = null;
int emptyCnt = 0;
for (InputSplit split : oneInputSplits) {
if (split.getLength() == 0) {
- emptyCnt++;
+ emptyCnt++;
continue;
}
if (split.getLength() >= maxCombinedSplitSize) {
@@ -461,7 +463,7 @@ public class MapRedUtil {
HashSet<String> locationSeen = new HashSet<String>();
for (String location : locations)
{
- if (!locationSeen.contains(location))
+ if (!locationSeen.contains(location))
{
Node node = nodeMap.get(location);
if (node == null) {
@@ -488,7 +490,7 @@ public class MapRedUtil {
ArrayList<ComparableSplit> splits = node.getSplits();
for (ComparableSplit split : splits) {
if (!seen.contains(split.getSplit())) {
- // remove duplicates. The set has to be on the raw input split not the
+ // remove duplicates. The set has to be on the raw input split not the
// comparable input split as the latter overrides the compareTo method
// so its equality semantics is changed and not we want here
seen.add(split.getSplit());
@@ -497,7 +499,7 @@ public class MapRedUtil {
}
}
}
-
+
int combinedSplitLen = 0;
for (PigSplit split : result)
combinedSplitLen += split.getNumPaths();
@@ -581,7 +583,7 @@ public class MapRedUtil {
for (Node node : nodes) {
for (ComparableSplit split : node.getSplits()) {
if (!seen.contains(split.getSplit())) {
- // remove duplicates. The set has to be on the raw input split not the
+ // remove duplicates. The set has to be on the raw input split not the
// comparable input split as the latter overrides the compareTo method
// so its equality semantics is changed and not we want here
seen.add(split.getSplit());
@@ -589,7 +591,7 @@ public class MapRedUtil {
}
}
}
-
+
/* verification code
int combinedSplitLen = 0;
for (PigSplit split : result)
@@ -602,7 +604,7 @@ public class MapRedUtil {
long totalSize = 0;
ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
ArrayList<ComparableSplit> combinedComparableSplits = new ArrayList<ComparableSplit>();
-
+
int splitLen = leftoverSplits.size();
for (int i = 0; i < splitLen; i++)
{
@@ -649,26 +651,26 @@ public class MapRedUtil {
combinedSplitLen += split.getNumPaths();
if (combinedSplitLen != nSplits-emptyCnt)
throw new AssertionError("number of combined splits ["+combinedSplitLen+"] does not match the number of original splits ["+nSplits+"].");
-
+
long totalLen = 0;
for (PigSplit split : result)
totalLen += split.getLength();
-
+
long origTotalLen = 0;
for (InputSplit split : oneInputSplits)
origTotalLen += split.getLength();
if (totalLen != origTotalLen)
throw new AssertionError("The total length ["+totalLen+"] does not match the original ["+origTotalLen+"]");
- */
+ */
log.info("Total input paths (combined) to process : " + result.size());
return result;
}
-
+
private static void removeSplits(List<ComparableSplit> splits) {
for (ComparableSplit split: splits)
split.removeFromNodes();
}
-
+
public String inputSplitToString(InputSplit[] splits) throws IOException, InterruptedException {
// debugging purpose only
StringBuilder st = new StringBuilder();
@@ -681,11 +683,11 @@ public class MapRedUtil {
st.append("Input split["+i+"]:\n Length = "+ splits[i].getLength()+"\n Locations:\n");
for (String location : splits[i].getLocations())
st.append(" "+location+"\n");
- st.append("\n-----------------------\n");
+ st.append("\n-----------------------\n");
}
return st.toString();
}
-
+
/* verification code: debug purpose only
public String inputSplitToString(ArrayList<ComparableSplit> splits) throws IOException, InterruptedException {
StringBuilder st = new StringBuilder();
@@ -698,7 +700,7 @@ public class MapRedUtil {
st.append("Input split["+i+"]:\n Length = "+ splits.get(i).getSplit().getLength()+"\n Locations:\n");
for (String location : splits.get(i).getSplit().getLocations())
st.append(" "+location+"\n");
- st.append("\n-----------------------\n");
+ st.append("\n-----------------------\n");
}
return st.toString();
}
Modified: pig/trunk/src/org/apache/pig/impl/util/UriUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/UriUtil.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/UriUtil.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/UriUtil.java Wed Apr 30 14:00:20 2014
@@ -17,24 +17,25 @@
*/
package org.apache.pig.impl.util;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+
public class UriUtil {
public static boolean isHDFSFile(String uri){
if(uri == null)
return false;
- if(uri.startsWith("/") || uri.startsWith("hdfs:") || uri.startsWith("viewfs:")) {
+ if (uri.startsWith("/") || uri.startsWith("hdfs:") || uri.startsWith("viewfs:") ||
+ uri.startsWith("hftp:") || uri.startsWith("webhdfs:")) {
return true;
}
return false;
}
- public static boolean isHDFSFileOrLocalOrS3N(String uri){
+ public static boolean isHDFSFileOrLocalOrS3N(String uri, Configuration conf){
if(uri == null)
return false;
- if(uri.startsWith("/") || uri.matches("[A-Za-z]:.*") || uri.startsWith("hdfs:")
- || uri.startsWith("viewfs:") || uri.startsWith("file:") || uri.startsWith("s3n:")) {
- return true;
- }
- return false;
+ return HadoopShims.hasFileSystemImpl(new Path(uri), conf);
}
-
+
}
Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Wed Apr 30 14:00:20 2014
@@ -525,27 +525,6 @@ public class Utils {
return baos.toString();
}
- /**
- * Returns whether the give path has a FileSystem implementation.
- *
- * @param path path
- * @param conf configuration
- * @return true if the give path's scheme has a FileSystem implementation,
- * false otherwise
- */
- public static boolean hasFileSystemImpl(Path path, Configuration conf) {
- String scheme = path.toUri().getScheme();
- if (scheme != null) {
- String fsImpl = conf.get("fs." + scheme + ".impl");
- if (fsImpl == null) {
- return false;
- }
- }
- return true;
- }
-
-
-
public static boolean isLocal(PigContext pigContext, Configuration conf) {
return pigContext.getExecType().isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
}
Modified: pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java Wed Apr 30 14:00:20 2014
@@ -31,6 +31,7 @@ import org.antlr.runtime.CommonTokenStre
import org.antlr.runtime.RecognitionException;
import org.antlr.runtime.tree.CommonTree;
import org.antlr.runtime.tree.Tree;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigConfiguration;
@@ -38,6 +39,8 @@ import org.apache.pig.StoreFuncInterface
import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
@@ -56,13 +59,13 @@ public class QueryParserUtils {
return str;
}
- public static void attachStorePlan(String scope, LogicalPlan lp, String fileName, String func,
+ public static void attachStorePlan(String scope, LogicalPlan lp, String fileName, String func,
Operator input, String alias, PigContext pigContext) throws FrontendException {
func = func == null ? pigContext.getProperties().getProperty(PigConfiguration.PIG_DEFAULT_STORE_FUNC, PigStorage.class.getName()) : func;
FuncSpec funcSpec = new FuncSpec( func );
StoreFuncInterface stoFunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec( funcSpec );
-
+
fileName = removeQuotes( fileName );
FileSpec fileSpec = new FileSpec( fileName, funcSpec );
String sig = alias + "_" + LogicalPlanBuilder.newOperatorKey(scope);
@@ -87,25 +90,23 @@ public class QueryParserUtils {
ElementDescriptor el = dfs.asElement(desc);
return new Path(el.toString());
}
-
+
static void setHdfsServers(String absolutePath, PigContext pigContext) throws URISyntaxException {
// Get native host
String defaultFS = (String)pigContext.getProperties().get("fs.default.name");
if (defaultFS==null)
defaultFS = (String)pigContext.getProperties().get("fs.defaultFS");
-
+
URI defaultFSURI = new URI(defaultFS);
- String defaultHost = defaultFSURI.getHost();
- if (defaultHost == null) defaultHost = "";
-
- defaultHost = defaultHost.toLowerCase();
-
- Set<String> remoteHosts = getRemoteHosts(absolutePath, defaultHost);
-
+
+ Configuration conf = new Configuration(true);
+ ConfigurationUtil.mergeConf(conf, ConfigurationUtil.toConfiguration(pigContext.getProperties()));
+ Set<String> remoteHosts = getRemoteHosts(absolutePath, defaultFSURI, conf);
+
String hdfsServersString = (String)pigContext.getProperties().get("mapreduce.job.hdfs-servers");
if (hdfsServersString == null) hdfsServersString = "";
String hdfsServers[] = hdfsServersString.split(",");
-
+
for (String remoteHost : remoteHosts) {
boolean existing = false;
for (String hdfsServer : hdfsServers) {
@@ -120,43 +121,50 @@ public class QueryParserUtils {
hdfsServersString = hdfsServersString + remoteHost;
}
}
-
+
if (!hdfsServersString.isEmpty()) {
pigContext.getProperties().setProperty("mapreduce.job.hdfs-servers", hdfsServersString);
}
}
- static Set<String> getRemoteHosts(String absolutePath, String defaultHost) {
- String HAR_PREFIX = "hdfs-";
- Set<String> result = new HashSet<String>();
- String[] fnames = absolutePath.split(",");
- for (String fname: fnames) {
- // remove leading/trailing whitespace(s)
- fname = fname.trim();
- Path p = new Path(fname);
- URI uri = p.toUri();
- if(uri.isAbsolute()) {
- String scheme = uri.getScheme();
- if (scheme!=null && scheme.toLowerCase().equals("hdfs")||scheme.toLowerCase().equals("har")) {
- if (uri.getHost()==null)
- continue;
- String thisHost = uri.getHost().toLowerCase();
- if (scheme.toLowerCase().equals("har")) {
- if (thisHost.startsWith(HAR_PREFIX)) {
- thisHost = thisHost.substring(HAR_PREFIX.length());
- }
- }
- if (!uri.getHost().isEmpty() &&
- !thisHost.equals(defaultHost)) {
- if (uri.getPort()!=-1)
- result.add("hdfs://"+thisHost+":"+uri.getPort());
- else
- result.add("hdfs://"+thisHost);
- }
- }
- }
- }
- return result;
+ static Set<String> getRemoteHosts(String absolutePath, URI defaultFSURI, Configuration conf) {
+ String defaultHost = defaultFSURI.getHost() == null ? "" : defaultFSURI.getHost().toLowerCase();
+ String defaultScheme = defaultFSURI.getScheme() == null ? "" : defaultFSURI.getScheme().toLowerCase();
+
+ Set<String> result = new HashSet<String>();
+ String[] fnames = absolutePath.split(",");
+ for (String fname : fnames) {
+ // remove leading/trailing whitespace(s)
+ Path path = new Path(fname.trim());
+ URI uri = path.toUri();
+ if (uri.isAbsolute()) { // If it has scheme
+ String thisHost = uri.getHost() == null ? "" : uri.getHost().toLowerCase();
+ String scheme = uri.getScheme().toLowerCase();
+ // If host and scheme are same continue
+ if (scheme.equals(defaultScheme) && (thisHost.equals(defaultHost) || thisHost.isEmpty())) {
+ continue;
+ }
+ String authority = uri.getAuthority() == null ? "" : uri.getAuthority()
+ .toLowerCase();
+ if (scheme.equals("har")) {
+ String[] parts = authority.split("-", 2);
+ scheme = parts[0];
+ if (parts.length < 2) {
+ authority = "";
+ } else {
+ authority = parts[1];
+ }
+ if (scheme.isEmpty() || (scheme.equals(defaultScheme) &&
+ authority.equals(defaultFSURI.getAuthority()))) {
+ continue;
+ }
+ } else if (!HadoopShims.hasFileSystemImpl(path, conf)) {
+ continue;
+ }
+ result.add(scheme + "://" + authority);
+ }
+ }
+ return result;
}
static String constructFileNameSignature(String fileName, FuncSpec funcSpec) {
@@ -218,11 +226,11 @@ public class QueryParserUtils {
return null;
}
-
+
static QueryParser createParser(CommonTokenStream tokens) {
return createParser(tokens, 0);
}
-
+
static QueryParser createParser(CommonTokenStream tokens, int lineOffset) {
QueryParser parser = new QueryParser(tokens);
PigParserNodeAdaptor adaptor = new PigParserNodeAdaptor(
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Wed Apr 30 14:00:20 2014
@@ -346,7 +346,7 @@ public abstract class JobStats extends O
for (String className : readerNames.split(",")) {
reader = (PigStatsOutputSizeReader) PigContext.instantiateFuncFromSpec(className);
- if (reader.supports(sto)) {
+ if (reader.supports(sto, conf)) {
LOG.info("using output size reader: " + className);
try {
return reader.getOutputSize(sto, conf);
Added: pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java?rev=1591297&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java (added)
+++ pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java Wed Apr 30 14:00:20 2014
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.parser;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Properties;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.test.Util;
+import org.junit.Test;
+
+public class TestQueryParserUtils {
+
+ @Test
+ public void testSetHDFSServers() throws Exception {
+ Properties props = new Properties();
+ props.setProperty("fs.default.name", "hdfs://nn1:8020/tmp");
+ PigContext pc = new PigContext(ExecType.LOCAL, props);
+
+ //No scheme/host
+ QueryParserUtils.setHdfsServers("hdfs:///tmp", pc);
+ assertEquals(null, props.getProperty("mapreduce.job.hdfs-servers"));
+ QueryParserUtils.setHdfsServers("/tmp", pc);
+ assertEquals(null, props.getProperty("mapreduce.job.hdfs-servers"));
+ QueryParserUtils.setHdfsServers("tmp", pc);
+ assertEquals(null, props.getProperty("mapreduce.job.hdfs-servers"));
+
+ // Same as default host and scheme
+ QueryParserUtils.setHdfsServers("hdfs://nn1/tmp", pc);
+ assertEquals(null, props.getProperty("mapreduce.job.hdfs-servers"));
+ QueryParserUtils.setHdfsServers("hdfs://nn1:8020/tmp", pc);
+ assertEquals(null, props.getProperty("mapreduce.job.hdfs-servers"));
+
+ // Same host different scheme
+ QueryParserUtils.setHdfsServers("hftp://nn1/tmp", pc);
+ assertEquals("hftp://nn1", props.getProperty("mapreduce.job.hdfs-servers"));
+ QueryParserUtils.setHdfsServers("hftp://nn1:50070/tmp", pc);
+ assertEquals("hftp://nn1,hftp://nn1:50070", props.getProperty("mapreduce.job.hdfs-servers"));
+ // There should be no duplicates
+ QueryParserUtils.setHdfsServers("hftp://nn1:50070/tmp", pc);
+ assertEquals("hftp://nn1,hftp://nn1:50070", props.getProperty("mapreduce.job.hdfs-servers"));
+
+ // har
+ props.remove("mapreduce.job.hdfs-servers");
+ QueryParserUtils.setHdfsServers("har:///tmp", pc);
+ assertEquals(null, props.getProperty("mapreduce.job.hdfs-servers"));
+ QueryParserUtils.setHdfsServers("har://hdfs-nn1:8020/tmp", pc);
+ assertEquals(null, props.getProperty("mapreduce.job.hdfs-servers"));
+ QueryParserUtils.setHdfsServers("har://hdfs-nn1/tmp", pc);
+ assertEquals("hdfs://nn1", props.getProperty("mapreduce.job.hdfs-servers"));
+
+ // Non existing filesystem scheme
+ props.remove("mapreduce.job.hdfs-servers");
+ QueryParserUtils.setHdfsServers("hello://nn1/tmp", pc);
+ assertEquals(null, props.getProperty("mapreduce.job.hdfs-servers"));
+
+ if(Util.isHadoop23() || Util.isHadoop2_0()) {
+ // webhdfs
+ props.remove("mapreduce.job.hdfs-servers");
+ QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc);
+ assertEquals("webhdfs://nn1", props.getProperty("mapreduce.job.hdfs-servers"));
+ QueryParserUtils.setHdfsServers("webhdfs://nn1:50070/tmp", pc);
+ assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty("mapreduce.job.hdfs-servers"));
+
+ // har with webhfs
+ QueryParserUtils.setHdfsServers("har://webhdfs-nn1:50070/tmp", pc);
+ assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty("mapreduce.job.hdfs-servers"));
+ QueryParserUtils.setHdfsServers("har://webhdfs-nn2:50070/tmp", pc);
+ assertEquals("webhdfs://nn1,webhdfs://nn1:50070,webhdfs://nn2:50070", props.getProperty("mapreduce.job.hdfs-servers"));
+ props.remove("mapreduce.job.hdfs-servers");
+ QueryParserUtils.setHdfsServers("har://webhdfs-nn1/tmp", pc);
+ assertEquals("webhdfs://nn1", props.getProperty("mapreduce.job.hdfs-servers"));
+
+ //viewfs
+ props.remove("mapreduce.job.hdfs-servers");
+ QueryParserUtils.setHdfsServers("viewfs:/tmp", pc);
+ assertEquals("viewfs://", props.getProperty("mapreduce.job.hdfs-servers"));
+ QueryParserUtils.setHdfsServers("viewfs:///tmp", pc);
+ assertEquals("viewfs://", props.getProperty("mapreduce.job.hdfs-servers"));
+ QueryParserUtils.setHdfsServers("viewfs://cluster1/tmp", pc);
+ assertEquals("viewfs://,viewfs://cluster1", props.getProperty("mapreduce.job.hdfs-servers"));
+
+ //har with viewfs
+ props.remove("mapreduce.job.hdfs-servers");
+ QueryParserUtils.setHdfsServers("har://viewfs/tmp", pc);
+ assertEquals("viewfs://", props.getProperty("mapreduce.job.hdfs-servers"));
+ QueryParserUtils.setHdfsServers("har://viewfs-cluster1/tmp", pc);
+ assertEquals("viewfs://,viewfs://cluster1", props.getProperty("mapreduce.job.hdfs-servers"));
+
+
+ }
+
+
+ }
+
+}
Modified: pig/trunk/test/org/apache/pig/test/TestMRJobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMRJobStats.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMRJobStats.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMRJobStats.java Wed Apr 30 14:00:20 2014
@@ -25,7 +25,6 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Properties;
@@ -190,7 +189,7 @@ public class TestMRJobStats {
* @param sto POStore
*/
@Override
- public boolean supports(POStore sto) {
+ public boolean supports(POStore sto, Configuration conf) {
return true;
}