You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/09/17 10:42:21 UTC

[GitHub] [hbase] Apache9 commented on a change in pull request #2413: Add a new ReplicationSource for hbase:meta WAL files. The new

Apache9 commented on a change in pull request #2413:
URL: https://github.com/apache/hbase/pull/2413#discussion_r490111996



##########
File path: hbase-common/src/test/java/org/apache/hadoop/hbase/Waiter.java
##########
@@ -90,7 +91,7 @@ public static float getWaitForRatio(Configuration conf) {
      * @return the boolean result of the evaluation.
      * @throws E thrown if the predicate evaluation could not evaluate.
      */
-    boolean evaluate() throws E;
+    boolean evaluate() throws E, IOException;

Review comment:
       Why this change? E can be any exception type...

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -2429,15 +2431,11 @@ public FlushResultImpl flushcache(List<byte[]> families,
         if (!writestate.flushing && writestate.writesEnabled) {
           this.writestate.flushing = true;
         } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("NOT flushing memstore for region " + this
-                + ", flushing=" + writestate.flushing + ", writesEnabled="
-                + writestate.writesEnabled);
-          }
-          String msg = "Not flushing since "
-              + (writestate.flushing ? "already flushing"
-              : "writes not enabled");
+          String msg = "NOT flushing " + this + "; " +

Review comment:
       Why removing the LOG.isDebugEnabled test? It is used to avoid String concating.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
##########
@@ -168,7 +180,7 @@ public static AssignRegionHandler create(HRegionServer server, RegionInfo region
       long openProcId, TableDescriptor tableDesc, long masterSystemTime) {
     EventType eventType;
     if (regionInfo.isMetaRegion()) {
-      eventType = EventType.M_RS_CLOSE_META;
+      eventType = EventType.M_RS_OPEN_META;

Review comment:
       This is a bug?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
##########
@@ -439,16 +439,38 @@ public void append(TableName tableName, byte[] encodedRegionName, byte[] row,
       // invalidate the cache and check from meta
       RegionLocations locations = null;
       boolean useCache = true;
-      while (true) {
+      RETRY: while (true) {
         // get the replicas of the primary region
         try {
           locations = RegionReplicaReplayCallable
               .getRegionLocations(connection, tableName, row, useCache, 0);
-
           if (locations == null) {
             throw new HBaseIOException("Cannot locate locations for "
                 + tableName + ", row:" + Bytes.toStringBinary(row));
           }
+          if (useCache && locations.size() <= 1) {
+            // If locations size is 1 or less, then that is odd. We are supposed to be replicating
+            // to replicas. If size is '1', then its probably just the primary -- no replication
+            // will be done (we only replicate edits to the secondaries). So, the replicas are not
+            // online yet -- not configured or are taking their time coming up after the primary.
+            // So, need to notice when they come on line, promptly. A getRegionLocations with
+            // useCache=true will not find newly-added replicas because it will satisfy the request
+            // out of cache w/o going back to the hbase:meta table. And we can't skip cache on
+            // every replication append -- it will kill performance. So, lets just do it on
+            // occasion. When replicas come up, they ask the primary to flush. Lets recheck
+            // locations on flush. Flush edits happen at start of the flush and at the end. We
+            // could perhaps avoid lookup in one of these cases by taking apart the edits flush
+            // description to figure if start or end of flush but that is tying the code here too
+            // close to the flush implementation. Lets just look for the flush meta-edit and if we
+            // find it AND locations==1, lets do a re-lookup. If still no replicas when flush is
+            // done, then lookup again at flush end.
+            for (Entry e: entries) {
+              if (e.getEdit().isFlushMarker()) {
+                useCache = false;
+                continue RETRY; // Go back up to the outer while-loop and redo locations get.

Review comment:
       No backoff here?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -564,9 +564,9 @@ public boolean isCompactionNeeded() {
     @Override
     public String toString() {
       return new StringBuilder()
-        .append("flush result:").append(result).append(", ")
-        .append("failureReason:").append(failureReason).append(",")
-        .append("flush seq id").append(flushSequenceId).toString();
+        .append("result: ").append(result).append("; ")
+        .append("reason: ").append(failureReason).append("; ")

Review comment:
       Why removing failure? It is the failure reason.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
##########
@@ -2451,8 +2448,23 @@ private void triggerFlushInPrimaryRegion(final HRegion region) {
 
     // submit it to be handled by one of the handlers so that we do not block OpenRegionHandler
     if (this.executorService != null) {
-      this.executorService.submit(new RegionReplicaFlushHandler(this, clusterConnection,
-          rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region));
+      EventHandler eh = new RegionReplicaFlushHandler(this, clusterConnection,
+        rpcRetryingCallerFactory, rpcControllerFactory, operationTimeout, region);
+      if (this.executorService.hasExecutor(eh.getEventType())) {
+        this.executorService.submit(eh);
+      } else {
+        // There is no executor for this type of event. Run in a new thread.

Review comment:
       I think this is not the correct way here. You should add the executor for this event type to the executorService in startServices method.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
##########
@@ -129,8 +135,15 @@ public void process() throws IOException {
       }
       // pass null for the last parameter, which used to be a CancelableProgressable, as now the
       // opening can not be interrupted by a close request any more.
-      region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), rs.getConfiguration(),
-        rs, null);
+      Configuration conf = rs.getConfiguration();
+      TableName tn = htd.getTableName();
+      if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(conf, tn)) {
+        if (RegionReplicaUtil.isDefaultReplica(this.regionInfo.getReplicaId())) {
+          // Add the hbase:meta replication source on replica zero/default.
+          rs.getReplicationSourceService().getReplicationManager().addHBaseMetaSource();
+        }
+      }
+      region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), conf, rs, null);
     } catch (IOException e) {
       cleanUpAndReportFailure(e);

Review comment:
       Will we remove the meta source added above in this method?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -435,28 +486,60 @@ public void refreshSources(String peerId) throws IOException {
    * @param src source to clear
    */
   void removeRecoveredSource(ReplicationSourceInterface src) {
-    LOG.info("Done with the recovered queue " + src.getQueueId());
+    LOG.info("Done with recovered replication queue {}", src.getStoreQueueId());
     this.oldsources.remove(src);
     // Delete queue from storage and memory
-    deleteQueue(src.getQueueId());
-    this.walsByIdRecoveredQueues.remove(src.getQueueId());
+    deleteQueue(src);
+    this.walsByIdRecoveredQueues.remove(src.getStoreQueueId());
   }
 
   /**
    * Clear the metrics and related replication queue of the specified old source
-   * @param src source to clear
    */
-  void removeSource(ReplicationSourceInterface src) {
-    LOG.info("Done with the queue " + src.getQueueId());
+  public void removeSource(ReplicationSourceInterface src) {
+    LOG.info("Done with replication queue {}", src.getStoreQueueId());
     this.sources.remove(src.getPeerId());
     // Delete queue from storage and memory
-    deleteQueue(src.getQueueId());
-    this.walsById.remove(src.getQueueId());
+    deleteQueue(src);
+    this.walsById.remove(src.getStoreQueueId());
   }
 
   /**
-   * Delete a complete queue of wals associated with a replication source
-   * @param queueId the id of replication queue to delete
+   * Add hbase:meta replication source. Called on open of hbase:meta.
+   * @see #removeHBaseMetaSource()
+   */
+  public ReplicationSourceInterface addHBaseMetaSource() throws IOException {

Review comment:
       It is because this method? Then let's add a synchornized in this method and do not change the logic of the addSource method?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -231,36 +233,41 @@ private void decorateConf() {
     }
   }
 
+  @Override public String toString() {
+    return "ReplicationSource[peerId=" + peerId + "]";
+  }
+
   @Override
   public void enqueueLog(Path wal) {
     if (!this.filterInWALs.test(wal)) {
       LOG.trace("NOT replicating {}", wal);
       return;
     }
-    String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
-    PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
+    // Use WAL prefix as the WALGroupId for this peer.
+    String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());

Review comment:
       Better still name it walPrefix? The method is called getWALPrefixFromWALName, name it walGroupId adds new constraint.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -160,9 +163,9 @@
    */
   private final List<WALEntryFilter> baseFilterOutWALEntries;
 
-  ReplicationSource() {
+  ReplicationSource(WALProvider walProvider) {

Review comment:
       I think the design here, is to not have any parameters for constructor, and then use init method below to do the initialize work? Otherwise if we could even pass a WALProvider here, why not also pass other parameters at the same time? At least, Configuration, FileSystem, Server will not be changed and easy to get when constructing a ReplicationSource.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -320,53 +336,89 @@ public void removePeer(String peerId) {
   }
 
   /**
-   * Factory method to create a replication source
-   * @param queueId the id of the replication queue
+   * Factory method to create and initialize a replication source
+   * @param peerId the id of the replication queue
    * @return the created source
    */
-  private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer)
+  private ReplicationSourceInterface createSource(String peerId, ReplicationPeer replicationPeer)
       throws IOException {
-    ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
+    ReplicationSourceInterface rs = ReplicationSourceFactory.create(conf, peerId, this.walFactory);
+    rs.init(conf, fs, this, queueStorage, replicationPeer, server, peerId, clusterId,
+      new MetricsSource(peerId));
+    return addWALActionsListener(rs);
+  }
 
-    MetricsSource metrics = new MetricsSource(queueId);
-    // init replication source
-    src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
-      walFileLengthProvider, metrics);
-    return src;
+  /**
+   * Add a listener on the WALProvider used by the passed ReplicationSource. Only add the listener
+   * ONCE. addWALActionsListener is not idempotent and it would be tough to make it so; hence a bit
+   * of gymnastics are needed here to ensure we register the listener once only per walProvider.
+   * @return Returns the <code>rs</code> passed as a parameter.
+   */
+  private ReplicationSourceInterface addWALActionsListener(ReplicationSourceInterface rs) {
+    WALProvider walProvider = rs.getWALProvider();
+    if (walProvider != null) {
+      synchronized (this.walProvidersWithListenersInstalled) {
+        if (!this.walProvidersWithListenersInstalled.contains(walProvider)) {
+          walProvider.addWALActionsListener(new ReplicationSourceWALActionListener(conf, this));
+          this.walProvidersWithListenersInstalled.add(walProvider);
+        }
+      }
+    }
+    return rs;
   }
 
+
   /**
-   * Add a normal source for the given peer on this region server. Meanwhile, add new replication
-   * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal
-   * group and do replication
+   * Add a replication source for the given peer on this region server. Peer should have been
+   * added to {@link #replicationPeers} before calling this method because we expect to find
+   * the peerIds {@link ReplicationPeer} in {@link #replicationPeers}.
    * @param peerId the id of the replication peer
-   * @return the source that was created
    */
   @VisibleForTesting
   ReplicationSourceInterface addSource(String peerId) throws IOException {
-    ReplicationPeer peer = replicationPeers.getPeer(peerId);
-    ReplicationSourceInterface src = createSource(peerId, peer);
-    // synchronized on latestPaths to avoid missing the new log
+    return addSource(replicationPeers.getPeer(peerId));
+  }
+
+  /**
+   * Adds a replication source. Creates source if not present otherwise fetches current source
+   * from {@link #sources}.
+   * @see #addSource(String) for method to add peer AND persist to replication store.
+   */
+  private ReplicationSourceInterface addSource(ReplicationPeer peer) throws IOException {
+    String id = peer.getId();
+    ReplicationSourceInterface src;
+    // Synchronized on latestPaths to avoid missing the new WAL.
     synchronized (this.latestPaths) {
-      this.sources.put(peerId, src);
+      src = this.sources.get(id);

Review comment:
       Why this change? In the old code we do not need to test whether we have already added the replication, why removing this assumption? It makes code much complicated.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
##########
@@ -320,53 +336,89 @@ public void removePeer(String peerId) {
   }
 
   /**
-   * Factory method to create a replication source
-   * @param queueId the id of the replication queue
+   * Factory method to create and initialize a replication source
+   * @param peerId the id of the replication queue
    * @return the created source
    */
-  private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer)
+  private ReplicationSourceInterface createSource(String peerId, ReplicationPeer replicationPeer)
       throws IOException {
-    ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
+    ReplicationSourceInterface rs = ReplicationSourceFactory.create(conf, peerId, this.walFactory);
+    rs.init(conf, fs, this, queueStorage, replicationPeer, server, peerId, clusterId,
+      new MetricsSource(peerId));
+    return addWALActionsListener(rs);
+  }
 
-    MetricsSource metrics = new MetricsSource(queueId);
-    // init replication source
-    src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
-      walFileLengthProvider, metrics);
-    return src;
+  /**
+   * Add a listener on the WALProvider used by the passed ReplicationSource. Only add the listener
+   * ONCE. addWALActionsListener is not idempotent and it would be tough to make it so; hence a bit
+   * of gymnastics are needed here to ensure we register the listener once only per walProvider.
+   * @return Returns the <code>rs</code> passed as a parameter.
+   */
+  private ReplicationSourceInterface addWALActionsListener(ReplicationSourceInterface rs) {
+    WALProvider walProvider = rs.getWALProvider();
+    if (walProvider != null) {
+      synchronized (this.walProvidersWithListenersInstalled) {
+        if (!this.walProvidersWithListenersInstalled.contains(walProvider)) {
+          walProvider.addWALActionsListener(new ReplicationSourceWALActionListener(conf, this));
+          this.walProvidersWithListenersInstalled.add(walProvider);
+        }
+      }
+    }
+    return rs;
   }
 
+
   /**
-   * Add a normal source for the given peer on this region server. Meanwhile, add new replication
-   * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal
-   * group and do replication
+   * Add a replication source for the given peer on this region server. Peer should have been
+   * added to {@link #replicationPeers} before calling this method because we expect to find
+   * the peerIds {@link ReplicationPeer} in {@link #replicationPeers}.
    * @param peerId the id of the replication peer
-   * @return the source that was created
    */
   @VisibleForTesting
   ReplicationSourceInterface addSource(String peerId) throws IOException {
-    ReplicationPeer peer = replicationPeers.getPeer(peerId);
-    ReplicationSourceInterface src = createSource(peerId, peer);
-    // synchronized on latestPaths to avoid missing the new log
+    return addSource(replicationPeers.getPeer(peerId));
+  }
+
+  /**
+   * Adds a replication source. Creates source if not present otherwise fetches current source
+   * from {@link #sources}.
+   * @see #addSource(String) for method to add peer AND persist to replication store.
+   */
+  private ReplicationSourceInterface addSource(ReplicationPeer peer) throws IOException {
+    String id = peer.getId();
+    ReplicationSourceInterface src;
+    // Synchronized on latestPaths to avoid missing the new WAL.
     synchronized (this.latestPaths) {
-      this.sources.put(peerId, src);
+      src = this.sources.get(id);
+      if (src == null) {
+        // The created source may not be added if we are beat to the creation by another. It is
+        // initialized only so it should be safe to let it go; nothing to cleanup if not started.
+        src = createSource(id, peer);
+        ReplicationSourceInterface newSrc = this.sources.putIfAbsent(id, src);
+        if (newSrc != null) {
+          src.terminate("Another beat us to the installation of an instance of " + id);
+          src = newSrc;
+        }
+      }
       Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
-      this.walsById.put(peerId, walsByGroup);
+      this.walsById.put(id, walsByGroup);
       // Add the latest wal to that source's queue
-      if (this.latestPaths.size() > 0) {
-        for (Path logPath : latestPaths) {
+      if (!this.latestPaths.isEmpty()) {
+        for (Path logPath: this.latestPaths) {
           String name = logPath.getName();
-          String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
           NavigableSet<String> logs = new TreeSet<>();
           logs.add(name);
-          walsByGroup.put(walPrefix, logs);
-          // Abort RS and throw exception to make add peer failed
-          abortAndThrowIOExceptionWhenFail(
-            () -> this.queueStorage.addWAL(server.getServerName(), peerId, name));
+          walsByGroup.put(AbstractFSWALProvider.getWALPrefixFromWALName(name), logs);
+          if (src.isQueueStored()) {

Review comment:
       I wonder whether we really need this flag here, just set queueStoreage to null, or have an Optional<ReplicationQueueStorage>?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -780,9 +767,8 @@ public void postShipEdits(List<Entry> entries, int batchSize) {
     this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
   }
 
-  @Override
-  public WALFileLengthProvider getWALFileLengthProvider() {
-    return walFileLengthProvider;
+  @Override public WALProvider getWALProvider() {

Review comment:
       I think you'd better update your formatter config to put Override annotation to the line before? I think this is the typical type in our code base, and there is no checkstyle issue for this style.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
##########
@@ -18,37 +17,52 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 
 /**
- * Constructs a {@link ReplicationSourceInterface}
+ * Constructs appropriate {@link ReplicationSourceInterface}.
+ * Considers whether Recovery or not, whether hbase:meta Region Read Replicas or not, etc.
  */
 @InterfaceAudience.Private
-public class ReplicationSourceFactory {
-
+public final class ReplicationSourceFactory {
   private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceFactory.class);
 
-  static ReplicationSourceInterface create(Configuration conf, String queueId) {
+  private ReplicationSourceFactory() {}
+
+  static ReplicationSourceInterface create(Configuration conf, String queueId,
+      WALFactory walFactory) throws IOException {
+    // Check for the marker name used to enable a replication source for hbase:meta for region read
+    // replicas. There is no peer nor use of replication storage (or need for queue recovery) when
+    // running hbase:meta region read replicas.
+    if (ReplicationSourceManager.META_REGION_REPLICA_REPLICATION_SOURCE.equals(queueId)) {
+      return new HBaseMetaNoQueueStoreReplicationSource(walFactory == null?

Review comment:
       Why walFactory could be null? And if it is null, what is the value of creating replication source here?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
##########
@@ -191,4 +194,17 @@ void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pai
   default boolean isRecovered() {
     return false;
   }
+
+  /**
+   * ReplicationSources keep their replication queues and queue state out in the replication
+   * store which is external to the current process -- usually zookeeper -- in case of crash so
+   * another process can pick up where the crashed process left off. A few specialized replication
+   * sources intentionally avoid keeping state in the replication store because they reset and
+   * start over after a crash so need of recovery. One such is the replication of the hbase:meta
+   * table. Its ReplicationSource will return false when this method is called.
+   * @return True if this source persists its queues and queue state to the replication store.
+   */
+  default boolean isQueueStored() {

Review comment:
       Maybe 'persistent' is better then 'stored'?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org