You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by Apache Wiki <wi...@apache.org> on 2007/11/21 00:04:55 UTC
[Pig Wiki] Update of "PigAbstractionFrontEnd" by AntonioMagnaghi
Dear Wiki user,
You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.
The following page has been changed by AntonioMagnaghi:
http://wiki.apache.org/pig/PigAbstractionFrontEnd
New page:
= Data-Storage Based Pig Front-End =
This is a sample code fragment where `PigContext.java` has been adapted to use the Data Storage API defined above.
{{{
-
- // configuration for connecting to hadoop
- transient private JobConf conf = null;
@@ -79,16 +81,21 @@
+ // configuration information for file system(s)
+ transient private DataStorageProperties fileSystemConf;
+
//main file system that jobs and shell commands access
- transient private FileSystem dfs;
+ transient private DataStorage dfs;
}}}
{{{
@@ -195,21 +199,32 @@
}
}
-
- lfs = FileSystem.getNamed("local", conf);
-
- mLogger.info("Connecting to hadoop file system at: " + conf.get("fs.default.name"));
- dfs = FileSystem.get(conf);
+ HadoopDataStorageConfiguration conf =
new HadoopDataStorageConfiguration();
+ fileSystemConf = conf;
+
+ lfs = new HadoopFileSystem(new URI(...), conf);
+
+ mLogger.info("Connecting to hadoop file system at: " +
fileSystemConf.getValue("fs.default.name"));
+
+ dfs = new HadoopFileSystem (new URI(...), conf);
+
+ mLogger.info("Connecting to map-reduce job tracker at: " +
conf.getValue("mapred.job.tracker"));
+
- mLogger.info("Connecting to map-reduce job tracker at: " +
conf.get("mapred.job.tracker"));
- jobTracker = (JobSubmissionProtocol)
RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID,
JobTracker.getAddress(conf),
conf);
- jobClient = new JobClient(conf);
+ HadoopExecutionEngineConfiguration execConf =
new HadoopExecutionEngineConfiguration();
+ backEndConf = execConf;
+ jobClient = new HadoopExecutionEngine(execConf);
}else{
- conf = new JobConf();
- lfs = FileSystem.getNamed("local", conf);
- dfs = lfs; // for local execution, the "dfs" is the local file system
+ HadoopDataStorageConfiguration conf = new HadoopDataStorageConfiguration();
+ fileSystemConf = conf;
+
+ lfs = new HadoopFileSystem(new URI(...),
new HadoopDataStorageConfiguration(conf));
+
+ dfs = lfs; // for local execution, the "dfs" is the local file system
}
}catch (IOException e){
}}}
These are sample code fragments from `PigServer.java`. Operations that previously utilized the Hadoop file system directly, now have been adapted to use the Data Storage API defined above.
{{{
@@ -485,37 +565,99 @@
* @return
* @throws IOException
*/
- public long fileSize(String filename) throws IOException {
- FileSystem dfs = pigContext.getDfs();
- Path p = new Path(filename);
- long len = dfs.getLength(p);
- long replication = dfs.getDefaultReplication();
- return len * replication;
+ public long fileSize(String name) throws IOException {
+ try {
+ DataStorage dfs = pigContext.getDfs();
+ DataStorageElementDescriptor elem = dfs.asElement(name);
+ DataStorageProperties elemStats = elem.getStatistics();
+
+ long len = new Long(elemStats.
getValue("length.bytes").toString());
+ long replication = new Long(elemStats.
getValue("replication").toString());
+
+ return len * replication;
+ }
+ catch (DataStorageException e)
+ {
+ return 0;
+ }
}
}}}
{{{
- public boolean deleteFile(String filename) throws IOException {
- return pigContext.getDfs().delete(new Path(filename));
+ public boolean deleteFile(String name) throws IOException {
+ try {
+ DataStorage ds = pigContext.getDfs();
+ DataStorageElementDescriptor elem = ds.asElement(name);
+
+ return elem.delete();
+ }
+ catch (DataStorageException e)
+ {
+ throw new IOException(e);
+ }
}
}}}