You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/02/24 21:02:55 UTC

[13/50] [abbrv] hadoop git commit: HADOOP-10865. Add a Crc32 chunked verification benchmark for both directly and non-directly buffer cases.

HADOOP-10865. Add a Crc32 chunked verification benchmark for both directly and non-directly buffer cases.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bbfaf3c2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bbfaf3c2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bbfaf3c2

Branch: refs/heads/HDFS-7240
Commit: bbfaf3c2712c9ba82b0f8423bdeb314bf505a692
Parents: ba6d5ed
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Thu Feb 18 11:47:33 2016 -0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Thu Feb 18 11:47:33 2016 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |   3 +
 .../org/apache/hadoop/util/DataChecksum.java    | 147 ++++----
 .../hadoop/util/Crc32PerformanceTest.java       | 350 +++++++++++++++++++
 .../apache/hadoop/util/TestDataChecksum.java    |   6 +
 .../hadoop/hdfs/TestDFSClientRetries.java       |   8 +-
 5 files changed, 450 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbfaf3c2/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 41ba87d..6cdcb7a 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -1175,6 +1175,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12758. Extend CSRF Filter with UserAgent Checks
     (Larry McCay via cnauroth)
 
+    HADOOP-10865. Add a Crc32 chunked verification benchmark for both directly
+    and non-directly buffer cases.  (szetszwo)
+
   BUG FIXES
 
     HADOOP-12352. Delay in checkpointing Trash can leave trash for 2 intervals

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbfaf3c2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
index d9dc7af..faac587 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
@@ -286,94 +286,119 @@ public class DataChecksum implements Checksum {
    * @throws ChecksumException if the checksums do not match
    */
   public void verifyChunkedSums(ByteBuffer data, ByteBuffer checksums,
-      String fileName, long basePos)
-  throws ChecksumException {
+      String fileName, long basePos) throws ChecksumException {
     if (type.size == 0) return;
     
     if (data.hasArray() && checksums.hasArray()) {
-      verifyChunkedSums(
-          data.array(), data.arrayOffset() + data.position(), data.remaining(),
-          checksums.array(), checksums.arrayOffset() + checksums.position(),
-          fileName, basePos);
+      final int dataOffset = data.arrayOffset() + data.position();
+      final int crcsOffset = checksums.arrayOffset() + checksums.position();
+      verifyChunked(type, summer, data.array(), dataOffset, data.remaining(),
+          bytesPerChecksum, checksums.array(), crcsOffset, fileName, basePos);
       return;
     }
     if (NativeCrc32.isAvailable()) {
       NativeCrc32.verifyChunkedSums(bytesPerChecksum, type.id, checksums, data,
           fileName, basePos);
-      return;
+    } else {
+      verifyChunked(type, summer, data, bytesPerChecksum, checksums, fileName,
+          basePos);
     }
-    
-    int startDataPos = data.position();
+  }
+
+  static void verifyChunked(final Type type, final Checksum algorithm,
+      final ByteBuffer data, final int bytesPerCrc, final ByteBuffer crcs,
+      final String filename, final long basePos) throws ChecksumException {
+    final byte[] bytes = new byte[bytesPerCrc];
+    final int dataOffset = data.position();
+    final int dataLength = data.remaining();
     data.mark();
-    checksums.mark();
+    crcs.mark();
+
     try {
-      byte[] buf = new byte[bytesPerChecksum];
-      byte[] sum = new byte[type.size];
-      while (data.remaining() > 0) {
-        int n = Math.min(data.remaining(), bytesPerChecksum);
-        checksums.get(sum);
-        data.get(buf, 0, n);
-        summer.reset();
-        summer.update(buf, 0, n);
-        int calculated = (int)summer.getValue();
-        int stored = (sum[0] << 24 & 0xff000000) |
-          (sum[1] << 16 & 0xff0000) |
-          (sum[2] << 8 & 0xff00) |
-          sum[3] & 0xff;
-        if (calculated != stored) {
-          long errPos = basePos + data.position() - startDataPos - n;
-          throw new ChecksumException(
-              "Checksum error: "+ fileName + " at "+ errPos +
-              " exp: " + stored + " got: " + calculated, errPos);
+      int i = 0;
+      for(final int n = dataLength - bytesPerCrc + 1; i < n; i += bytesPerCrc) {
+        data.get(bytes);
+        algorithm.reset();
+        algorithm.update(bytes, 0, bytesPerCrc);
+        final int computed = (int)algorithm.getValue();
+        final int expected = crcs.getInt();
+
+        if (computed != expected) {
+          long errPos = basePos + data.position() - dataOffset - bytesPerCrc;
+          throwChecksumException(type, algorithm, filename, errPos, expected,
+              computed);
+        }
+      }
+
+      final int remainder = dataLength - i;
+      if (remainder > 0) {
+        data.get(bytes, 0, remainder);
+        algorithm.reset();
+        algorithm.update(bytes, 0, remainder);
+        final int computed = (int)algorithm.getValue();
+        final int expected = crcs.getInt();
+
+        if (computed != expected) {
+          long errPos = basePos + data.position() - dataOffset - remainder;
+          throwChecksumException(type, algorithm, filename, errPos, expected,
+              computed);
         }
       }
     } finally {
       data.reset();
-      checksums.reset();
+      crcs.reset();
     }
   }
-  
+
   /**
    * Implementation of chunked verification specifically on byte arrays. This
    * is to avoid the copy when dealing with ByteBuffers that have array backing.
    */
-  private void verifyChunkedSums(
-      byte[] data, int dataOff, int dataLen,
-      byte[] checksums, int checksumsOff, String fileName,
-      long basePos) throws ChecksumException {
-    if (type.size == 0) return;
+  static void verifyChunked(final Type type, final Checksum algorithm,
+      final byte[] data, final int dataOffset, final int dataLength,
+      final int bytesPerCrc, final byte[] crcs, final int crcsOffset,
+      final String filename, final long basePos) throws ChecksumException {
+    final int dataEnd = dataOffset + dataLength;
+    int i = dataOffset;
+    int j = crcsOffset;
+    for(final int n = dataEnd-bytesPerCrc+1; i < n; i += bytesPerCrc, j += 4) {
+      algorithm.reset();
+      algorithm.update(data, i, bytesPerCrc);
+      final int computed = (int)algorithm.getValue();
+      final int expected = ((crcs[j] << 24) + ((crcs[j + 1] << 24) >>> 8))
+          + (((crcs[j + 2] << 24) >>> 16) + ((crcs[j + 3] << 24) >>> 24));
 
-    if (NativeCrc32.isAvailable()) {
-      NativeCrc32.verifyChunkedSumsByteArray(bytesPerChecksum, type.id,
-          checksums, checksumsOff, data, dataOff, dataLen, fileName, basePos);
-      return;
+      if (computed != expected) {
+        final long errPos = basePos + i - dataOffset;
+        throwChecksumException(type, algorithm, filename, errPos, expected,
+            computed);
+      }
     }
-    
-    int remaining = dataLen;
-    int dataPos = 0;
-    while (remaining > 0) {
-      int n = Math.min(remaining, bytesPerChecksum);
-      
-      summer.reset();
-      summer.update(data, dataOff + dataPos, n);
-      dataPos += n;
-      remaining -= n;
-      
-      int calculated = (int)summer.getValue();
-      int stored = (checksums[checksumsOff] << 24 & 0xff000000) |
-        (checksums[checksumsOff + 1] << 16 & 0xff0000) |
-        (checksums[checksumsOff + 2] << 8 & 0xff00) |
-        checksums[checksumsOff + 3] & 0xff;
-      checksumsOff += 4;
-      if (calculated != stored) {
-        long errPos = basePos + dataPos - n;
-        throw new ChecksumException(
-            "Checksum error: "+ fileName + " at "+ errPos +
-            " exp: " + stored + " got: " + calculated, errPos);
+    final int remainder = dataEnd - i;
+    if (remainder > 0) {
+      algorithm.reset();
+      algorithm.update(data, i, remainder);
+      final int computed = (int)algorithm.getValue();
+      final int expected = ((crcs[j] << 24) + ((crcs[j + 1] << 24) >>> 8))
+          + (((crcs[j + 2] << 24) >>> 16) + ((crcs[j + 3] << 24) >>> 24));
+
+      if (computed != expected) {
+        final long errPos = basePos + i - dataOffset;
+        throwChecksumException(type, algorithm, filename, errPos, expected,
+            computed);
       }
     }
   }
 
+  private static void throwChecksumException(Type type, Checksum algorithm,
+      String filename, long errPos, int expected, int computed)
+          throws ChecksumException {
+    throw new ChecksumException("Checksum " + type
+        + " not matched for file " + filename + " at position "+ errPos
+        + String.format(": expected=%X but computed=%X", expected, computed)
+        + ", algorithm=" + algorithm.getClass().getSimpleName(), errPos);
+  }
+
   /**
    * Calculate checksums for the given data.
    * 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbfaf3c2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/Crc32PerformanceTest.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/Crc32PerformanceTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/Crc32PerformanceTest.java
new file mode 100644
index 0000000..d8963df
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/Crc32PerformanceTest.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.io.PrintStream;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.log4j.Level;
+
+/**
+ * Performance tests to compare performance of Crc32 implementations
+ * This can be run from the command line with:
+ *
+ *   java -cp path/to/test/classes:path/to/common/classes \
+ *      'org.apache.hadoop.util.Crc32PerformanceTest'
+ *
+ *      or
+ *
+ *  hadoop org.apache.hadoop.util.Crc32PerformanceTest
+ *
+ * The output is in JIRA table format.
+ */
+public class Crc32PerformanceTest {
+  static final int MB = 1024 * 1024;
+
+  static interface Crc32 {
+
+    public void verifyChunked(ByteBuffer data, int bytesPerCrc, ByteBuffer crcs,
+        String filename, long basePos) throws ChecksumException;
+
+    static final class Native implements Crc32 {
+      @Override
+      public void verifyChunked(ByteBuffer data, int bytesPerSum,
+          ByteBuffer sums, String fileName, long basePos)
+              throws ChecksumException {
+        NativeCrc32.verifyChunkedSums(bytesPerSum, DataChecksum.Type.CRC32.id,
+            sums, data, fileName, basePos);
+      }
+    }
+
+
+    static abstract class AbstractCrc32<T extends Checksum> implements Crc32 {
+      abstract T newAlgorithm();
+
+      @Override
+      public void verifyChunked(ByteBuffer data, int bytesPerCrc,
+          ByteBuffer crcs, String filename, long basePos)
+              throws ChecksumException {
+        final Checksum algorithm = newAlgorithm();
+        if (data.hasArray() && crcs.hasArray()) {
+          DataChecksum.verifyChunked(DataChecksum.Type.CRC32, algorithm,
+              data.array(), data.position(), data.remaining(), bytesPerCrc,
+              crcs.array(), crcs.position(), filename, basePos);
+        } else {
+          DataChecksum.verifyChunked(DataChecksum.Type.CRC32, algorithm,
+              data, bytesPerCrc, crcs, filename, basePos);
+        }
+      }
+    }
+
+    static final class Zip extends AbstractCrc32<CRC32> {
+      @Override
+      public CRC32 newAlgorithm() {
+        return new CRC32();
+      }
+    }
+
+    static final class PureJava extends AbstractCrc32<PureJavaCrc32> {
+      @Override
+      public PureJavaCrc32 newAlgorithm() {
+        return new PureJavaCrc32();
+      }
+    }
+  }
+
+  final int dataLengthMB;
+  final int trials;
+  final boolean direct;
+
+  final PrintStream out = System.out;
+
+  final List<Class<? extends Crc32>> crcs = new ArrayList<>();
+
+  Crc32PerformanceTest(final int dataLengthMB, final int trials,
+      final boolean direct) {
+    this.dataLengthMB = dataLengthMB;
+    this.trials = trials;
+    this.direct = direct;
+
+    crcs.add(Crc32.Zip.class);
+    crcs.add(Crc32.PureJava.class);
+
+    if (direct && NativeCrc32.isAvailable()) {
+      crcs.add(Crc32.Native.class);
+      ((Log4JLogger)LogFactory.getLog(NativeCodeLoader.class))
+          .getLogger().setLevel(Level.ALL);
+    }
+  }
+
+  void run() throws Exception {
+    final long startTime = System.nanoTime();
+    printSystemProperties(out);
+    out.println("Data Length = " + dataLengthMB + " MB");
+    out.println("Trials      = " + trials);
+    doBench(crcs);
+    out.printf("Elapsed %.1fs\n", secondsElapsed(startTime));
+  }
+
+  public static void main(String args[]) throws Exception {
+    new Crc32PerformanceTest(64, 5, true).run();
+  }
+
+  private static void printCell(String s, int width, PrintStream out) {
+    final int w = s.length() > width? s.length(): width;
+    out.printf(" %" + w + "s |", s);
+  }
+
+  private ByteBuffer allocateByteBuffer(int length) {
+    return direct? ByteBuffer.allocateDirect(length)
+      : ByteBuffer.allocate(length);
+  }
+
+  private ByteBuffer newData() {
+    final byte[] bytes = new byte[dataLengthMB << 20];
+    new Random().nextBytes(bytes);
+    final ByteBuffer dataBufs = allocateByteBuffer(bytes.length);
+    dataBufs.mark();
+    dataBufs.put(bytes);
+    dataBufs.reset();
+    return dataBufs;
+  }
+
+  private ByteBuffer computeCrc(ByteBuffer dataBufs, int bytePerCrc) {
+    final int size = 4 * (dataBufs.remaining() - 1) / bytePerCrc + 1;
+    final ByteBuffer crcBufs = allocateByteBuffer(size);
+    final DataChecksum checksum = DataChecksum.newDataChecksum(
+        DataChecksum.Type.CRC32, bytePerCrc);
+    checksum.calculateChunkedSums(dataBufs, crcBufs);
+    return crcBufs;
+  }
+
+  private void doBench(final List<Class<? extends Crc32>> crcs)
+      throws Exception {
+    final ByteBuffer[] dataBufs = new ByteBuffer[16];
+    for(int i = 0; i < dataBufs.length; i++) {
+      dataBufs[i] = newData();
+    }
+
+    // Print header
+    out.printf("\n%s Buffer Performance Table", direct? "Direct": "Non-direct");
+    out.printf(" (bpc: byte-per-crc in MB/sec; #T: #Theads)\n");
+
+    // Warm up implementations to get jit going.
+    final ByteBuffer[] crc32 = {computeCrc(dataBufs[0], 32)};
+    final ByteBuffer[] crc512 = {computeCrc(dataBufs[0], 512)};
+    for (Class<? extends Crc32> c : crcs) {
+      doBench(c, 1, dataBufs, crc32, 32);
+      doBench(c, 1, dataBufs, crc512, 512);
+    }
+
+    // Test on a variety of sizes with different number of threads
+    for (int i = 5; i <= 16; i++) {
+      doBench(crcs, dataBufs, 1 << i, out);
+    }
+  }
+
+  private void doBench(final List<Class<? extends Crc32>> crcs,
+      final ByteBuffer[] dataBufs, final int bytePerCrc, final PrintStream out)
+          throws Exception {
+    final ByteBuffer[] crcBufs = new ByteBuffer[dataBufs.length];
+    for(int i = 0; i < crcBufs.length; i++) {
+      crcBufs[i] = computeCrc(dataBufs[i], bytePerCrc);
+    }
+
+    final String numBytesStr = " bpc ";
+    final String numThreadsStr = "#T";
+    final String diffStr = "% diff";
+
+    out.print('|');
+    printCell(numBytesStr, 0, out);
+    printCell(numThreadsStr, 0, out);
+    for (int i = 0; i < crcs.size(); i++) {
+      final Class<? extends Crc32> c = crcs.get(i);
+      out.print('|');
+      printCell(c.getSimpleName(), 8, out);
+      for(int j = 0; j < i; j++) {
+        printCell(diffStr, diffStr.length(), out);
+      }
+    }
+    out.printf("\n");
+
+    for(int numThreads = 1; numThreads <= dataBufs.length; numThreads <<= 1) {
+      out.printf("|");
+      printCell(String.valueOf(bytePerCrc), numBytesStr.length(), out);
+      printCell(String.valueOf(numThreads), numThreadsStr.length(), out);
+
+      final List<BenchResult> previous = new ArrayList<BenchResult>();
+      for(Class<? extends Crc32> c : crcs) {
+        System.gc();
+
+        final BenchResult result = doBench(c, numThreads, dataBufs, crcBufs,
+            bytePerCrc);
+        printCell(String.format("%9.1f", result.mbps),
+            c.getSimpleName().length() + 1, out);
+
+        //compare result with previous
+        for(BenchResult p : previous) {
+          final double diff = (result.mbps - p.mbps) / p.mbps * 100;
+          printCell(String.format("%5.1f%%", diff), diffStr.length(), out);
+        }
+        previous.add(result);
+      }
+      out.printf("\n");
+    }
+  }
+
+
+  private BenchResult doBench(Class<? extends Crc32> clazz,
+      final int numThreads, final ByteBuffer[] dataBufs,
+      final ByteBuffer[] crcBufs, final int bytePerCrc)
+          throws Exception {
+
+    final Thread[] threads = new Thread[numThreads];
+    final BenchResult[] results = new BenchResult[threads.length];
+
+    {
+      final Constructor<? extends Crc32> ctor = clazz.getConstructor();
+
+      for(int i = 0; i < threads.length; i++) {
+        final Crc32 crc = ctor.newInstance();
+        final long byteProcessed = dataBufs[i].remaining() * trials;
+        final int index = i;
+        threads[i] = new Thread() {
+          @Override
+          public void run() {
+            final long startTime = System.nanoTime();
+            for (int i = 0; i < trials; i++) {
+              dataBufs[index].mark();
+              crcBufs[index].mark();
+              try {
+                crc.verifyChunked(dataBufs[index], bytePerCrc, crcBufs[index],
+                    crc.getClass().getSimpleName(), dataBufs[index].position());
+              } catch (Throwable t) {
+                results[index] = new BenchResult(t);
+                return;
+              } finally {
+                dataBufs[index].reset();
+                crcBufs[index].reset();
+              }
+            }
+            final double secsElapsed = secondsElapsed(startTime);
+            results[index] = new BenchResult(byteProcessed/secsElapsed/MB);
+          }
+        };
+      }
+    }
+
+    for(Thread t : threads) {
+      t.start();
+    }
+    for(Thread t : threads) {
+      t.join();
+    }
+
+    double sum = 0;
+    for(int i = 0; i < results.length; i++) {
+      sum += results[i].getMbps();
+    }
+    return new BenchResult(sum/results.length);
+  }
+
+  private static class BenchResult {
+    /** Speed (MB per second) */
+    final double mbps;
+    final Throwable thrown;
+
+    BenchResult(double mbps) {
+      this.mbps = mbps;
+      this.thrown = null;
+    }
+
+    BenchResult(Throwable e) {
+      this.mbps = 0;
+      this.thrown = e;
+    }
+
+    double getMbps() {
+      if (thrown != null) {
+        throw new AssertionError(thrown);
+      }
+      return mbps;
+    }
+  }
+
+  static double secondsElapsed(final long startTime) {
+    return (System.nanoTime() - startTime) / 1000000000.0d;
+  }
+
+  static void printSystemProperties(PrintStream out) {
+    final String[] names = {
+        "java.version",
+        "java.runtime.name",
+        "java.runtime.version",
+        "java.vm.version",
+        "java.vm.vendor",
+        "java.vm.name",
+        "java.vm.specification.version",
+        "java.specification.version",
+        "os.arch",
+        "os.name",
+        "os.version"
+    };
+    int max = 0;
+    for(String n : names) {
+      if (n.length() > max) {
+        max = n.length();
+      }
+    }
+
+    final Properties p = System.getProperties();
+    for(String n : names) {
+      out.printf("%" + max + "s = %s\n", n, p.getProperty(n));
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbfaf3c2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDataChecksum.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDataChecksum.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDataChecksum.java
index 73fd25a..8841809 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDataChecksum.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestDataChecksum.java
@@ -197,4 +197,10 @@ public class TestDataChecksum {
     newBuf.limit(dataBuf.limit());
     return newBuf;
   }
+
+  @Test
+  public void testCrc32() throws Exception {
+    new Crc32PerformanceTest(8, 3, true).run();
+    new Crc32PerformanceTest(8, 3, false).run();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbfaf3c2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
index 1f783f6..62dfe79 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
@@ -52,6 +52,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -62,8 +63,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
 import org.apache.hadoop.hdfs.client.HdfsUtils;
+import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -929,11 +930,12 @@ public class TestDFSClientRetries {
         try {
           dis.read(arr, 0, (int)FILE_LENGTH);
           fail("Expected ChecksumException not thrown");
-        } catch (Exception ex) {
+        } catch (ChecksumException ex) {
           GenericTestUtils.assertExceptionContains(
-              "Checksum error", ex);
+              "Checksum", ex);
         }
       }
+      client.close();
     } finally {
       cluster.shutdown();
     }