You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/19 01:08:08 UTC

svn commit: r447626 [3/3] - in /lucene/hadoop/trunk: ./ src/contrib/ src/contrib/streaming/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java Mon Sep 18 16:08:06 2006
@@ -42,91 +42,82 @@
  *
  *  @author Michel Tourn
  */
-public class StreamXmlRecordReader extends StreamBaseRecordReader 
-{
-  public StreamXmlRecordReader(
-    FSDataInputStream in, FileSplit split, Reporter reporter, JobConf job, FileSystem fs)
-    throws IOException
-  {
+public class StreamXmlRecordReader extends StreamBaseRecordReader {
+
+  public StreamXmlRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
+      JobConf job, FileSystem fs) throws IOException {
     super(in, split, reporter, job, fs);
-    
+
     beginMark_ = checkJobGet(CONF_NS + "begin");
-    endMark_   = checkJobGet(CONF_NS + "end");
+    endMark_ = checkJobGet(CONF_NS + "end");
 
-    maxRecSize_= job_.getInt(CONF_NS + "maxrec", 50*1000);
-    lookAhead_ = job_.getInt(CONF_NS + "lookahead", 2*maxRecSize_);
+    maxRecSize_ = job_.getInt(CONF_NS + "maxrec", 50 * 1000);
+    lookAhead_ = job_.getInt(CONF_NS + "lookahead", 2 * maxRecSize_);
     synched_ = false;
-    
+
     slowMatch_ = job_.getBoolean(CONF_NS + "slowmatch", false);
-    if(slowMatch_) {
-      beginPat_  = makePatternCDataOrMark(beginMark_);
-      endPat_    = makePatternCDataOrMark(endMark_);
+    if (slowMatch_) {
+      beginPat_ = makePatternCDataOrMark(beginMark_);
+      endPat_ = makePatternCDataOrMark(endMark_);
     }
   }
-  
+
   int numNext = 0;
-  public synchronized boolean next(Writable key, Writable value)
-   throws IOException
-  {
+
+  public synchronized boolean next(Writable key, Writable value) throws IOException {
     long pos = in_.getPos();
     numNext++;
     if (pos >= end_) {
       return false;
     }
-    
+
     DataOutputBuffer buf = new DataOutputBuffer();
-    if(!readUntilMatchBegin()) {
-        return false;
+    if (!readUntilMatchBegin()) {
+      return false;
     }
-    if(!readUntilMatchEnd(buf)) {
-        return false;
+    if (!readUntilMatchEnd(buf)) {
+      return false;
     }
-    
+
     // There is only one elem..key/value splitting is not done here.
-    byte [] record = new byte[buf.getLength()];
+    byte[] record = new byte[buf.getLength()];
     System.arraycopy(buf.getData(), 0, record, 0, record.length);
-    
-    numRecStats(record, 0, record.length); 
 
-    ((Text)key).set(record);
-    ((Text)value).set("");
-    
+    numRecStats(record, 0, record.length);
+
+    ((Text) key).set(record);
+    ((Text) value).set("");
+
     /*if(numNext < 5) {
-        System.out.println("@@@ " + numNext + ". true next k=|" + key.toString().replaceAll("[\\r\\n]", " ")
-        + "|, len=" + buf.length() + " v=|" + value.toString().replaceAll("[\\r\\n]", " ") + "|");
-    }*/
+     System.out.println("@@@ " + numNext + ". true next k=|" + key.toString().replaceAll("[\\r\\n]", " ")
+     + "|, len=" + buf.length() + " v=|" + value.toString().replaceAll("[\\r\\n]", " ") + "|");
+     }*/
 
     return true;
   }
-  
-  public void seekNextRecordBoundary() throws IOException
-  {
+
+  public void seekNextRecordBoundary() throws IOException {
     readUntilMatchBegin();
   }
-  
-  boolean readUntilMatchBegin() throws IOException
-  {
-    if(slowMatch_) {
-        return slowReadUntilMatch(beginPat_, false, null);
+
+  boolean readUntilMatchBegin() throws IOException {
+    if (slowMatch_) {
+      return slowReadUntilMatch(beginPat_, false, null);
     } else {
-        return fastReadUntilMatch(beginMark_, false, null);
+      return fastReadUntilMatch(beginMark_, false, null);
     }
   }
-  
-  private boolean readUntilMatchEnd(DataOutputBuffer buf) throws IOException
-  {
-    if(slowMatch_) {
+
+  private boolean readUntilMatchEnd(DataOutputBuffer buf) throws IOException {
+    if (slowMatch_) {
       return slowReadUntilMatch(endPat_, true, buf);
     } else {
       return fastReadUntilMatch(endMark_, true, buf);
     }
   }
-  
-  
-  private boolean slowReadUntilMatch(Pattern markPattern, boolean includePat, 
-          DataOutputBuffer outBufOrNull) 
-    throws IOException   
-  {
+
+  private boolean slowReadUntilMatch(Pattern markPattern, boolean includePat,
+      DataOutputBuffer outBufOrNull) throws IOException {
     try {
       long inStart = in_.getPos();
       byte[] buf = new byte[Math.max(lookAhead_, maxRecSize_)];
@@ -134,34 +125,33 @@
       boolean success = true;
       in_.mark(lookAhead_ + 2);
       read = in_.read(buf);
-      if( read == -1 )
-          return false;
-      
-      String sbuf = new String(buf, 0, read, "UTF-8");        
+      if (read == -1) return false;
+
+      String sbuf = new String(buf, 0, read, "UTF-8");
       Matcher match = markPattern.matcher(sbuf);
 
       firstMatchStart_ = NA;
       firstMatchEnd_ = NA;
       int bufPos = 0;
       int state = synched_ ? CDATA_OUT : CDATA_UNK;
-      int s=0;
+      int s = 0;
       int matchLen = 0;
-      while(match.find(bufPos)) {
+      while (match.find(bufPos)) {
         int input;
         matchLen = match.group(0).length();
-        if(match.group(1) != null) {
+        if (match.group(1) != null) {
           input = CDATA_BEGIN;
-        } else if(match.group(2) != null) {
+        } else if (match.group(2) != null) {
           input = CDATA_END;
           firstMatchStart_ = NA; // |<DOC CDATA[ </DOC> ]]> should keep it
         } else {
           input = RECORD_MAYBE;
         }
-        if(input == RECORD_MAYBE) {
-            if(firstMatchStart_ == NA) {
-              firstMatchStart_ = match.start();
-              firstMatchEnd_   = match.end();
-            }
+        if (input == RECORD_MAYBE) {
+          if (firstMatchStart_ == NA) {
+            firstMatchStart_ = match.start();
+            firstMatchEnd_ = match.end();
+          }
         }
         state = nextState(state, input, match.start());
         /*System.out.println("@@@" +
@@ -169,164 +159,153 @@
          " state=" + state + " input=" + input + 
          " firstMatchStart_=" + firstMatchStart_ + " startinstream=" + (inStart+firstMatchStart_) + 
          " match=" + match.group(0) + " in=" + in_.getPos());*/
-        if(state == RECORD_ACCEPT) {
+        if (state == RECORD_ACCEPT) {
           break;
         }
         bufPos = match.end();
         s++;
       }
-      if(state != CDATA_UNK) {
+      if (state != CDATA_UNK) {
         synched_ = true;
       }
-      boolean matched = (firstMatchStart_ != NA) && (state == RECORD_ACCEPT || state == CDATA_UNK); 
-      if(matched) {
+      boolean matched = (firstMatchStart_ != NA) && (state == RECORD_ACCEPT || state == CDATA_UNK);
+      if (matched) {
         int endPos = includePat ? firstMatchEnd_ : firstMatchStart_;
         //System.out.println("firstMatchStart_=" + firstMatchStart_ + " firstMatchEnd_=" + firstMatchEnd_);
         //String snip = sbuf.substring(firstMatchStart_, firstMatchEnd_);
         //System.out.println(" match snip=|" + snip + "| markPattern=" + markPattern);
-        if(outBufOrNull != null) {
-          in_.reset();      
-          outBufOrNull.write(in_,endPos);  
+        if (outBufOrNull != null) {
+          in_.reset();
+          outBufOrNull.write(in_, endPos);
         } else {
           //System.out.println("Skip to " + (inStart + endPos));
           in_.seek(inStart + endPos);
         }
       }
       return matched;
-    } catch(Exception e) {
+    } catch (Exception e) {
       e.printStackTrace();
     } finally {
       // in_ ?
     }
     return false;
-  }  
-  
+  }
+
   // states
-  final static int CDATA_IN  = 10;
+  final static int CDATA_IN = 10;
   final static int CDATA_OUT = 11;
   final static int CDATA_UNK = 12;
   final static int RECORD_ACCEPT = 13;
   // inputs
   final static int CDATA_BEGIN = 20;
-  final static int CDATA_END   = 21;
-  final static int RECORD_MAYBE= 22;
-  
+  final static int CDATA_END = 21;
+  final static int RECORD_MAYBE = 22;
+
   /* also updates firstMatchStart_;*/
-  int nextState(int state, int input, int bufPos)
-  {
-    switch(state) {
-      case CDATA_UNK:
-      case CDATA_OUT:
-        switch(input) {
-          case CDATA_BEGIN:
-            return CDATA_IN;
-          case CDATA_END:
-            if(state==CDATA_OUT) {
-              //System.out.println("buggy XML " + bufPos);
-            }
-            return CDATA_OUT;
-          case RECORD_MAYBE:
-            return (state==CDATA_UNK) ? CDATA_UNK : RECORD_ACCEPT;
+  int nextState(int state, int input, int bufPos) {
+    switch (state) {
+    case CDATA_UNK:
+    case CDATA_OUT:
+      switch (input) {
+      case CDATA_BEGIN:
+        return CDATA_IN;
+      case CDATA_END:
+        if (state == CDATA_OUT) {
+          //System.out.println("buggy XML " + bufPos);
         }
-      break;
-      case CDATA_IN:
-       return (input==CDATA_END) ? CDATA_OUT : CDATA_IN;
+        return CDATA_OUT;
+      case RECORD_MAYBE:
+        return (state == CDATA_UNK) ? CDATA_UNK : RECORD_ACCEPT;
+      }
+    break;
+    case CDATA_IN:
+      return (input == CDATA_END) ? CDATA_OUT : CDATA_IN;
     }
     throw new IllegalStateException(state + " " + input + " " + bufPos + " " + splitName_);
   }
-  
-    
-  Pattern makePatternCDataOrMark(String escapedMark)
-  {
+
+  Pattern makePatternCDataOrMark(String escapedMark) {
     StringBuffer pat = new StringBuffer();
-    addGroup(pat, StreamUtil.regexpEscape("CDATA["));   // CDATA_BEGIN
-    addGroup(pat, StreamUtil.regexpEscape("]]>"));      // CDATA_END
-    addGroup(pat, escapedMark);                         // RECORD_MAYBE
+    addGroup(pat, StreamUtil.regexpEscape("CDATA[")); // CDATA_BEGIN
+    addGroup(pat, StreamUtil.regexpEscape("]]>")); // CDATA_END
+    addGroup(pat, escapedMark); // RECORD_MAYBE
     return Pattern.compile(pat.toString());
   }
-  void addGroup(StringBuffer pat, String escapedGroup)
-  {
-    if(pat.length() > 0) {
-        pat.append("|");
+
+  void addGroup(StringBuffer pat, String escapedGroup) {
+    if (pat.length() > 0) {
+      pat.append("|");
     }
     pat.append("(");
     pat.append(escapedGroup);
     pat.append(")");
   }
-  
-  
-  
-  boolean fastReadUntilMatch(String textPat, 
-          boolean includePat, 
-          DataOutputBuffer outBufOrNull) throws IOException 
-  {
+
+  boolean fastReadUntilMatch(String textPat, boolean includePat, DataOutputBuffer outBufOrNull) throws IOException {
     //System.out.println("@@@BEGIN readUntilMatch inPos=" + in_.getPos());  
     byte[] cpat = textPat.getBytes("UTF-8");
     int m = 0;
     boolean match = false;
     long markPos = -1;
     int msup = cpat.length;
-    if(!includePat) {
+    if (!includePat) {
       int LL = 120000 * 10;
       markPos = in_.getPos();
       in_.mark(LL); // lookAhead_
     }
     while (true) {
       int b = in_.read();
-      if (b == -1)
-        break;
+      if (b == -1) break;
 
-      byte c = (byte)b; // this assumes eight-bit matching. OK with UTF-8
+      byte c = (byte) b; // this assumes eight-bit matching. OK with UTF-8
       if (c == cpat[m]) {
         m++;
-        if(m==msup) {
+        if (m == msup) {
           match = true;
           break;
         }
       } else {
-        if(outBufOrNull != null) {
+        if (outBufOrNull != null) {
           outBufOrNull.write(cpat, 0, m);
           outBufOrNull.write(c);
         }
-        
+
         m = 0;
       }
     }
-    if(!includePat && match) {
+    if (!includePat && match) {
       long pos = in_.getPos() - textPat.length();
       in_.reset();
       in_.seek(pos);
-    } else if(outBufOrNull != null){
+    } else if (outBufOrNull != null) {
       outBufOrNull.write(cpat);
     }
     //System.out.println("@@@DONE  readUntilMatch inPos=" + in_.getPos() + " includePat=" + includePat + " pat=" + textPat + ", buf=|" + outBufOrNull + "|");
     return match;
   }
-  
-  String checkJobGet(String prop) throws IOException
-  {
+
+  String checkJobGet(String prop) throws IOException {
     String val = job_.get(prop);
-    if(val == null) {
-        throw new IOException("JobConf: missing required property: " + prop);
+    if (val == null) {
+      throw new IOException("JobConf: missing required property: " + prop);
     }
     return val;
   }
-  
-  
+
   String beginMark_;
   String endMark_;
-  
+
   Pattern beginPat_;
   Pattern endPat_;
 
-  boolean slowMatch_;  
+  boolean slowMatch_;
   int lookAhead_; // bytes to read to try to synch CDATA/non-CDATA. Should be more than max record size
   int maxRecSize_;
 
-  final static int NA = -1;  
+  final static int NA = -1;
   int firstMatchStart_ = 0; // candidate record boundary. Might just be CDATA.
   int firstMatchEnd_ = 0;
-  
+
   boolean isRecordMatch_;
   boolean synched_;
 }

Added: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java?view=auto&rev=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/TupleInputFormat.java Mon Sep 18 16:08:06 2006
@@ -0,0 +1,86 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+
+/**
+ @author Michel Tourn
+ */
+public class TupleInputFormat extends InputFormatBase {
+
+  public TupleInputFormat() {
+    fmts_ = new ArrayList();
+  }
+
+  public void setPrimary(InputFormat fmt) {
+    if (fmts_.size() == 0) {
+      fmts_.add(fmt);
+    } else {
+      fmts_.set(0, fmt);
+    }
+  }
+
+  public void addSecondary(InputFormat fmt) {
+    if (fmts_.size() == 0) {
+      throw new IllegalStateException("this.setPrimary() has not been called");
+    }
+    fmts_.add(fmt);
+  }
+
+  /**
+   */
+  public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter) throws IOException {
+
+    reporter.setStatus(split.toString());
+
+    return new MultiRecordReader();
+  }
+
+  class MultiRecordReader implements RecordReader {
+
+    MultiRecordReader() {
+    }
+
+    public boolean next(Writable key, Writable value) throws IOException {
+      return false;
+    }
+
+    public long getPos() throws IOException {
+      return 0;
+    }
+
+    public void close() throws IOException {
+    }
+
+    public WritableComparable createKey() {
+      return new UTF8();
+    }
+
+    public Writable createValue() {
+      return new UTF8();
+    }
+
+  }
+
+  ArrayList/*<InputFormat>*/fmts_;
+}

Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java?view=auto&rev=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java Mon Sep 18 16:08:06 2006
@@ -0,0 +1,317 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.LineNumberInputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.DFSShell;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This JUnit test is not pure-Java and is not run as 
+ * part of the standard ant test* targets.   
+ * Two ways to run this:<pre>
+ * 1. main(), a Java application.
+ * 2. cd src/contrib/streaming/ 
+ *    ant \
+ *     [-Dfs.default.name=h:p] \ 
+ *     [-Dhadoop.test.localoutputfile=/tmp/fifo] \ 
+ *     test-unix 
+ * </pre>
+ * @author michel
+ */
+public class TestStreamedMerge extends TestCase {
+
+  public TestStreamedMerge() throws IOException {
+    UtilTest utilTest = new UtilTest(getClass().getName());
+    utilTest.checkUserDir();
+    //  utilTest.redirectIfAntJunit();
+  }
+
+  final static int NAME_PORT = 8200;
+  final static int SOC_PORT = 1888;
+
+  void addInput(String path, String contents) throws IOException {
+    OutputStream out = fs_.create(new Path(path));
+    DataOutputStream dout = new DataOutputStream(out);
+    dout.write(contents.getBytes("UTF-8"));
+    dout.close();
+    System.err.println("addInput done: " + path);
+  }
+
+  String createInputs(boolean tag) throws IOException {
+    fs_.delete(new Path("/input/"));
+
+    // i18n() replaces some ASCII with multibyte UTF-8 chars
+    addInput("/input/part-00", i18n("k1\tv1\n" + "k3\tv5\n"));
+    addInput("/input/part-01", i18n("k1\tv2\n" + "k2\tv4\n"));
+    addInput("/input/part-02", i18n("k1\tv3\n"));
+    addInput("/input/part-03", "");
+    
+    // tags are one-based: ">1" corresponds to part-00, etc.
+    // Expected result it the merge-sort order of the records.
+    // keys are compared as Strings and ties are broken by stream index
+    // For example (k1; stream 2) < (k1; stream 3)
+    String expect = i18n(
+        unt(">1\tk1\tv1\n", tag) + 
+        unt(">2\tk1\tv2\n", tag) + 
+        unt(">3\tk1\tv3\n", tag) + 
+        unt(">2\tk2\tv4\n", tag) +
+        unt(">1\tk3\tv5\n", tag)
+    );
+    return expect;
+  }
+  
+  String unt(String line, boolean keepTag)
+  {
+    return keepTag ? line : line.substring(line.indexOf('\t')+1);
+  }
+  String i18n(String c) {
+    c = c.replace('k', '\u20ac'); // Euro sign, in UTF-8: E282AC
+    c = c.replace('v', '\u00a2'); // Cent sign, in UTF-8: C2A2 ; UTF-16 contains null
+    // "\ud800\udc00" // A surrogate pair, U+10000. OK also works
+    return c;
+  }
+
+  void lsr() {
+    try {
+      System.out.println("lsr /");
+      DFSShell shell = new DFSShell();
+      shell.setConf(conf_);
+      shell.init();
+      shell.ls("/", true);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  void printSampleInput() {
+    try {
+      System.out.println("cat /input/part-00");
+      String content = StreamUtil.slurpHadoop(new Path("/input/part-00"), fs_);
+      System.out.println(content);
+      System.out.println("cat done.");
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  void callStreaming(String argSideOutput, boolean inputTagged) throws IOException {
+    String[] testargs = new String[] {
+        //"-jobconf", "stream.debug=1",
+        "-verbose", 
+        "-jobconf", "stream.testmerge=1", 
+        "-input", "+/input/part-00 | /input/part-01 | /input/part-02", 
+        "-mapper", StreamUtil.localizeBin("/bin/cat"), 
+        "-reducer", "NONE", 
+        "-output", "/my.output",
+        "-mapsideoutput", argSideOutput, 
+        "-dfs", conf_.get("fs.default.name"), 
+        "-jt", "local",
+        "-jobconf", "stream.sideoutput.localfs=true", 
+    };
+    ArrayList argList = new ArrayList();
+    argList.addAll(Arrays.asList(testargs));
+    if (inputTagged) {
+      argList.add("-inputtagged");
+    }
+    testargs = (String[])argList.toArray(new String[0]);
+    String sss = StreamUtil.collate(argList, " ");
+    System.out.println("bin/hadoop jar build/hadoop-streaming.jar " + sss);
+    //HadoopStreaming.main(testargs);
+    StreamJob job = new StreamJob(testargs, false);
+    job.go();
+  }
+
+  SideEffectConsumer startSideEffectConsumer(StringBuffer outBuf) {
+    SideEffectConsumer t = new SideEffectConsumer(outBuf) {
+      ServerSocket listen;
+      Socket client;
+      InputStream in;
+      
+      InputStream connectInputStream() throws IOException {
+        listen = new ServerSocket(SOC_PORT);
+        client = listen.accept();
+        in = client.getInputStream();
+        return in;
+      }
+      
+      void close() throws IOException
+      {
+        listen.close();
+        System.out.println("@@@listen closed");
+      }
+    };
+    t.start();
+    return t;
+  }
+
+  abstract class SideEffectConsumer extends Thread {
+
+    SideEffectConsumer(StringBuffer buf) {
+      buf_ = buf;
+      setDaemon(true);
+    }
+
+    abstract InputStream connectInputStream() throws IOException;
+    
+    abstract void close() throws IOException;
+    
+    public void run() {
+      try {
+        in_ = connectInputStream();
+        while (true) {
+          byte[] b = UTF8ByteArrayUtils.readLine(in_);
+          if (b == null) {
+            break;
+          }
+          buf_.append(new String(b, "UTF-8"));
+          buf_.append('\n');
+        }
+        in_.close();
+      } catch (IOException io) {
+        throw new RuntimeException(io);
+      }
+    }
+    
+    InputStream in_;
+    StringBuffer buf_;
+  }
+
+  public void testMain() throws IOException {
+    boolean success = false;
+    String base = new File(".").getAbsolutePath();
+    System.setProperty("hadoop.log.dir", base + "/logs");
+    conf_ = new Configuration();
+    String overrideFS = StreamUtil.getBoundAntProperty("fs.default.name", null);
+    MiniDFSCluster cluster = null;
+    try {
+      if(overrideFS == null) {
+        cluster = new MiniDFSCluster(NAME_PORT, conf_, false);
+        fs_ = cluster.getFileSystem();
+      } else {
+        System.out.println("overrideFS: " + overrideFS);
+        conf_.set("fs.default.name", overrideFS);
+        fs_ = FileSystem.get(conf_);
+      }
+      doAllTestJobs();
+      success = true;
+    } catch (IOException io) {
+      io.printStackTrace();
+    } finally {
+      try {
+        fs_.close();
+      } catch (IOException io) {
+      }
+      if (cluster != null) {
+        cluster.shutdown();
+        System.out.println("cluster.shutdown(); DONE");
+      }
+      System.out.println(getClass().getName() + ": success=" + success);
+    }
+  }
+
+  void doAllTestJobs() throws IOException
+  {
+    goSocketTagged(true, false);
+    goSocketTagged(false, false);
+    goSocketTagged(true, true);
+  }
+  
+  void goSocketTagged(boolean socket, boolean inputTagged) throws IOException {
+    System.out.println("***** goSocketTagged: " + socket + ", " + inputTagged);
+    String expect = createInputs(inputTagged);
+    lsr();
+    printSampleInput();
+
+    StringBuffer outputBuf = new StringBuffer();
+    String sideOutput = null;
+    File f = null;
+    SideEffectConsumer consumer = null;
+    if (socket) {
+      consumer = startSideEffectConsumer(outputBuf);
+      sideOutput = "socket://localhost:" + SOC_PORT + "/";
+    } else {
+      String userOut = StreamUtil.getBoundAntProperty(
+          "hadoop.test.localoutputfile", null);
+      if(userOut != null) {
+        f = new File(userOut);
+        // don't delete so they can mkfifo
+        maybeFifoOutput_ = true;
+      } else {
+        f = new File("localoutputfile");
+        f.delete();
+        maybeFifoOutput_ = false;
+      }
+      String s = new Path(f.getAbsolutePath()).toString();
+      if(! s.startsWith("/")) {
+        s = "/" + s; // Windows "file:/C:/"
+      }
+      sideOutput = "file:" + s;
+    }
+    System.out.println("sideOutput=" + sideOutput);
+    callStreaming(sideOutput, inputTagged);
+    String output;
+    if (socket) {
+      try {
+        consumer.join();
+        consumer.close();
+      } catch (InterruptedException e) {
+        throw (IOException) new IOException().initCause(e);
+      }
+      output = outputBuf.toString();
+    } else {
+      if(maybeFifoOutput_) {
+        System.out.println("assertEquals will fail.");
+        output = "potential FIFO: not retrieving to avoid blocking on open() "
+          + f.getAbsoluteFile();
+      } else {
+        output = StreamUtil.slurp(f.getAbsoluteFile());
+      }
+    }
+
+    lsr();
+    
+    System.out.println("output=|" + output + "|");
+    System.out.println("expect=|" + expect + "|");
+    assertEquals(expect, output);
+  }
+
+  Configuration conf_;
+  FileSystem fs_;
+  boolean maybeFifoOutput_;
+
+  public static void main(String[] args) throws Throwable {
+    TestStreamedMerge test = new TestStreamedMerge();
+    test.testMain();
+  }
+  
+}

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Mon Sep 18 16:08:06 2006
@@ -35,74 +35,29 @@
   String OUTPUT_DIR = "out";
   String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";
   // map behaves like "/usr/bin/tr . \\n"; (split words into lines)
-  String map = makeJavaCommand(TrApp.class, new String[]{".", "\\n"});
+  String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "\\n"});
   // combine, reduce behave like /usr/bin/uniq. But also prepend lines with C, R.
-  String combine  = makeJavaCommand(UniqApp.class, new String[]{"C"});
-  String reduce = makeJavaCommand(UniqApp.class, new String[]{"R"});
+  String combine  = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"C"});
+  String reduce = StreamUtil.makeJavaCommand(UniqApp.class, new String[]{"R"});
   String outputExpect = "RCare\t\nRCblue\t\nRCbunnies\t\nRCpink\t\nRCred\t\nRCroses\t\nRCviolets\t\n";
 
   StreamJob job;
 
   public TestStreaming() throws IOException
   {
-    // trunk/src/contrib/streaming --> trunk/build/contrib/streaming/test/data
-    String userDir = System.getProperty("user.dir");
-    String antTestDir = System.getProperty("test.build.data", userDir);
-    if(! userDir.equals(antTestDir)) {
-        // because changes to user.dir are ignored by File
-        throw new IllegalStateException("user.dir != test.build.data. The junit Ant task must be forked.");
-    }
-
-    boolean fromAntJunit = System.getProperty("test.build.data") != null;
-    if(fromAntJunit) {
-      new File(antTestDir).mkdirs();
-      File outFile = new File(antTestDir, getClass().getName()+".log");
-      PrintStream out = new PrintStream(new FileOutputStream(outFile));
-      System.setOut(out);
-      System.setErr(out);
-    }
-    System.out.println("test.build.data=" + antTestDir);
+    UtilTest utilTest = new UtilTest(getClass().getName());
+    utilTest.checkUserDir();
+    utilTest.redirectIfAntJunit();
   }
 
   void createInput() throws IOException
   {
     String path = new File(".", INPUT_FILE).getAbsolutePath();// needed from junit forked vm
     DataOutputStream out = new DataOutputStream(new FileOutputStream(path));
-    out.writeBytes(input);
+    out.write(input.getBytes("UTF-8"));
     out.close();
   }
 
-  public String makeJavaCommand(Class main, String[] argv)
-  {
-    ArrayList vargs = new ArrayList();
-    File javaHomeBin = new File(System.getProperty("java.home"), "bin");
-    File jvm = new File(javaHomeBin, "java");
-    vargs.add(jvm.toString());
-    // copy parent classpath
-    vargs.add("-classpath");
-    vargs.add("\"" + System.getProperty("java.class.path") + "\"");
-
-    // Add main class and its arguments
-    vargs.add(main.getName());
-    for(int i=0; i<argv.length; i++) {
-      vargs.add(argv[i]);
-    }
-    return collate(vargs, " ");
-  }
-
-  String collate(ArrayList args, String sep)
-  {
-    StringBuffer buf = new StringBuffer();
-    Iterator it = args.iterator();
-    while(it.hasNext()) {
-      if(buf.length() > 0) {
-        buf.append(" ");
-      }
-      buf.append(it.next());
-    }
-    return buf.toString();
-  }
-
   public void testCommandLine()
   {
     try {
@@ -117,22 +72,21 @@
           "-mapper", map,
           "-combiner", combine,
           "-reducer", reduce,
-          /*"-debug",*/
-          "-verbose"
+          //"-verbose",
+          //"-jobconf", "stream.debug=set"
+          "-jobconf", "keep.failed.task.files=true",
       };
-      job = new StreamJob(argv, mayExit);
+      job = new StreamJob(argv, mayExit);      
       job.go();
       File outFile = new File(".", OUTPUT_DIR + "/part-00000").getAbsoluteFile();
       String output = StreamUtil.slurp(outFile);
-      System.out.println("outEx=" + outputExpect);
-      System.out.println("  out=" + output);
+      System.err.println("outEx1=" + outputExpect);
+      System.err.println("  out1=" + output);
       assertEquals(outputExpect, output);
 
     } catch(Exception e) {
-        failTrace(e);
+      failTrace(e);
     }
-
-
   }
 
   void failTrace(Exception e)
@@ -141,8 +95,6 @@
     e.printStackTrace(new PrintWriter(sw));
     fail(sw.toString());
   }
-
-
 
   public static void main(String[]args) throws Exception
   {

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java Mon Sep 18 16:08:06 2006
@@ -17,12 +17,13 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
+
 import org.apache.hadoop.streaming.Environment;
 
 /** A minimal Java implementation of /usr/bin/tr.
     Used to test the usage of external applications without adding
     platform-specific dependencies.
-*/
+ */
 public class TrApp
 {
 
@@ -35,16 +36,6 @@
   void testParentJobConfToEnvVars() throws IOException
   {
     env = new Environment();
-
-    /* To get some ideas of stable env.vars:
-    Enumeration it = env.keys();
-     while(it.hasMoreElements()) {
-        String key = (String)it.nextElement();
-        String val = (String)env.get(key);
-        System.out.println("@@@" + key + "=" + val);
-     }
-     */
-
     // test that some JobConf properties are exposed as expected     
     // Note the dots translated to underscore: 
     // property names have been escaped in PipeMapRed.safeEnvVarName()
@@ -52,7 +43,8 @@
     expect("mapred_job_tracker", "local");
     expect("mapred_input_key_class", "org.apache.hadoop.io.Text");
     expect("mapred_input_value_class", "org.apache.hadoop.io.Text");
-    expect("mapred_local_dir", "build/test/mapred/local");
+    //expect("mapred_local_dir", "build/test/mapred/local");
+    expectDefined("mapred_local_dir");
     expect("mapred_output_format_class", "org.apache.hadoop.streaming.StreamOutputFormat");
     expect("mapred_output_key_class", "org.apache.hadoop.io.Text");
     expect("mapred_output_value_class", "org.apache.hadoop.io.Text");
@@ -64,12 +56,12 @@
     expectDefined("map_input_file");
     expect("map_input_start", "0");
     expectDefined("map_input_length");
-    
+
     expectDefined("io_sort_factor");
 
     // the FileSplit context properties are not available in local hadoop..
     // so can't check them in this test.
-    
+
   }
 
   // this runs in a subprocess; won't use JUnit's assertTrue()    
@@ -81,7 +73,7 @@
       throw new IOException(msg);
     }
   }
-  
+
   void expectDefined(String evName) throws IOException
   {
     String got = env.getProperty(evName);
@@ -90,7 +82,7 @@
       throw new IOException(msg);
     }
   }
-  
+
   public void go() throws IOException
   {
     testParentJobConfToEnvVars();
@@ -98,8 +90,8 @@
     String line;
 
     while ((line = in.readLine()) != null) {
-        String out = line.replace(find, replace);
-        System.out.println(out);
+      String out = line.replace(find, replace);
+      System.out.println(out);
     }
   }
 

Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UniqApp.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UniqApp.java?view=diff&rev=447626&r1=447625&r2=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UniqApp.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UniqApp.java Mon Sep 18 16:08:06 2006
@@ -17,12 +17,13 @@
 package org.apache.hadoop.streaming;
 
 import java.io.*;
+import java.util.Date;
 
 /** A minimal Java implementation of /usr/bin/uniq
     Used to test the usage of external applications without adding
     platform-specific dependencies.
     Uniques lines and prepends a header on the line.
-*/
+ */
 public class UniqApp
 {
 
@@ -36,10 +37,10 @@
     String line;
     String prevLine = null;
     while ((line = in.readLine()) != null) {
-        if(! line.equals(prevLine)) {
-          System.out.println(header + line);
-        }
-        prevLine = line;
+      if(! line.equals(prevLine)) {
+        System.out.println(header + line);
+      }
+      prevLine = line;
     }
   }
 

Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java?view=auto&rev=447626
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java (added)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java Mon Sep 18 16:08:06 2006
@@ -0,0 +1,56 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+class UtilTest {
+
+  public UtilTest(String testName) {
+    testName_ = testName;
+    userDir_ = System.getProperty("user.dir");
+    antTestDir_ = System.getProperty("test.build.data", userDir_);
+    System.out.println("test.build.data-or-user.dir=" + antTestDir_);
+  }
+
+  void checkUserDir() {
+    // trunk/src/contrib/streaming --> trunk/build/contrib/streaming/test/data
+    if(! userDir_.equals(antTestDir_)) {
+      // because changes to user.dir are ignored by File static methods.
+      throw new IllegalStateException("user.dir != test.build.data. The junit Ant task must be forked.");
+    }
+  }
+
+  void redirectIfAntJunit() throws IOException
+  {
+    boolean fromAntJunit = System.getProperty("test.build.data") != null;
+    if(fromAntJunit) {
+      new File(antTestDir_).mkdirs();
+      File outFile = new File(antTestDir_, testName_+".log");
+      PrintStream out = new PrintStream(new FileOutputStream(outFile));
+      System.setOut(out);
+      System.setErr(out);
+    }
+  }
+
+  private String userDir_;
+  private String antTestDir_;
+  private String testName_;
+}