You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2022/11/26 10:06:01 UTC

[incubator-celeborn] branch branch-0.1 updated: [CELEBORN-68] Client might fetch incorrect data chunk (#1007)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.1
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.1 by this push:
     new 3bedfc82 [CELEBORN-68] Client might fetch incorrect data chunk (#1007)
3bedfc82 is described below

commit 3bedfc82f23bb3882d7c57307fac115f2b079569
Author: Keyong Zhou <zh...@apache.org>
AuthorDate: Sat Nov 26 18:05:58 2022 +0800

    [CELEBORN-68] Client might fetch incorrect data chunk (#1007)
---
 .../emr/rss/client/read/RetryingChunkClient.java   | 301 ----------------
 .../aliyun/emr/rss/client/read/RssInputStream.java | 125 ++++++-
 .../rss/client/read/RetryingChunkClientSuiteJ.java | 394 ---------------------
 .../rss/common/network/client/TransportClient.java |   8 +-
 .../scala/com/aliyun/emr/rss/common/RssConf.scala  |   8 +
 .../service/deploy/worker/FileWriterSuiteJ.java    |   2 +-
 6 files changed, 119 insertions(+), 719 deletions(-)

diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/RetryingChunkClient.java b/client/src/main/java/com/aliyun/emr/rss/client/read/RetryingChunkClient.java
deleted file mode 100644
index 13672d9b..00000000
--- a/client/src/main/java/com/aliyun/emr/rss/client/read/RetryingChunkClient.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.aliyun.emr.rss.client.read;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.aliyun.emr.rss.common.RssConf;
-import com.aliyun.emr.rss.common.network.buffer.ManagedBuffer;
-import com.aliyun.emr.rss.common.network.client.ChunkReceivedCallback;
-import com.aliyun.emr.rss.common.network.client.TransportClient;
-import com.aliyun.emr.rss.common.network.client.TransportClientFactory;
-import com.aliyun.emr.rss.common.network.protocol.Message;
-import com.aliyun.emr.rss.common.network.protocol.OpenStream;
-import com.aliyun.emr.rss.common.network.protocol.StreamHandle;
-import com.aliyun.emr.rss.common.network.util.NettyUtils;
-import com.aliyun.emr.rss.common.network.util.TransportConf;
-import com.aliyun.emr.rss.common.protocol.PartitionLocation;
-import com.aliyun.emr.rss.common.protocol.TransportModuleConstants;
-import com.aliyun.emr.rss.common.util.Utils;
-
-/**
- * Encapsulate the Partition Location information, so that for the file corresponding to this
- * Partition Location, you can ignore whether there is a retry and whether the Master/Slave
- * switch is performed.
- *
- * Specifically, for a file, we can try maxTries times, and each attempt actually includes attempts
- * to all available copies in a Partition Location. In this way, we can simply take advantage of
- * the ability of multiple copies, and can also ensure that the number of retries for a file will
- * not be too many. Each retry is actually a switch between Master and Slave. Therefore, each retry
- * needs to create a new connection and reopen the file to generate the stream id.
- */
-public class RetryingChunkClient {
-  private static final Logger logger = LoggerFactory.getLogger(RetryingChunkClient.class);
-  private static final ExecutorService executorService = Executors.newCachedThreadPool(
-      NettyUtils.createThreadFactory("Chunk Fetch Retry"));
-
-  private final ChunkReceivedCallback callback;
-  private final List<Replica> replicas;
-  private final long retryWaitMs;
-  private final int maxTries;
-
-  private volatile int numTries = 0;
-
-  public RetryingChunkClient(
-      RssConf conf,
-      String shuffleKey,
-      PartitionLocation location,
-      ChunkReceivedCallback callback,
-      TransportClientFactory clientFactory) {
-    this(conf, shuffleKey, location, callback, clientFactory, 0, Integer.MAX_VALUE);
-  }
-
-  public RetryingChunkClient(
-      RssConf conf,
-      String shuffleKey,
-      PartitionLocation location,
-      ChunkReceivedCallback callback,
-      TransportClientFactory clientFactory,
-      int startMapIndex,
-      int endMapIndex) {
-    TransportConf transportConf = Utils.fromRssConf(conf, TransportModuleConstants.DATA_MODULE, 0);
-
-    this.replicas = new ArrayList<>(2);
-    this.callback = callback;
-    this.retryWaitMs = transportConf.ioRetryWaitTimeMs();
-
-    long timeoutMs = RssConf.fetchChunkTimeoutMs(conf);
-    if (location != null) {
-      replicas.add(new Replica(timeoutMs, shuffleKey, location,
-        clientFactory, startMapIndex, endMapIndex));
-      if (location.getPeer() != null) {
-        replicas.add(new Replica(timeoutMs, shuffleKey, location.getPeer(),
-          clientFactory, startMapIndex, endMapIndex));
-      }
-    }
-
-    if (this.replicas.size() <= 0) {
-      throw new IllegalArgumentException("Must contain at least one available PartitionLocation.");
-    }
-
-    this.maxTries = (transportConf.maxIORetries() + 1) * replicas.size();
-  }
-
-  /**
-   * This method should only be called once after RetryingChunkReader is initialized, so it is
-   * assumed that there is no concurrency problem when it is called.
-   *
-   * @return numChunks.
-   */
-  public int openChunks() throws IOException {
-    int numChunks = -1;
-    while (numChunks == -1 && hasRemainingRetries()) {
-      Replica replica = getCurrentReplica();
-      try {
-        replica.getOrOpenStream();
-        numChunks = replica.getNumChunks();
-      } catch (Exception e) {
-        if (e instanceof InterruptedException) {
-          Thread.currentThread().interrupt();
-          throw new IOException(e);
-        }
-
-        logger.warn("Exception raised while sending open chunks message to {}.", replica, e);
-
-        numChunks = -1;
-        if (shouldRetry(e)) {
-          numTries += 1; // openChunks will not be concurrently called.
-        } else {
-          break;
-        }
-      }
-    }
-    if (numChunks == -1) {
-      throw new IOException(String.format("Could not open chunks after %d tries.", numTries));
-    }
-    return numChunks;
-  }
-
-  /**
-   * Fetch for a chunk. It can be retried multiple times, so there is no guarantee that the order
-   * will arrive on the server side, nor can it guarantee an orderly return. Therefore, the chunks
-   * should be as orderly as possible when calling.
-   *
-   * @param chunkIndex the index of the chunk to be fetched.
-   */
-  public void fetchChunk(int chunkIndex) {
-    Replica replica;
-    RetryingChunkReceiveCallback callback;
-    synchronized (this) {
-      replica = getCurrentReplica();
-      callback = new RetryingChunkReceiveCallback(numTries);
-    }
-    try {
-      TransportClient client = replica.getOrOpenStream();
-      client.fetchChunk(replica.getStreamId(), chunkIndex, callback);
-    } catch (Exception e) {
-      logger.error("Exception raised while beginning fetch chunk {} {}.",
-          chunkIndex, numTries > 0 ? "(after " + numTries + " retries)" : "", e);
-
-      if (shouldRetry(e)) {
-        initiateRetry(chunkIndex, callback.currentNumTries);
-      } else {
-        callback.onFailure(chunkIndex, e);
-      }
-    }
-  }
-
-  @VisibleForTesting
-  Replica getCurrentReplica() {
-    int currentReplicaIndex = numTries % replicas.size();
-    return replicas.get(currentReplicaIndex);
-  }
-
-  @VisibleForTesting
-  int getNumTries() {
-    return numTries;
-  }
-
-  private boolean hasRemainingRetries() {
-    return numTries < maxTries;
-  }
-
-  private synchronized boolean shouldRetry(Throwable e) {
-    boolean isIOException = e instanceof IOException
-        || e instanceof TimeoutException
-        || (e.getCause() != null && e.getCause() instanceof TimeoutException)
-        || (e.getCause() != null && e.getCause() instanceof IOException);
-    return isIOException && hasRemainingRetries();
-  }
-
-  @SuppressWarnings("UnstableApiUsage")
-  private synchronized void initiateRetry(final int chunkIndex, int currentNumTries) {
-    numTries = Math.max(numTries, currentNumTries + 1);
-
-    logger.info("Retrying fetch ({}/{}) for chunk {} from {} after {} ms.",
-        currentNumTries, maxTries, chunkIndex, getCurrentReplica(), retryWaitMs);
-
-    executorService.submit(() -> {
-      Uninterruptibles.sleepUninterruptibly(retryWaitMs, TimeUnit.MILLISECONDS);
-      fetchChunk(chunkIndex);
-    });
-  }
-
-  private class RetryingChunkReceiveCallback implements ChunkReceivedCallback {
-    final int currentNumTries;
-
-    RetryingChunkReceiveCallback(int currentNumTries) {
-      this.currentNumTries = currentNumTries;
-    }
-
-    @Override
-    public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
-      callback.onSuccess(chunkIndex, buffer);
-    }
-
-    @Override
-    public void onFailure(int chunkIndex, Throwable e) {
-      if (shouldRetry(e)) {
-        initiateRetry(chunkIndex, this.currentNumTries);
-      } else {
-        logger.error("Failed to fetch chunk {}, and will not retry({} tries).",
-          chunkIndex, this.currentNumTries);
-        callback.onFailure(chunkIndex, e);
-      }
-    }
-  }
-}
-
-class Replica {
-  private static final Logger logger = LoggerFactory.getLogger(Replica.class);
-  private final long timeoutMs;
-  private final String shuffleKey;
-  private final PartitionLocation location;
-  private final TransportClientFactory clientFactory;
-
-  private StreamHandle streamHandle;
-  private TransportClient client;
-  private int startMapIndex;
-  private int endMapIndex;
-
-  Replica(
-      long timeoutMs,
-      String shuffleKey,
-      PartitionLocation location,
-      TransportClientFactory clientFactory,
-      int startMapIndex,
-      int endMapIndex) {
-    this.timeoutMs = timeoutMs;
-    this.shuffleKey = shuffleKey;
-    this.location = location;
-    this.clientFactory = clientFactory;
-    this.startMapIndex = startMapIndex;
-    this.endMapIndex = endMapIndex;
-  }
-
-  Replica(
-      long timeoutMs,
-      String shuffleKey,
-      PartitionLocation location,
-      TransportClientFactory clientFactory) {
-    this(timeoutMs, shuffleKey, location, clientFactory, 0, Integer.MAX_VALUE);
-  }
-
-  public synchronized TransportClient getOrOpenStream()
-      throws IOException, InterruptedException {
-    if (client == null || !client.isActive()) {
-      client = clientFactory.createClient(location.getHost(), location.getFetchPort());
-
-      OpenStream openBlocks = new OpenStream(shuffleKey, location.getFileName(),
-        startMapIndex, endMapIndex);
-      ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(), timeoutMs);
-      streamHandle = (StreamHandle) Message.decode(response);
-    }
-    return client;
-  }
-
-  public long getStreamId() {
-    return streamHandle.streamId;
-  }
-
-  public int getNumChunks() {
-    return streamHandle.numChunks;
-  }
-
-  @Override
-  public String toString() {
-    return location.getHost() + ":" + location.getFetchPort();
-  }
-
-  @VisibleForTesting
-  PartitionLocation getLocation() {
-    return location;
-  }
-}
diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java
index 822fb63f..1729eb2a 100644
--- a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java
+++ b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java
@@ -19,6 +19,7 @@ package com.aliyun.emr.rss.client.read;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -32,6 +33,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 
+import com.aliyun.emr.rss.common.network.client.TransportClient;
+import com.aliyun.emr.rss.common.network.protocol.Message;
+import com.aliyun.emr.rss.common.network.protocol.OpenStream;
+import com.aliyun.emr.rss.common.network.protocol.StreamHandle;
 import io.netty.buffer.ByteBuf;
 import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
@@ -108,6 +113,8 @@ public abstract class RssInputStream extends InputStream {
 
     private ByteBuf currentChunk;
     private PartitionReader currentReader;
+    private final int fetchChunkMaxRetry;
+    private int fetchChunkRetryCnt = 0;
     private int fileIndex;
     private int position;
     private int limit;
@@ -155,6 +162,8 @@ public abstract class RssInputStream extends InputStream {
 
       decompressor = Decompressor.getDecompressor(conf);
 
+      fetchChunkMaxRetry = RssConf.fetchChunkMaxRetries(conf);
+
       moveToNextReader();
     }
 
@@ -191,6 +200,9 @@ public abstract class RssInputStream extends InputStream {
         }
         currentLocation = locations[fileIndex];
       }
+
+      fetchChunkRetryCnt = 0;
+
       return currentLocation;
     }
 
@@ -203,7 +215,7 @@ public abstract class RssInputStream extends InputStream {
       if (currentLocation == null) {
         return;
       }
-      currentReader = createReader(currentLocation);
+      currentReader = createReaderWithRetry(currentLocation);
       fileIndex++;
       while (!currentReader.hasNext()) {
         currentReader.close();
@@ -212,13 +224,60 @@ public abstract class RssInputStream extends InputStream {
         if (currentLocation == null) {
           return;
         }
-        currentReader = createReader(currentLocation);
+        currentReader = createReaderWithRetry(currentLocation);
         fileIndex++;
       }
-      currentChunk = currentReader.next();
+      currentChunk = getNextChunk();
     }
 
-    private PartitionReader createReader(PartitionLocation location) throws IOException {
+    private PartitionReader createReaderWithRetry(PartitionLocation location) throws IOException {
+      while (fetchChunkRetryCnt < fetchChunkMaxRetry) {
+        try {
+          return createReader(location, fetchChunkRetryCnt, fetchChunkMaxRetry);
+        } catch (Exception e) {
+          fetchChunkRetryCnt++;
+          if (location.getPeer() != null) {
+            location = location.getPeer();
+            logger.warn("CreatePartitionReader failed {}/{} times, change to peer",
+              fetchChunkRetryCnt, fetchChunkMaxRetry);
+          } else {
+            logger.warn("CreatePartitionReader failed {}/{} times, retry the same location",
+              fetchChunkRetryCnt, fetchChunkMaxRetry);
+          }
+        }
+      }
+      throw new IOException("createPartitionReader failed!");
+    }
+
+    private ByteBuf getNextChunk() throws IOException {
+      while (fetchChunkRetryCnt < fetchChunkMaxRetry) {
+        try {
+          return currentReader.next();
+        } catch (Exception e) {
+          fetchChunkRetryCnt++;
+          currentReader.close();
+          if (fetchChunkRetryCnt == fetchChunkMaxRetry) {
+            logger.warn("Fetch chunk fail exceeds max retry {}", fetchChunkRetryCnt);
+            throw new IOException("Fetch chunk failed for " + fetchChunkRetryCnt + " times");
+          } else {
+            if (currentReader.getLocation().getPeer() != null) {
+              logger.warn("Fetch chunk failed {}/{} times, change to peer",
+                fetchChunkRetryCnt, fetchChunkMaxRetry);
+              currentReader = createReaderWithRetry(currentReader.location.getPeer());
+            } else {
+              logger.warn("Fetch chunk failed {}/{} times", fetchChunkRetryCnt, fetchChunkMaxRetry);
+              currentReader = createReaderWithRetry(currentReader.location);
+            }
+          }
+        }
+      }
+      throw new IOException("Fetch chunk failed!");
+    }
+
+    private PartitionReader createReader(
+      PartitionLocation location,
+      int fetchChunkRetryCnt,
+      int fetchChunkMaxRetry) throws IOException {
       if (location.getPeer() == null) {
         logger.debug("Partition {} has only one partition replica.", location);
       }
@@ -227,7 +286,7 @@ public abstract class RssInputStream extends InputStream {
         logger.debug("Read peer {} for attempt {}.", location, attemptNumber);
       }
 
-      return new PartitionReader(location);
+      return new PartitionReader(location, fetchChunkRetryCnt, fetchChunkMaxRetry);
     }
 
     public void setCallback(MetricsCallback callback) {
@@ -286,7 +345,7 @@ public abstract class RssInputStream extends InputStream {
     @Override
     public void close() {
       int locationsCount = locations.length;
-      logger.info(
+      logger.debug(
           "total location count {} read {} skip {}",
           locationsCount,
           locationsCount - skipCount.sum(),
@@ -309,7 +368,7 @@ public abstract class RssInputStream extends InputStream {
       }
       currentChunk = null;
       if (currentReader.hasNext()) {
-        currentChunk = currentReader.next();
+        currentChunk = getNextChunk();
         return true;
       } else if (fileIndex < locations.length) {
         moveToNextReader();
@@ -377,8 +436,9 @@ public abstract class RssInputStream extends InputStream {
     }
 
     private final class PartitionReader {
-      private final RetryingChunkClient client;
-      private final int numChunks;
+      private PartitionLocation location;
+      private TransportClient client;
+      private StreamHandle streamHandle;
 
       private int returnedChunks;
       private int chunkIndex;
@@ -390,7 +450,17 @@ public abstract class RssInputStream extends InputStream {
 
       private boolean closed = false;
 
-      PartitionReader(PartitionLocation location) throws IOException {
+      // for test
+      private int fetchChunkRetryCnt;
+      private int fetchChunkMaxRetry;
+      private final boolean testFetch;
+
+      PartitionReader(
+        PartitionLocation location,
+        int fetchChunkRetryCnt,
+        int fetchChunkMaxRetry) throws IOException {
+        this.location = location;
+
         results = new LinkedBlockingQueue<>();
         callback = new ChunkReceivedCallback() {
           @Override
@@ -412,18 +482,29 @@ public abstract class RssInputStream extends InputStream {
             exception.set(new IOException(errorMsg, e));
           }
         };
-        client = new RetryingChunkClient(conf, shuffleKey, location,
-          callback, clientFactory, startMapIndex, endMapIndex);
-        numChunks = client.openChunks();
+        try {
+          client = clientFactory.createClient(location.getHost(), location.getFetchPort());
+        } catch (InterruptedException ie) {
+          throw new IOException("Interrupted when createClient", ie);
+        }
+        OpenStream openBlocks = new OpenStream(shuffleKey, location.getFileName(),
+          startMapIndex, endMapIndex);
+        long timeoutMs = RssConf.fetchChunkTimeoutMs(conf);
+        ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(), timeoutMs);
+        streamHandle = (StreamHandle) Message.decode(response);
+
+        this.fetchChunkRetryCnt = fetchChunkRetryCnt;
+        this.fetchChunkMaxRetry = fetchChunkMaxRetry;
+        testFetch = RssConf.testFetchFailure(conf);
       }
 
       boolean hasNext() {
-        return returnedChunks < numChunks;
+        return returnedChunks < streamHandle.numChunks;
       }
 
       ByteBuf next() throws IOException {
         checkException();
-        if (chunkIndex < numChunks) {
+        if (chunkIndex < streamHandle.numChunks) {
           fetchChunks();
         }
         ByteBuf chunk = null;
@@ -455,9 +536,15 @@ public abstract class RssInputStream extends InputStream {
       private void fetchChunks() {
         final int inFlight = chunkIndex - returnedChunks;
         if (inFlight < maxInFlight) {
-          final int toFetch = Math.min(maxInFlight - inFlight + 1, numChunks - chunkIndex);
+          final int toFetch = Math.min(maxInFlight - inFlight + 1,
+            streamHandle.numChunks - chunkIndex);
           for (int i = 0; i < toFetch; i++) {
-            client.fetchChunk(chunkIndex++);
+            if (testFetch && fetchChunkRetryCnt < fetchChunkMaxRetry - 1 && chunkIndex == 3) {
+              callback.onFailure(chunkIndex, new IOException("Test fetch chunk failure"));
+            } else {
+              client.fetchChunk(streamHandle.streamId, chunkIndex, callback);
+              chunkIndex++;
+            }
           }
         }
       }
@@ -468,6 +555,10 @@ public abstract class RssInputStream extends InputStream {
           throw e;
         }
       }
+
+      public PartitionLocation getLocation() {
+        return location;
+      }
     }
   }
 }
diff --git a/client/src/test/java/com/aliyun/emr/rss/client/read/RetryingChunkClientSuiteJ.java b/client/src/test/java/com/aliyun/emr/rss/client/read/RetryingChunkClientSuiteJ.java
deleted file mode 100644
index 1a41e6e8..00000000
--- a/client/src/test/java/com/aliyun/emr/rss/client/read/RetryingChunkClientSuiteJ.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.aliyun.emr.rss.client.read;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-import io.netty.channel.Channel;
-import org.junit.Test;
-import org.mockito.stubbing.Answer;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.anyObject;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-
-import com.aliyun.emr.rss.common.RssConf;
-import com.aliyun.emr.rss.common.network.buffer.ManagedBuffer;
-import com.aliyun.emr.rss.common.network.buffer.NioManagedBuffer;
-import com.aliyun.emr.rss.common.network.client.ChunkReceivedCallback;
-import com.aliyun.emr.rss.common.network.client.TransportClient;
-import com.aliyun.emr.rss.common.network.client.TransportClientFactory;
-import com.aliyun.emr.rss.common.network.client.TransportResponseHandler;
-import com.aliyun.emr.rss.common.network.protocol.StreamHandle;
-import com.aliyun.emr.rss.common.protocol.PartitionLocation;
-import com.aliyun.emr.rss.common.util.ThreadUtils;
-
-public class RetryingChunkClientSuiteJ {
-
-  private static final int MASTER_RPC_PORT = 1234;
-  private static final int MASTER_PUSH_PORT = 1235;
-  private static final int MASTER_FETCH_PORT = 1236;
-  private static final int MASTER_REPLICATE_PORT = 1237;
-  private static final int SLAVE_RPC_PORT = 4321;
-  private static final int SLAVE_PUSH_PORT = 4322;
-  private static final int SLAVE_FETCH_PORT = 4323;
-  private static final int SLAVE_REPLICATE_PORT = 4324;
-  private static final PartitionLocation masterLocation = new PartitionLocation(
-    0, 1, "localhost", MASTER_RPC_PORT, MASTER_PUSH_PORT, MASTER_FETCH_PORT,
-    MASTER_REPLICATE_PORT, PartitionLocation.Mode.Master);
-  private static final PartitionLocation slaveLocation = new PartitionLocation(
-    0, 1, "localhost", SLAVE_RPC_PORT, SLAVE_PUSH_PORT, SLAVE_FETCH_PORT,
-    SLAVE_REPLICATE_PORT, PartitionLocation.Mode.Slave);
-
-  static {
-    masterLocation.setPeer(slaveLocation);
-    slaveLocation.setPeer(masterLocation);
-  }
-
-  ManagedBuffer chunk0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
-  ManagedBuffer chunk1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
-  ManagedBuffer chunk2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
-
-  @Test
-  public void testNoFailures() throws IOException, InterruptedException {
-    ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
-    Map<Integer, List<Object>> interactions = ImmutableMap.<Integer, List<Object>>builder()
-        .put(0, Arrays.asList(chunk0))
-        .put(1, Arrays.asList(chunk1))
-        .put(2, Arrays.asList(chunk2))
-        .build();
-
-    RetryingChunkClient client = performInteractions(interactions, callback);
-
-    verify(callback, timeout(5000)).onSuccess(eq(0), eq(chunk0));
-    verify(callback, timeout(5000)).onSuccess(eq(1), eq(chunk1));
-    verify(callback, timeout(5000)).onSuccess(eq(2), eq(chunk2));
-    verifyNoMoreInteractions(callback);
-
-    assertEquals(0, client.getNumTries());
-    assertEquals(masterLocation, client.getCurrentReplica().getLocation());
-  }
-
-  @Test
-  public void testUnrecoverableFailure() throws IOException, InterruptedException {
-    ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
-    Map<Integer, List<Object>> interactions = ImmutableMap.<Integer, List<Object>>builder()
-        .put(0, Arrays.asList(new RuntimeException("Ouch!")))
-        .put(1, Arrays.asList(chunk1))
-        .put(2, Arrays.asList(chunk2))
-        .build();
-    RetryingChunkClient client = performInteractions(interactions, callback);
-
-    verify(callback, timeout(5000)).onFailure(eq(0), any());
-    verify(callback, timeout(5000)).onSuccess(eq(1), eq(chunk1));
-    verify(callback, timeout(5000)).onSuccess(eq(2), eq(chunk2));
-    verifyNoMoreInteractions(callback);
-
-    assertEquals(0, client.getNumTries());
-    assertEquals(masterLocation, client.getCurrentReplica().getLocation());
-  }
-
-  @Test
-  public void testDuplicateSuccess() throws IOException, InterruptedException {
-    ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
-
-    Map<Integer, List<Object>> interactions = ImmutableMap.<Integer, List<Object>>builder()
-        .put(0, Arrays.asList(chunk0, chunk1))
-        .build();
-    RetryingChunkClient client = performInteractions(interactions, callback);
-    verify(callback, timeout(5000)).onSuccess(eq(0), eq(chunk0));
-    verifyNoMoreInteractions(callback);
-
-    assertEquals(0, client.getNumTries());
-    assertEquals(masterLocation, client.getCurrentReplica().getLocation());
-  }
-
-  @Test
-  public void testSingleIOException() throws IOException, InterruptedException {
-    Map<Integer, Object> result = new ConcurrentHashMap<>();
-    ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
-    Semaphore signal = new Semaphore(3);
-    signal.acquire(3);
-
-    Answer<Void> answer = invocation -> {
-      synchronized (signal) {
-        int chunkIndex = (Integer) invocation.getArguments()[0];
-        assertFalse(result.containsKey(chunkIndex));
-        Object value = invocation.getArguments()[1];
-        result.put(chunkIndex, value);
-        signal.release();
-      }
-      return null;
-    };
-    doAnswer(answer).when(callback).onSuccess(anyInt(), anyObject());
-    doAnswer(answer).when(callback).onFailure(anyInt(), anyObject());
-
-    Map<Integer, List<Object>> interactions = ImmutableMap.<Integer, List<Object>>builder()
-      .put(0, Arrays.asList(new IOException(), chunk0))
-      .put(1, Arrays.asList(chunk1))
-      .put(2, Arrays.asList(chunk2))
-      .build();
-    RetryingChunkClient client = performInteractions(interactions, callback);
-
-    while (!signal.tryAcquire(3, 500, TimeUnit.MILLISECONDS));
-    assertEquals(1, client.getNumTries());
-    assertEquals(slaveLocation, client.getCurrentReplica().getLocation());
-    assertEquals(chunk0, result.get(0));
-    assertEquals(chunk1, result.get(1));
-    assertEquals(chunk2, result.get(2));
-  }
-
-  @Test
-  public void testTwoIOExceptions() throws IOException, InterruptedException {
-    Map<Integer, Object> result = new ConcurrentHashMap<>();
-    ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
-    Semaphore signal = new Semaphore(3);
-    signal.acquire(3);
-
-    Answer<Void> answer = invocation -> {
-      synchronized (signal) {
-        int chunkIndex = (Integer) invocation.getArguments()[0];
-        assertFalse(result.containsKey(chunkIndex));
-        Object value = invocation.getArguments()[1];
-        result.put(chunkIndex, value);
-        signal.release();
-      }
-      return null;
-    };
-    doAnswer(answer).when(callback).onSuccess(anyInt(), anyObject());
-    doAnswer(answer).when(callback).onFailure(anyInt(), anyObject());
-
-    Map<Integer, List<Object>> interactions = ImmutableMap.<Integer, List<Object>>builder()
-      .put(0, Arrays.asList(new IOException("first ioexception"), chunk0))
-      .put(1, Arrays.asList(new IOException("second ioexception"), chunk1))
-      .put(2, Arrays.asList(chunk2))
-      .build();
-    RetryingChunkClient client = performInteractions(interactions, callback);
-
-    while (!signal.tryAcquire(3, 500, TimeUnit.MILLISECONDS));
-    assertEquals(1, client.getNumTries());
-    assertEquals(slaveLocation, client.getCurrentReplica().getLocation());
-    assertEquals(chunk0, result.get(0));
-    assertEquals(chunk1, result.get(1));
-    assertEquals(chunk2, result.get(2));
-  }
-
-  @Test
-  public void testThreeIOExceptions() throws IOException, InterruptedException {
-    Map<Integer, Object> result = new ConcurrentHashMap<>();
-    ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
-    Semaphore signal = new Semaphore(3);
-    signal.acquire(3);
-
-    Answer<Void> answer = invocation -> {
-      synchronized (signal) {
-        int chunkIndex = (Integer) invocation.getArguments()[0];
-        assertFalse(result.containsKey(chunkIndex));
-        Object value = invocation.getArguments()[1];
-        result.put(chunkIndex, value);
-        signal.release();
-      }
-      return null;
-    };
-    doAnswer(answer).when(callback).onSuccess(anyInt(), anyObject());
-    doAnswer(answer).when(callback).onFailure(anyInt(), anyObject());
-
-    Map<Integer, List<Object>> interactions = ImmutableMap.<Integer, List<Object>>builder()
-      .put(0, Arrays.asList(new IOException("first ioexception"), chunk0))
-      .put(1, Arrays.asList(new IOException("second ioexception"), chunk1))
-      .put(2, Arrays.asList(new IOException("third ioexception"), chunk2))
-      .build();
-
-    RetryingChunkClient client = performInteractions(interactions, callback);
-    while (!signal.tryAcquire(3, 500, TimeUnit.MILLISECONDS));
-    assertEquals(1, client.getNumTries());
-    assertEquals(slaveLocation, client.getCurrentReplica().getLocation());
-    assertEquals(chunk0, result.get(0));
-    assertEquals(chunk1, result.get(1));
-    assertEquals(chunk2, result.get(2));
-  }
-
-  @Test
-  public void testFailedWithIOExceptions() throws IOException, InterruptedException {
-    Map<Integer, Object> result = new ConcurrentHashMap<>();
-    ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
-    Semaphore signal = new Semaphore(3);
-    signal.acquire(3);
-
-    Answer<Void> answer = invocation -> {
-      synchronized (signal) {
-        int chunkIndex = (Integer) invocation.getArguments()[0];
-        assertFalse(result.containsKey(chunkIndex));
-        Object value = invocation.getArguments()[1];
-        result.put(chunkIndex, value);
-        signal.release();
-      }
-      return null;
-    };
-    doAnswer(answer).when(callback).onSuccess(anyInt(), anyObject());
-    doAnswer(answer).when(callback).onFailure(anyInt(), anyObject());
-
-    IOException ioe = new IOException("failed exception");
-    Map<Integer, List<Object>> interactions = ImmutableMap.<Integer, List<Object>>builder()
-      .put(0, Arrays.asList(ioe, ioe, ioe, ioe, ioe, chunk0))
-      .put(1, Arrays.asList(ioe, ioe, ioe, ioe, ioe, chunk1))
-      .put(2, Arrays.asList(ioe, ioe, ioe, ioe, ioe, chunk2))
-      .build();
-
-    RetryingChunkClient client = performInteractions(interactions, callback);
-    while (!signal.tryAcquire(3, 500, TimeUnit.MILLISECONDS));
-    // Note: this may exceeds the max retries we want, but it doesn't master.
-    assertEquals(4, client.getNumTries());
-    assertEquals(masterLocation, client.getCurrentReplica().getLocation());
-    assertEquals(ioe, result.get(0));
-    assertEquals(ioe, result.get(1));
-    assertEquals(ioe, result.get(2));
-  }
-
-  @Test
-  public void testRetryAndUnrecoverable() throws IOException, InterruptedException {
-    Map<Integer, Object> result = new ConcurrentHashMap<>();
-    ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
-    Semaphore signal = new Semaphore(3);
-    signal.acquire(3);
-
-    Answer<Void> answer = invocation -> {
-      synchronized (signal) {
-        int chunkIndex = (Integer) invocation.getArguments()[0];
-        assertFalse(result.containsKey(chunkIndex));
-        Object value = invocation.getArguments()[1];
-        result.put(chunkIndex, value);
-        signal.release();
-      }
-      return null;
-    };
-    doAnswer(answer).when(callback).onSuccess(anyInt(), anyObject());
-    doAnswer(answer).when(callback).onFailure(anyInt(), anyObject());
-
-    Exception re = new RuntimeException("failed exception");
-    Map<Integer, List<Object>> interactions = ImmutableMap.<Integer, List<Object>>builder()
-      .put(0, Arrays.asList(new IOException("first ioexception"), re, chunk0))
-      .put(1, Arrays.asList(chunk1))
-      .put(2, Arrays.asList(new IOException("second ioexception"), chunk2))
-      .build();
-
-    performInteractions(interactions, callback);
-    while (!signal.tryAcquire(3, 500, TimeUnit.MILLISECONDS));
-    assertEquals(re, result.get(0));
-    assertEquals(chunk1, result.get(1));
-    assertEquals(chunk2, result.get(2));
-  }
-
-  private static RetryingChunkClient performInteractions(
-      Map<Integer, List<Object>> interactions,
-      ChunkReceivedCallback callback) throws IOException, InterruptedException {
-    RssConf conf = new RssConf();
-    conf.set("rss.data.io.maxRetries", "1");
-    conf.set("rss.data.io.retryWait", "0");
-
-    // Contains all chunk ids that are referenced across all interactions.
-    LinkedHashSet<Integer> chunkIds = Sets.newLinkedHashSet(interactions.keySet());
-
-    final TransportClient client = new DummyTransportClient(chunkIds.size(), interactions);
-    final TransportClientFactory clientFactory = mock(TransportClientFactory.class);
-    doAnswer(invocation -> client).when(clientFactory).createClient(anyString(), anyInt());
-
-    RetryingChunkClient retryingChunkClient = new RetryingChunkClient(
-        conf, "test", masterLocation, callback, clientFactory);
-    chunkIds.stream().sorted().forEach(retryingChunkClient::fetchChunk);
-    return retryingChunkClient;
-  }
-
-  private static class DummyTransportClient extends TransportClient {
-
-    private static final Channel channel = mock(Channel.class);
-    private static final TransportResponseHandler handler = mock(TransportResponseHandler.class);
-
-    private final long streamId = new Random().nextInt(Integer.MAX_VALUE) * 1000L;
-    private final int numChunks;
-    private final Map<Integer, List<Object>> interactions;
-    private final Map<Integer, Integer> chunkIdToInterActionIndex;
-
-    private final ScheduledExecutorService schedule =
-      ThreadUtils.newDaemonThreadPoolScheduledExecutor("test-fetch-chunk", 3);
-
-    DummyTransportClient(int numChunks, Map<Integer, List<Object>> interactions) {
-      super(channel, handler);
-      this.numChunks = numChunks;
-      this.interactions = interactions;
-      this.chunkIdToInterActionIndex = new ConcurrentHashMap<>();
-      interactions.keySet().forEach((chunkId) -> chunkIdToInterActionIndex.putIfAbsent(chunkId, 0));
-    }
-
-    @Override
-    public void fetchChunk(long streamId, int chunkId, ChunkReceivedCallback callback) {
-      schedule.schedule(() -> {
-        Object action;
-        List<Object> interaction = interactions.get(chunkId);
-        synchronized (chunkIdToInterActionIndex) {
-          int index = chunkIdToInterActionIndex.get(chunkId);
-          assertTrue(index < interaction.size());
-          action = interaction.get(index);
-          chunkIdToInterActionIndex.put(chunkId, index + 1);
-        }
-
-        if (action instanceof ManagedBuffer) {
-          callback.onSuccess(chunkId, (ManagedBuffer) action);
-        } else if (action instanceof Exception) {
-          callback.onFailure(chunkId, (Exception) action);
-        } else {
-          fail("Can only handle ManagedBuffers and Exceptions, got " + action);
-        }
-      }, 500, TimeUnit.MILLISECONDS);
-    }
-
-    @Override
-    public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs) {
-      StreamHandle handle = new StreamHandle(streamId, numChunks);
-      return handle.toByteBuffer();
-    }
-
-    @Override
-    public void close() {
-      super.close();
-      schedule.shutdownNow();
-    }
-  }
-}
diff --git a/common/src/main/java/com/aliyun/emr/rss/common/network/client/TransportClient.java b/common/src/main/java/com/aliyun/emr/rss/common/network/client/TransportClient.java
index 6c97c1d3..c2080e17 100644
--- a/common/src/main/java/com/aliyun/emr/rss/common/network/client/TransportClient.java
+++ b/common/src/main/java/com/aliyun/emr/rss/common/network/client/TransportClient.java
@@ -21,14 +21,12 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.SettableFuture;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -195,7 +193,7 @@ public class TransportClient implements Closeable {
    * Synchronously sends an opaque message to the RpcHandler on the server-side, waiting for up to
    * a specified timeout for a response.
    */
-  public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs) {
+  public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs) throws IOException {
     final SettableFuture<ByteBuffer> result = SettableFuture.create();
 
     sendRpc(message, new RpcResponseCallback() {
@@ -216,10 +214,8 @@ public class TransportClient implements Closeable {
 
     try {
       return result.get(timeoutMs, TimeUnit.MILLISECONDS);
-    } catch (ExecutionException e) {
-      throw Throwables.propagate(e.getCause());
     } catch (Exception e) {
-      throw Throwables.propagate(e);
+      throw new IOException("Exception in sendRpcSync", e);
     }
   }
 
diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala
index fed1e2aa..1e3f6298 100644
--- a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala
+++ b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala
@@ -437,6 +437,14 @@ object RssConf extends Logging {
     conf.getInt("rss.fetch.chunk.maxReqsInFlight", 3)
   }
 
+  def fetchChunkMaxRetries(conf: RssConf): Int = {
+    conf.getInt("rss.fetch.chunk.max.retry", 3)
+  }
+
+  def testFetchFailure(conf: RssConf): Boolean = {
+    conf.getBoolean("rss.test.fetch.chunk.failure", false)
+  }
+
   def replicate(conf: RssConf): Boolean = {
     conf.getBoolean("rss.push.data.replicate", false)
   }
diff --git a/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/FileWriterSuiteJ.java b/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/FileWriterSuiteJ.java
index 6f7c49d1..0eae9313 100644
--- a/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/FileWriterSuiteJ.java
+++ b/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/FileWriterSuiteJ.java
@@ -163,7 +163,7 @@ public class FileWriterSuiteJ {
     return openBlocks.toByteBuffer();
   }
 
-  private void setUpConn(TransportClient client) {
+  private void setUpConn(TransportClient client) throws IOException {
     ByteBuffer resp = client.sendRpcSync(createOpenMessage(), 10000);
     StreamHandle streamHandle = (StreamHandle) Message.decode(resp);
     streamId = streamHandle.streamId;