You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/05/14 20:52:54 UTC
svn commit: r944387 - in /hadoop/pig/branches/branch-0.7: CHANGES.txt
src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
test/org/apache/pig/test/TestParser.java
Author: daijy
Date: Fri May 14 18:52:53 2010
New Revision: 944387
URL: http://svn.apache.org/viewvc?rev=944387&view=rev
Log:
PIG-1403: Make Pig work with remote HDFS in secure mode
Modified:
hadoop/pig/branches/branch-0.7/CHANGES.txt
hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestParser.java
Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/CHANGES.txt?rev=944387&r1=944386&r2=944387&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/CHANGES.txt Fri May 14 18:52:53 2010
@@ -189,6 +189,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-1403: Make Pig work with remote HDFS in secure mode (daijy)
+
PIG-1391: pig unit tests leave behind files in temp directory because
MiniCluster files don't get deleted (tejas)
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=944387&r1=944386&r2=944387&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Fri May 14 18:52:53 2010
@@ -464,6 +464,39 @@ public class QueryParser {
LogicalOperator getOp(String alias) {
return mapAliasOp.get(alias);
}
+
+ Set<String> getRemoteHosts(String absolutePath, String defaultHost) {
+ String HAR_PREFIX = "hdfs-";
+ Set<String> result = new HashSet<String>();
+ String[] fnames = absolutePath.split(",");
+ for (String fname: fnames) {
+ // remove leading/trailing whitespace(s)
+ fname = fname.trim();
+ Path p = new Path(fname);
+ URI uri = p.toUri();
+ if(uri.isAbsolute()) {
+ String scheme = uri.getScheme();
+ if (scheme!=null && scheme.toLowerCase().equals("hdfs")||scheme.toLowerCase().equals("har")) {
+ if (uri.getHost()==null)
+ continue;
+ String thisHost = uri.getHost().toLowerCase();
+ if (scheme.toLowerCase().equals("har")) {
+ if (thisHost.startsWith(HAR_PREFIX)) {
+ thisHost = thisHost.substring(HAR_PREFIX.length());
+ }
+ }
+ if (!uri.getHost().isEmpty() &&
+ !thisHost.equals(defaultHost)) {
+ if (uri.getPort()!=-1)
+ result.add("hdfs://"+thisHost+":"+uri.getPort());
+ else
+ result.add("hdfs://"+thisHost);
+ }
+ }
+ }
+ }
+ return result;
+ }
// Check and set files to be automatically shipped for the given StreamingCommand
// Auto-shipping rules:
@@ -1347,6 +1380,36 @@ LogicalOperator LoadClause(LogicalPlan l
String absolutePath = fileNameMap.get(constructFileNameSignature(filename, funcSpec));
if (absolutePath == null) {
absolutePath = loFunc.relativeToAbsolutePath(filename, getCurrentDir(pigContext));
+
+ // Get native host
+ String defaultFS = (String)pigContext.getProperties().get("fs.default.name");
+ URI defaultFSURI = new URI(defaultFS);
+ String defaultHost = defaultFSURI.getHost();
+ if (defaultHost==null)
+ defaultHost="";
+ defaultHost = defaultHost.toLowerCase();
+
+ Set<String> remoteHosts = getRemoteHosts(absolutePath, defaultHost);
+
+ String hdfsServersString = (String)pigContext.getProperties().get("mapreduce.job.hdfs-servers");
+ if (hdfsServersString==null) hdfsServersString="";
+ String hdfsServers[] = hdfsServersString.split(",");
+
+ for (String remoteHost : remoteHosts) {
+ boolean existing = false;
+ for (String hdfsServer:hdfsServers) {
+ if (hdfsServer.equals(remoteHost))
+ existing = true;
+ }
+ if (!existing) {
+ if (!hdfsServersString.isEmpty())
+ hdfsServersString = hdfsServersString + ",";
+ hdfsServersString = hdfsServersString + remoteHost;
+ }
+ }
+
+ if (!hdfsServersString.isEmpty())
+ pigContext.getProperties().setProperty("mapreduce.job.hdfs-servers", hdfsServersString);
fileNameMap.put(constructFileNameSignature(filename, funcSpec), absolutePath);
}
lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(absolutePath, funcSpec),
Modified: hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestParser.java?rev=944387&r1=944386&r2=944387&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestParser.java (original)
+++ hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestParser.java Fri May 14 18:52:53 2010
@@ -22,6 +22,7 @@ import static org.apache.pig.ExecType.LO
import static org.apache.pig.ExecType.MAPREDUCE;
import java.io.IOException;
+import java.util.Properties;
import junit.framework.TestCase;
@@ -86,4 +87,36 @@ protected final Log log = LogFactory.get
} catch (IOException io) {
}
}
+
+ @Test
+ public void testRemoteServerList() throws ExecException, IOException {
+ try {
+ Properties pigProperties = pigServer.getPigContext().getProperties();
+ pigProperties.setProperty("fs.default.name", "hdfs://a.com:8020");
+
+ pigServer.registerQuery("a = load '/user/pig/1.txt';");
+ assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")==null);
+
+ pigServer.registerQuery("a = load 'hdfs://a.com/user/pig/1.txt';");
+ assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")==null);
+
+ pigServer.registerQuery("a = load 'har:///1.txt';");
+ assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")==null);
+
+ pigServer.registerQuery("a = load 'hdfs://b.com/user/pig/1.txt';");
+ assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null &&
+ pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://b.com"));
+
+ pigServer.registerQuery("a = load 'har://hdfs-c.com/user/pig/1.txt';");
+ assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null &&
+ pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://c.com"));
+
+ pigServer.registerQuery("a = load 'hdfs://d.com:8020/user/pig/1.txt';");
+ assertTrue(pigProperties.getProperty("mapreduce.job.hdfs-servers")!=null &&
+ pigProperties.getProperty("mapreduce.job.hdfs-servers").contains("hdfs://d.com:8020"));
+
+
+ } catch (IOException io) {
+ }
+ }
}