You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2022/11/05 10:50:32 UTC

[GitHub] [hbase] carp84 commented on a diff in pull request #4808: HBASE-27218 Support rolling upgrading

carp84 commented on code in PR #4808:
URL: https://github.com/apache/hbase/pull/4808#discussion_r1014614083


##########
hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java:
##########
@@ -184,4 +185,13 @@ void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
    * @return Whether the replication queue table exists
    */
   boolean hasData() throws ReplicationException;
+
+  // the below 3 methods are used for migrating

Review Comment:
   Minor: would still be nice to have some javadoc for these 3 methods



##########
hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+
+/**
+ * Just retain a small set of the methods for the old zookeeper based replication queue storage, for
+ * migrating.
+ */
+@InterfaceAudience.Private
+public class ZKReplicationQueueStorage extends ZKReplicationStorageBase {

Review Comment:
   Suggest to rename the class to something like `ZKReplicationQueueStorageForMigration` to prevent future developers from misunderstanding it as a complete implementation of `ReplicationQueueStorage`



##########
hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java:
##########
@@ -541,4 +538,60 @@ public boolean hasData() throws ReplicationException {
       throw new ReplicationException("failed to get replication queue table", e);
     }
   }
+
+  @Override
+  public void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
+    throws ReplicationException {
+    List<Put> puts = new ArrayList<>();
+    for (ReplicationQueueData data : datas) {
+      if (data.getOffsets().isEmpty()) {
+        continue;
+      }
+      Put put = new Put(Bytes.toBytes(data.getId().toString()));
+      data.getOffsets().forEach((walGroup, offset) -> {
+        put.addColumn(QUEUE_FAMILY, Bytes.toBytes(walGroup), Bytes.toBytes(offset.toString()));
+      });
+      puts.add(put);
+    }
+    try (Table table = conn.getTable(tableName)) {
+      table.put(puts);
+    } catch (IOException e) {
+      throw new ReplicationException("failed to batch update queues", e);
+    }
+  }
+
+  @Override
+  public void batchUpdateLastSequenceIds(List<ZkLastPushedSeqId> lastPushedSeqIds)
+    throws ReplicationException {
+    Map<String, Put> peerId2Put = new HashMap<>();
+    for (ZkLastPushedSeqId lastPushedSeqId : lastPushedSeqIds) {
+      peerId2Put
+        .computeIfAbsent(lastPushedSeqId.getPeerId(), peerId -> new Put(Bytes.toBytes(peerId)))
+        .addColumn(LAST_SEQUENCE_ID_FAMILY, Bytes.toBytes(lastPushedSeqId.getEncodedRegionName()),
+          Bytes.toBytes(lastPushedSeqId.getLastPushedSeqId()));
+    }
+    try (Table table = conn.getTable(tableName)) {
+      table
+        .put(peerId2Put.values().stream().filter(p -> !p.isEmpty()).collect(Collectors.toList()));
+    } catch (IOException e) {
+      throw new ReplicationException("failed to batch update last pushed sequence ids", e);
+    }
+  }
+
+  @Override
+  public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs)
+    throws ReplicationException {
+    if (hfileRefs.isEmpty()) {
+      return;
+    }
+    Put put = new Put(Bytes.toBytes(peerId));
+    for (String ref : hfileRefs) {
+      put.addColumn(HFILE_REF_FAMILY, Bytes.toBytes(ref), HConstants.EMPTY_BYTE_ARRAY);
+    }
+    try (Table table = conn.getTable(tableName)) {
+      table.put(put);
+    } catch (IOException e) {
+      throw new ReplicationException("failed to batch update hfile references", e);

Review Comment:
   What happens if the batch update partially failed? I could see that we could have eventual consistency, but would there be any problem in between? It seems we also don't have test case to cover such scenario, so I'm a little bit curious.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@hbase.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org