You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2022/11/26 10:06:01 UTC
[incubator-celeborn] branch branch-0.1 updated: [CELEBORN-68] Client might fetch incorrect data chunk (#1007)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.1
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.1 by this push:
new 3bedfc82 [CELEBORN-68] Client might fetch incorrect data chunk (#1007)
3bedfc82 is described below
commit 3bedfc82f23bb3882d7c57307fac115f2b079569
Author: Keyong Zhou <zh...@apache.org>
AuthorDate: Sat Nov 26 18:05:58 2022 +0800
[CELEBORN-68] Client might fetch incorrect data chunk (#1007)
---
.../emr/rss/client/read/RetryingChunkClient.java | 301 ----------------
.../aliyun/emr/rss/client/read/RssInputStream.java | 125 ++++++-
.../rss/client/read/RetryingChunkClientSuiteJ.java | 394 ---------------------
.../rss/common/network/client/TransportClient.java | 8 +-
.../scala/com/aliyun/emr/rss/common/RssConf.scala | 8 +
.../service/deploy/worker/FileWriterSuiteJ.java | 2 +-
6 files changed, 119 insertions(+), 719 deletions(-)
diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/RetryingChunkClient.java b/client/src/main/java/com/aliyun/emr/rss/client/read/RetryingChunkClient.java
deleted file mode 100644
index 13672d9b..00000000
--- a/client/src/main/java/com/aliyun/emr/rss/client/read/RetryingChunkClient.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.aliyun.emr.rss.client.read;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.aliyun.emr.rss.common.RssConf;
-import com.aliyun.emr.rss.common.network.buffer.ManagedBuffer;
-import com.aliyun.emr.rss.common.network.client.ChunkReceivedCallback;
-import com.aliyun.emr.rss.common.network.client.TransportClient;
-import com.aliyun.emr.rss.common.network.client.TransportClientFactory;
-import com.aliyun.emr.rss.common.network.protocol.Message;
-import com.aliyun.emr.rss.common.network.protocol.OpenStream;
-import com.aliyun.emr.rss.common.network.protocol.StreamHandle;
-import com.aliyun.emr.rss.common.network.util.NettyUtils;
-import com.aliyun.emr.rss.common.network.util.TransportConf;
-import com.aliyun.emr.rss.common.protocol.PartitionLocation;
-import com.aliyun.emr.rss.common.protocol.TransportModuleConstants;
-import com.aliyun.emr.rss.common.util.Utils;
-
-/**
- * Encapsulate the Partition Location information, so that for the file corresponding to this
- * Partition Location, you can ignore whether there is a retry and whether the Master/Slave
- * switch is performed.
- *
- * Specifically, for a file, we can try maxTries times, and each attempt actually includes attempts
- * to all available copies in a Partition Location. In this way, we can simply take advantage of
- * the ability of multiple copies, and can also ensure that the number of retries for a file will
- * not be too many. Each retry is actually a switch between Master and Slave. Therefore, each retry
- * needs to create a new connection and reopen the file to generate the stream id.
- */
-public class RetryingChunkClient {
- private static final Logger logger = LoggerFactory.getLogger(RetryingChunkClient.class);
- private static final ExecutorService executorService = Executors.newCachedThreadPool(
- NettyUtils.createThreadFactory("Chunk Fetch Retry"));
-
- private final ChunkReceivedCallback callback;
- private final List<Replica> replicas;
- private final long retryWaitMs;
- private final int maxTries;
-
- private volatile int numTries = 0;
-
- public RetryingChunkClient(
- RssConf conf,
- String shuffleKey,
- PartitionLocation location,
- ChunkReceivedCallback callback,
- TransportClientFactory clientFactory) {
- this(conf, shuffleKey, location, callback, clientFactory, 0, Integer.MAX_VALUE);
- }
-
- public RetryingChunkClient(
- RssConf conf,
- String shuffleKey,
- PartitionLocation location,
- ChunkReceivedCallback callback,
- TransportClientFactory clientFactory,
- int startMapIndex,
- int endMapIndex) {
- TransportConf transportConf = Utils.fromRssConf(conf, TransportModuleConstants.DATA_MODULE, 0);
-
- this.replicas = new ArrayList<>(2);
- this.callback = callback;
- this.retryWaitMs = transportConf.ioRetryWaitTimeMs();
-
- long timeoutMs = RssConf.fetchChunkTimeoutMs(conf);
- if (location != null) {
- replicas.add(new Replica(timeoutMs, shuffleKey, location,
- clientFactory, startMapIndex, endMapIndex));
- if (location.getPeer() != null) {
- replicas.add(new Replica(timeoutMs, shuffleKey, location.getPeer(),
- clientFactory, startMapIndex, endMapIndex));
- }
- }
-
- if (this.replicas.size() <= 0) {
- throw new IllegalArgumentException("Must contain at least one available PartitionLocation.");
- }
-
- this.maxTries = (transportConf.maxIORetries() + 1) * replicas.size();
- }
-
- /**
- * This method should only be called once after RetryingChunkReader is initialized, so it is
- * assumed that there is no concurrency problem when it is called.
- *
- * @return numChunks.
- */
- public int openChunks() throws IOException {
- int numChunks = -1;
- while (numChunks == -1 && hasRemainingRetries()) {
- Replica replica = getCurrentReplica();
- try {
- replica.getOrOpenStream();
- numChunks = replica.getNumChunks();
- } catch (Exception e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- throw new IOException(e);
- }
-
- logger.warn("Exception raised while sending open chunks message to {}.", replica, e);
-
- numChunks = -1;
- if (shouldRetry(e)) {
- numTries += 1; // openChunks will not be concurrently called.
- } else {
- break;
- }
- }
- }
- if (numChunks == -1) {
- throw new IOException(String.format("Could not open chunks after %d tries.", numTries));
- }
- return numChunks;
- }
-
- /**
- * Fetch for a chunk. It can be retried multiple times, so there is no guarantee that the order
- * will arrive on the server side, nor can it guarantee an orderly return. Therefore, the chunks
- * should be as orderly as possible when calling.
- *
- * @param chunkIndex the index of the chunk to be fetched.
- */
- public void fetchChunk(int chunkIndex) {
- Replica replica;
- RetryingChunkReceiveCallback callback;
- synchronized (this) {
- replica = getCurrentReplica();
- callback = new RetryingChunkReceiveCallback(numTries);
- }
- try {
- TransportClient client = replica.getOrOpenStream();
- client.fetchChunk(replica.getStreamId(), chunkIndex, callback);
- } catch (Exception e) {
- logger.error("Exception raised while beginning fetch chunk {} {}.",
- chunkIndex, numTries > 0 ? "(after " + numTries + " retries)" : "", e);
-
- if (shouldRetry(e)) {
- initiateRetry(chunkIndex, callback.currentNumTries);
- } else {
- callback.onFailure(chunkIndex, e);
- }
- }
- }
-
- @VisibleForTesting
- Replica getCurrentReplica() {
- int currentReplicaIndex = numTries % replicas.size();
- return replicas.get(currentReplicaIndex);
- }
-
- @VisibleForTesting
- int getNumTries() {
- return numTries;
- }
-
- private boolean hasRemainingRetries() {
- return numTries < maxTries;
- }
-
- private synchronized boolean shouldRetry(Throwable e) {
- boolean isIOException = e instanceof IOException
- || e instanceof TimeoutException
- || (e.getCause() != null && e.getCause() instanceof TimeoutException)
- || (e.getCause() != null && e.getCause() instanceof IOException);
- return isIOException && hasRemainingRetries();
- }
-
- @SuppressWarnings("UnstableApiUsage")
- private synchronized void initiateRetry(final int chunkIndex, int currentNumTries) {
- numTries = Math.max(numTries, currentNumTries + 1);
-
- logger.info("Retrying fetch ({}/{}) for chunk {} from {} after {} ms.",
- currentNumTries, maxTries, chunkIndex, getCurrentReplica(), retryWaitMs);
-
- executorService.submit(() -> {
- Uninterruptibles.sleepUninterruptibly(retryWaitMs, TimeUnit.MILLISECONDS);
- fetchChunk(chunkIndex);
- });
- }
-
- private class RetryingChunkReceiveCallback implements ChunkReceivedCallback {
- final int currentNumTries;
-
- RetryingChunkReceiveCallback(int currentNumTries) {
- this.currentNumTries = currentNumTries;
- }
-
- @Override
- public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
- callback.onSuccess(chunkIndex, buffer);
- }
-
- @Override
- public void onFailure(int chunkIndex, Throwable e) {
- if (shouldRetry(e)) {
- initiateRetry(chunkIndex, this.currentNumTries);
- } else {
- logger.error("Failed to fetch chunk {}, and will not retry({} tries).",
- chunkIndex, this.currentNumTries);
- callback.onFailure(chunkIndex, e);
- }
- }
- }
-}
-
-class Replica {
- private static final Logger logger = LoggerFactory.getLogger(Replica.class);
- private final long timeoutMs;
- private final String shuffleKey;
- private final PartitionLocation location;
- private final TransportClientFactory clientFactory;
-
- private StreamHandle streamHandle;
- private TransportClient client;
- private int startMapIndex;
- private int endMapIndex;
-
- Replica(
- long timeoutMs,
- String shuffleKey,
- PartitionLocation location,
- TransportClientFactory clientFactory,
- int startMapIndex,
- int endMapIndex) {
- this.timeoutMs = timeoutMs;
- this.shuffleKey = shuffleKey;
- this.location = location;
- this.clientFactory = clientFactory;
- this.startMapIndex = startMapIndex;
- this.endMapIndex = endMapIndex;
- }
-
- Replica(
- long timeoutMs,
- String shuffleKey,
- PartitionLocation location,
- TransportClientFactory clientFactory) {
- this(timeoutMs, shuffleKey, location, clientFactory, 0, Integer.MAX_VALUE);
- }
-
- public synchronized TransportClient getOrOpenStream()
- throws IOException, InterruptedException {
- if (client == null || !client.isActive()) {
- client = clientFactory.createClient(location.getHost(), location.getFetchPort());
-
- OpenStream openBlocks = new OpenStream(shuffleKey, location.getFileName(),
- startMapIndex, endMapIndex);
- ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(), timeoutMs);
- streamHandle = (StreamHandle) Message.decode(response);
- }
- return client;
- }
-
- public long getStreamId() {
- return streamHandle.streamId;
- }
-
- public int getNumChunks() {
- return streamHandle.numChunks;
- }
-
- @Override
- public String toString() {
- return location.getHost() + ":" + location.getFetchPort();
- }
-
- @VisibleForTesting
- PartitionLocation getLocation() {
- return location;
- }
-}
diff --git a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java
index 822fb63f..1729eb2a 100644
--- a/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java
+++ b/client/src/main/java/com/aliyun/emr/rss/client/read/RssInputStream.java
@@ -19,6 +19,7 @@ package com.aliyun.emr.rss.client.read;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -32,6 +33,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
+import com.aliyun.emr.rss.common.network.client.TransportClient;
+import com.aliyun.emr.rss.common.network.protocol.Message;
+import com.aliyun.emr.rss.common.network.protocol.OpenStream;
+import com.aliyun.emr.rss.common.network.protocol.StreamHandle;
import io.netty.buffer.ByteBuf;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
@@ -108,6 +113,8 @@ public abstract class RssInputStream extends InputStream {
private ByteBuf currentChunk;
private PartitionReader currentReader;
+ private final int fetchChunkMaxRetry;
+ private int fetchChunkRetryCnt = 0;
private int fileIndex;
private int position;
private int limit;
@@ -155,6 +162,8 @@ public abstract class RssInputStream extends InputStream {
decompressor = Decompressor.getDecompressor(conf);
+ fetchChunkMaxRetry = RssConf.fetchChunkMaxRetries(conf);
+
moveToNextReader();
}
@@ -191,6 +200,9 @@ public abstract class RssInputStream extends InputStream {
}
currentLocation = locations[fileIndex];
}
+
+ fetchChunkRetryCnt = 0;
+
return currentLocation;
}
@@ -203,7 +215,7 @@ public abstract class RssInputStream extends InputStream {
if (currentLocation == null) {
return;
}
- currentReader = createReader(currentLocation);
+ currentReader = createReaderWithRetry(currentLocation);
fileIndex++;
while (!currentReader.hasNext()) {
currentReader.close();
@@ -212,13 +224,60 @@ public abstract class RssInputStream extends InputStream {
if (currentLocation == null) {
return;
}
- currentReader = createReader(currentLocation);
+ currentReader = createReaderWithRetry(currentLocation);
fileIndex++;
}
- currentChunk = currentReader.next();
+ currentChunk = getNextChunk();
}
- private PartitionReader createReader(PartitionLocation location) throws IOException {
+ private PartitionReader createReaderWithRetry(PartitionLocation location) throws IOException {
+ while (fetchChunkRetryCnt < fetchChunkMaxRetry) {
+ try {
+ return createReader(location, fetchChunkRetryCnt, fetchChunkMaxRetry);
+ } catch (Exception e) {
+ fetchChunkRetryCnt++;
+ if (location.getPeer() != null) {
+ location = location.getPeer();
+ logger.warn("CreatePartitionReader failed {}/{} times, change to peer",
+ fetchChunkRetryCnt, fetchChunkMaxRetry);
+ } else {
+ logger.warn("CreatePartitionReader failed {}/{} times, retry the same location",
+ fetchChunkRetryCnt, fetchChunkMaxRetry);
+ }
+ }
+ }
+ throw new IOException("createPartitionReader failed!");
+ }
+
+ private ByteBuf getNextChunk() throws IOException {
+ while (fetchChunkRetryCnt < fetchChunkMaxRetry) {
+ try {
+ return currentReader.next();
+ } catch (Exception e) {
+ fetchChunkRetryCnt++;
+ currentReader.close();
+ if (fetchChunkRetryCnt == fetchChunkMaxRetry) {
+ logger.warn("Fetch chunk fail exceeds max retry {}", fetchChunkRetryCnt);
+ throw new IOException("Fetch chunk failed for " + fetchChunkRetryCnt + " times");
+ } else {
+ if (currentReader.getLocation().getPeer() != null) {
+ logger.warn("Fetch chunk failed {}/{} times, change to peer",
+ fetchChunkRetryCnt, fetchChunkMaxRetry);
+ currentReader = createReaderWithRetry(currentReader.location.getPeer());
+ } else {
+ logger.warn("Fetch chunk failed {}/{} times", fetchChunkRetryCnt, fetchChunkMaxRetry);
+ currentReader = createReaderWithRetry(currentReader.location);
+ }
+ }
+ }
+ }
+ throw new IOException("Fetch chunk failed!");
+ }
+
+ private PartitionReader createReader(
+ PartitionLocation location,
+ int fetchChunkRetryCnt,
+ int fetchChunkMaxRetry) throws IOException {
if (location.getPeer() == null) {
logger.debug("Partition {} has only one partition replica.", location);
}
@@ -227,7 +286,7 @@ public abstract class RssInputStream extends InputStream {
logger.debug("Read peer {} for attempt {}.", location, attemptNumber);
}
- return new PartitionReader(location);
+ return new PartitionReader(location, fetchChunkRetryCnt, fetchChunkMaxRetry);
}
public void setCallback(MetricsCallback callback) {
@@ -286,7 +345,7 @@ public abstract class RssInputStream extends InputStream {
@Override
public void close() {
int locationsCount = locations.length;
- logger.info(
+ logger.debug(
"total location count {} read {} skip {}",
locationsCount,
locationsCount - skipCount.sum(),
@@ -309,7 +368,7 @@ public abstract class RssInputStream extends InputStream {
}
currentChunk = null;
if (currentReader.hasNext()) {
- currentChunk = currentReader.next();
+ currentChunk = getNextChunk();
return true;
} else if (fileIndex < locations.length) {
moveToNextReader();
@@ -377,8 +436,9 @@ public abstract class RssInputStream extends InputStream {
}
private final class PartitionReader {
- private final RetryingChunkClient client;
- private final int numChunks;
+ private PartitionLocation location;
+ private TransportClient client;
+ private StreamHandle streamHandle;
private int returnedChunks;
private int chunkIndex;
@@ -390,7 +450,17 @@ public abstract class RssInputStream extends InputStream {
private boolean closed = false;
- PartitionReader(PartitionLocation location) throws IOException {
+ // for test
+ private int fetchChunkRetryCnt;
+ private int fetchChunkMaxRetry;
+ private final boolean testFetch;
+
+ PartitionReader(
+ PartitionLocation location,
+ int fetchChunkRetryCnt,
+ int fetchChunkMaxRetry) throws IOException {
+ this.location = location;
+
results = new LinkedBlockingQueue<>();
callback = new ChunkReceivedCallback() {
@Override
@@ -412,18 +482,29 @@ public abstract class RssInputStream extends InputStream {
exception.set(new IOException(errorMsg, e));
}
};
- client = new RetryingChunkClient(conf, shuffleKey, location,
- callback, clientFactory, startMapIndex, endMapIndex);
- numChunks = client.openChunks();
+ try {
+ client = clientFactory.createClient(location.getHost(), location.getFetchPort());
+ } catch (InterruptedException ie) {
+ throw new IOException("Interrupted when createClient", ie);
+ }
+ OpenStream openBlocks = new OpenStream(shuffleKey, location.getFileName(),
+ startMapIndex, endMapIndex);
+ long timeoutMs = RssConf.fetchChunkTimeoutMs(conf);
+ ByteBuffer response = client.sendRpcSync(openBlocks.toByteBuffer(), timeoutMs);
+ streamHandle = (StreamHandle) Message.decode(response);
+
+ this.fetchChunkRetryCnt = fetchChunkRetryCnt;
+ this.fetchChunkMaxRetry = fetchChunkMaxRetry;
+ testFetch = RssConf.testFetchFailure(conf);
}
boolean hasNext() {
- return returnedChunks < numChunks;
+ return returnedChunks < streamHandle.numChunks;
}
ByteBuf next() throws IOException {
checkException();
- if (chunkIndex < numChunks) {
+ if (chunkIndex < streamHandle.numChunks) {
fetchChunks();
}
ByteBuf chunk = null;
@@ -455,9 +536,15 @@ public abstract class RssInputStream extends InputStream {
private void fetchChunks() {
final int inFlight = chunkIndex - returnedChunks;
if (inFlight < maxInFlight) {
- final int toFetch = Math.min(maxInFlight - inFlight + 1, numChunks - chunkIndex);
+ final int toFetch = Math.min(maxInFlight - inFlight + 1,
+ streamHandle.numChunks - chunkIndex);
for (int i = 0; i < toFetch; i++) {
- client.fetchChunk(chunkIndex++);
+ if (testFetch && fetchChunkRetryCnt < fetchChunkMaxRetry - 1 && chunkIndex == 3) {
+ callback.onFailure(chunkIndex, new IOException("Test fetch chunk failure"));
+ } else {
+ client.fetchChunk(streamHandle.streamId, chunkIndex, callback);
+ chunkIndex++;
+ }
}
}
}
@@ -468,6 +555,10 @@ public abstract class RssInputStream extends InputStream {
throw e;
}
}
+
+ public PartitionLocation getLocation() {
+ return location;
+ }
}
}
}
diff --git a/client/src/test/java/com/aliyun/emr/rss/client/read/RetryingChunkClientSuiteJ.java b/client/src/test/java/com/aliyun/emr/rss/client/read/RetryingChunkClientSuiteJ.java
deleted file mode 100644
index 1a41e6e8..00000000
--- a/client/src/test/java/com/aliyun/emr/rss/client/read/RetryingChunkClientSuiteJ.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.aliyun.emr.rss.client.read;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-import io.netty.channel.Channel;
-import org.junit.Test;
-import org.mockito.stubbing.Answer;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.anyObject;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-
-import com.aliyun.emr.rss.common.RssConf;
-import com.aliyun.emr.rss.common.network.buffer.ManagedBuffer;
-import com.aliyun.emr.rss.common.network.buffer.NioManagedBuffer;
-import com.aliyun.emr.rss.common.network.client.ChunkReceivedCallback;
-import com.aliyun.emr.rss.common.network.client.TransportClient;
-import com.aliyun.emr.rss.common.network.client.TransportClientFactory;
-import com.aliyun.emr.rss.common.network.client.TransportResponseHandler;
-import com.aliyun.emr.rss.common.network.protocol.StreamHandle;
-import com.aliyun.emr.rss.common.protocol.PartitionLocation;
-import com.aliyun.emr.rss.common.util.ThreadUtils;
-
-public class RetryingChunkClientSuiteJ {
-
- private static final int MASTER_RPC_PORT = 1234;
- private static final int MASTER_PUSH_PORT = 1235;
- private static final int MASTER_FETCH_PORT = 1236;
- private static final int MASTER_REPLICATE_PORT = 1237;
- private static final int SLAVE_RPC_PORT = 4321;
- private static final int SLAVE_PUSH_PORT = 4322;
- private static final int SLAVE_FETCH_PORT = 4323;
- private static final int SLAVE_REPLICATE_PORT = 4324;
- private static final PartitionLocation masterLocation = new PartitionLocation(
- 0, 1, "localhost", MASTER_RPC_PORT, MASTER_PUSH_PORT, MASTER_FETCH_PORT,
- MASTER_REPLICATE_PORT, PartitionLocation.Mode.Master);
- private static final PartitionLocation slaveLocation = new PartitionLocation(
- 0, 1, "localhost", SLAVE_RPC_PORT, SLAVE_PUSH_PORT, SLAVE_FETCH_PORT,
- SLAVE_REPLICATE_PORT, PartitionLocation.Mode.Slave);
-
- static {
- masterLocation.setPeer(slaveLocation);
- slaveLocation.setPeer(masterLocation);
- }
-
- ManagedBuffer chunk0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
- ManagedBuffer chunk1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
- ManagedBuffer chunk2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
-
- @Test
- public void testNoFailures() throws IOException, InterruptedException {
- ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
- Map<Integer, List<Object>> interactions = ImmutableMap.<Integer, List<Object>>builder()
- .put(0, Arrays.asList(chunk0))
- .put(1, Arrays.asList(chunk1))
- .put(2, Arrays.asList(chunk2))
- .build();
-
- RetryingChunkClient client = performInteractions(interactions, callback);
-
- verify(callback, timeout(5000)).onSuccess(eq(0), eq(chunk0));
- verify(callback, timeout(5000)).onSuccess(eq(1), eq(chunk1));
- verify(callback, timeout(5000)).onSuccess(eq(2), eq(chunk2));
- verifyNoMoreInteractions(callback);
-
- assertEquals(0, client.getNumTries());
- assertEquals(masterLocation, client.getCurrentReplica().getLocation());
- }
-
- @Test
- public void testUnrecoverableFailure() throws IOException, InterruptedException {
- ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
- Map<Integer, List<Object>> interactions = ImmutableMap.<Integer, List<Object>>builder()
- .put(0, Arrays.asList(new RuntimeException("Ouch!")))
- .put(1, Arrays.asList(chunk1))
- .put(2, Arrays.asList(chunk2))
- .build();
- RetryingChunkClient client = performInteractions(interactions, callback);
-
- verify(callback, timeout(5000)).onFailure(eq(0), any());
- verify(callback, timeout(5000)).onSuccess(eq(1), eq(chunk1));
- verify(callback, timeout(5000)).onSuccess(eq(2), eq(chunk2));
- verifyNoMoreInteractions(callback);
-
- assertEquals(0, client.getNumTries());
- assertEquals(masterLocation, client.getCurrentReplica().getLocation());
- }
-
- @Test
- public void testDuplicateSuccess() throws IOException, InterruptedException {
- ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
-
- Map<Integer, List<Object>> interactions = ImmutableMap.<Integer, List<Object>>builder()
- .put(0, Arrays.asList(chunk0, chunk1))
- .build();
- RetryingChunkClient client = performInteractions(interactions, callback);
- verify(callback, timeout(5000)).onSuccess(eq(0), eq(chunk0));
- verifyNoMoreInteractions(callback);
-
- assertEquals(0, client.getNumTries());
- assertEquals(masterLocation, client.getCurrentReplica().getLocation());
- }
-
- @Test
- public void testSingleIOException() throws IOException, InterruptedException {
- Map<Integer, Object> result = new ConcurrentHashMap<>();
- ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
- Semaphore signal = new Semaphore(3);
- signal.acquire(3);
-
- Answer<Void> answer = invocation -> {
- synchronized (signal) {
- int chunkIndex = (Integer) invocation.getArguments()[0];
- assertFalse(result.containsKey(chunkIndex));
- Object value = invocation.getArguments()[1];
- result.put(chunkIndex, value);
- signal.release();
- }
- return null;
- };
- doAnswer(answer).when(callback).onSuccess(anyInt(), anyObject());
- doAnswer(answer).when(callback).onFailure(anyInt(), anyObject());
-
- Map<Integer, List<Object>> interactions = ImmutableMap.<Integer, List<Object>>builder()
- .put(0, Arrays.asList(new IOException(), chunk0))
- .put(1, Arrays.asList(chunk1))
- .put(2, Arrays.asList(chunk2))
- .build();
- RetryingChunkClient client = performInteractions(interactions, callback);
-
- while (!signal.tryAcquire(3, 500, TimeUnit.MILLISECONDS));
- assertEquals(1, client.getNumTries());
- assertEquals(slaveLocation, client.getCurrentReplica().getLocation());
- assertEquals(chunk0, result.get(0));
- assertEquals(chunk1, result.get(1));
- assertEquals(chunk2, result.get(2));
- }
-
- @Test
- public void testTwoIOExceptions() throws IOException, InterruptedException {
- Map<Integer, Object> result = new ConcurrentHashMap<>();
- ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
- Semaphore signal = new Semaphore(3);
- signal.acquire(3);
-
- Answer<Void> answer = invocation -> {
- synchronized (signal) {
- int chunkIndex = (Integer) invocation.getArguments()[0];
- assertFalse(result.containsKey(chunkIndex));
- Object value = invocation.getArguments()[1];
- result.put(chunkIndex, value);
- signal.release();
- }
- return null;
- };
- doAnswer(answer).when(callback).onSuccess(anyInt(), anyObject());
- doAnswer(answer).when(callback).onFailure(anyInt(), anyObject());
-
- Map<Integer, List<Object>> interactions = ImmutableMap.<Integer, List<Object>>builder()
- .put(0, Arrays.asList(new IOException("first ioexception"), chunk0))
- .put(1, Arrays.asList(new IOException("second ioexception"), chunk1))
- .put(2, Arrays.asList(chunk2))
- .build();
- RetryingChunkClient client = performInteractions(interactions, callback);
-
- while (!signal.tryAcquire(3, 500, TimeUnit.MILLISECONDS));
- assertEquals(1, client.getNumTries());
- assertEquals(slaveLocation, client.getCurrentReplica().getLocation());
- assertEquals(chunk0, result.get(0));
- assertEquals(chunk1, result.get(1));
- assertEquals(chunk2, result.get(2));
- }
-
- @Test
- public void testThreeIOExceptions() throws IOException, InterruptedException {
- Map<Integer, Object> result = new ConcurrentHashMap<>();
- ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
- Semaphore signal = new Semaphore(3);
- signal.acquire(3);
-
- Answer<Void> answer = invocation -> {
- synchronized (signal) {
- int chunkIndex = (Integer) invocation.getArguments()[0];
- assertFalse(result.containsKey(chunkIndex));
- Object value = invocation.getArguments()[1];
- result.put(chunkIndex, value);
- signal.release();
- }
- return null;
- };
- doAnswer(answer).when(callback).onSuccess(anyInt(), anyObject());
- doAnswer(answer).when(callback).onFailure(anyInt(), anyObject());
-
- Map<Integer, List<Object>> interactions = ImmutableMap.<Integer, List<Object>>builder()
- .put(0, Arrays.asList(new IOException("first ioexception"), chunk0))
- .put(1, Arrays.asList(new IOException("second ioexception"), chunk1))
- .put(2, Arrays.asList(new IOException("third ioexception"), chunk2))
- .build();
-
- RetryingChunkClient client = performInteractions(interactions, callback);
- while (!signal.tryAcquire(3, 500, TimeUnit.MILLISECONDS));
- assertEquals(1, client.getNumTries());
- assertEquals(slaveLocation, client.getCurrentReplica().getLocation());
- assertEquals(chunk0, result.get(0));
- assertEquals(chunk1, result.get(1));
- assertEquals(chunk2, result.get(2));
- }
-
- @Test
- public void testFailedWithIOExceptions() throws IOException, InterruptedException {
- Map<Integer, Object> result = new ConcurrentHashMap<>();
- ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
- Semaphore signal = new Semaphore(3);
- signal.acquire(3);
-
- Answer<Void> answer = invocation -> {
- synchronized (signal) {
- int chunkIndex = (Integer) invocation.getArguments()[0];
- assertFalse(result.containsKey(chunkIndex));
- Object value = invocation.getArguments()[1];
- result.put(chunkIndex, value);
- signal.release();
- }
- return null;
- };
- doAnswer(answer).when(callback).onSuccess(anyInt(), anyObject());
- doAnswer(answer).when(callback).onFailure(anyInt(), anyObject());
-
- IOException ioe = new IOException("failed exception");
- Map<Integer, List<Object>> interactions = ImmutableMap.<Integer, List<Object>>builder()
- .put(0, Arrays.asList(ioe, ioe, ioe, ioe, ioe, chunk0))
- .put(1, Arrays.asList(ioe, ioe, ioe, ioe, ioe, chunk1))
- .put(2, Arrays.asList(ioe, ioe, ioe, ioe, ioe, chunk2))
- .build();
-
- RetryingChunkClient client = performInteractions(interactions, callback);
- while (!signal.tryAcquire(3, 500, TimeUnit.MILLISECONDS));
- // Note: this may exceeds the max retries we want, but it doesn't master.
- assertEquals(4, client.getNumTries());
- assertEquals(masterLocation, client.getCurrentReplica().getLocation());
- assertEquals(ioe, result.get(0));
- assertEquals(ioe, result.get(1));
- assertEquals(ioe, result.get(2));
- }
-
- @Test
- public void testRetryAndUnrecoverable() throws IOException, InterruptedException {
- Map<Integer, Object> result = new ConcurrentHashMap<>();
- ChunkReceivedCallback callback = mock(ChunkReceivedCallback.class);
- Semaphore signal = new Semaphore(3);
- signal.acquire(3);
-
- Answer<Void> answer = invocation -> {
- synchronized (signal) {
- int chunkIndex = (Integer) invocation.getArguments()[0];
- assertFalse(result.containsKey(chunkIndex));
- Object value = invocation.getArguments()[1];
- result.put(chunkIndex, value);
- signal.release();
- }
- return null;
- };
- doAnswer(answer).when(callback).onSuccess(anyInt(), anyObject());
- doAnswer(answer).when(callback).onFailure(anyInt(), anyObject());
-
- Exception re = new RuntimeException("failed exception");
- Map<Integer, List<Object>> interactions = ImmutableMap.<Integer, List<Object>>builder()
- .put(0, Arrays.asList(new IOException("first ioexception"), re, chunk0))
- .put(1, Arrays.asList(chunk1))
- .put(2, Arrays.asList(new IOException("second ioexception"), chunk2))
- .build();
-
- performInteractions(interactions, callback);
- while (!signal.tryAcquire(3, 500, TimeUnit.MILLISECONDS));
- assertEquals(re, result.get(0));
- assertEquals(chunk1, result.get(1));
- assertEquals(chunk2, result.get(2));
- }
-
- private static RetryingChunkClient performInteractions(
- Map<Integer, List<Object>> interactions,
- ChunkReceivedCallback callback) throws IOException, InterruptedException {
- RssConf conf = new RssConf();
- conf.set("rss.data.io.maxRetries", "1");
- conf.set("rss.data.io.retryWait", "0");
-
- // Contains all chunk ids that are referenced across all interactions.
- LinkedHashSet<Integer> chunkIds = Sets.newLinkedHashSet(interactions.keySet());
-
- final TransportClient client = new DummyTransportClient(chunkIds.size(), interactions);
- final TransportClientFactory clientFactory = mock(TransportClientFactory.class);
- doAnswer(invocation -> client).when(clientFactory).createClient(anyString(), anyInt());
-
- RetryingChunkClient retryingChunkClient = new RetryingChunkClient(
- conf, "test", masterLocation, callback, clientFactory);
- chunkIds.stream().sorted().forEach(retryingChunkClient::fetchChunk);
- return retryingChunkClient;
- }
-
- private static class DummyTransportClient extends TransportClient {
-
- private static final Channel channel = mock(Channel.class);
- private static final TransportResponseHandler handler = mock(TransportResponseHandler.class);
-
- private final long streamId = new Random().nextInt(Integer.MAX_VALUE) * 1000L;
- private final int numChunks;
- private final Map<Integer, List<Object>> interactions;
- private final Map<Integer, Integer> chunkIdToInterActionIndex;
-
- private final ScheduledExecutorService schedule =
- ThreadUtils.newDaemonThreadPoolScheduledExecutor("test-fetch-chunk", 3);
-
- DummyTransportClient(int numChunks, Map<Integer, List<Object>> interactions) {
- super(channel, handler);
- this.numChunks = numChunks;
- this.interactions = interactions;
- this.chunkIdToInterActionIndex = new ConcurrentHashMap<>();
- interactions.keySet().forEach((chunkId) -> chunkIdToInterActionIndex.putIfAbsent(chunkId, 0));
- }
-
- @Override
- public void fetchChunk(long streamId, int chunkId, ChunkReceivedCallback callback) {
- schedule.schedule(() -> {
- Object action;
- List<Object> interaction = interactions.get(chunkId);
- synchronized (chunkIdToInterActionIndex) {
- int index = chunkIdToInterActionIndex.get(chunkId);
- assertTrue(index < interaction.size());
- action = interaction.get(index);
- chunkIdToInterActionIndex.put(chunkId, index + 1);
- }
-
- if (action instanceof ManagedBuffer) {
- callback.onSuccess(chunkId, (ManagedBuffer) action);
- } else if (action instanceof Exception) {
- callback.onFailure(chunkId, (Exception) action);
- } else {
- fail("Can only handle ManagedBuffers and Exceptions, got " + action);
- }
- }, 500, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs) {
- StreamHandle handle = new StreamHandle(streamId, numChunks);
- return handle.toByteBuffer();
- }
-
- @Override
- public void close() {
- super.close();
- schedule.shutdownNow();
- }
- }
-}
diff --git a/common/src/main/java/com/aliyun/emr/rss/common/network/client/TransportClient.java b/common/src/main/java/com/aliyun/emr/rss/common/network/client/TransportClient.java
index 6c97c1d3..c2080e17 100644
--- a/common/src/main/java/com/aliyun/emr/rss/common/network/client/TransportClient.java
+++ b/common/src/main/java/com/aliyun/emr/rss/common/network/client/TransportClient.java
@@ -21,14 +21,12 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.util.concurrent.SettableFuture;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -195,7 +193,7 @@ public class TransportClient implements Closeable {
* Synchronously sends an opaque message to the RpcHandler on the server-side, waiting for up to
* a specified timeout for a response.
*/
- public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs) {
+ public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs) throws IOException {
final SettableFuture<ByteBuffer> result = SettableFuture.create();
sendRpc(message, new RpcResponseCallback() {
@@ -216,10 +214,8 @@ public class TransportClient implements Closeable {
try {
return result.get(timeoutMs, TimeUnit.MILLISECONDS);
- } catch (ExecutionException e) {
- throw Throwables.propagate(e.getCause());
} catch (Exception e) {
- throw Throwables.propagate(e);
+ throw new IOException("Exception in sendRpcSync", e);
}
}
diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala
index fed1e2aa..1e3f6298 100644
--- a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala
+++ b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala
@@ -437,6 +437,14 @@ object RssConf extends Logging {
conf.getInt("rss.fetch.chunk.maxReqsInFlight", 3)
}
+ def fetchChunkMaxRetries(conf: RssConf): Int = {
+ conf.getInt("rss.fetch.chunk.max.retry", 3)
+ }
+
+ def testFetchFailure(conf: RssConf): Boolean = {
+ conf.getBoolean("rss.test.fetch.chunk.failure", false)
+ }
+
def replicate(conf: RssConf): Boolean = {
conf.getBoolean("rss.push.data.replicate", false)
}
diff --git a/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/FileWriterSuiteJ.java b/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/FileWriterSuiteJ.java
index 6f7c49d1..0eae9313 100644
--- a/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/FileWriterSuiteJ.java
+++ b/server-worker/src/test/java/com/aliyun/emr/rss/service/deploy/worker/FileWriterSuiteJ.java
@@ -163,7 +163,7 @@ public class FileWriterSuiteJ {
return openBlocks.toByteBuffer();
}
- private void setUpConn(TransportClient client) {
+ private void setUpConn(TransportClient client) throws IOException {
ByteBuffer resp = client.sendRpcSync(createOpenMessage(), 10000);
StreamHandle streamHandle = (StreamHandle) Message.decode(resp);
streamId = streamHandle.streamId;