You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/07/12 19:33:28 UTC
svn commit: r963377 - in /hadoop/pig/branches/branch-0.7: ./
src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/impl/logicalLayer/parser/ test/org/apache/pig/test/
Author: rding
Date: Mon Jul 12 17:33:28 2010
New Revision: 963377
URL: http://svn.apache.org/viewvc?rev=963377&view=rev
Log:
PIG-1490: Make Pig storers work with remote HDFS in secure mode
Modified:
hadoop/pig/branches/branch-0.7/CHANGES.txt
hadoop/pig/branches/branch-0.7/src/org/apache/pig/StoreFunc.java
hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestParser.java
Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/CHANGES.txt?rev=963377&r1=963376&r2=963377&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/CHANGES.txt Mon Jul 12 17:33:28 2010
@@ -196,6 +196,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-1490: Make Pig storers work with remote HDFS in secure mode (rding)
+
PIG-1484: BinStorage should support comma seperated path (daijy)
PIG-1467: order by fail when set "fs.file.impl.disable.cache" to true (daijy)
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/StoreFunc.java?rev=963377&r1=963376&r2=963377&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/StoreFunc.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/StoreFunc.java Mon Jul 12 17:33:28 2010
@@ -162,8 +162,8 @@ public abstract class StoreFunc implemen
*/
public static void cleanupOnFailureImpl(String location, Job job)
throws IOException {
- FileSystem fs = FileSystem.get(job.getConfiguration());
Path path = new Path(location);
+ FileSystem fs = path.getFileSystem(job.getConfiguration());
if(fs.exists(path)){
fs.delete(path, true);
}
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=963377&r1=963376&r2=963377&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Mon Jul 12 17:33:28 2010
@@ -504,9 +504,9 @@ public class MapReduceLauncher extends L
}
private void createSuccessFile(Job job, POStore store) throws IOException {
- if(shouldMarkOutputDir(job)) {
- FileSystem fs = FileSystem.get(job.getJobConf());
+ if(shouldMarkOutputDir(job)) {
Path outputPath = new Path(store.getSFile().getFileName());
+ FileSystem fs = outputPath.getFileSystem(job.getJobConf());
if(fs.exists(outputPath)){
// create a file in the folder to mark it
Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=963377&r1=963376&r2=963377&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Mon Jul 12 17:33:28 2010
@@ -504,6 +504,41 @@ public class QueryParser {
return result;
}
+ void setHdfsServers(String absolutePath, PigContext pigContext) throws URISyntaxException {
+ // Get native host
+ String defaultFS = (String)pigContext.getProperties().get("fs.default.name");
+ URI defaultFSURI = new URI(defaultFS);
+ String defaultHost = defaultFSURI.getHost();
+ if (defaultHost == null) defaultHost = "";
+
+ defaultHost = defaultHost.toLowerCase();
+
+ Set<String> remoteHosts = getRemoteHosts(absolutePath, defaultHost);
+
+ String hdfsServersString = (String)pigContext.getProperties().get("mapreduce.job.hdfs-servers");
+ if (hdfsServersString == null) hdfsServersString = "";
+ String hdfsServers[] = hdfsServersString.split(",");
+
+ for (String remoteHost : remoteHosts) {
+ boolean existing = false;
+ for (String hdfsServer : hdfsServers) {
+ if (hdfsServer.equals(remoteHost)) {
+ existing = true;
+ }
+ }
+ if (!existing) {
+ if (!hdfsServersString.isEmpty()) {
+ hdfsServersString = hdfsServersString + ",";
+ }
+ hdfsServersString = hdfsServersString + remoteHost;
+ }
+ }
+
+ if (!hdfsServersString.isEmpty()) {
+ pigContext.getProperties().setProperty("mapreduce.job.hdfs-servers", hdfsServersString);
+ }
+ }
+
// Check and set files to be automatically shipped for the given StreamingCommand
// Auto-shipping rules:
// 1. If the command begins with either perl or python assume that the
@@ -1388,36 +1423,8 @@ LogicalOperator LoadClause(LogicalPlan l
absolutePath = loFunc.relativeToAbsolutePath(filename, getCurrentDir(pigContext));
if (absolutePath!=null) {
- // Get native host
- String defaultFS = (String)pigContext.getProperties().get("fs.default.name");
- URI defaultFSURI = new URI(defaultFS);
- String defaultHost = defaultFSURI.getHost();
- if (defaultHost==null)
- defaultHost="";
- defaultHost = defaultHost.toLowerCase();
-
- Set<String> remoteHosts = getRemoteHosts(absolutePath, defaultHost);
-
- String hdfsServersString = (String)pigContext.getProperties().get("mapreduce.job.hdfs-servers");
- if (hdfsServersString==null) hdfsServersString="";
- String hdfsServers[] = hdfsServersString.split(",");
-
- for (String remoteHost : remoteHosts) {
- boolean existing = false;
- for (String hdfsServer:hdfsServers) {
- if (hdfsServer.equals(remoteHost))
- existing = true;
- }
- if (!existing) {
- if (!hdfsServersString.isEmpty())
- hdfsServersString = hdfsServersString + ",";
- hdfsServersString = hdfsServersString + remoteHost;
- }
- }
-
- if (!hdfsServersString.isEmpty())
- pigContext.getProperties().setProperty("mapreduce.job.hdfs-servers", hdfsServersString);
- }
+ setHdfsServers(absolutePath, pigContext);
+ }
fileNameMap.put(constructFileNameSignature(filename, funcSpec), absolutePath);
}
lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(absolutePath, funcSpec),
@@ -2542,6 +2549,9 @@ LogicalOperator StoreClause(LogicalPlan
String absolutePath = fileNameMap.get(constructFileNameSignature(fileName, funcSpec));
if (absolutePath == null) {
absolutePath = stoFunc.relToAbsPathForStoreLocation(fileName, getCurrentDir(pigContext));
+ if (absolutePath != null) {
+ setHdfsServers(absolutePath, pigContext);
+ }
fileNameMap.put(constructFileNameSignature(fileName, funcSpec), absolutePath);
}
LogicalOperator store = new LOStore(lp, new OperatorKey(scope, getNextId()),
Modified: hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestParser.java?rev=963377&r1=963376&r2=963377&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestParser.java (original)
+++ hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestParser.java Mon Jul 12 17:33:28 2010
@@ -109,6 +109,40 @@ protected final Log log = LogFactory.get
pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://d.com:8020"));
} catch (IOException io) {
+ fail(io.getMessage());
+ }
+ }
+
+ @Test
+ public void testRemoteServerList2() throws ExecException, IOException {
+ try {
+ Properties pigProperties = pigServer.getPigContext().getProperties();
+
+ pigServer.setBatchOn();
+
+ pigServer.registerQuery("a = load '/user/pig/1.txt';");
+ pigServer.registerQuery("store a into '/user/pig/1.txt';");
+
+ System.out.println("hdfs-servers: " + pigProperties.getProperty("mapreduce.job.hdfs-servers"));
+ assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")==null);
+
+ pigServer.registerQuery("store a into 'hdfs://b.com/user/pig/1.txt';");
+ System.out.println("hdfs-servers: " + pigProperties.getProperty("mapreduce.job.hdfs-servers"));
+ assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null &&
+ pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://b.com"));
+
+ pigServer.registerQuery("store a into 'har://hdfs-c.com:8020/user/pig/1.txt';");
+ System.out.println("hdfs-servers: " + pigProperties.getProperty("mapreduce.job.hdfs-servers"));
+ assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null &&
+ pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://c.com:8020"));
+
+ pigServer.registerQuery("store a into 'hdfs://d.com:8020/user/pig/1.txt';");
+ System.out.println("hdfs-servers: " + pigProperties.getProperty("mapreduce.job.hdfs-servers"));
+ assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null &&
+ pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://d.com:8020"));
+
+ } catch (IOException io) {
+ fail(io.getMessage());
}
}
}