You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/19 01:08:08 UTC
svn commit: r447626 [3/3] - in /lucene/hadoop/trunk: ./ src/contrib/
src/contrib/streaming/
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/contrib/streaming/src/test/org/apache/hadoop/streaming/
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java Mon Sep 18 16:08:06 2006
@@ -42,91 +42,82 @@
*
* @author Michel Tourn
*/
-public class StreamXmlRecordReader extends StreamBaseRecordReader
-{
- public StreamXmlRecordReader(
- FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)
- throws IOException
- {
+public class StreamXmlRecordReader extends StreamBaseRecordReader {
+
+ public StreamXmlRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
+ JobConf job, FileSystem fs) throws IOException {
super(in, split, reporter, job, fs);
-
+
beginMark_ = checkJobGet(CONF_NS + "begin");
- endMark_ = checkJobGet(CONF_NS + "end");
+ endMark_ = checkJobGet(CONF_NS + "end");
- maxRecSize_= job_.getInt(CONF_NS + "maxrec", 50*1000);
- lookAhead_ = job_.getInt(CONF_NS + "lookahead", 2*maxRecSize_);
+ maxRecSize_ = job_.getInt(CONF_NS + "maxrec", 50 * 1000);
+ lookAhead_ = job_.getInt(CONF_NS + "lookahead", 2 * maxRecSize_);
synched_ = false;
-
+
slowMatch_ = job_.getBoolean(CONF_NS + "slowmatch", false);
- if(slowMatch_) {
- beginPat_ = makePatternCDataOrMark(beginMark_);
- endPat_ = makePatternCDataOrMark(endMark_);
+ if (slowMatch_) {
+ beginPat_ = makePatternCDataOrMark(beginMark_);
+ endPat_ = makePatternCDataOrMark(endMark_);
}
}
-
+
int numNext = 0;
- public synchronized boolean next(Writable key, Writable value)
- throws IOException
- {
+
+ public synchronized boolean next(Writable key, Writable value) throws IOException {
long pos = in_.getPos();
numNext++;
if (pos >= end_) {
return false;
}
-
+
DataOutputBuffer buf = new DataOutputBuffer();
- if(!readUntilMatchBegin()) {
- return false;
+ if (!readUntilMatchBegin()) {
+ return false;
}
- if(!readUntilMatchEnd(buf)) {
- return false;
+ if (!readUntilMatchEnd(buf)) {
+ return false;
}
-
+
// There is only one elem..key/value splitting is not done here.
- byte [] record = new byte[buf.getLength()];
+ byte[] record = new byte[buf.getLength()];
System.arraycopy(buf.getData(), 0, record, 0, record.length);
-
- numRecStats(record, 0, record.length);
- ((Text)key).set(record);
- ((Text)value).set("");
-
+ numRecStats(record, 0, record.length);
+
+ ((Text) key).set(record);
+ ((Text) value).set("");
+
/*if(numNext < 5) {
- System.out.println("@@@ " + numNext + ". true next k=|" + key.toString().replaceAll("[\\r\\n]", " ")
- + "|, len=" + buf.length() + " v=|" + value.toString().replaceAll("[\\r\\n]", " ") + "|");
- }*/
+ System.out.println("@@@ " + numNext + ". true next k=|" + key.toString().replaceAll("[\\r\\n]", " ")
+ + "|, len=" + buf.length() + " v=|" + value.toString().replaceAll("[\\r\\n]", " ") + "|");
+ }*/
return true;
}
-
- public void seekNextRecordBoundary() throws IOException
- {
+
+ public void seekNextRecordBoundary() throws IOException {
readUntilMatchBegin();
}
-
- boolean readUntilMatchBegin() throws IOException
- {
- if(slowMatch_) {
- return slowReadUntilMatch(beginPat_, false, null);
+
+ boolean readUntilMatchBegin() throws IOException {
+ if (slowMatch_) {
+ return slowReadUntilMatch(beginPat_, false, null);
} else {
- return fastReadUntilMatch(beginMark_, false, null);
+ return fastReadUntilMatch(beginMark_, false, null);
}
}
-
- private boolean readUntilMatchEnd(DataOutputBuffer buf) throws IOException
- {
- if(slowMatch_) {
+
+ private boolean readUntilMatchEnd(DataOutputBuffer buf) throws IOException {
+ if (slowMatch_) {
return slowReadUntilMatch(endPat_, true, buf);
} else {
return fastReadUntilMatch(endMark_, true, buf);
}
}
-
-
- private boolean slowReadUntilMatch(Pattern markPattern, boolean includePat,
- DataOutputBuffer outBufOrNull)
- throws IOException
- {
+
+ private boolean slowReadUntilMatch(Pattern markPattern, boolean includePat,
+ DataOutputBuffer outBufOrNull) throws IOException {
try {
long inStart = in_.getPos();
byte[] buf = new byte[Math.max(lookAhead_, maxRecSize_)];
@@ -134,34 +125,33 @@
boolean success = true;
in_.mark(lookAhead_ + 2);
read = in_.read(buf);
- if( read == -1 )
- return false;
-
- String sbuf = new String(buf, 0, read, "UTF-8");
+ if (read == -1) return false;
+
+ String sbuf = new String(buf, 0, read, "UTF-8");
Matcher match = markPattern.matcher(sbuf);
firstMatchStart_ = NA;
firstMatchEnd_ = NA;
int bufPos = 0;
int state = synched_ ? CDATA_OUT : CDATA_UNK;
- int s=0;
+ int s = 0;
int matchLen = 0;
- while(match.find(bufPos)) {
+ while (match.find(bufPos)) {
int input;
matchLen = match.group(0).length();
- if(match.group(1) != null) {
+ if (match.group(1) != null) {
input = CDATA_BEGIN;
- } else if(match.group(2) != null) {
+ } else if (match.group(2) != null) {
input = CDATA_END;
firstMatchStart_ = NA; // |<DOC CDATA[ </DOC> ]]> should keep it
} else {
input = RECORD_MAYBE;
}
- if(input == RECORD_MAYBE) {
- if(firstMatchStart_ == NA) {
- firstMatchStart_ = match.start();
- firstMatchEnd_ = match.end();
- }
+ if (input == RECORD_MAYBE) {
+ if (firstMatchStart_ == NA) {
+ firstMatchStart_ = match.start();
+ firstMatchEnd_ = match.end();
+ }
}
state = nextState(state, input, match.start());
/*System.out.println("@@@" +
@@ -169,164 +159,153 @@
" state=" + state + " input=" + input +
" firstMatchStart_=" + firstMatchStart_ + " startinstream=" + (inStart+firstMatchStart_) +
" match=" + match.group(0) + " in=" + in_.getPos());*/
- if(state == RECORD_ACCEPT) {
+ if (state == RECORD_ACCEPT) {
break;
}
bufPos = match.end();
s++;
}
- if(state != CDATA_UNK) {
+ if (state != CDATA_UNK) {
synched_ = true;
}
- boolean matched = (firstMatchStart_ != NA) && (state == RECORD_ACCEPT || state == CDATA_UNK);
- if(matched) {
+ boolean matched = (firstMatchStart_ != NA) && (state == RECORD_ACCEPT || state == CDATA_UNK);
+ if (matched) {
int endPos = includePat ? firstMatchEnd_ : firstMatchStart_;
//System.out.println("firstMatchStart_=" + firstMatchStart_ + " firstMatchEnd_=" + firstMatchEnd_);
//String snip = sbuf.substring(firstMatchStart_, firstMatchEnd_);
//System.out.println(" match snip=|" + snip + "| markPattern=" + markPattern);
- if(outBufOrNull != null) {
- in_.reset();
- outBufOrNull.write(in_,endPos);
+ if (outBufOrNull != null) {
+ in_.reset();
+ outBufOrNull.write(in_, endPos);
} else {
//System.out.println("Skip to " + (inStart + endPos));
in_.seek(inStart + endPos);
}
}
return matched;
- } catch(Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
} finally {
// in_ ?
}
return false;
- }
-
+ }
+
// states
- final static int CDATA_IN = 10;
+ final static int CDATA_IN = 10;
final static int CDATA_OUT = 11;
final static int CDATA_UNK = 12;
final static int RECORD_ACCEPT = 13;
// inputs
final static int CDATA_BEGIN = 20;
- final static int CDATA_END = 21;
- final static int RECORD_MAYBE= 22;
-
+ final static int CDATA_END = 21;
+ final static int RECORD_MAYBE = 22;
+
/* also updates firstMatchStart_;*/
- int nextState(int state, int input, int bufPos)
- {
- switch(state) {
- case CDATA_UNK:
- case CDATA_OUT:
- switch(input) {
- case CDATA_BEGIN:
- return CDATA_IN;
- case CDATA_END:
- if(state==CDATA_OUT) {
- //System.out.println("buggy XML " + bufPos);
- }
- return CDATA_OUT;
- case RECORD_MAYBE:
- return (state==CDATA_UNK) ? CDATA_UNK : RECORD_ACCEPT;
+ int nextState(int state, int input, int bufPos) {
+ switch (state) {
+ case CDATA_UNK:
+ case CDATA_OUT:
+ switch (input) {
+ case CDATA_BEGIN:
+ return CDATA_IN;
+ case CDATA_END:
+ if (state == CDATA_OUT) {
+ //System.out.println("buggy XML " + bufPos);
}
- break;
- case CDATA_IN:
- return (input==CDATA_END) ? CDATA_OUT : CDATA_IN;
+ return CDATA_OUT;
+ case RECORD_MAYBE:
+ return (state == CDATA_UNK) ? CDATA_UNK : RECORD_ACCEPT;
+ }
+ break;
+ case CDATA_IN:
+ return (input == CDATA_END) ? CDATA_OUT : CDATA_IN;
}
throw new IllegalStateException(state + " " + input + " " + bufPos + " " + splitName_);
}
-
-
- Pattern makePatternCDataOrMark(String escapedMark)
- {
+
+ Pattern makePatternCDataOrMark(String escapedMark) {
StringBuffer pat = new StringBuffer();
- addGroup(pat, StreamUtil.regexpEscape("CDATA[")); // CDATA_BEGIN
- addGroup(pat, StreamUtil.regexpEscape("]]>")); // CDATA_END
- addGroup(pat, escapedMark); // RECORD_MAYBE
+ addGroup(pat, StreamUtil.regexpEscape("CDATA[")); // CDATA_BEGIN
+ addGroup(pat, StreamUtil.regexpEscape("]]>")); // CDATA_END
+ addGroup(pat, escapedMark); // RECORD_MAYBE
return Pattern.compile(pat.toString());
}
- void addGroup(StringBuffer pat, String escapedGroup)
- {
- if(pat.length() > 0) {
- pat.append("|");
+
+ void addGroup(StringBuffer pat, String escapedGroup) {
+ if (pat.length() > 0) {
+ pat.append("|");
}
pat.append("(");
pat.append(escapedGroup);
pat.append(")");
}
-
-
-
- boolean fastReadUntilMatch(String textPat,
- boolean includePat,
- DataOutputBuffer outBufOrNull) throws IOException
- {
+
+ boolean fastReadUntilMatch(String textPat, boolean includePat, DataOutputBuffer outBufOrNull) throws IOException {
//System.out.println("@@@BEGIN readUntilMatch inPos=" + in_.getPos());
byte[] cpat = textPat.getBytes("UTF-8");
int m = 0;
boolean match = false;
long markPos = -1;
int msup = cpat.length;
- if(!includePat) {
+ if (!includePat) {
int LL = 120000 * 10;
markPos = in_.getPos();
in_.mark(LL); // lookAhead_
}
while (true) {
int b = in_.read();
- if (b == -1)
- break;
+ if (b == -1) break;
- byte c = (byte)b; // this assumes eight-bit matching. OK with UTF-8
+ byte c = (byte) b; // this assumes eight-bit matching. OK with UTF-8
if (c == cpat[m]) {
m++;
- if(m==msup) {
+ if (m == msup) {
match = true;
break;
}
} else {
- if(outBufOrNull != null) {
+ if (outBufOrNull != null) {
outBufOrNull.write(cpat, 0, m);
outBufOrNull.write(c);
}
-
+
m = 0;
}
}
- if(!includePat && match) {
+ if (!includePat && match) {
long pos = in_.getPos() - textPat.length();
in_.reset();
in_.seek(pos);
- } else if(outBufOrNull != null){
+ } else if (outBufOrNull != null) {
outBufOrNull.write(cpat);
}
//System.out.println("@@@DONE readUntilMatch inPos=" + in_.getPos() + " includePat=" + includePat + " pat=" + textPat + ", buf=|" + outBufOrNull + "|");
return match;
}
-
- String checkJobGet(String prop) throws IOException
- {
+
+ String checkJobGet(String prop) throws IOException {
String val = job_.get(prop);
- if(val == null) {
- throw new IOException("JobConf: missing required property: " + prop);
+ if (val == null) {
+ throw new IOException("JobConf: missing required property: " + prop);
}
return val;
}
-
-
+
String beginMark_;
String endMark_;
-
+
Pattern beginPat_;
Pattern endPat_;
- boolean slowMatch_;
+ boolean slowMatch_;
int lookAhead_; // bytes to read to try to synch CDATA/non-CDATA. Should be more than max record size
int maxRecSize_;
- final static int NA = -1;
+ final static int NA = -1;
int firstMatchStart_ = 0; // candidate record boundary. Might just be CDATA.
int firstMatchEnd_ = 0;
-
+
boolean isRecordMatch_;
boolean synched_;
}
Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java?view=auto&rev=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java Mon Sep 18 16:08:06 2006
@@ -0,0 +1,86 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+
+/**
+ @author Michel Tourn
+ */
+public class TupleInputFormat extends InputFormatBase {
+
+ public TupleInputFormat() {
+ fmts_ = new ArrayList();
+ }
+
+ public void setPrimary(InputFormat fmt) {
+ if (fmts_.size() == 0) {
+ fmts_.add(fmt);
+ } else {
+ fmts_.set(0, fmt);
+ }
+ }
+
+ public void addSecondary(InputFormat fmt) {
+ if (fmts_.size() == 0) {
+ throw new IllegalStateException("this.setPrimary() has not been called");
+ }
+ fmts_.add(fmt);
+ }
+
+ /**
+ */
+ public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) throws IOException {
+
+ reporter.setStatus(split.toString());
+
+ return new MultiRecordReader();
+ }
+
+ class MultiRecordReader implements RecordReader {
+
+ MultiRecordReader() {
+ }
+
+ public boolean next(Writable key, Writable value) throws IOException {
+ return false;
+ }
+
+ public long getPos() throws IOException {
+ return 0;
+ }
+
+ public void close() throws IOException {
+ }
+
+ public WritableComparable createKey() {
+ return new UTF8();
+ }
+
+ public Writable createValue() {
+ return new UTF8();
+ }
+
+ }
+
+ ArrayList/*<InputFormat>*/fmts_;
+}
Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java?view=auto&rev=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java Mon Sep 18 16:08:06 2006
@@ -0,0 +1,317 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.LineNumberInputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.DFSShell;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This JUnit test is not pure-Java and is not run as
+ * part of the standard ant test* targets.
+ * Two ways to run this:<pre>
+ * 1. main(), a Java application.
+ * 2. cd src/contrib/streaming/
+ * ant \
+ * [-Dfs.default.name=h:p] \
+ * [-Dhadoop.test.localoutputfile=/tmp/fifo] \
+ * test-unix
+ * </pre>
+ * @author michel
+ */
+public class TestStreamedMerge extends TestCase {
+
+ public TestStreamedMerge() throws IOException {
+ UtilTest utilTest = new UtilTest(getClass().getName());
+ utilTest.checkUserDir();
+ // utilTest.redirectIfAntJunit();
+ }
+
+ final static int NAME_PORT = 8200;
+ final static int SOC_PORT = 1888;
+
+ void addInput(String path, String contents) throws IOException {
+ OutputStream out = fs_.create(new Path(path));
+ DataOutputStream dout = new DataOutputStream(out);
+ dout.write(contents.getBytes("UTF-8"));
+ dout.close();
+ System.err.println("addInput done: " + path);
+ }
+
+ String createInputs(boolean tag) throws IOException {
+ fs_.delete(new Path("/input/"));
+
+ // i18n() replaces some ASCII with multibyte UTF-8 chars
+ addInput("/input/part-00", i18n("k1\tv1\n" + "k3\tv5\n"));
+ addInput("/input/part-01", i18n("k1\tv2\n" + "k2\tv4\n"));
+ addInput("/input/part-02", i18n("k1\tv3\n"));
+ addInput("/input/part-03", "");
+
+ // tags are one-based: ">1" corresponds to part-00, etc.
+ // Expected result it the merge-sort order of the records.
+ // keys are compared as Strings and ties are broken by stream index
+ // For example (k1; stream 2) < (k1; stream 3)
+ String expect = i18n(
+ unt(">1\tk1\tv1\n", tag) +
+ unt(">2\tk1\tv2\n", tag) +
+ unt(">3\tk1\tv3\n", tag) +
+ unt(">2\tk2\tv4\n", tag) +
+ unt(">1\tk3\tv5\n", tag)
+ );
+ return expect;
+ }
+
+ String unt(String line, boolean keepTag)
+ {
+ return keepTag ? line : line.substring(line.indexOf('\t')+1);
+ }
+ String i18n(String c) {
+ c = c.replace('k', '\u20ac'); // Euro sign, in UTF-8: E282AC
+ c = c.replace('v', '\u00a2'); // Cent sign, in UTF-8: C2A2 ; UTF-16 contains null
+ // "\ud800\udc00" // A surrogate pair, U+10000. OK also works
+ return c;
+ }
+
+ void lsr() {
+ try {
+ System.out.println("lsr /");
+ DFSShell shell = new DFSShell();
+ shell.setConf(conf_);
+ shell.init();
+ shell.ls("/", true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ void printSampleInput() {
+ try {
+ System.out.println("cat /input/part-00");
+ String content = StreamUtil.slurpHadoop(new Path("/input/part-00"), fs_);
+ System.out.println(content);
+ System.out.println("cat done.");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ void callStreaming(String argSideOutput, boolean inputTagged) throws IOException {
+ String[] testargs = new String[] {
+ //"-jobconf", "stream.debug=1",
+ "-verbose",
+ "-jobconf", "stream.testmerge=1",
+ "-input", "+/input/part-00 | /input/part-01 | /input/part-02",
+ "-mapper", StreamUtil.localizeBin("/bin/cat"),
+ "-reducer", "NONE",
+ "-output", "/my.output",
+ "-mapsideoutput", argSideOutput,
+ "-dfs", conf_.get("fs.default.name"),
+ "-jt", "local",
+ "-jobconf", "stream.sideoutput.localfs=true",
+ };
+ ArrayList argList = new ArrayList();
+ argList.addAll(Arrays.asList(testargs));
+ if (inputTagged) {
+ argList.add("-inputtagged");
+ }
+ testargs = (String[])argList.toArray(new String[0]);
+ String sss = StreamUtil.collate(argList, " ");
+ System.out.println("bin/hadoop jar build/hadoop-streaming.jar " + sss);
+ //HadoopStreaming.main(testargs);
+ StreamJob job = new StreamJob(testargs, false);
+ job.go();
+ }
+
+ SideEffectConsumer startSideEffectConsumer(StringBuffer outBuf) {
+ SideEffectConsumer t = new SideEffectConsumer(outBuf) {
+ ServerSocket listen;
+ Socket client;
+ InputStream in;
+
+ InputStream connectInputStream() throws IOException {
+ listen = new ServerSocket(SOC_PORT);
+ client = listen.accept();
+ in = client.getInputStream();
+ return in;
+ }
+
+ void close() throws IOException
+ {
+ listen.close();
+ System.out.println("@@@listen closed");
+ }
+ };
+ t.start();
+ return t;
+ }
+
+ abstract class SideEffectConsumer extends Thread {
+
+ SideEffectConsumer(StringBuffer buf) {
+ buf_ = buf;
+ setDaemon(true);
+ }
+
+ abstract InputStream connectInputStream() throws IOException;
+
+ abstract void close() throws IOException;
+
+ public void run() {
+ try {
+ in_ = connectInputStream();
+ while (true) {
+ byte[] b = UTF8ByteArrayUtils.readLine(in_);
+ if (b == null) {
+ break;
+ }
+ buf_.append(new String(b, "UTF-8"));
+ buf_.append('\n');
+ }
+ in_.close();
+ } catch (IOException io) {
+ throw new RuntimeException(io);
+ }
+ }
+
+ InputStream in_;
+ StringBuffer buf_;
+ }
+
+ public void testMain() throws IOException {
+ boolean success = false;
+ String base = new File(".").getAbsolutePath();
+ System.setProperty("hadoop.log.dir", base + "/logs");
+ conf_ = new Configuration();
+ String overrideFS = StreamUtil.getBoundAntProperty("fs.default.name", null);
+ MiniDFSCluster cluster = null;
+ try {
+ if(overrideFS == null) {
+ cluster = new MiniDFSCluster(NAME_PORT, conf_, false);
+ fs_ = cluster.getFileSystem();
+ } else {
+ System.out.println("overrideFS: " + overrideFS);
+ conf_.set("fs.default.name", overrideFS);
+ fs_ = FileSystem.get(conf_);
+ }
+ doAllTestJobs();
+ success = true;
+ } catch (IOException io) {
+ io.printStackTrace();
+ } finally {
+ try {
+ fs_.close();
+ } catch (IOException io) {
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ System.out.println("cluster.shutdown(); DONE");
+ }
+ System.out.println(getClass().getName() + ": success=" + success);
+ }
+ }
+
+ void doAllTestJobs() throws IOException
+ {
+ goSocketTagged(true, false);
+ goSocketTagged(false, false);
+ goSocketTagged(true, true);
+ }
+
+ void goSocketTagged(boolean socket, boolean inputTagged) throws IOException {
+ System.out.println("***** goSocketTagged: " + socket + ", " + inputTagged);
+ String expect = createInputs(inputTagged);
+ lsr();
+ printSampleInput();
+
+ StringBuffer outputBuf = new StringBuffer();
+ String sideOutput = null;
+ File f = null;
+ SideEffectConsumer consumer = null;
+ if (socket) {
+ consumer = startSideEffectConsumer(outputBuf);
+ sideOutput = "socket://localhost:" + SOC_PORT + "/";
+ } else {
+ String userOut = StreamUtil.getBoundAntProperty(
+ "hadoop.test.localoutputfile", null);
+ if(userOut != null) {
+ f = new File(userOut);
+ // don't delete so they can mkfifo
+ maybeFifoOutput_ = true;
+ } else {
+ f = new File("localoutputfile");
+ f.delete();
+ maybeFifoOutput_ = false;
+ }
+ String s = new Path(f.getAbsolutePath()).toString();
+ if(! s.startsWith("/")) {
+ s = "/" + s; // Windows "file:/C:/"
+ }
+ sideOutput = "file:" + s;
+ }
+ System.out.println("sideOutput=" + sideOutput);
+ callStreaming(sideOutput, inputTagged);
+ String output;
+ if (socket) {
+ try {
+ consumer.join();
+ consumer.close();
+ } catch (InterruptedException e) {
+ throw (IOException) new IOException().initCause(e);
+ }
+ output = outputBuf.toString();
+ } else {
+ if(maybeFifoOutput_) {
+ System.out.println("assertEquals will fail.");
+ output = "potential FIFO: not retrieving to avoid blocking on open() "
+ + f.getAbsoluteFile();
+ } else {
+ output = StreamUtil.slurp(f.getAbsoluteFile());
+ }
+ }
+
+ lsr();
+
+ System.out.println("output=|" + output + "|");
+ System.out.println("expect=|" + expect + "|");
+ assertEquals(expect, output);
+ }
+
+ Configuration conf_;
+ FileSystem fs_;
+ boolean maybeFifoOutput_;
+
+ public static void main(String[] args) throws Throwable {
+ TestStreamedMerge test = new TestStreamedMerge();
+ test.testMain();
+ }
+
+}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Mon Sep 18 16:08:06 2006
@@ -35,74 +35,29 @@
String OUTPUT_DIR = "out";
String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
// map behaves like "/usr/bin/tr . \\n"; (split words into lines)
- String map = makeJavaCommand(TrApp.class, new String[]{".", "\\n"});
+ String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "\\n"});
// combine, reduce behave like /usr/bin/uniq. But also prepend lines with C, R.
- String combine = makeJavaCommand(UniqApp.class, new String[]{"C"});
- String reduce = makeJavaCommand(UniqApp.class, new String[]{"R"});
+ String combine = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"C"});
+ String reduce = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"R"});
String outputExpect = "RCare\t\nRCblue\t\nRCbunnies\t\nRCpink\t\nRCred\t\nRCroses\t\nRCviolets\t\n";
StreamJob job;
public TestStreaming() throws IOException
{
- // trunk/src/contrib/streaming --> trunk/build/contrib/streaming/test/data
- String userDir = System.getProperty("user.dir");
- String antTestDir = System.getProperty("test.build.data", userDir);
- if(! userDir.equals(antTestDir)) {
- // because changes to user.dir are ignored by File
- throw new IllegalStateException("user.dir != test.build.data. The junit Ant task must be forked.");
- }
-
- boolean fromAntJunit = System.getProperty("test.build.data") != null;
- if(fromAntJunit) {
- new File(antTestDir).mkdirs();
- File outFile = new File(antTestDir, getClass().getName()+".log");
- PrintStream out = new PrintStream(new FileOutputStream(outFile));
- System.setOut(out);
- System.setErr(out);
- }
- System.out.println("test.build.data=" + antTestDir);
+ UtilTest utilTest = new UtilTest(getClass().getName());
+ utilTest.checkUserDir();
+ utilTest.redirectIfAntJunit();
}
void createInput() throws IOException
{
String path = new File(".", INPUT_FILE).getAbsolutePath();// needed from junit forked vm
DataOutputStream out = new DataOutputStream(new FileOutputStream(path));
- out.writeBytes(input);
+ out.write(input.getBytes("UTF-8"));
out.close();
}
- public String makeJavaCommand(Class main, String[] argv)
- {
- ArrayList vargs = new ArrayList();
- File javaHomeBin = new File(System.getProperty("java.home"), "bin");
- File jvm = new File(javaHomeBin, "java");
- vargs.add(jvm.toString());
- // copy parent classpath
- vargs.add("-classpath");
- vargs.add("\"" + System.getProperty("java.class.path") + "\"");
-
- // Add main class and its arguments
- vargs.add(main.getName());
- for(int i=0; i<argv.length; i++) {
- vargs.add(argv[i]);
- }
- return collate(vargs, " ");
- }
-
- String collate(ArrayList args, String sep)
- {
- StringBuffer buf = new StringBuffer();
- Iterator it = args.iterator();
- while(it.hasNext()) {
- if(buf.length() > 0) {
- buf.append(" ");
- }
- buf.append(it.next());
- }
- return buf.toString();
- }
-
public void testCommandLine()
{
try {
@@ -117,22 +72,21 @@
"-mapper", map,
"-combiner", combine,
"-reducer", reduce,
- /*"-debug",*/
- "-verbose"
+ //"-verbose",
+ //"-jobconf", "stream.debug=set"
+ "-jobconf", "keep.failed.task.files=true",
};
- job = new StreamJob(argv, mayExit);
+ job = new StreamJob(argv, mayExit);
job.go();
File outFile = new File(".", OUTPUT_DIR + "/part-00000").getAbsoluteFile();
String output = StreamUtil.slurp(outFile);
- System.out.println("outEx=" + outputExpect);
- System.out.println(" out=" + output);
+ System.err.println("outEx1=" + outputExpect);
+ System.err.println(" out1=" + output);
assertEquals(outputExpect, output);
} catch(Exception e) {
- failTrace(e);
+ failTrace(e);
}
-
-
}
void failTrace(Exception e)
@@ -141,8 +95,6 @@
e.printStackTrace(new PrintWriter(sw));
fail(sw.toString());
}
-
-
public static void main(String[]args) throws Exception
{
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java Mon Sep 18 16:08:06 2006
@@ -17,12 +17,13 @@
package org.apache.hadoop.streaming;
import java.io.*;
+
import org.apache.hadoop.streaming.Environment;
/** A minimal Java implementation of /usr/bin/tr.
Used to test the usage of external applications without adding
platform-specific dependencies.
-*/
+ */
public class TrApp
{
@@ -35,16 +36,6 @@
void testParentJobConfToEnvVars() throws IOException
{
env = new Environment();
-
- /* To get some ideas of stable env.vars:
- Enumeration it = env.keys();
- while(it.hasMoreElements()) {
- String key = (String)it.nextElement();
- String val = (String)env.get(key);
- System.out.println("@@@" + key + "=" + val);
- }
- */
-
// test that some JobConf properties are exposed as expected
// Note the dots translated to underscore:
// property names have been escaped in PipeMapRed.safeEnvVarName()
@@ -52,7 +43,8 @@
expect("mapred_job_tracker", "local");
expect("mapred_input_key_class", "org.apache.hadoop.io.Text");
expect("mapred_input_value_class", "org.apache.hadoop.io.Text");
- expect("mapred_local_dir", "build/test/mapred/local");
+ //expect("mapred_local_dir", "build/test/mapred/local");
+ expectDefined("mapred_local_dir");
expect("mapred_output_format_class", "org.apache.hadoop.streaming.StreamOutputFormat");
expect("mapred_output_key_class", "org.apache.hadoop.io.Text");
expect("mapred_output_value_class", "org.apache.hadoop.io.Text");
@@ -64,12 +56,12 @@
expectDefined("map_input_file");
expect("map_input_start", "0");
expectDefined("map_input_length");
-
+
expectDefined("io_sort_factor");
// the FileSplit context properties are not available in local hadoop..
// so can't check them in this test.
-
+
}
// this runs in a subprocess; won't use JUnit's assertTrue()
@@ -81,7 +73,7 @@
throw new IOException(msg);
}
}
-
+
void expectDefined(String evName) throws IOException
{
String got = env.getProperty(evName);
@@ -90,7 +82,7 @@
throw new IOException(msg);
}
}
-
+
public void go() throws IOException
{
testParentJobConfToEnvVars();
@@ -98,8 +90,8 @@
String line;
while ((line = in.readLine()) != null) {
- String out = line.replace(find, replace);
- System.out.println(out);
+ String out = line.replace(find, replace);
+ System.out.println(out);
}
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UniqApp.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UniqApp.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UniqApp.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UniqApp.java Mon Sep 18 16:08:06 2006
@@ -17,12 +17,13 @@
package org.apache.hadoop.streaming;
import java.io.*;
+import java.util.Date;
/** A minimal Java implementation of /usr/bin/uniq
Used to test the usage of external applications without adding
platform-specific dependencies.
Uniques lines and prepends a header on the line.
-*/
+ */
public class UniqApp
{
@@ -36,10 +37,10 @@
String line;
String prevLine = null;
while ((line = in.readLine()) != null) {
- if(! line.equals(prevLine)) {
- System.out.println(header + line);
- }
- prevLine = line;
+ if(! line.equals(prevLine)) {
+ System.out.println(header + line);
+ }
+ prevLine = line;
}
}
Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java?view=auto&rev=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java Mon Sep 18 16:08:06 2006
@@ -0,0 +1,56 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+class UtilTest {
+
+ public UtilTest(String testName) {
+ testName_ = testName;
+ userDir_ = System.getProperty("user.dir");
+ antTestDir_ = System.getProperty("test.build.data", userDir_);
+ System.out.println("test.build.data-or-user.dir=" + antTestDir_);
+ }
+
+ void checkUserDir() {
+ // trunk/src/contrib/streaming --> trunk/build/contrib/streaming/test/data
+ if(! userDir_.equals(antTestDir_)) {
+ // because changes to user.dir are ignored by File static methods.
+ throw new IllegalStateException("user.dir != test.build.data. The junit Ant task must be forked.");
+ }
+ }
+
+ void redirectIfAntJunit() throws IOException
+ {
+ boolean fromAntJunit = System.getProperty("test.build.data") != null;
+ if(fromAntJunit) {
+ new File(antTestDir_).mkdirs();
+ File outFile = new File(antTestDir_, testName_+".log");
+ PrintStream out = new PrintStream(new FileOutputStream(outFile));
+ System.setOut(out);
+ System.setErr(out);
+ }
+ }
+
+ private String userDir_;
+ private String antTestDir_;
+ private String testName_;
+}