You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/12/23 00:22:58 UTC
spark git commit: [SPARK-18972][CORE] Fix the netty thread names for
RPC
Repository: spark
Updated Branches:
refs/heads/master 2246ce88a -> f252cb5d1
[SPARK-18972][CORE] Fix the netty thread names for RPC
## What changes were proposed in this pull request?
Right now the name of threads created by Netty for Spark RPC are `shuffle-client-**` and `shuffle-server-**`. It's pretty confusing.
This PR just uses the module name in TransportConf to set the thread name. In addition, it also includes the following minor fixes:
- TransportChannelHandler.channelActive and channelInactive should call the corresponding super methods.
- Make ShuffleBlockFetcherIterator throw NoSuchElementException if it has no more elements. Otherwise, if the caller calls `next` without `hasNext`, it will just hang.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <sh...@databricks.com>
Closes #16380 from zsxwing/SPARK-18972.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f252cb5d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f252cb5d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f252cb5d
Branch: refs/heads/master
Commit: f252cb5d161e064d39cc1ed1d9299307a0636174
Parents: 2246ce8
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Thu Dec 22 16:22:55 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Thu Dec 22 16:22:55 2016 -0800
----------------------------------------------------------------------
.../spark/network/client/TransportClientFactory.java | 6 ++++--
.../spark/network/server/TransportChannelHandler.java | 12 ++++++------
.../apache/spark/network/server/TransportServer.java | 2 +-
.../org/apache/spark/network/util/TransportConf.java | 4 ++++
.../spark/storage/ShuffleBlockFetcherIterator.scala | 4 ++++
5 files changed, 19 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f252cb5d/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index e895f13..cb10edf 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -100,8 +100,10 @@ public class TransportClientFactory implements Closeable {
IOMode ioMode = IOMode.valueOf(conf.ioMode());
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
- // TODO: Make thread pool name configurable.
- this.workerGroup = NettyUtils.createEventLoop(ioMode, conf.clientThreads(), "shuffle-client");
+ this.workerGroup = NettyUtils.createEventLoop(
+ ioMode,
+ conf.clientThreads(),
+ conf.getModuleName() + "-client");
this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads());
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f252cb5d/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
index c33848c..c6ccae1 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
@@ -88,14 +88,14 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
try {
requestHandler.channelActive();
} catch (RuntimeException e) {
- logger.error("Exception from request handler while registering channel", e);
+ logger.error("Exception from request handler while channel is active", e);
}
try {
responseHandler.channelActive();
} catch (RuntimeException e) {
- logger.error("Exception from response handler while registering channel", e);
+ logger.error("Exception from response handler while channel is active", e);
}
- super.channelRegistered(ctx);
+ super.channelActive(ctx);
}
@Override
@@ -103,14 +103,14 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
try {
requestHandler.channelInactive();
} catch (RuntimeException e) {
- logger.error("Exception from request handler while unregistering channel", e);
+ logger.error("Exception from request handler while channel is inactive", e);
}
try {
responseHandler.channelInactive();
} catch (RuntimeException e) {
- logger.error("Exception from response handler while unregistering channel", e);
+ logger.error("Exception from response handler while channel is inactive", e);
}
- super.channelUnregistered(ctx);
+ super.channelInactive(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/spark/blob/f252cb5d/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
index 0d7a677..047c5f3 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -89,7 +89,7 @@ public class TransportServer implements Closeable {
IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup =
- NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
+ NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
EventLoopGroup workerGroup = bossGroup;
PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
http://git-wip-us.apache.org/repos/asf/spark/blob/f252cb5d/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 223d6d8..6a557fa 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -70,6 +70,10 @@ public class TransportConf {
return "spark." + module + "." + suffix;
}
+ public String getModuleName() {
+ return module;
+ }
+
/** IO mode: nio or epoll */
public String ioMode() { return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(); }
http://git-wip-us.apache.org/repos/asf/spark/blob/f252cb5d/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index b720aae..f890611 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -315,6 +315,10 @@ final class ShuffleBlockFetcherIterator(
* Throws a FetchFailedException if the next block could not be fetched.
*/
override def next(): (BlockId, InputStream) = {
+ if (!hasNext) {
+ throw new NoSuchElementException
+ }
+
numBlocksProcessed += 1
var result: FetchResult = null
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org