You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/30 21:13:48 UTC
svn commit: r1591449 - in /pig/branches/tez: ./
shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/
shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/
src/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/e...
Author: cheolsoo
Date: Wed Apr 30 19:13:47 2014
New Revision: 1591449
URL: http://svn.apache.org/r1591449
Log:
Merge latest trunk changes
Added:
pig/branches/tez/test/org/apache/pig/parser/TestQueryParserUtils.java
- copied unchanged from r1591410, pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java
Modified:
pig/branches/tez/ (props changed)
pig/branches/tez/CHANGES.txt
pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
pig/branches/tez/src/org/apache/pig/PigServer.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigStatsOutputSizeReader.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
pig/branches/tez/src/org/apache/pig/impl/util/UriUtil.java
pig/branches/tez/src/org/apache/pig/impl/util/Utils.java
pig/branches/tez/src/org/apache/pig/parser/QueryParserUtils.java
pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java
pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
pig/branches/tez/src/pig-default.properties (props changed)
pig/branches/tez/test/org/apache/pig/test/TestMRJobStats.java
pig/branches/tez/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2 (props changed)
Propchange: pig/branches/tez/
------------------------------------------------------------------------------
Merged /pig/trunk:r1591007-1591410
Modified: pig/branches/tez/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/tez/CHANGES.txt?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/CHANGES.txt (original)
+++ pig/branches/tez/CHANGES.txt Wed Apr 30 19:13:47 2014
@@ -32,6 +32,8 @@ PIG-2207: Support custom counters for ag
IMPROVEMENTS
+PIG-3672: Pig should not check for hardcoded file system implementations (rohini)
+
PIG-3860: Refactor PigStatusReporter and PigLogger for non-MR execution engine (cheolsoo)
PIG-3865: Remodel the XMLLoader to work to be faster and more maintainable (aseldawy via daijy)
@@ -117,6 +119,8 @@ PIG-3882: Multiquery off mode execution
BUG FIXES
+PIG-3859: auto local mode should not modify reducer configuration (aniket486)
+
PIG-3909: Type Casting issue (daijy)
PIG-3905: 0.12.1 release can't be build for Hadoop2 (daijy)
Modified: pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Wed Apr 30 19:13:47 2014
@@ -119,14 +119,33 @@ public class HadoopShims {
public static void unsetConf(Configuration conf, String key) {
// Not supported in Hadoop 0.20/1.x
}
-
+
/**
- * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
+ * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
* @param conf
* @param taskAttemptID
*/
public static void setTaskAttemptId(Configuration conf, TaskAttemptID taskAttemptID) {
conf.set("mapred.task.id", taskAttemptID.toString());
}
-
+
+ /**
+ * Returns whether the give path has a FileSystem implementation.
+ *
+ * @param path path
+ * @param conf configuration
+ * @return true if the give path's scheme has a FileSystem implementation,
+ * false otherwise
+ */
+ public static boolean hasFileSystemImpl(Path path, Configuration conf) {
+ String scheme = path.toUri().getScheme();
+ if (scheme != null) {
+ String fsImpl = conf.get("fs." + scheme + ".impl");
+ if (fsImpl == null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
}
Modified: pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Wed Apr 30 19:13:47 2014
@@ -18,7 +18,10 @@
package org.apache.pig.backend.hadoop.executionengine.shims;
import java.io.IOException;
+import java.lang.reflect.Method;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -41,6 +44,10 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop23.PigJobControl;
public class HadoopShims {
+
+ private static Log LOG = LogFactory.getLog(HadoopShims.class);
+ private static Method getFileSystemClass;
+
static public JobContext cloneJobContext(JobContext original) throws IOException, InterruptedException {
JobContext newContext = ContextFactory.cloneContext(original,
new JobConf(original.getConfiguration()));
@@ -103,7 +110,7 @@ public class HadoopShims {
public static JobControl newJobControl(String groupName, int timeToSleep) {
return new PigJobControl(groupName, timeToSleep);
}
-
+
public static long getDefaultBlockSize(FileSystem fs, Path path) {
return fs.getDefaultBlockSize(path);
}
@@ -111,22 +118,63 @@ public class HadoopShims {
public static Counters getCounters(Job job) throws IOException, InterruptedException {
return new Counters(job.getJob().getCounters());
}
-
+
public static boolean isJobFailed(TaskReport report) {
return report.getCurrentStatus()==TIPStatus.FAILED;
}
-
+
public static void unsetConf(Configuration conf, String key) {
conf.unset(key);
}
-
+
/**
- * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
+ * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
* @param conf
* @param taskAttemptID
*/
public static void setTaskAttemptId(Configuration conf, TaskAttemptID taskAttemptID) {
conf.setInt("mapreduce.job.application.attempt.id", taskAttemptID.getId());
}
-
+
+ /**
+ * Returns whether the give path has a FileSystem implementation.
+ *
+ * @param path path
+ * @param conf configuration
+ * @return true if the give path's scheme has a FileSystem implementation,
+ * false otherwise
+ */
+ public static boolean hasFileSystemImpl(Path path, Configuration conf) {
+ String scheme = path.toUri().getScheme();
+ if (scheme != null) {
+ // Hadoop 0.23
+ if (conf.get("fs.file.impl") != null) {
+ String fsImpl = conf.get("fs." + scheme + ".impl");
+ if (fsImpl == null) {
+ return false;
+ }
+ } else {
+ // Hadoop 2.x HADOOP-7549
+ if (getFileSystemClass == null) {
+ try {
+ getFileSystemClass = FileSystem.class.getDeclaredMethod(
+ "getFileSystemClass", String.class, Configuration.class);
+ } catch (NoSuchMethodException e) {
+ LOG.warn("Error while trying to determine if path " + path +
+ " has a filesystem implementation");
+ // Assume has implementation to be safe
+ return true;
+ }
+ }
+ try {
+ Object fs = getFileSystemClass.invoke(null, scheme, conf);
+ return fs == null ? false : true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
}
Modified: pig/branches/tez/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigServer.java?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigServer.java Wed Apr 30 19:13:47 2014
@@ -1831,7 +1831,7 @@ public class PigServer {
Set<String> uniqueStoreLoc = new HashSet<String>();
for(LOStore store : storeOps) {
String fileName = store.getFileSpec().getFileName();
- if(!uniqueStoreLoc.add(fileName) && UriUtil.isHDFSFileOrLocalOrS3N(fileName)) {
+ if(!uniqueStoreLoc.add(fileName) && UriUtil.isHDFSFileOrLocalOrS3N(fileName, new Configuration(true))) {
throw new RuntimeException("Script contains 2 or more STORE statements writing to same location : "+ fileName);
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java Wed Apr 30 19:13:47 2014
@@ -35,15 +35,15 @@ public class FileBasedOutputSizeReader i
private static final Log log = LogFactory.getLog(FileBasedOutputSizeReader.class);
- /**
+ /**
* Returns whether the given POStore is supported by this output size reader
* or not. We check whether the uri scheme of output file is one of hdfs,
* local, and s3.
* @param sto POStore
*/
@Override
- public boolean supports(POStore sto) {
- return UriUtil.isHDFSFileOrLocalOrS3N(getLocationUri(sto));
+ public boolean supports(POStore sto, Configuration conf) {
+ return UriUtil.isHDFSFileOrLocalOrS3N(getLocationUri(sto), conf);
}
/**
@@ -53,7 +53,7 @@ public class FileBasedOutputSizeReader i
*/
@Override
public long getOutputSize(POStore sto, Configuration conf) throws IOException {
- if (!supports(sto)) {
+ if (!supports(sto, conf)) {
log.warn("'" + sto.getStoreFunc().getClass().getName()
+ "' is not supported by " + getClass().getName());
return -1;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Wed Apr 30 19:13:47 2014
@@ -109,7 +109,7 @@ public class InputSizeReducerEstimator i
// the input file location might be a list of comma separated files,
// separate them out
for (String location : LoadFunc.getPathStrings(ld.getLFile().getFileName())) {
- if (UriUtil.isHDFSFileOrLocalOrS3N(location)) {
+ if (UriUtil.isHDFSFileOrLocalOrS3N(location, conf)) {
Path path = new Path(location);
FileSystem fs = path.getFileSystem(conf);
FileStatus[] status = fs.globStatus(path);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Apr 30 19:13:47 2014
@@ -569,7 +569,7 @@ public class JobControlCompiler{
// override with the default conf to run in local mode
for (Entry<String, String> entry : defaultConf) {
String key = entry.getKey();
- if (key.equals("mapred.reduce.tasks")) {
+ if (key.equals("mapred.reduce.tasks") || key.equals("mapreduce.job.reduces")) {
// this must not be set back to the default in case it has been set to 0 for example.
continue;
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Apr 30 19:13:47 2014
@@ -443,7 +443,7 @@ public class MapReduceLauncher extends L
// Clean up all the intermediate data
for (String path : intermediateVisitor.getIntermediate()) {
// Skip non-file system paths such as hbase, see PIG-3617
- if (Utils.hasFileSystemImpl(new Path(path), conf)) {
+ if (HadoopShims.hasFileSystemImpl(new Path(path), conf)) {
FileLocalizer.delete(path, pc);
}
}
@@ -731,7 +731,7 @@ public class MapReduceLauncher extends L
if(shouldMarkOutputDir(job)) {
Path outputPath = new Path(store.getSFile().getFileName());
String scheme = outputPath.toUri().getScheme();
- if (Utils.hasFileSystemImpl(outputPath, job.getJobConf())) {
+ if (HadoopShims.hasFileSystemImpl(outputPath, job.getJobConf())) {
FileSystem fs = outputPath.getFileSystem(job.getJobConf());
if (fs.exists(outputPath)) {
// create a file in the folder to mark it
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Wed Apr 30 19:13:47 2014
@@ -56,9 +56,9 @@ public final class PigHadoopLogger imple
return logger;
}
- public void destory() {
+ public void destroy() {
if (reporter != null) {
- reporter.destory();
+ reporter.destroy();
}
reporter = null;
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigStatsOutputSizeReader.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigStatsOutputSizeReader.java?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigStatsOutputSizeReader.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigStatsOutputSizeReader.java Wed Apr 30 19:13:47 2014
@@ -30,7 +30,7 @@ import org.apache.pig.classification.Int
* HBaseStorage), the output size cannot always be computed as the total size of
* output files.
*
- * @see FileBasedOutputSizeReader
+ * @see FileBasedOutputSizeReader
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@@ -38,12 +38,13 @@ public interface PigStatsOutputSizeReade
static final String OUTPUT_SIZE_READER_KEY = "pig.stats.output.size.reader";
- /**
+ /**
* Returns whether the given PSStore is supported by this output size reader
* or not.
* @param sto POStore
+ * @param conf Configuration
*/
- public boolean supports(POStore sto);
+ public boolean supports(POStore sto, Configuration conf);
/**
* Returns the size of output in bytes. If the size of output cannot be
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Wed Apr 30 19:13:47 2014
@@ -139,7 +139,7 @@ public class PigProcessor implements Log
sampleMap = null;
sampleVertex = null;
if (pigHadoopLogger != null) {
- pigHadoopLogger.destory();
+ pigHadoopLogger.destroy();
pigHadoopLogger = null;
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Wed Apr 30 19:13:47 2014
@@ -46,6 +46,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@@ -86,8 +87,8 @@ public class MapRedUtil {
Map<E, Pair<Integer, Integer>> reducerMap = new HashMap<E, Pair<Integer, Integer>>();
// use local file system to get the keyDistFile
- Configuration conf = new Configuration(false);
-
+ Configuration conf = new Configuration(false);
+
if (mapConf.get("yarn.resourcemanager.principal")!=null) {
conf.set("yarn.resourcemanager.principal", mapConf.get("yarn.resourcemanager.principal"));
}
@@ -122,7 +123,7 @@ public class MapRedUtil {
Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
// Used to replace the maxIndex with the number of reducers
if (maxIndex < minIndex) {
- maxIndex = totalReducers[0] + maxIndex;
+ maxIndex = totalReducers[0] + maxIndex;
}
E keyT;
@@ -170,12 +171,12 @@ public class MapRedUtil {
public static void setupUDFContext(Configuration job) throws IOException {
UDFContext udfc = UDFContext.getUDFContext();
udfc.addJobConf(job);
- // don't deserialize in front-end
+ // don't deserialize in front-end
if (udfc.isUDFConfEmpty()) {
udfc.deserialize();
}
}
-
+
/**
* Sets up output and log dir paths for a single-store streaming job
*
@@ -188,7 +189,7 @@ public class MapRedUtil {
Configuration conf) throws IOException {
// set out filespecs
String outputPathString = st.getSFile().getFileName();
- if (!outputPathString.contains("://") || outputPathString.startsWith("hdfs://")) {
+ if (HadoopShims.hasFileSystemImpl(new Path(outputPathString), conf)) {
conf.set("pig.streaming.log.dir",
new Path(outputPathString, JobControlCompiler.LOG_DIR).toString());
}
@@ -246,11 +247,11 @@ public class MapRedUtil {
/**
* Get all files recursively from the given list of files
- *
+ *
* @param files a list of FileStatus
* @param conf the configuration object
* @return the list of fileStatus that contains all the files in the given
- * list and, recursively, all the files inside the directories in
+ * list and, recursively, all the files inside the directories in
* the given list
* @throws IOException
*/
@@ -268,12 +269,12 @@ public class MapRedUtil {
result.add(file);
}
}
- log.info("Total input paths to process : " + result.size());
+ log.info("Total input paths to process : " + result.size());
return result;
}
-
+
private static void addInputPathRecursively(List<FileStatus> result,
- FileSystem fs, Path path, PathFilter inputFilter)
+ FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
for (FileStatus stat: fs.listStatus(path, inputFilter)) {
if (stat.isDir()) {
@@ -285,6 +286,7 @@ public class MapRedUtil {
}
private static final PathFilter hiddenFileFilter = new PathFilter(){
+ @Override
public boolean accept(Path p){
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
@@ -320,7 +322,7 @@ public class MapRedUtil {
return cmp == 0 ? 0 : cmp < 0 ? -1 : 1;
}
};
-
+
private static final class ComparableSplit implements Comparable<ComparableSplit> {
private InputSplit rawInputSplit;
private HashSet<Node> nodes;
@@ -331,32 +333,32 @@ public class MapRedUtil {
nodes = new HashSet<Node>();
this.id = id;
}
-
+
void add(Node node) {
nodes.add(node);
}
-
+
void removeFromNodes() {
for (Node node : nodes)
node.remove(this);
}
-
+
public InputSplit getSplit() {
return rawInputSplit;
}
-
+
@Override
public boolean equals(Object other) {
if (other == null || !(other instanceof ComparableSplit))
return false;
return (compareTo((ComparableSplit) other) == 0);
}
-
+
@Override
public int hashCode() {
return 41;
}
-
+
@Override
public int compareTo(ComparableSplit other) {
try {
@@ -370,41 +372,41 @@ public class MapRedUtil {
}
}
}
-
+
private static class DummySplit extends InputSplit {
private long length;
-
+
@Override
public String[] getLocations() {
return null;
}
-
+
@Override
public long getLength() {
return length;
}
-
+
public void setLength(long length) {
this.length = length;
}
}
-
+
private static class Node {
private long length = 0;
private ArrayList<ComparableSplit> splits;
private boolean sorted;
-
+
Node() throws IOException, InterruptedException {
length = 0;
splits = new ArrayList<ComparableSplit>();
sorted = false;
}
-
+
void add(ComparableSplit split) throws IOException, InterruptedException {
splits.add(split);
length++;
}
-
+
void remove(ComparableSplit split) {
if (!sorted)
sort();
@@ -414,23 +416,23 @@ public class MapRedUtil {
length--;
}
}
-
+
void sort() {
if (!sorted) {
Collections.sort(splits);
sorted = true;
}
}
-
+
ArrayList<ComparableSplit> getSplits() {
return splits;
}
-
+
public long getLength() {
return length;
}
}
-
+
public static List<List<InputSplit>> getCombinePigSplits(List<InputSplit>
oneInputSplits, long maxCombinedSplitSize, Configuration conf)
throws IOException, InterruptedException {
@@ -439,13 +441,13 @@ public class MapRedUtil {
List<List<InputSplit>> result = new ArrayList<List<InputSplit>>();
List<Long> resultLengths = new ArrayList<Long>();
long comparableSplitId = 0;
-
+
int size = 0, nSplits = oneInputSplits.size();
InputSplit lastSplit = null;
int emptyCnt = 0;
for (InputSplit split : oneInputSplits) {
if (split.getLength() == 0) {
- emptyCnt++;
+ emptyCnt++;
continue;
}
if (split.getLength() >= maxCombinedSplitSize) {
@@ -462,7 +464,7 @@ public class MapRedUtil {
HashSet<String> locationSeen = new HashSet<String>();
for (String location : locations)
{
- if (!locationSeen.contains(location))
+ if (!locationSeen.contains(location))
{
Node node = nodeMap.get(location);
if (node == null) {
@@ -489,7 +491,7 @@ public class MapRedUtil {
ArrayList<ComparableSplit> splits = node.getSplits();
for (ComparableSplit split : splits) {
if (!seen.contains(split.getSplit())) {
- // remove duplicates. The set has to be on the raw input split not the
+ // remove duplicates. The set has to be on the raw input split not the
// comparable input split as the latter overrides the compareTo method
// so its equality semantics is changed and not we want here
seen.add(split.getSplit());
@@ -498,7 +500,7 @@ public class MapRedUtil {
}
}
}
-
+
int combinedSplitLen = 0;
for (PigSplit split : result)
combinedSplitLen += split.getNumPaths();
@@ -582,7 +584,7 @@ public class MapRedUtil {
for (Node node : nodes) {
for (ComparableSplit split : node.getSplits()) {
if (!seen.contains(split.getSplit())) {
- // remove duplicates. The set has to be on the raw input split not the
+ // remove duplicates. The set has to be on the raw input split not the
// comparable input split as the latter overrides the compareTo method
// so its equality semantics is changed and not we want here
seen.add(split.getSplit());
@@ -590,7 +592,7 @@ public class MapRedUtil {
}
}
}
-
+
/* verification code
int combinedSplitLen = 0;
for (PigSplit split : result)
@@ -603,7 +605,7 @@ public class MapRedUtil {
long totalSize = 0;
ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
ArrayList<ComparableSplit> combinedComparableSplits = new ArrayList<ComparableSplit>();
-
+
int splitLen = leftoverSplits.size();
for (int i = 0; i < splitLen; i++)
{
@@ -650,26 +652,26 @@ public class MapRedUtil {
combinedSplitLen += split.getNumPaths();
if (combinedSplitLen != nSplits-emptyCnt)
throw new AssertionError("number of combined splits ["+combinedSplitLen+"] does not match the number of original splits ["+nSplits+"].");
-
+
long totalLen = 0;
for (PigSplit split : result)
totalLen += split.getLength();
-
+
long origTotalLen = 0;
for (InputSplit split : oneInputSplits)
origTotalLen += split.getLength();
if (totalLen != origTotalLen)
throw new AssertionError("The total length ["+totalLen+"] does not match the original ["+origTotalLen+"]");
- */
+ */
log.info("Total input paths (combined) to process : " + result.size());
return result;
}
-
+
private static void removeSplits(List<ComparableSplit> splits) {
for (ComparableSplit split: splits)
split.removeFromNodes();
}
-
+
public String inputSplitToString(InputSplit[] splits) throws IOException, InterruptedException {
// debugging purpose only
StringBuilder st = new StringBuilder();
@@ -682,11 +684,11 @@ public class MapRedUtil {
st.append("Input split["+i+"]:\n Length = "+ splits[i].getLength()+"\n Locations:\n");
for (String location : splits[i].getLocations())
st.append(" "+location+"\n");
- st.append("\n-----------------------\n");
+ st.append("\n-----------------------\n");
}
return st.toString();
}
-
+
/* verification code: debug purpose only
public String inputSplitToString(ArrayList<ComparableSplit> splits) throws IOException, InterruptedException {
StringBuilder st = new StringBuilder();
@@ -699,7 +701,7 @@ public class MapRedUtil {
st.append("Input split["+i+"]:\n Length = "+ splits.get(i).getSplit().getLength()+"\n Locations:\n");
for (String location : splits.get(i).getSplit().getLocations())
st.append(" "+location+"\n");
- st.append("\n-----------------------\n");
+ st.append("\n-----------------------\n");
}
return st.toString();
}
Modified: pig/branches/tez/src/org/apache/pig/impl/util/UriUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/util/UriUtil.java?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/util/UriUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/util/UriUtil.java Wed Apr 30 19:13:47 2014
@@ -17,24 +17,25 @@
*/
package org.apache.pig.impl.util;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+
public class UriUtil {
public static boolean isHDFSFile(String uri){
if(uri == null)
return false;
- if(uri.startsWith("/") || uri.startsWith("hdfs:") || uri.startsWith("viewfs:")) {
+ if (uri.startsWith("/") || uri.startsWith("hdfs:") || uri.startsWith("viewfs:") ||
+ uri.startsWith("hftp:") || uri.startsWith("webhdfs:")) {
return true;
}
return false;
}
- public static boolean isHDFSFileOrLocalOrS3N(String uri){
+ public static boolean isHDFSFileOrLocalOrS3N(String uri, Configuration conf){
if(uri == null)
return false;
- if(uri.startsWith("/") || uri.matches("[A-Za-z]:.*") || uri.startsWith("hdfs:")
- || uri.startsWith("viewfs:") || uri.startsWith("file:") || uri.startsWith("s3n:")) {
- return true;
- }
- return false;
+ return HadoopShims.hasFileSystemImpl(new Path(uri), conf);
}
-
+
}
Modified: pig/branches/tez/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/util/Utils.java?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/util/Utils.java Wed Apr 30 19:13:47 2014
@@ -579,25 +579,6 @@ public class Utils {
return baos.toString();
}
- /**
- * Returns whether the give path has a FileSystem implementation.
- *
- * @param path path
- * @param conf configuration
- * @return true if the give path's scheme has a FileSystem implementation,
- * false otherwise
- */
- public static boolean hasFileSystemImpl(Path path, Configuration conf) {
- String scheme = path.toUri().getScheme();
- if (scheme != null) {
- String fsImpl = conf.get("fs." + scheme + ".impl");
- if (fsImpl == null) {
- return false;
- }
- }
- return true;
- }
-
public static boolean isLocal(PigContext pigContext, Configuration conf) {
return pigContext.getExecType().isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
}
Modified: pig/branches/tez/src/org/apache/pig/parser/QueryParserUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/parser/QueryParserUtils.java?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/parser/QueryParserUtils.java (original)
+++ pig/branches/tez/src/org/apache/pig/parser/QueryParserUtils.java Wed Apr 30 19:13:47 2014
@@ -31,6 +31,7 @@ import org.antlr.runtime.CommonTokenStre
import org.antlr.runtime.RecognitionException;
import org.antlr.runtime.tree.CommonTree;
import org.antlr.runtime.tree.Tree;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigConfiguration;
@@ -38,6 +39,8 @@ import org.apache.pig.StoreFuncInterface
import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
@@ -56,13 +59,13 @@ public class QueryParserUtils {
return str;
}
- public static void attachStorePlan(String scope, LogicalPlan lp, String fileName, String func,
+ public static void attachStorePlan(String scope, LogicalPlan lp, String fileName, String func,
Operator input, String alias, PigContext pigContext) throws FrontendException {
func = func == null ? pigContext.getProperties().getProperty(PigConfiguration.PIG_DEFAULT_STORE_FUNC, PigStorage.class.getName()) : func;
FuncSpec funcSpec = new FuncSpec( func );
StoreFuncInterface stoFunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec( funcSpec );
-
+
fileName = removeQuotes( fileName );
FileSpec fileSpec = new FileSpec( fileName, funcSpec );
String sig = alias + "_" + LogicalPlanBuilder.newOperatorKey(scope);
@@ -87,25 +90,23 @@ public class QueryParserUtils {
ElementDescriptor el = dfs.asElement(desc);
return new Path(el.toString());
}
-
+
static void setHdfsServers(String absolutePath, PigContext pigContext) throws URISyntaxException {
// Get native host
String defaultFS = (String)pigContext.getProperties().get("fs.default.name");
if (defaultFS==null)
defaultFS = (String)pigContext.getProperties().get("fs.defaultFS");
-
+
URI defaultFSURI = new URI(defaultFS);
- String defaultHost = defaultFSURI.getHost();
- if (defaultHost == null) defaultHost = "";
-
- defaultHost = defaultHost.toLowerCase();
-
- Set<String> remoteHosts = getRemoteHosts(absolutePath, defaultHost);
-
+
+ Configuration conf = new Configuration(true);
+ ConfigurationUtil.mergeConf(conf, ConfigurationUtil.toConfiguration(pigContext.getProperties()));
+ Set<String> remoteHosts = getRemoteHosts(absolutePath, defaultFSURI, conf);
+
String hdfsServersString = (String)pigContext.getProperties().get("mapreduce.job.hdfs-servers");
if (hdfsServersString == null) hdfsServersString = "";
String hdfsServers[] = hdfsServersString.split(",");
-
+
for (String remoteHost : remoteHosts) {
boolean existing = false;
for (String hdfsServer : hdfsServers) {
@@ -120,43 +121,50 @@ public class QueryParserUtils {
hdfsServersString = hdfsServersString + remoteHost;
}
}
-
+
if (!hdfsServersString.isEmpty()) {
pigContext.getProperties().setProperty("mapreduce.job.hdfs-servers", hdfsServersString);
}
}
- static Set<String> getRemoteHosts(String absolutePath, String defaultHost) {
- String HAR_PREFIX = "hdfs-";
- Set<String> result = new HashSet<String>();
- String[] fnames = absolutePath.split(",");
- for (String fname: fnames) {
- // remove leading/trailing whitespace(s)
- fname = fname.trim();
- Path p = new Path(fname);
- URI uri = p.toUri();
- if(uri.isAbsolute()) {
- String scheme = uri.getScheme();
- if (scheme!=null && scheme.toLowerCase().equals("hdfs")||scheme.toLowerCase().equals("har")) {
- if (uri.getHost()==null)
- continue;
- String thisHost = uri.getHost().toLowerCase();
- if (scheme.toLowerCase().equals("har")) {
- if (thisHost.startsWith(HAR_PREFIX)) {
- thisHost = thisHost.substring(HAR_PREFIX.length());
- }
- }
- if (!uri.getHost().isEmpty() &&
- !thisHost.equals(defaultHost)) {
- if (uri.getPort()!=-1)
- result.add("hdfs://"+thisHost+":"+uri.getPort());
- else
- result.add("hdfs://"+thisHost);
- }
- }
- }
- }
- return result;
+ static Set<String> getRemoteHosts(String absolutePath, URI defaultFSURI, Configuration conf) {
+ String defaultHost = defaultFSURI.getHost() == null ? "" : defaultFSURI.getHost().toLowerCase();
+ String defaultScheme = defaultFSURI.getScheme() == null ? "" : defaultFSURI.getScheme().toLowerCase();
+
+ Set<String> result = new HashSet<String>();
+ String[] fnames = absolutePath.split(",");
+ for (String fname : fnames) {
+ // remove leading/trailing whitespace(s)
+ Path path = new Path(fname.trim());
+ URI uri = path.toUri();
+ if (uri.isAbsolute()) { // If it has scheme
+ String thisHost = uri.getHost() == null ? "" : uri.getHost().toLowerCase();
+ String scheme = uri.getScheme().toLowerCase();
+ // If host and scheme are same continue
+ if (scheme.equals(defaultScheme) && (thisHost.equals(defaultHost) || thisHost.isEmpty())) {
+ continue;
+ }
+ String authority = uri.getAuthority() == null ? "" : uri.getAuthority()
+ .toLowerCase();
+ if (scheme.equals("har")) {
+ String[] parts = authority.split("-", 2);
+ scheme = parts[0];
+ if (parts.length < 2) {
+ authority = "";
+ } else {
+ authority = parts[1];
+ }
+ if (scheme.isEmpty() || (scheme.equals(defaultScheme) &&
+ authority.equals(defaultFSURI.getAuthority()))) {
+ continue;
+ }
+ } else if (!HadoopShims.hasFileSystemImpl(path, conf)) {
+ continue;
+ }
+ result.add(scheme + "://" + authority);
+ }
+ }
+ return result;
}
static String constructFileNameSignature(String fileName, FuncSpec funcSpec) {
@@ -218,11 +226,11 @@ public class QueryParserUtils {
return null;
}
-
+
static QueryParser createParser(CommonTokenStream tokens) {
return createParser(tokens, 0);
}
-
+
static QueryParser createParser(CommonTokenStream tokens, int lineOffset) {
QueryParser parser = new QueryParser(tokens);
PigParserNodeAdaptor adaptor = new PigParserNodeAdaptor(
Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java Wed Apr 30 19:13:47 2014
@@ -346,7 +346,7 @@ public abstract class JobStats extends O
for (String className : readerNames.split(",")) {
reader = (PigStatsOutputSizeReader) PigContext.instantiateFuncFromSpec(className);
- if (reader.supports(sto)) {
+ if (reader.supports(sto, conf)) {
LOG.info("using output size reader: " + className);
try {
return reader.getOutputSize(sto, conf);
Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Wed Apr 30 19:13:47 2014
@@ -46,7 +46,7 @@ public class PigStatusReporter extends S
return reporter;
}
- public void destory() {
+ public void destroy() {
context = null;
}
Propchange: pig/branches/tez/src/pig-default.properties
------------------------------------------------------------------------------
Merged /pig/trunk/src/pig-default.properties:r1591007-1591410
Modified: pig/branches/tez/test/org/apache/pig/test/TestMRJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestMRJobStats.java?rev=1591449&r1=1591448&r2=1591449&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestMRJobStats.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestMRJobStats.java Wed Apr 30 19:13:47 2014
@@ -25,7 +25,6 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Properties;
@@ -190,7 +189,7 @@ public class TestMRJobStats {
* @param sto POStore
*/
@Override
- public boolean supports(POStore sto) {
+ public boolean supports(POStore sto, Configuration conf) {
return true;
}
Propchange: pig/branches/tez/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
------------------------------------------------------------------------------
Merged /pig/trunk/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2:r1591007-1591410