You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/03 23:00:17 UTC
svn commit: r399432 [2/2] - in /lucene/hadoop/trunk: ./ src/contrib/
src/contrib/streaming/ src/contrib/streaming/src/
src/contrib/streaming/src/java/ src/contrib/streaming/src/java/org/
src/contrib/streaming/src/java/org/apache/ src/contrib/streaming/...
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java Wed May 3 14:00:13 2006
@@ -0,0 +1,302 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.text.DecimalFormat;
+import java.io.*;
+import java.net.*;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.jar.*;
+
+/** Utilities not available elsewhere in Hadoop.
+ *
+ */
+public class StreamUtil
+{
+
+ public static Class goodClassOrNull(String className, String defaultPackage)
+ {
+ if(className.indexOf('.') == -1 && defaultPackage != null) {
+ className = defaultPackage + "." + className;
+ }
+ Class clazz = null;
+ try {
+ clazz = Class.forName(className);
+ } catch(ClassNotFoundException cnf) {
+ } catch(LinkageError cnf) {
+ }
+ return clazz;
+ }
+
+ /** @return a jar file path or a base directory or null if not found.
+ */
+ public static String findInClasspath(String className)
+ {
+
+ String relPath = className;
+ if (!relPath.startsWith("/")) {
+ relPath = "/" + relPath;
+ }
+ relPath = relPath.replace('.', '/');
+ relPath += ".class";
+
+ java.net.URL classUrl = StreamUtil.class.getResource(relPath);
+
+ String codePath;
+ if (classUrl != null) {
+ boolean inJar = classUrl.getProtocol().equals("jar");
+ codePath = classUrl.toString();
+ if(codePath.startsWith("jar:")) {
+ codePath = codePath.substring("jar:".length());
+ }
+ if(codePath.startsWith("file:")) { // can have both
+ codePath = codePath.substring("file:".length());
+ }
+ if(inJar) {
+ // A jar spec: remove class suffix in /path/my.jar!/package/Class
+ int bang = codePath.lastIndexOf('!');
+ codePath = codePath.substring(0, bang);
+ } else {
+ // A class spec: remove the /my/package/Class.class portion
+ int pos = codePath.lastIndexOf(relPath);
+ if(pos == -1) {
+ throw new IllegalArgumentException(
+ "invalid codePath: className=" + className + " codePath=" + codePath);
+ }
+ codePath = codePath.substring(0, pos);
+ }
+ } else {
+ codePath = null;
+ }
+ return codePath;
+ }
+
+ // copied from TaskRunner
+ static void unJar(File jarFile, File toDir) throws IOException {
+ JarFile jar = new JarFile(jarFile);
+ try {
+ Enumeration entries = jar.entries();
+ while (entries.hasMoreElements()) {
+ JarEntry entry = (JarEntry)entries.nextElement();
+ if (!entry.isDirectory()) {
+ InputStream in = jar.getInputStream(entry);
+ try {
+ File file = new File(toDir, entry.getName());
+ file.getParentFile().mkdirs();
+ OutputStream out = new FileOutputStream(file);
+ try {
+ byte[] buffer = new byte[8192];
+ int i;
+ while ((i = in.read(buffer)) != -1) {
+ out.write(buffer, 0, i);
+ }
+ } finally {
+ out.close();
+ }
+ } finally {
+ in.close();
+ }
+ }
+ }
+ } finally {
+ jar.close();
+ }
+ }
+
+
+
+ final static long KB = 1024L * 1;
+ final static long MB = 1024L * KB;
+ final static long GB = 1024L * MB;
+ final static long TB = 1024L * GB;
+ final static long PB = 1024L * TB;
+
+ static DecimalFormat dfm = new DecimalFormat("####.000");
+ static DecimalFormat ifm = new DecimalFormat("###,###,###,###,###");
+
+ public static String dfmt(double d)
+ {
+ return dfm.format(d);
+ }
+ public static String ifmt(double d)
+ {
+ return ifm.format(d);
+ }
+
+ public static String formatBytes(long numBytes)
+ {
+ StringBuffer buf = new StringBuffer();
+ boolean bDetails = true;
+ double num = numBytes;
+
+ if(numBytes < KB) {
+ buf.append(numBytes + " B");
+ bDetails = false;
+ } else if(numBytes < MB) {
+ buf.append(dfmt(num/KB) + " KB");
+ } else if(numBytes < GB) {
+ buf.append(dfmt(num/MB) + " MB");
+ } else if(numBytes < TB) {
+ buf.append(dfmt(num/GB) + " GB");
+ } else if(numBytes < PB) {
+ buf.append(dfmt(num/TB) + " TB");
+ } else {
+ buf.append(dfmt(num/PB) + " PB");
+ }
+ if(bDetails) {
+ buf.append(" (" + ifmt(numBytes) + " bytes)");
+ }
+ return buf.toString();
+ }
+
+ public static String formatBytes2(long numBytes)
+ {
+ StringBuffer buf = new StringBuffer();
+ long u = 0;
+ if(numBytes >= TB) {
+ u = numBytes/TB;
+ numBytes -= u*TB;
+ buf.append(u + " TB ");
+ }
+ if(numBytes >= GB) {
+ u = numBytes/GB;
+ numBytes -= u*GB;
+ buf.append(u + " GB ");
+ }
+ if(numBytes >= MB) {
+ u = numBytes/MB;
+ numBytes -= u*MB;
+ buf.append(u + " MB ");
+ }
+ if(numBytes >= KB) {
+ u = numBytes/KB;
+ numBytes -= u*KB;
+ buf.append(u + " KB ");
+ }
+ buf.append(u + " B"); //even if zero
+ return buf.toString();
+ }
+
+ static Environment env;
+ static String HOST;
+
+ static {
+ try {
+ env = new Environment();
+ HOST = env.get("HOST").toString();
+ } catch(IOException io) {
+ io.printStackTrace();
+ }
+ }
+
+ static class StreamConsumer extends Thread
+ {
+ StreamConsumer(InputStream in, OutputStream out)
+ {
+ this.bin = new LineNumberReader(
+ new BufferedReader(new InputStreamReader(in)));
+ if(out != null) {
+ this.bout = new DataOutputStream(out);
+ }
+ }
+ public void run()
+ {
+ try {
+ String line;
+ while((line=bin.readLine()) != null) {
+ if(bout != null) {
+ bout.writeUTF(line); //writeChars
+ bout.writeChar('\n');
+ }
+ }
+ bout.flush();
+ } catch(IOException io) {
+ }
+ }
+ LineNumberReader bin;
+ DataOutputStream bout;
+ }
+
+ static void exec(String arg, PrintStream log)
+ {
+ exec( new String[] {arg}, log );
+ }
+
+ static void exec(String[] args, PrintStream log)
+ {
+ try {
+ log.println("Exec: start: " + Arrays.asList(args));
+ Process proc = Runtime.getRuntime().exec(args);
+ new StreamConsumer(proc.getErrorStream(), log).start();
+ new StreamConsumer(proc.getInputStream(), log).start();
+ int status = proc.waitFor();
+ //if status != 0
+ log.println("Exec: status=" + status + ": " + Arrays.asList(args));
+ } catch(InterruptedException in) {
+ in.printStackTrace();
+ } catch(IOException io) {
+ io.printStackTrace();
+ }
+ }
+
+ static String qualifyHost(String url)
+ {
+ try {
+ return qualifyHost(new URL(url)).toString();
+ } catch(IOException io) {
+ return url;
+ }
+ }
+
+ static URL qualifyHost(URL url)
+ {
+ try {
+ InetAddress a = InetAddress.getByName(url.getHost());
+ String qualHost = a.getCanonicalHostName();
+ URL q = new URL(url.getProtocol(), qualHost, url.getPort(), url.getFile());
+ return q;
+ } catch(IOException io) {
+ return url;
+ }
+ }
+
+ static String slurp(File f) throws IOException
+ {
+ FileInputStream in = new FileInputStream(f);
+ int len = (int)f.length();
+ byte[] buf = new byte[len];
+ in.read(buf, 0, len);
+ return new String(buf);
+ }
+
+ static private Environment env_;
+
+ static Environment env()
+ {
+ if(env_ != null) {
+ return env_;
+ }
+ try {
+ env_ = new Environment();
+ } catch(IOException io) {
+ io.printStackTrace();
+ }
+ return env_;
+ }
+
+}
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java Wed May 3 14:00:13 2006
@@ -0,0 +1,108 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+/** A way to interpret XML fragments as Mapper input records.
+ * Values are XML subtrees delimited by configurable tags.
+ * Keys could be the value of a certain attribute in the XML subtree,
+ * but this is left to the stream processor application.
+ * @author Michel Tourn
+ */
+public class StreamXmlRecordReader extends StreamBaseRecordReader
+{
+ public StreamXmlRecordReader(
+ FSDataInputStream in, long start, long end,
+ String splitName, Reporter reporter, JobConf job)
+ throws IOException
+ {
+ super(in, start, end, splitName, reporter, job);
+ beginMark_ = checkJobGet("stream.recordreader.begin");
+ endMark_ = checkJobGet("stream.recordreader.end");
+ }
+
+ String checkJobGet(String prop) throws IOException
+ {
+ String val = job_.get(prop);
+ if(val == null) {
+ throw new IOException("JobConf: missing required property: " + prop);
+ }
+ return val;
+ }
+
+ public void seekNextRecordBoundary() throws IOException
+ {
+ System.out.println("@@@start seekNext " + in_.getPos());
+ readUntilMatch(beginMark_, null);
+ System.out.println("@@@end seekNext " + in_.getPos());
+ }
+
+ public synchronized boolean next(Writable key, Writable value)
+ throws IOException
+ {
+ long pos = in_.getPos();
+ if (pos >= end_)
+ return false;
+
+ StringBuffer buf = new StringBuffer();
+ readUntilMatch(endMark_, buf);
+ numRecStats(buf);
+ return true;
+ }
+
+ void readUntilMatch(String pat, StringBuffer outBuf) throws IOException
+ {
+
+ char[] cpat = pat.toCharArray();
+ int m = 0;
+ int msup = cpat.length;
+ while (true) {
+ int b = in_.read();
+ if (b == -1)
+ break;
+
+ char c = (char)b; // this assumes eight-bit matching. OK with UTF-8
+ if (c == cpat[m]) {
+ m++;
+ if(m==msup-1) {
+ break;
+ }
+ } else {
+ m = 0;
+ }
+ if(outBuf != null) {
+ outBuf.append(c);
+ }
+ }
+System.out.println("@@@START readUntilMatch(" + pat + ", " + outBuf + "\n@@@END readUntilMatch");
+ }
+
+
+ String beginMark_;
+ String endMark_;
+}
Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?rev=399432&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Wed May 3 14:00:13 2006
@@ -0,0 +1,117 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import junit.framework.TestCase;
+import java.io.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests hadoopStreaming in MapReduce local mode.
+ * It requires the Unix utilities tr and uniq.
+ */
+public class TestStreaming extends TestCase
+{
+
+ // "map" command: grep -E (red|green|blue)
+ // reduce command: uniq
+ String INPUT_FILE = "input.txt";
+ String OUTPUT_DIR = "out";
+ String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
+ String map = "/usr/bin/tr . \\n"; // split words into lines. Careful with spaces in args
+ String reduce = "/usr/bin/uniq";
+ String outputExpect = "are\t\nblue\t\nbunnies\t\npink\t\nred\t\nroses\t\nviolets\t\n";
+
+ StreamJob job;
+
+ public TestStreaming() throws IOException
+ {
+ // trunk/src/contrib/streaming --> trunk/build/contrib/streaming/test/data
+ String userDir = System.getProperty("user.dir");
+ String antTestDir = System.getProperty("test.build.data", userDir);
+ if(! userDir.equals(antTestDir)) {
+ // because changes to user.dir are ignored by File
+ throw new IllegalStateException("user.dir != test.build.data. The junit Ant task must be forked.");
+ }
+
+ boolean fromAntJunit = System.getProperty("test.build.data") != null;
+ if(fromAntJunit) {
+ new File(antTestDir).mkdirs();
+ File outFile = new File(antTestDir, getClass().getName()+".log");
+ PrintStream out = new PrintStream(new FileOutputStream(outFile));
+ System.setOut(out);
+ System.setErr(out);
+ }
+ System.out.println("test.build.data=" + antTestDir);
+ }
+ void createInput() throws IOException
+ {
+ String path = new File(".", INPUT_FILE).getAbsolutePath();// needed from junit forked vm
+ DataOutputStream out = new DataOutputStream(new FileOutputStream(path));
+ out.writeBytes(input);
+ out.close();
+ }
+
+ public void testCommandLine()
+ {
+ try {
+ createInput();
+ boolean mayExit = false;
+
+ // During tests, the default Configuration will use a local mapred
+ // So don't specify -config or -cluster
+ String argv[] = new String[] {
+ "-input", INPUT_FILE,
+ "-output", OUTPUT_DIR,
+ "-mapper", map,
+ "-reducer", reduce,
+ /*"-debug",*/
+ "-verbose"
+ };
+
+ job = new StreamJob(argv, mayExit);
+ job.go();
+ File outFile = new File(".", OUTPUT_DIR + "/part-00000").getAbsoluteFile();
+ String output = StreamUtil.slurp(outFile);
+ System.out.println("outEx=" + outputExpect);
+ System.out.println(" out=" + output);
+ assertEquals(outputExpect, output);
+
+ } catch(Exception e) {
+ failTrace(e);
+ }
+
+
+ }
+
+ void failTrace(Exception e)
+ {
+ StringWriter sw = new StringWriter();
+ e.printStackTrace(new PrintWriter(sw));
+ fail(sw.toString());
+ }
+
+
+
+ public static void main(String[]args) throws Exception
+ {
+ new TestStreaming().testCommandLine();
+ }
+
+}