You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/03 23:00:17 UTC

svn commit: r399432 [2/2] - in /lucene/hadoop/trunk: ./ src/contrib/ src/contrib/streaming/ src/contrib/streaming/src/ src/contrib/streaming/src/java/ src/contrib/streaming/src/java/org/ src/contrib/streaming/src/java/org/apache/ src/contrib/streaming/...

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java Wed May  3 14:00:13 2006
@@ -0,0 +1,302 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.text.DecimalFormat;
+import java.io.*;
+import java.net.*;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.jar.*;
+
+/** Utilities not available elsewhere in Hadoop.
+ *  
+ */
+public class StreamUtil
+{
+
+  public static Class goodClassOrNull(String className, String defaultPackage)
+  {
+    if(className.indexOf('.') == -1 && defaultPackage != null) {
+        className = defaultPackage + "." + className;
+    }
+    Class clazz = null;
+    try {
+        clazz = Class.forName(className);
+    } catch(ClassNotFoundException cnf) {
+    } catch(LinkageError cnf) {
+    }
+    return clazz;
+  }
+  
+   /** @return a jar file path or a base directory or null if not found.
+   */
+   public static String findInClasspath(String className) 
+   {
+
+    String relPath = className;
+    if (!relPath.startsWith("/")) {
+      relPath = "/" + relPath;
+    }
+    relPath = relPath.replace('.', '/');
+    relPath += ".class";
+
+    java.net.URL classUrl = StreamUtil.class.getResource(relPath);
+
+    String codePath;
+    if (classUrl != null) {
+        boolean inJar = classUrl.getProtocol().equals("jar");
+        codePath = classUrl.toString();
+        if(codePath.startsWith("jar:")) {
+            codePath = codePath.substring("jar:".length());
+        }
+        if(codePath.startsWith("file:")) { // can have both
+            codePath = codePath.substring("file:".length());
+        }
+        if(inJar) {          
+          // A jar spec: remove class suffix in /path/my.jar!/package/Class
+          int bang = codePath.lastIndexOf('!');
+          codePath = codePath.substring(0, bang);
+        } else {
+          // A class spec: remove the /my/package/Class.class portion
+          int pos = codePath.lastIndexOf(relPath);
+          if(pos == -1) {
+            throw new IllegalArgumentException(
+              "invalid codePath: className=" + className + " codePath=" + codePath);
+          }
+          codePath = codePath.substring(0, pos);
+        }
+    } else {
+        codePath = null;
+    }
+    return codePath;
+  }
+
+  // copied from TaskRunner  
+  static void unJar(File jarFile, File toDir) throws IOException {
+    JarFile jar = new JarFile(jarFile);
+    try {
+      Enumeration entries = jar.entries();
+      while (entries.hasMoreElements()) {
+        JarEntry entry = (JarEntry)entries.nextElement();
+        if (!entry.isDirectory()) {
+          InputStream in = jar.getInputStream(entry);
+          try {
+            File file = new File(toDir, entry.getName());
+            file.getParentFile().mkdirs();
+            OutputStream out = new FileOutputStream(file);
+            try {
+              byte[] buffer = new byte[8192];
+              int i;
+              while ((i = in.read(buffer)) != -1) {
+                out.write(buffer, 0, i);
+              }
+            } finally {
+              out.close();
+            }
+          } finally {
+            in.close();
+          }
+        }
+      }
+    } finally {
+      jar.close();
+    }
+  }
+  
+
+  
+  final static long KB = 1024L * 1;
+  final static long MB = 1024L * KB;
+  final static long GB = 1024L * MB;
+  final static long TB = 1024L * GB;
+  final static long PB = 1024L * TB;
+
+  static DecimalFormat dfm = new DecimalFormat("####.000");
+  static DecimalFormat ifm = new DecimalFormat("###,###,###,###,###");
+  
+  public static String dfmt(double d)
+  {
+    return dfm.format(d);
+  }
+  public static String ifmt(double d)
+  {
+    return ifm.format(d);
+  }
+  
+  public static String formatBytes(long numBytes)
+  {
+    StringBuffer buf = new StringBuffer();
+    boolean bDetails = true;    
+    double num = numBytes;
+    
+    if(numBytes < KB) {
+      buf.append(numBytes + " B");
+      bDetails = false;
+    } else if(numBytes < MB) {
+      buf.append(dfmt(num/KB) + " KB");
+    } else if(numBytes < GB) {
+      buf.append(dfmt(num/MB) + " MB");
+    } else if(numBytes < TB) {
+      buf.append(dfmt(num/GB) + " GB");
+    } else if(numBytes < PB) {
+      buf.append(dfmt(num/TB) + " TB");
+    } else {
+      buf.append(dfmt(num/PB) + " PB");
+    }
+    if(bDetails) {
+      buf.append(" (" + ifmt(numBytes) + " bytes)");
+    }
+    return buf.toString();
+  }
+
+  public static String formatBytes2(long numBytes)
+  {
+    StringBuffer buf = new StringBuffer();
+    long u = 0;
+    if(numBytes >= TB) {
+      u = numBytes/TB;
+      numBytes -= u*TB;
+      buf.append(u + " TB ");
+    }
+    if(numBytes >= GB) {
+      u = numBytes/GB;
+      numBytes -= u*GB;
+      buf.append(u + " GB ");
+    }
+    if(numBytes >= MB) {
+      u = numBytes/MB;
+      numBytes -= u*MB;
+      buf.append(u + " MB ");
+    }
+    if(numBytes >= KB) {
+      u = numBytes/KB;
+      numBytes -= u*KB;
+      buf.append(u + " KB ");
+    }
+    buf.append(u + " B"); //even if zero
+    return buf.toString();
+  }
+
+  static Environment env;
+  static String HOST;
+  
+  static {
+    try {
+      env = new Environment();
+      HOST = env.get("HOST").toString();
+    } catch(IOException io) {
+      io.printStackTrace();
+    }
+  }
+
+  static class StreamConsumer extends Thread
+  {
+    StreamConsumer(InputStream in, OutputStream out)
+    {
+      this.bin = new LineNumberReader(
+        new BufferedReader(new InputStreamReader(in)));
+      if(out != null) {
+        this.bout = new DataOutputStream(out);
+      }
+    }
+    public void run()
+    {
+      try {
+        String line;
+        while((line=bin.readLine()) != null) {
+          if(bout != null) {
+            bout.writeUTF(line); //writeChars
+            bout.writeChar('\n');
+          }
+        }
+        bout.flush();
+      } catch(IOException io) {        
+      }
+    }
+    LineNumberReader bin;
+    DataOutputStream bout;
+  }
+
+  static void exec(String arg, PrintStream log)
+  {
+    exec( new String[] {arg}, log );
+  }
+  
+  static void exec(String[] args, PrintStream log)
+  {
+      try {
+        log.println("Exec: start: " + Arrays.asList(args));
+        Process proc = Runtime.getRuntime().exec(args);
+        new StreamConsumer(proc.getErrorStream(), log).start();
+        new StreamConsumer(proc.getInputStream(), log).start();
+        int status = proc.waitFor();
+        //if status != 0
+        log.println("Exec: status=" + status + ": " + Arrays.asList(args));
+      } catch(InterruptedException in) {
+        in.printStackTrace();
+      } catch(IOException io) {
+        io.printStackTrace();
+      }
+  }
+  
+  static String qualifyHost(String url)
+  {
+    try {
+        return qualifyHost(new URL(url)).toString();
+    } catch(IOException io) {
+        return url;
+    }
+  }
+  
+  static URL qualifyHost(URL url)
+  {    
+    try {
+      InetAddress a = InetAddress.getByName(url.getHost());
+      String qualHost = a.getCanonicalHostName();
+      URL q = new URL(url.getProtocol(), qualHost, url.getPort(), url.getFile());
+      return q;
+    } catch(IOException io) {
+      return url;
+    }
+  }
+  
+  static String slurp(File f) throws IOException
+  {
+    FileInputStream in = new FileInputStream(f);
+    int len = (int)f.length();
+    byte[] buf = new byte[len];
+    in.read(buf, 0, len);
+    return new String(buf);
+  }
+  
+  static private Environment env_;
+  
+  static Environment env()
+  {
+    if(env_ != null) {
+      return env_;
+    }
+    try {
+      env_ = new Environment();
+    } catch(IOException io) {      
+      io.printStackTrace();
+    }
+    return env_;
+  }
+
+}

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java Wed May  3 14:00:13 2006
@@ -0,0 +1,108 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+/** A way to interpret XML fragments as Mapper input records.
+ *  Values are XML subtrees delimited by configurable tags.
+ *  Keys could be the value of a certain attribute in the XML subtree, 
+ *  but this is left to the stream processor application.
+ *  @author Michel Tourn
+ */
+public class StreamXmlRecordReader extends StreamBaseRecordReader 
+{
+  public StreamXmlRecordReader(
+    FSDataInputStream in, long start, long end, 
+    String splitName, Reporter reporter, JobConf job)
+    throws IOException
+  {
+    super(in, start, end, splitName, reporter, job);
+    beginMark_ = checkJobGet("stream.recordreader.begin");
+    endMark_   = checkJobGet("stream.recordreader.end");
+  }
+
+  String checkJobGet(String prop) throws IOException
+  {
+  	String val = job_.get(prop);
+  	if(val == null) {
+  		throw new IOException("JobConf: missing required property: " + prop);
+  	}
+  	return val;
+  }
+  
+  public void seekNextRecordBoundary() throws IOException
+  {
+  System.out.println("@@@start seekNext " + in_.getPos());
+    readUntilMatch(beginMark_, null);      
+  System.out.println("@@@end   seekNext " + in_.getPos());
+  }
+    
+  public synchronized boolean next(Writable key, Writable value)
+   throws IOException
+  {
+    long pos = in_.getPos();
+    if (pos >= end_)
+      return false;
+    
+    StringBuffer buf = new StringBuffer();
+    readUntilMatch(endMark_, buf);
+    numRecStats(buf);
+    return true;
+  }
+
+  void readUntilMatch(String pat, StringBuffer outBuf) throws IOException 
+  {
+    
+    char[] cpat = pat.toCharArray();
+    int m = 0;
+    int msup = cpat.length;
+    while (true) {
+      int b = in_.read();
+      if (b == -1)
+        break;
+
+      char c = (char)b; // this assumes eight-bit matching. OK with UTF-8
+      if (c == cpat[m]) {
+        m++;
+        if(m==msup-1) {
+          break;
+        }
+      } else {
+        m = 0;
+      }
+      if(outBuf != null) {
+        outBuf.append(c);
+      }
+    }
+System.out.println("@@@START readUntilMatch(" + pat + ", " + outBuf + "\n@@@END readUntilMatch");
+  }
+  
+  
+  String beginMark_;
+  String endMark_;
+}

Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Wed May  3 14:00:13 2006
@@ -0,0 +1,117 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import junit.framework.TestCase;
+import java.io.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests hadoopStreaming in MapReduce local mode.
+ * It requires the Unix utilities tr and uniq.
+ */
+public class TestStreaming extends TestCase
+{
+
+  // "map" command: grep -E (red|green|blue)
+  // reduce command: uniq
+  String INPUT_FILE = "input.txt";
+  String OUTPUT_DIR = "out";
+  String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
+  String map = "/usr/bin/tr . \\n"; // split words into lines. Careful with spaces in args
+  String reduce = "/usr/bin/uniq";
+  String outputExpect = "are\t\nblue\t\nbunnies\t\npink\t\nred\t\nroses\t\nviolets\t\n";
+
+  StreamJob job;
+
+  public TestStreaming() throws IOException
+  {
+    // trunk/src/contrib/streaming --> trunk/build/contrib/streaming/test/data
+    String userDir = System.getProperty("user.dir");
+    String antTestDir = System.getProperty("test.build.data", userDir);
+    if(! userDir.equals(antTestDir)) {
+        // because changes to user.dir are ignored by File
+        throw new IllegalStateException("user.dir != test.build.data. The junit Ant task must be forked.");
+    }
+
+    boolean fromAntJunit = System.getProperty("test.build.data") != null;
+    if(fromAntJunit) {
+      new File(antTestDir).mkdirs();
+      File outFile = new File(antTestDir, getClass().getName()+".log");
+      PrintStream out = new PrintStream(new FileOutputStream(outFile));
+      System.setOut(out);
+      System.setErr(out);
+    }
+    System.out.println("test.build.data=" + antTestDir);
+  }
+  void createInput() throws IOException
+  {
+    String path = new File(".", INPUT_FILE).getAbsolutePath();// needed from junit forked vm
+    DataOutputStream out = new DataOutputStream(new FileOutputStream(path));
+    out.writeBytes(input);
+    out.close();
+  }
+
+  public void testCommandLine()
+  {
+    try {
+      createInput();
+      boolean mayExit = false;
+
+      // During tests, the default Configuration will use a local mapred
+      // So don't specify -config or -cluster
+      String argv[] = new String[] {
+          "-input", INPUT_FILE,
+          "-output", OUTPUT_DIR,
+          "-mapper", map,
+          "-reducer", reduce,
+          /*"-debug",*/
+          "-verbose"
+      };
+
+      job = new StreamJob(argv, mayExit);
+      job.go();
+      File outFile = new File(".", OUTPUT_DIR + "/part-00000").getAbsoluteFile();
+      String output = StreamUtil.slurp(outFile);
+      System.out.println("outEx=" + outputExpect);
+      System.out.println("  out=" + output);
+      assertEquals(outputExpect, output);
+
+    } catch(Exception e) {
+        failTrace(e);
+    }
+
+
+  }
+
+  void failTrace(Exception e)
+  {
+    StringWriter sw = new StringWriter();
+    e.printStackTrace(new PrintWriter(sw));
+    fail(sw.toString());
+  }
+
+
+
+  public static void main(String[]args) throws Exception
+  {
+    new TestStreaming().testCommandLine();
+  }
+
+}