You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/07/26 01:37:25 UTC

svn commit: r1365839 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java

Author: szetszwo
Date: Wed Jul 25 23:37:25 2012
New Revision: 1365839

URL: http://svn.apache.org/viewvc?rev=1365839&view=rev
Log:
HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations to get around a Java library bug causing OutOfMemoryError.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1365839&r1=1365838&r2=1365839&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Jul 25 23:37:25 2012
@@ -1403,6 +1403,9 @@ Release 0.23.3 - UNRELEASED
     HDFS-3646. LeaseRenewer can hold reference to inactive DFSClient
     instances forever. (Kihwal Lee via daryn)
 
+    HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations
+    to get around a Java library bug causing OutOfMemoryError.  (szetszwo)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1365839&r1=1365838&r2=1365839&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Wed Jul 25 23:37:25 2012
@@ -423,6 +423,7 @@ public class WebHdfsFileSystem extends F
     //Step 2) Submit another Http request with the URL from the Location header with data.
     conn = (HttpURLConnection)new URL(redirect).openConnection();
     conn.setRequestMethod(op.getType().toString());
+    conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
     return conn;
   }
 
@@ -835,8 +836,7 @@ public class WebHdfsFileSystem extends F
     }
 
     private static WebHdfsFileSystem getWebHdfs(
-        final Token<?> token, final Configuration conf
-        ) throws IOException, InterruptedException, URISyntaxException {
+        final Token<?> token, final Configuration conf) throws IOException {
       
       final InetSocketAddress nnAddr = SecurityUtil.getTokenServiceAddr(token);
       final URI uri = DFSUtil.createUri(WebHdfsFileSystem.SCHEME, nnAddr);
@@ -850,12 +850,7 @@ public class WebHdfsFileSystem extends F
       // update the kerberos credentials, if they are coming from a keytab
       ugi.reloginFromKeytab();
 
-      try {
-        WebHdfsFileSystem webhdfs = getWebHdfs(token, conf);
-        return webhdfs.renewDelegationToken(token);
-      } catch (URISyntaxException e) {
-        throw new IOException(e);
-      }
+      return getWebHdfs(token, conf).renewDelegationToken(token);
     }
   
     @Override
@@ -865,12 +860,7 @@ public class WebHdfsFileSystem extends F
       // update the kerberos credentials, if they are coming from a keytab
       ugi.checkTGTAndReloginFromKeytab();
 
-      try {
-        final WebHdfsFileSystem webhdfs = getWebHdfs(token, conf);
-        webhdfs.cancelDelegationToken(token);
-      } catch (URISyntaxException e) {
-        throw new IOException(e);
-      }
+      getWebHdfs(token, conf).cancelDelegationToken(token);
     }
   }
   

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java?rev=1365839&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java Wed Jul 25 23:37:25 2012
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test WebHDFS */
+public class TestWebHDFS {
+  static final Log LOG = LogFactory.getLog(TestWebHDFS.class);
+  
+  static final Random RANDOM = new Random();
+  
+  static final long systemStartTime = System.nanoTime();
+
+  /** A timer for measuring performance. */
+  static class Ticker {
+    final String name;
+    final long startTime = System.nanoTime();
+    private long previousTick = startTime;
+
+    Ticker(final String name, String format, Object... args) {
+      this.name = name;
+      LOG.info(String.format("\n\n%s START: %s\n",
+          name, String.format(format, args)));
+    }
+
+    void tick(final long nBytes, String format, Object... args) {
+      final long now = System.nanoTime();
+      if (now - previousTick > 10000000000L) {
+        previousTick = now;
+        final double mintues = (now - systemStartTime)/60000000000.0;
+        LOG.info(String.format("\n\n%s %.2f min) %s %s\n", name, mintues,
+            String.format(format, args), toMpsString(nBytes, now)));
+      }
+    }
+    
+    void end(final long nBytes) {
+      final long now = System.nanoTime();
+      final double seconds = (now - startTime)/1000000000.0;
+      LOG.info(String.format("\n\n%s END: duration=%.2fs %s\n",
+          name, seconds, toMpsString(nBytes, now)));
+    }
+    
+    String toMpsString(final long nBytes, final long now) {
+      final double mb = nBytes/(double)(1<<20);
+      final double mps = mb*1000000000.0/(now - startTime);
+      return String.format("[nBytes=%.2fMB, speed=%.2fMB/s]", mb, mps);
+    }
+  }
+
+  @Test
+  public void testLargeFile() throws Exception {
+    largeFileTest(200L << 20); //200MB file length
+  }
+
+  /** Test read and write large files. */
+  static void largeFileTest(final long fileLength) throws Exception {
+    final Configuration conf = WebHdfsTestUtil.createConf();
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(3)
+        .build();
+    try {
+      cluster.waitActive();
+
+      final FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf);
+      final Path dir = new Path("/test/largeFile");
+      Assert.assertTrue(fs.mkdirs(dir));
+
+      final byte[] data = new byte[1 << 20];
+      RANDOM.nextBytes(data);
+
+      final byte[] expected = new byte[2 * data.length];
+      System.arraycopy(data, 0, expected, 0, data.length);
+      System.arraycopy(data, 0, expected, data.length, data.length);
+
+      final Path p = new Path(dir, "file");
+      final Ticker t = new Ticker("WRITE", "fileLength=" + fileLength);
+      final FSDataOutputStream out = fs.create(p);
+      try {
+        long remaining = fileLength;
+        for(; remaining > 0;) {
+          t.tick(fileLength - remaining, "remaining=%d", remaining);
+          
+          final int n = (int)Math.min(remaining, data.length);
+          out.write(data, 0, n);
+          remaining -= n;
+        }
+      } finally {
+        out.close();
+      }
+      t.end(fileLength);
+  
+      Assert.assertEquals(fileLength, fs.getFileStatus(p).getLen());
+
+      final long smallOffset = RANDOM.nextInt(1 << 20) + (1 << 20);
+      final long largeOffset = fileLength - smallOffset;
+      final byte[] buf = new byte[data.length];
+
+      verifySeek(fs, p, largeOffset, fileLength, buf, expected);
+      verifySeek(fs, p, smallOffset, fileLength, buf, expected);
+  
+      verifyPread(fs, p, largeOffset, fileLength, buf, expected);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  static void checkData(long offset, long remaining, int n,
+      byte[] actual, byte[] expected) {
+    if (RANDOM.nextInt(100) == 0) {
+      int j = (int)(offset % actual.length);
+      for(int i = 0; i < n; i++) {
+        if (expected[j] != actual[i]) {
+          Assert.fail("expected[" + j + "]=" + expected[j]
+              + " != actual[" + i + "]=" + actual[i]
+              + ", offset=" + offset + ", remaining=" + remaining + ", n=" + n);
+        }
+        j++;
+      }
+    }
+  }
+
+  /** test seek */
+  static void verifySeek(FileSystem fs, Path p, long offset, long length,
+      byte[] buf, byte[] expected) throws IOException { 
+    long remaining = length - offset;
+    long checked = 0;
+    LOG.info("XXX SEEK: offset=" + offset + ", remaining=" + remaining);
+
+    final Ticker t = new Ticker("SEEK", "offset=%d, remaining=%d",
+        offset, remaining);
+    final FSDataInputStream in = fs.open(p, 64 << 10);
+    in.seek(offset);
+    for(; remaining > 0; ) {
+      t.tick(checked, "offset=%d, remaining=%d", offset, remaining);
+      final int n = (int)Math.min(remaining, buf.length);
+      in.readFully(buf, 0, n);
+      checkData(offset, remaining, n, buf, expected);
+
+      offset += n;
+      remaining -= n;
+      checked += n;
+    }
+    in.close();
+    t.end(checked);
+  }
+
+  static void verifyPread(FileSystem fs, Path p, long offset, long length,
+      byte[] buf, byte[] expected) throws IOException {
+    long remaining = length - offset;
+    long checked = 0;
+    LOG.info("XXX PREAD: offset=" + offset + ", remaining=" + remaining);
+
+    final Ticker t = new Ticker("PREAD", "offset=%d, remaining=%d",
+        offset, remaining);
+    final FSDataInputStream in = fs.open(p, 64 << 10);
+    for(; remaining > 0; ) {
+      t.tick(checked, "offset=%d, remaining=%d", offset, remaining);
+      final int n = (int)Math.min(remaining, buf.length);
+      in.readFully(offset, buf, 0, n);
+      checkData(offset, remaining, n, buf, expected);
+
+      offset += n;
+      remaining -= n;
+      checked += n;
+    }
+    in.close();
+    t.end(checked);
+  }
+}