You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/03/10 18:00:24 UTC

[hbase] branch master updated: HBASE-23851 Log networks and bind addresses when multicast publisher/listener enabled (#1173)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new e1e8f39  HBASE-23851 Log networks and bind addresses when multicast publisher/listener enabled (#1173)
e1e8f39 is described below

commit e1e8f396ca078fda3199d29cc5a0d7f7e3bbc20c
Author: Michael Stack <sa...@users.noreply.github.com>
AuthorDate: Tue Mar 10 10:55:44 2020 -0700

    HBASE-23851 Log networks and bind addresses when multicast publisher/listener enabled (#1173)
    
    Signed-off-by: Sean Busbey <bu...@apache.org>
    Signed-off-by: Nick Dimiduk <nd...@apache.org>
    Signed-off-by: Viraj Jasani <vj...@apache.org>
---
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |  3 +-
 .../hadoop/hbase/client/ClusterStatusListener.java | 20 ++++++------
 .../org/apache/hadoop/hbase/ScheduledChore.java    |  4 +--
 .../hbase/master/ClusterStatusPublisher.java       | 37 ++++++++++++++++------
 .../org/apache/hadoop/hbase/master/HMaster.java    |  1 +
 5 files changed, 40 insertions(+), 25 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index bd39ac3..0f12e90 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -171,8 +171,7 @@ class AsyncConnectionImpl implements AsyncConnection {
               }
             }, conf, listenerClass);
         } catch (IOException e) {
-          LOG.warn("Failed to create ClusterStatusListener, not a critical problem, ignoring...",
-            e);
+          LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e);
         }
       }
     }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
index 7361238..ccdfec7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java
@@ -37,10 +37,6 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
@@ -51,8 +47,10 @@ import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramChannel;
 import org.apache.hbase.thirdparty.io.netty.channel.socket.DatagramPacket;
 import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
-
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A class that receives the cluster status, and provide it as a set of service to the client.
@@ -208,10 +206,9 @@ class ClusterStatusListener implements Closeable {
       try {
         Bootstrap b = new Bootstrap();
         b.group(group)
-            .channel(NioDatagramChannel.class)
-            .option(ChannelOption.SO_REUSEADDR, true)
-            .handler(new ClusterStatusHandler());
-
+          .channel(NioDatagramChannel.class)
+          .option(ChannelOption.SO_REUSEADDR, true)
+          .handler(new ClusterStatusHandler());
         channel = (DatagramChannel)b.bind(bindAddress, port).sync().channel();
       } catch (InterruptedException e) {
         close();
@@ -225,9 +222,11 @@ class ClusterStatusListener implements Closeable {
         ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress());
       }
 
+      LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina);
       channel.joinGroup(ina, ni, null, channel.newPromise());
     }
 
+
     @Override
     public void close() {
       if (channel != null) {
@@ -252,8 +251,7 @@ class ClusterStatusListener implements Closeable {
       }
 
       @Override
-      public boolean acceptInboundMessage(Object msg)
-          throws Exception {
+      public boolean acceptInboundMessage(Object msg) throws Exception {
         return super.acceptInboundMessage(msg);
       }
 
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java
index b5197a0..86dcb2c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ScheduledChore.java
@@ -373,7 +373,7 @@ public abstract class ScheduledChore implements Runnable {
   @InterfaceAudience.Private
   @Override
   public String toString() {
-    return "[ScheduledChore: Name: " + getName() + " Period: " + getPeriod() + " Unit: "
-        + getTimeUnit() + "]";
+    return "ScheduledChore name=" + getName() + ", period=" + getPeriod() +
+      ", unit=" + getTimeUnit();
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
index af35ce4..8257466 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -49,12 +48,11 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.yetus.audience.InterfaceAudience;
-
 import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
-import org.apache.hbase.thirdparty.io.netty.bootstrap.ChannelFactory;
 import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelException;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelFactory;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@@ -65,6 +63,8 @@ import org.apache.hbase.thirdparty.io.netty.channel.socket.InternetProtocolFamil
 import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioDatagramChannel;
 import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
 import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -75,6 +75,7 @@ import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
  */
 @InterfaceAudience.Private
 public class ClusterStatusPublisher extends ScheduledChore {
+  private static Logger LOG = LoggerFactory.getLogger(ClusterStatusPublisher.class);
   /**
    * The implementation class used to publish the status. Default is null (no publish).
    * Use org.apache.hadoop.hbase.master.ClusterStatusPublisher.MulticastPublisher to multicast the
@@ -113,7 +114,7 @@ public class ClusterStatusPublisher extends ScheduledChore {
   public ClusterStatusPublisher(HMaster master, Configuration conf,
                                 Class<? extends Publisher> publisherClass)
       throws IOException {
-    super("HBase clusterStatusPublisher for " + master.getName(), master, conf.getInt(
+    super("ClusterStatusPublisher for=" + master.getName(), master, conf.getInt(
       STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD));
     this.master = master;
     this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, DEFAULT_STATUS_PUBLISH_PERIOD);
@@ -126,6 +127,11 @@ public class ClusterStatusPublisher extends ScheduledChore {
     connected = true;
   }
 
+  @Override
+  public String toString() {
+    return super.toString() + ", publisher=" + this.publisher + ", connected=" + this.connected;
+  }
+
   // For tests only
   protected ClusterStatusPublisher() {
     master = null;
@@ -246,6 +252,11 @@ public class ClusterStatusPublisher extends ScheduledChore {
     }
 
     @Override
+    public String toString() {
+      return "channel=" + this.channel;
+    }
+
+    @Override
     public void connect(Configuration conf) throws IOException {
       String mcAddress = conf.get(HConstants.STATUS_MULTICAST_ADDRESS,
           HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS);
@@ -262,7 +273,6 @@ public class ClusterStatusPublisher extends ScheduledChore {
         close();
         throw new IOException("Can't connect to " + mcAddress, e);
       }
-
       final InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
 
       InternetProtocolFamily family;
@@ -285,17 +295,23 @@ public class ClusterStatusPublisher extends ScheduledChore {
         }
         ni = NetworkInterface.getByInetAddress(localAddress);
       }
-
       Bootstrap b = new Bootstrap();
       b.group(group)
         .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family))
         .option(ChannelOption.SO_REUSEADDR, true)
         .handler(new ClusterMetricsEncoder(isa));
-
       try {
+        LOG.debug("Channel bindAddress={}, networkInterface={}, INA={}", bindAddress, ni, ina);
         channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel();
         channel.joinGroup(ina, ni, null, channel.newPromise()).sync();
         channel.connect(isa).sync();
+        // Set into configuration in case many networks available. Do this for tests so that
+        // server and client use same Interface (presuming share same Configuration).
+        // TestAsyncTableRSCrashPublish was failing when connected to VPN because extra networks
+        // available with Master binding on one Interface and client on another so test failed.
+        if (ni != null) {
+          conf.set(HConstants.STATUS_MULTICAST_NI_NAME, ni.getName());
+        }
       } catch (InterruptedException e) {
         close();
         throw ExceptionUtil.asInterrupt(e);
@@ -303,9 +319,9 @@ public class ClusterStatusPublisher extends ScheduledChore {
     }
 
     private static final class HBaseDatagramChannelFactory<T extends Channel>
-      implements ChannelFactory<T> {
+        implements ChannelFactory<T> {
       private final Class<? extends T> clazz;
-      private InternetProtocolFamily family;
+      private final InternetProtocolFamily family;
 
       HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) {
         this.clazz = clazz;
@@ -347,6 +363,7 @@ public class ClusterStatusPublisher extends ScheduledChore {
 
     @Override
     public void publish(ClusterMetrics cs) {
+      LOG.info("PUBLISH {}", cs);
       channel.writeAndFlush(cs).syncUninterruptibly();
     }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 05117da..d0fd7f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -571,6 +571,7 @@ public class HMaster extends HRegionServer implements MasterServices {
               " is not set - not publishing status");
         } else {
           clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
+          LOG.debug("Created {}", this.clusterStatusPublisherChore);
           getChoreService().scheduleChore(clusterStatusPublisherChore);
         }
       }