You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by da...@apache.org on 2013/12/05 16:54:28 UTC

svn commit: r1548178 - in /hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp: ./ src/main/java/org/apache/hadoop/tools/ src/main/java/org/apache/hadoop/tools/mapred/ src/test/java/org/apache/hadoop/tools/ src/test/java/org/apache/hadoop/too...

Author: daryn
Date: Thu Dec  5 15:54:27 2013
New Revision: 1548178

URL: http://svn.apache.org/r1548178
Log:
HADOOP-10129. Distcp may succeed when it fails (daryn)

Added:
    hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/pom.xml
    hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
    hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
    hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java

Modified: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/pom.xml?rev=1548178&r1=1548177&r2=1548178&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/pom.xml (original)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/pom.xml Thu Dec  5 15:54:27 2013
@@ -90,6 +90,11 @@
       <scope>test</scope>
       <type>test-jar</type>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

Modified: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java?rev=1548178&r1=1548177&r2=1548178&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java Thu Dec  5 15:54:27 2013
@@ -32,6 +32,8 @@ import org.apache.hadoop.tools.util.Dist
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.security.Credentials;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.*;
 import java.util.Stack;
 
@@ -107,12 +109,13 @@ public class SimpleCopyListing extends C
   /** {@inheritDoc} */
   @Override
   public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws IOException {
-
-    SequenceFile.Writer fileListWriter = null;
-
+    doBuildListing(getWriter(pathToListingFile), options);
+  }
+  
+  @VisibleForTesting
+  public void doBuildListing(SequenceFile.Writer fileListWriter,
+      DistCpOptions options) throws IOException {
     try {
-      fileListWriter = getWriter(pathToListingFile);
-
       for (Path path: options.getSourcePaths()) {
         FileSystem sourceFS = path.getFileSystem(getConf());
         path = makeQualified(path);
@@ -140,8 +143,10 @@ public class SimpleCopyListing extends C
           writeToFileListing(fileListWriter, rootStatus, sourcePathRoot, localFile);
         }
       }
+      fileListWriter.close();
+      fileListWriter = null;
     } finally {
-      IOUtils.closeStream(fileListWriter);
+      IOUtils.cleanup(LOG, fileListWriter);
     }
   }
 

Modified: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java?rev=1548178&r1=1548177&r2=1548178&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java Thu Dec  5 15:54:27 2013
@@ -30,6 +30,8 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.*;
 import java.util.EnumSet;
 
@@ -176,7 +178,8 @@ public class RetriableFileCopyCommand ex
     return new Path(root, ".distcp.tmp." + context.getTaskAttemptID().toString());
   }
 
-  private long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
+  @VisibleForTesting
+  long copyBytes(FileStatus sourceFileStatus, OutputStream outStream,
                          int bufferSize, Mapper.Context context)
       throws IOException {
     Path source = sourceFileStatus.getPath();
@@ -193,6 +196,8 @@ public class RetriableFileCopyCommand ex
         updateContextStatus(totalBytesRead, context, sourceFileStatus);
         bytesRead = inStream.read(buf);
       }
+      outStream.close();
+      outStream = null;
     } finally {
       IOUtils.cleanup(LOG, outStream, inStream);
     }

Modified: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java?rev=1548178&r1=1548177&r2=1548178&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java Thu Dec  5 15:54:27 2013
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.tools;
 
+import static org.mockito.Mockito.*;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -35,6 +37,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.AfterClass;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.List;
@@ -249,4 +252,29 @@ public class TestCopyListing extends Sim
       IOUtils.closeStream(reader);
     }
   }
+  
+  @Test
+  public void testFailOnCloseError() throws IOException {
+    File inFile = File.createTempFile("TestCopyListingIn", null);
+    inFile.deleteOnExit();
+    File outFile = File.createTempFile("TestCopyListingOut", null);
+    outFile.deleteOnExit();
+    List<Path> srcs = new ArrayList<Path>();
+    srcs.add(new Path(inFile.toURI()));
+    
+    Exception expectedEx = new IOException("boom");
+    SequenceFile.Writer writer = mock(SequenceFile.Writer.class);
+    doThrow(expectedEx).when(writer).close();
+    
+    SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS);
+    DistCpOptions options = new DistCpOptions(srcs, new Path(outFile.toURI()));
+    Exception actualEx = null;
+    try {
+      listing.doBuildListing(writer, options);
+    } catch (Exception e) {
+      actualEx = e;
+    }
+    Assert.assertNotNull("close writer didn't fail", actualEx);
+    Assert.assertEquals(expectedEx, actualEx);
+  }
 }

Added: hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java?rev=1548178&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestRetriableFileCopyCommand.java Thu Dec  5 15:54:27 2013
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.*;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class TestRetriableFileCopyCommand {
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testFailOnCloseError() throws Exception {
+    Mapper.Context context = mock(Mapper.Context.class);
+    doReturn(new Configuration()).when(context).getConfiguration();
+
+    Exception expectedEx = new IOException("boom");
+    OutputStream out = mock(OutputStream.class);
+    doThrow(expectedEx).when(out).close();
+
+    File f = File.createTempFile(this.getClass().getSimpleName(), null);
+    f.deleteOnExit();
+    FileStatus stat =
+        new FileStatus(1L, false, 1, 1024, 0, new Path(f.toURI()));
+    
+    Exception actualEx = null;
+    try {
+      new RetriableFileCopyCommand("testFailOnCloseError")
+        .copyBytes(stat, out, 512, context);
+    } catch (Exception e) {
+      actualEx = e;
+    }
+    assertNotNull("close didn't fail", actualEx);
+    assertEquals(expectedEx, actualEx);
+  }  
+}