You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/03/10 18:00:24 UTC
[hbase] branch master updated: HBASE-23851 Log networks and bind
addresses when multicast publisher/listener enabled (#1173)
This is an automated email from the ASF dual-hosted git repository.
stack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new e1e8f39 HBASE-23851 Log networks and bind addresses when multicast publisher/listener enabled (#1173)
e1e8f39 is described below
commit e1e8f396ca078fda3199d29cc5a0d7f7e3bbc20c
Author: Michael Stack <sa...@users.noreply.github.com>
AuthorDate: Tue Mar 10 10:55:44 2020 -0700
HBASE-23851 Log networks and bind addresses when multicast publisher/listener enabled (#1173)
Signed-off-by: Sean Busbey <bu...@apache.org>
Signed-off-by: Nick Dimiduk <nd...@apache.org>
Signed-off-by: Viraj Jasani <vj...@apache.org>
---
.../hadoop/hbase/client/AsyncConnectionImpl.java | 3 +-
.../hadoop/hbase/client/ClusterStatusListener.java | 20 ++++++------
.../org/apache/hadoop/hbase/ScheduledChore.java | 4 +--
.../hbase/master/ClusterStatusPublisher.java | 37 ++++++++++++++++------
.../org/apache/hadoop/hbase/master/HMaster.java | 1 +
5 files changed, 40 insertions(+), 25 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index bd39ac3..0f12e90 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -171,8 +171,7 @@ class AsyncConnectionImpl implements AsyncConnection {
}
}, conf, listenerClass);
} catch (IOException e) {
- LOG.warn("Failed to create ClusterStatusListener, not a critical problem, ignoring...",
- e);
+ LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e);
}
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
index 7361238..ccdfec7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
@@ -37,10 +37,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
@@ -51,8 +47,10 @@ import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel;
import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
-
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A class that receives the cluster status, and provide it as a set of service to the client.
@@ -208,10 +206,9 @@ class ClusterStatusListener implements Closeable {
try {
Bootstrap b = new Bootstrap();
b.group(group)
- .channel(NioDatagramChannel.class)
- .option(ChannelOption.SO_REUSEADDR, true)
- .handler(new ClusterStatusHandler());
-
+ .channel(NioDatagramChannel.class)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .handler(new ClusterStatusHandler());
channel = (DatagramChannel)b.bind(bindAddress, port).sync().channel();
} catch (InterruptedException e) {
close();
@@ -225,9 +222,11 @@ class ClusterStatusListener implements Closeable {
ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
}
+ LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina);
channel.joinGroup(ina, ni, null, channel.newPromise());
}
+
@Override
public void close() {
if (channel != null) {
@@ -252,8 +251,7 @@ class ClusterStatusListener implements Closeable {
}
@Override
- public boolean acceptInboundMessage(Object msg)
- throws Exception {
+ public boolean acceptInboundMessage(Object msg) throws Exception {
return super.acceptInboundMessage(msg);
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java
index b5197a0..86dcb2c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java
@@ -373,7 +373,7 @@ public abstract class ScheduledChore implements Runnable {
@InterfaceAudience.Private
@Override
public String toString() {
- return "[ScheduledChore: Name: " + getName() + " Period: " + getPeriod() + " Unit: "
- + getTimeUnit() + "]";
+ return "ScheduledChore name=" + getName() + ", period=" + getPeriod() +
+ ", unit=" + getTimeUnit();
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
index af35ce4..8257466 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -49,12 +48,11 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.yetus.audience.InterfaceAudience;
-
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
-import org.apache.hbase.thirdparty.io.netty.bootstrap.ChannelFactory;
import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelException;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelFactory;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@@ -65,6 +63,8 @@ import org.apache.hbase.thirdparty.io.netty.channel.socket.InternetProtocolFamil
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -75,6 +75,7 @@ import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
*/
@InterfaceAudience.Private
public class ClusterStatusPublisher extends ScheduledChore {
+ private static Logger LOG = LoggerFactory.getLogger(ClusterStatusPublisher.class);
/**
* The implementation class used to publish the status. Default is null (no publish).
* Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the
@@ -113,7 +114,7 @@ public class ClusterStatusPublisher extends ScheduledChore {
public ClusterStatusPublisher(HMaster master, Configuration conf,
Class<? extends Publisher> publisherClass)
throws IOException {
- super("HBase clusterStatusPublisher for " + master.getName(), master, conf.getInt(
+ super("ClusterStatusPublisher for=" + master.getName(), master, conf.getInt(
STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD));
this.master = master;
this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
@@ -126,6 +127,11 @@ public class ClusterStatusPublisher extends ScheduledChore {
connected = true;
}
+ @Override
+ public String toString() {
+ return super.toString() + ", publisher=" + this.publisher + ", connected=" + this.connected;
+ }
+
// For tests only
protected ClusterStatusPublisher() {
master = null;
@@ -246,6 +252,11 @@ public class ClusterStatusPublisher extends ScheduledChore {
}
@Override
+ public String toString() {
+ return "channel=" + this.channel;
+ }
+
+ @Override
public void connect(Configuration conf) throws IOException {
String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
@@ -262,7 +273,6 @@ public class ClusterStatusPublisher extends ScheduledChore {
close();
throw new IOException("Can't connect to " + mcAddress, e);
}
-
final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
InternetProtocolFamily family;
@@ -285,17 +295,23 @@ public class ClusterStatusPublisher extends ScheduledChore {
}
ni = NetworkInterface.getByInetAddress(localAddress);
}
-
Bootstrap b = new Bootstrap();
b.group(group)
.channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family))
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new ClusterMetricsEncoder(isa));
-
try {
+ LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina);
channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel();
channel.joinGroup(ina, ni, null, channel.newPromise()).sync();
channel.connect(isa).sync();
+ // Set into configuration in case many networks available. Do this for tests so that
+ // server and client use same Interface (presuming share same Configuration).
+ // TestAsyncTableRSCrashPublish was failing when connected to VPN because extra networks
+ // available with Master binding on one Interface and client on another so test failed.
+ if (ni != null) {
+ conf.set(HConstants.STATUS_MULTICAST_NI_NAME, ni.getName());
+ }
} catch (InterruptedException e) {
close();
throw ExceptionUtil.asInterrupt(e);
@@ -303,9 +319,9 @@ public class ClusterStatusPublisher extends ScheduledChore {
}
private static final class HBaseDatagramChannelFactory<T extends Channel>
- implements ChannelFactory<T> {
+ implements ChannelFactory<T> {
private final Class<? extends T> clazz;
- private InternetProtocolFamily family;
+ private final InternetProtocolFamily family;
HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) {
this.clazz = clazz;
@@ -347,6 +363,7 @@ public class ClusterStatusPublisher extends ScheduledChore {
@Override
public void publish(ClusterMetrics cs) {
+ LOG.info("PUBLISH {}", cs);
channel.writeAndFlush(cs).syncUninterruptibly();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 05117da..d0fd7f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -571,6 +571,7 @@ public class HMaster extends HRegionServer implements MasterServices {
" is not set - not publishing status");
} else {
clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
+ LOG.debug("Created {}", this.clusterStatusPublisherChore);
getChoreService().scheduleChore(clusterStatusPublisherChore);
}
}