You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by am...@apache.org on 2010/07/05 12:40:35 UTC

svn commit: r960534 - in /hadoop/mapreduce/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/

Author: amareshwari
Date: Mon Jul  5 10:40:35 2010
New Revision: 960534

URL: http://svn.apache.org/viewvc?rev=960534&view=rev
Log:
MAPREDUCE-577. Fixes duplicate records in StreamXmlRecordReader. Contributed by Ravi Gummadi

Added:
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlMultipleRecords.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
    hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=960534&r1=960533&r2=960534&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Jul  5 10:40:35 2010
@@ -140,6 +140,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1888. Fixes Streaming to override output key and value types, 
     only if mapper/reducer is a command. (Ravi Gummadi via amareshwari)
 
+    MAPREDUCE-577. Fixes duplicate records in StreamXmlRecordReader.
+    (Ravi Gummadi via amareshwari)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java?rev=960534&r1=960533&r2=960534&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java Mon Jul  5 10:40:35 2010
@@ -88,7 +88,7 @@ public class StreamXmlRecordReader exten
     if (!readUntilMatchBegin()) {
       return false;
     }
-    if (!readUntilMatchEnd(buf)) {
+    if (pos_ >= end_ || !readUntilMatchEnd(buf)) {
       return false;
     }
 
@@ -258,8 +258,8 @@ public class StreamXmlRecordReader exten
         if (outBufOrNull != null) {
           outBufOrNull.write(cpat, 0, m);
           outBufOrNull.write(c);
-          pos_ += m;
         }
+        pos_ += m + 1; // skip m chars, +1 for 'c'
         m = 0;
       }
     }

Added: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlMultipleRecords.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlMultipleRecords.java?rev=960534&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlMultipleRecords.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamXmlMultipleRecords.java Mon Jul  5 10:40:35 2010
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests if StreamXmlRecordReader will read the next record, _after_ the
+ * end of a split if the split falls before the end of end-tag of a record.
+ * Also tests if StreamXmlRecordReader will read a record twice if end of a
+ * split is after few characters after the end-tag of a record but before the
+ * begin-tag of next record.
+ */
+public class TestStreamXmlMultipleRecords extends TestStreaming
+{
+  private static final Log LOG = LogFactory.getLog(
+      TestStreamXmlMultipleRecords.class);
+
+  private boolean hasPerl = false;
+  private long blockSize;
+  private String isSlowMatch;
+
+  // Our own configuration used for creating FileSystem object where
+  // fs.local.block.size is set to 60 OR 80.
+  // See 60th char in input. It is before the end of end-tag of a record.
+  // See 80th char in input. It is in between the end-tag of a record and
+  // the begin-tag of next record.
+  private Configuration conf = null;
+
+  private String myPerlMapper =
+      "perl -n -a -e 'print join(\"\\n\", map { \"$_\\t1\" } @F), \"\\n\";'";
+  private String myPerlReducer =
+      "perl -n -a -e '$freq{$F[0]}++; END { print \"is\\t$freq{is}\\n\"; }'";
+
+  public TestStreamXmlMultipleRecords() throws IOException {
+    super();
+
+    input = "<line>This is a single line,\nand it is containing multiple" +
+        " words.</line>                     <line>Only is appears more than" +
+        " once.</line>\n";
+    outputExpect = "is\t3\n";
+
+    map = myPerlMapper;
+    reduce = myPerlReducer;
+
+    hasPerl = UtilTest.hasPerlSupport();
+  }
+
+  @Override
+  @Before
+  public void setUp() throws IOException {
+    super.setUp();
+    // Without this closeAll() call, setting of FileSystem block size is
+    // not effective and will be old block size set in earlier test.
+    FileSystem.closeAll();
+  }
+
+  // Set file system block size such that split falls
+  // (a) before the end of end-tag of a record (testStreamXmlMultiInner...) OR
+  // (b) between records(testStreamXmlMultiOuter...)
+  @Override
+  protected Configuration getConf() {
+    conf = new Configuration();
+    conf.setLong("fs.local.block.size", blockSize);
+    return conf;
+  }
+
+  @Override
+  protected String[] genArgs() {
+    args.add("-inputreader");
+    args.add("StreamXmlRecordReader,begin=<line>,end=</line>,slowmatch=" +
+        isSlowMatch);
+    return super.genArgs();
+  }
+
+  /**
+   * Tests if StreamXmlRecordReader will read the next record, _after_ the
+   * end of a split if the split falls before the end of end-tag of a record.
+   * Tests with slowmatch=false.
+   * @throws Exception
+   */
+  @Test
+  public void testStreamXmlMultiInnerFast() throws Exception {
+    if (hasPerl) {
+      blockSize = 60;
+
+      isSlowMatch = "false";
+      super.testCommandLine();
+    }
+    else {
+      LOG.warn("No perl; skipping test.");
+    }
+  }
+
+  /**
+   * Tests if StreamXmlRecordReader will read a record twice if end of a
+   * split is after few characters after the end-tag of a record but before the
+   * begin-tag of next record.
+   * Tests with slowmatch=false.
+   * @throws Exception
+   */
+  @Test
+  public void testStreamXmlMultiOuterFast() throws Exception {
+    if (hasPerl) {
+      blockSize = 80;
+
+      isSlowMatch = "false";
+      super.testCommandLine();
+    }
+    else {
+      LOG.warn("No perl; skipping test.");
+    }
+  }
+
+  /**
+   * Tests if StreamXmlRecordReader will read the next record, _after_ the
+   * end of a split if the split falls before the end of end-tag of a record.
+   * Tests with slowmatch=true.
+   * @throws Exception
+   */
+  @Test
+  public void testStreamXmlMultiInnerSlow() throws Exception {
+    if (hasPerl) {
+      blockSize = 60;
+
+      isSlowMatch = "true";
+      super.testCommandLine();
+    }
+    else {
+      LOG.warn("No perl; skipping test.");
+    }
+  }
+
+  /**
+   * Tests if StreamXmlRecordReader will read a record twice if end of a
+   * split is after few characters after the end-tag of a record but before the
+   * begin-tag of next record.
+   * Tests with slowmatch=true.
+   * @throws Exception
+   */
+  @Test
+  public void testStreamXmlMultiOuterSlow() throws Exception {
+    if (hasPerl) {
+      blockSize = 80;
+
+      isSlowMatch = "true";
+      super.testCommandLine();
+    }
+    else {
+      LOG.warn("No perl; skipping test.");
+    }
+  }
+
+  @Override
+  @Test
+  public void testCommandLine() {
+    // Do nothing
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?rev=960534&r1=960533&r2=960534&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Mon Jul  5 10:40:35 2010
@@ -69,6 +69,7 @@ public class TestStreaming
   public void setUp() throws IOException {
     UtilTest.recursiveDelete(TEST_DIR);
     assertTrue("Creating " + TEST_DIR, TEST_DIR.mkdirs());
+    args.clear();
   }
 
   @After

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java?rev=960534&r1=960533&r2=960534&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/UtilTest.java Mon Jul  5 10:40:35 2010
@@ -23,8 +23,14 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
 class UtilTest {
 
+  private static final Log LOG = LogFactory.getLog(UtilTest.class);
+
   /**
    * Utility routine to recurisvely delete a directory.
    * On normal return, the file does not exist.
@@ -76,6 +82,28 @@ class UtilTest {
     }
   }
 
+  /**
+   * Is perl supported on this machine ?
+   * @return true if perl is available and is working as expected
+   */
+  public static boolean hasPerlSupport() {
+    boolean hasPerl = false;
+    ShellCommandExecutor shexec = new ShellCommandExecutor(
+      new String[] { "perl", "-e", "print 42" });
+    try {
+      shexec.execute();
+      if (shexec.getOutput().equals("42")) {
+        hasPerl = true;
+      }
+      else {
+        LOG.warn("Perl is installed, but isn't behaving as expected.");
+      }
+    } catch (Exception e) {
+      LOG.warn("Could not run perl: " + e);
+    }
+    return hasPerl;
+  }
+
   private String userDir_;
   private String antTestDir_;
   private String testName_;