You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by we...@apache.org on 2020/09/11 18:00:03 UTC
[hbase] 01/01: Revert "HBASE-25003 Backport HBASE-24350 and
HBASE-24779 to branch-2.2 (#2372)"
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch revert-2372-HBASE-25003_branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit acf068c28d22c1fc56df594492be04f1580de682
Author: Wei-Chiu Chuang <jo...@gmail.com>
AuthorDate: Fri Sep 11 10:59:41 2020 -0700
Revert "HBASE-25003 Backport HBASE-24350 and HBASE-24779 to branch-2.2 (#2372)"
This reverts commit 7cea31869ee836f051fa2298b053676babeb78dc.
---
.../MetricsReplicationGlobalSourceSource.java | 39 ------
.../MetricsReplicationSourceFactory.java | 3 +-
.../MetricsReplicationTableSource.java | 32 -----
...a => MetricsReplicationGlobalSourceSource.java} | 18 +--
.../MetricsReplicationSourceFactoryImpl.java | 8 +-
.../MetricsReplicationSourceSourceImpl.java | 2 +-
.../MetricsReplicationTableSourceImpl.java | 134 ---------------------
.../replication/regionserver/MetricsSource.java | 55 ++-------
.../replication/regionserver/Replication.java | 7 +-
.../regionserver/ReplicationSource.java | 6 +-
.../regionserver/ReplicationSourceManager.java | 25 +---
.../regionserver/ReplicationSourceShipper.java | 10 +-
.../regionserver/ReplicationSourceWALReader.java | 17 ++-
.../replication/regionserver/WALEntryBatch.java | 24 ++--
.../hbase/replication/TestReplicationEndpoint.java | 97 +++------------
.../regionserver/TestWALEntryStream.java | 5 -
16 files changed, 52 insertions(+), 430 deletions(-)
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
deleted file mode 100644
index e373a6c..0000000
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.yetus.audience.InterfaceAudience;
-
-@InterfaceAudience.Private
-public interface MetricsReplicationGlobalSourceSource extends MetricsReplicationSourceSource {
-
- public static final String SOURCE_WAL_READER_EDITS_BUFFER = "source.walReaderEditsBufferUsage";
-
- /**
- * Sets the total usage of memory used by edits in memory read from WALs. The memory represented
- * by this usage measure is across peers/sources. For example, we may batch the same WAL edits
- * multiple times for the sake of replicating them to multiple peers..
- * @param usage The memory used by edits in bytes
- */
- void setWALReaderEditsBufferBytes(long usage);
-
- /**
- * Returns the size, in bytes, of edits held in memory to be replicated across all peers.
- */
- long getWALReaderEditsBufferBytes();
-}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
index 5e4ad27..6534b11 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactory.java
@@ -24,6 +24,5 @@ import org.apache.yetus.audience.InterfaceAudience;
public interface MetricsReplicationSourceFactory {
public MetricsReplicationSinkSource getSink();
public MetricsReplicationSourceSource getSource(String id);
- public MetricsReplicationTableSource getTableSource(String tableName);
- public MetricsReplicationGlobalSourceSource getGlobalSource();
+ public MetricsReplicationSourceSource getGlobalSource();
}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
deleted file mode 100644
index faa944a..0000000
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSource.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.hadoop.hbase.metrics.BaseSource;
-import org.apache.yetus.audience.InterfaceAudience;
-
-@InterfaceAudience.Private
-public interface MetricsReplicationTableSource extends BaseSource {
-
- void setLastShippedAge(long age);
- void incrShippedBytes(long size);
- long getShippedBytes();
- void clear();
- long getLastShippedAge();
-}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
similarity index 93%
rename from hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
rename to hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 963abba..4e8c810 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -24,8 +24,7 @@ import org.apache.hadoop.metrics2.lib.MutableHistogram;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
-public class MetricsReplicationGlobalSourceSourceImpl
- implements MetricsReplicationGlobalSourceSource {
+public class MetricsReplicationGlobalSourceSource implements MetricsReplicationSourceSource{
private static final String KEY_PREFIX = "source.";
private final MetricsReplicationSourceImpl rms;
@@ -54,9 +53,8 @@ public class MetricsReplicationGlobalSourceSourceImpl
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
private final MutableFastCounter failedRecoveryQueue;
- private final MutableGaugeLong walReaderBufferUsageBytes;
- public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms) {
+ public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
this.rms = rms;
ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(SOURCE_AGE_OF_LAST_SHIPPED_OP);
@@ -94,8 +92,6 @@ public class MetricsReplicationGlobalSourceSourceImpl
.getCounter(SOURCE_COMPLETED_RECOVERY_QUEUES, 0L);
failedRecoveryQueue = rms.getMetricsRegistry()
.getCounter(SOURCE_FAILED_RECOVERY_QUEUES, 0L);
- walReaderBufferUsageBytes = rms.getMetricsRegistry()
- .getGauge(SOURCE_WAL_READER_EDITS_BUFFER, 0L);
}
@Override public void setLastShippedAge(long age) {
@@ -264,14 +260,4 @@ public class MetricsReplicationGlobalSourceSourceImpl
public String getMetricsName() {
return rms.getMetricsName();
}
-
- @Override
- public void setWALReaderEditsBufferBytes(long usage) {
- this.walReaderBufferUsageBytes.set(usage);
- }
-
- @Override
- public long getWALReaderEditsBufferBytes() {
- return this.walReaderBufferUsageBytes.value();
- }
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
index c0cd1c7..af310f0 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceFactoryImpl.java
@@ -35,11 +35,7 @@ public class MetricsReplicationSourceFactoryImpl implements MetricsReplicationSo
return new MetricsReplicationSourceSourceImpl(SourceHolder.INSTANCE.source, id);
}
- @Override public MetricsReplicationTableSource getTableSource(String tableName) {
- return new MetricsReplicationTableSourceImpl(SourceHolder.INSTANCE.source, tableName);
- }
-
- @Override public MetricsReplicationGlobalSourceSource getGlobalSource() {
- return new MetricsReplicationGlobalSourceSourceImpl(SourceHolder.INSTANCE.source);
+ @Override public MetricsReplicationSourceSource getGlobalSource() {
+ return new MetricsReplicationGlobalSourceSource(SourceHolder.INSTANCE.source);
}
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 79e9bc1..0ad5052 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -162,7 +162,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
@Override public void incrShippedBytes(long size) {
shippedBytesCounter.incr(size);
- MetricsReplicationGlobalSourceSourceImpl
+ MetricsReplicationGlobalSourceSource
.incrementKBsCounter(shippedBytesCounter, shippedKBsCounter);
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java
deleted file mode 100644
index 7120a73..0000000
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationTableSourceImpl.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.hadoop.metrics2.lib.MutableFastCounter;
-import org.apache.hadoop.metrics2.lib.MutableHistogram;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * This is the metric source for table level replication metrics.
- * We can easy monitor some useful table level replication metrics such as
- * ageOfLastShippedOp and shippedBytes
- */
-@InterfaceAudience.Private
-public class MetricsReplicationTableSourceImpl implements MetricsReplicationTableSource {
-
- private final MetricsReplicationSourceImpl rms;
- private final String tableName;
- private final String ageOfLastShippedOpKey;
- private String keyPrefix;
-
- private final String shippedBytesKey;
-
- private final MutableHistogram ageOfLastShippedOpHist;
- private final MutableFastCounter shippedBytesCounter;
-
- public MetricsReplicationTableSourceImpl(MetricsReplicationSourceImpl rms, String tableName) {
- this.rms = rms;
- this.tableName = tableName;
- this.keyPrefix = "source." + this.tableName + ".";
-
- ageOfLastShippedOpKey = this.keyPrefix + "ageOfLastShippedOp";
- ageOfLastShippedOpHist = rms.getMetricsRegistry().getHistogram(ageOfLastShippedOpKey);
-
- shippedBytesKey = this.keyPrefix + "shippedBytes";
- shippedBytesCounter = rms.getMetricsRegistry().getCounter(shippedBytesKey, 0L);
- }
-
- @Override
- public void setLastShippedAge(long age) {
- ageOfLastShippedOpHist.add(age);
- }
-
- @Override
- public void incrShippedBytes(long size) {
- shippedBytesCounter.incr(size);
- }
-
- @Override
- public void clear() {
- rms.removeMetric(ageOfLastShippedOpKey);
- rms.removeMetric(shippedBytesKey);
- }
-
- @Override
- public long getLastShippedAge() {
- return ageOfLastShippedOpHist.getMax();
- }
-
- @Override
- public long getShippedBytes() {
- return shippedBytesCounter.value();
- }
-
- @Override
- public void init() {
- rms.init();
- }
-
- @Override
- public void setGauge(String gaugeName, long value) {
- rms.setGauge(this.keyPrefix + gaugeName, value);
- }
-
- @Override
- public void incGauge(String gaugeName, long delta) {
- rms.incGauge(this.keyPrefix + gaugeName, delta);
- }
-
- @Override
- public void decGauge(String gaugeName, long delta) {
- rms.decGauge(this.keyPrefix + gaugeName, delta);
- }
-
- @Override
- public void removeMetric(String key) {
- rms.removeMetric(this.keyPrefix + key);
- }
-
- @Override
- public void incCounters(String counterName, long delta) {
- rms.incCounters(this.keyPrefix + counterName, delta);
- }
-
- @Override
- public void updateHistogram(String name, long value) {
- rms.updateHistogram(this.keyPrefix + name, value);
- }
-
- @Override
- public String getMetricsContext() {
- return rms.getMetricsContext();
- }
-
- @Override
- public String getMetricsDescription() {
- return rms.getMetricsDescription();
- }
-
- @Override
- public String getMetricsJmxContext() {
- return rms.getMetricsJmxContext();
- }
-
- @Override
- public String getMetricsName() {
- return rms.getMetricsName();
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 49ef392..830ebe1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -19,11 +19,8 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,8 +47,8 @@ public class MetricsSource implements BaseSource {
private String id;
private final MetricsReplicationSourceSource singleSourceSource;
- private final MetricsReplicationGlobalSourceSource globalSourceSource;
- private Map<String, MetricsReplicationTableSource> singleSourceSourceByTable;
+ private final MetricsReplicationSourceSource globalSourceSource;
+ private Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable;
/**
* Constructor used to register the metrics
@@ -74,8 +71,8 @@ public class MetricsSource implements BaseSource {
* @param globalSourceSource Class to monitor global-scoped metrics
*/
public MetricsSource(String id, MetricsReplicationSourceSource singleSourceSource,
- MetricsReplicationGlobalSourceSource globalSourceSource,
- Map<String, MetricsReplicationTableSource> singleSourceSourceByTable) {
+ MetricsReplicationSourceSource globalSourceSource,
+ Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable) {
this.id = id;
this.singleSourceSource = singleSourceSource;
this.globalSourceSource = globalSourceSource;
@@ -96,29 +93,6 @@ public class MetricsSource implements BaseSource {
}
/**
- * Update the table level replication metrics per table
- *
- * @param walEntries List of pairs of WAL entry and it's size
- */
- public void updateTableLevelMetrics(List<Pair<Entry, Long>> walEntries) {
- for (Pair<Entry, Long> walEntryWithSize : walEntries) {
- Entry entry = walEntryWithSize.getFirst();
- long entrySize = walEntryWithSize.getSecond();
- String tableName = entry.getKey().getTableName().getNameAsString();
- long writeTime = entry.getKey().getWriteTime();
- long age = EnvironmentEdgeManager.currentTime() - writeTime;
-
- // get the replication metrics source for table at the run time
- MetricsReplicationTableSource tableSource = this.getSingleSourceSourceByTable()
- .computeIfAbsent(tableName,
- t -> CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)
- .getTableSource(t));
- tableSource.setLastShippedAge(age);
- tableSource.incrShippedBytes(entrySize);
- }
- }
-
- /**
* Set the age of the last edit that was shipped group by table
* @param timestamp write time of the edit
* @param tableName String as group and tableName
@@ -127,7 +101,7 @@ public class MetricsSource implements BaseSource {
long age = EnvironmentEdgeManager.currentTime() - timestamp;
this.getSingleSourceSourceByTable().computeIfAbsent(
tableName, t -> CompatibilitySingletonFactory
- .getInstance(MetricsReplicationSourceFactory.class).getTableSource(t))
+ .getInstance(MetricsReplicationSourceFactory.class).getSource(t))
.setLastShippedAge(age);
}
@@ -145,7 +119,7 @@ public class MetricsSource implements BaseSource {
* @param walGroup which group we are getting
* @return age
*/
- public long getAgeOfLastShippedOp(String walGroup) {
+ public long getAgeofLastShippedOp(String walGroup) {
return this.ageOfLastShippedOp.get(walGroup) == null ? 0 : ageOfLastShippedOp.get(walGroup);
}
@@ -419,22 +393,7 @@ public class MetricsSource implements BaseSource {
}
@VisibleForTesting
- public Map<String, MetricsReplicationTableSource> getSingleSourceSourceByTable() {
+ public Map<String, MetricsReplicationSourceSource> getSingleSourceSourceByTable() {
return singleSourceSourceByTable;
}
-
- /**
- * Sets the amount of memory in bytes used in this RegionServer by edits pending replication.
- */
- public void setWALReaderEditsBufferUsage(long usageInBytes) {
- globalSourceSource.setWALReaderEditsBufferBytes(usageInBytes);
- }
-
- /**
- * Returns the amount of memory in bytes used in this RegionServer by edits pending replication.
- * @return the amount of memory in bytes used in this RegionServer by edits pending replication.
- */
- public long getWALReaderEditsBufferUsage() {
- return globalSourceSource.getWALReaderEditsBufferBytes();
- }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index bec9042..752cfb8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
@@ -73,7 +72,6 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
private int statsThreadPeriod;
// ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad;
- private MetricsReplicationGlobalSourceSource globalMetricsSource;
private PeerProcedureHandler peerProcedureHandler;
@@ -121,12 +119,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
} catch (KeeperException ke) {
throw new IOException("Could not read cluster id", ke);
}
- this.globalMetricsSource = CompatibilitySingletonFactory
- .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf,
this.server, fs, logDir, oldLogDir, clusterId,
- walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
- globalMetricsSource);
+ walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty());
if (walProvider != null) {
walProvider
.addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index c5f72c9..4726949 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -329,7 +329,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
String walGroupId = walGroupShipper.getKey();
ReplicationSourceShipper shipper = walGroupShipper.getValue();
lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
- ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
+ ageOfLastShippedOp = metrics.getAgeofLastShippedOp(walGroupId);
int queueSize = queues.get(walGroupId).size();
replicationDelay =
ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
@@ -706,9 +706,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
throttler.addPushSize(batchSize);
}
totalReplicatedEdits.addAndGet(entries.size());
- long newBufferUsed = totalBufferUsed.addAndGet(-batchSize);
- // Record the new buffer usage
- this.manager.getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+ totalBufferUsed.addAndGet(-batchSize);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 2806110..a6fc313 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -155,9 +155,6 @@ public class ReplicationSourceManager implements ReplicationListener {
private AtomicLong totalBufferUsed = new AtomicLong();
- // Total buffer size on this RegionServer for holding batched edits to be shipped.
- private final long totalBufferLimit;
- private final MetricsReplicationGlobalSourceSource globalMetrics;
/**
* Creates a replication manager and sets the watch on all the other registered region servers
@@ -174,8 +171,7 @@ public class ReplicationSourceManager implements ReplicationListener {
public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
- WALFileLengthProvider walFileLengthProvider,
- MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
+ WALFileLengthProvider walFileLengthProvider) throws IOException {
// CopyOnWriteArrayList is thread-safe.
// Generally, reading is more than modifying.
this.sources = new ConcurrentHashMap<>();
@@ -209,9 +205,6 @@ public class ReplicationSourceManager implements ReplicationListener {
this.latestPaths = new HashSet<Path>();
replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
- this.totalBufferLimit = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
- HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
- this.globalMetrics = globalMetrics;
}
/**
@@ -869,14 +862,6 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
- * Returns the maximum size in bytes of edits held in memory which are pending replication
- * across all sources inside this RegionServer.
- */
- public long getTotalBufferLimit() {
- return totalBufferLimit;
- }
-
- /**
* Get the directory where wals are archived
* @return the directory where wals are archived
*/
@@ -913,10 +898,6 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
public String getStats() {
StringBuilder stats = new StringBuilder();
- // Print stats that apply across all Replication Sources
- stats.append("Global stats: ");
- stats.append("WAL Edits Buffer Used=").append(getTotalBufferUsed().get()).append("B, Limit=")
- .append(getTotalBufferLimit()).append("B\n");
for (ReplicationSourceInterface source : this.sources.values()) {
stats.append("Normal source for cluster " + source.getPeerId() + ": ");
stats.append(source.getStats() + "\n");
@@ -942,8 +923,4 @@ public class ReplicationSourceManager implements ReplicationListener {
int activeFailoverTaskCount() {
return executor.getActiveCount();
}
-
- MetricsReplicationGlobalSourceSource getGlobalMetrics() {
- return this.globalMetrics;
- }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 78e5521..23f736f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
@@ -200,14 +201,17 @@ public class ReplicationSourceShipper extends Thread {
// Clean up hfile references
for (Entry entry : entries) {
cleanUpHFileRefs(entry.getEdit());
- LOG.trace("shipped entry {}: ", entry);
+
+ TableName tableName = entry.getKey().getTableName();
+ source.getSourceMetrics().setAgeOfLastShippedOpByTable(entry.getKey().getWriteTime(),
+ tableName.getNameAsString());
}
// Log and clean up WAL logs
updateLogPosition(entryBatch);
//offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
//this sizeExcludeBulkLoad has to use same calculation that when calling
- //acquireBufferQuota() in ReplicationSourceWALReader because they maintain
+ //acquireBufferQuota() in ReplicatinoSourceWALReader because they maintain
//same variable: totalBufferUsed
source.postShipEdits(entries, sizeExcludeBulkLoad);
// FIXME check relationship between wal group and overall
@@ -215,8 +219,6 @@ public class ReplicationSourceShipper extends Thread {
entryBatch.getNbHFiles());
source.getSourceMetrics().setAgeOfLastShippedOp(
entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
- source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize());
-
if (LOG.isTraceEnabled()) {
LOG.trace("Replicated {} entries or {} operations in {} ms",
entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 328cb8f..12653f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
@@ -103,7 +104,8 @@ class ReplicationSourceWALReader extends Thread {
// the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1);
this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
- this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
+ this.totalBufferQuota = conf.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY,
+ HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
@@ -170,7 +172,7 @@ class ReplicationSourceWALReader extends Thread {
}
long entrySize = getEntrySizeIncludeBulkLoad(entry);
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
- batch.addEntry(entry, entrySize);
+ batch.addEntry(entry);
updateBatchStats(batch, entry, entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
@@ -270,8 +272,6 @@ class ReplicationSourceWALReader extends Thread {
private boolean checkQuota() {
// try not to go over total quota
if (totalBufferUsed.get() > totalBufferQuota) {
- LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
- this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
Threads.sleep(sleepForRetries);
return false;
}
@@ -306,7 +306,9 @@ class ReplicationSourceWALReader extends Thread {
private long getEntrySizeIncludeBulkLoad(Entry entry) {
WALEdit edit = entry.getEdit();
- return getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
+ WALKey key = entry.getKey();
+ return edit.heapSize() + sizeOfStoreFilesIncludeBulkLoad(edit) +
+ key.estimatedSerializedSizeOf();
}
public static long getEntrySizeExcludeBulkLoad(Entry entry) {
@@ -399,10 +401,7 @@ class ReplicationSourceWALReader extends Thread {
* @return true if we should clear buffer and push all
*/
private boolean acquireBufferQuota(long size) {
- long newBufferUsed = totalBufferUsed.addAndGet(size);
- // Record the new buffer usage
- this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
- return newBufferUsed >= totalBufferQuota;
+ return totalBufferUsed.addAndGet(size) >= totalBufferQuota;
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index 4f96c96..22b2de7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -21,9 +21,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
@@ -36,8 +34,7 @@ class WALEntryBatch {
// used by recovered replication queue to indicate that all the entries have been read.
public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null);
- private List<Pair<Entry, Long>> walEntriesWithSize;
-
+ private List<Entry> walEntries;
// last WAL that was read
private Path lastWalPath;
// position in WAL of last entry in this batch
@@ -57,7 +54,7 @@ class WALEntryBatch {
* @param lastWalPath Path of the WAL the last entry in this batch was read from
*/
WALEntryBatch(int maxNbEntries, Path lastWalPath) {
- this.walEntriesWithSize = new ArrayList<>(maxNbEntries);
+ this.walEntries = new ArrayList<>(maxNbEntries);
this.lastWalPath = lastWalPath;
}
@@ -69,22 +66,15 @@ class WALEntryBatch {
return batch;
}
- public void addEntry(Entry entry, long entrySize) {
- walEntriesWithSize.add(new Pair<>(entry, entrySize));
+ public void addEntry(Entry entry) {
+ walEntries.add(entry);
}
/**
* @return the WAL Entries.
*/
public List<Entry> getWalEntries() {
- return walEntriesWithSize.stream().map(Pair::getFirst).collect(Collectors.toList());
- }
-
- /**
- * @return the WAL Entries.
- */
- public List<Pair<Entry, Long>> getWalEntriesWithSize() {
- return walEntriesWithSize;
+ return walEntries;
}
/**
@@ -106,7 +96,7 @@ class WALEntryBatch {
}
public int getNbEntries() {
- return walEntriesWithSize.size();
+ return walEntries.size();
}
/**
@@ -170,7 +160,7 @@ class WALEntryBatch {
@Override
public String toString() {
- return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath +
+ return "WALEntryBatch [walEntries=" + walEntries + ", lastWalPath=" + lastWalPath +
", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" +
nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" +
endOfFile + "]";
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index cc7e20c..5d7366d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -28,7 +28,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,8 +35,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -47,21 +44,16 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
-import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSourceImpl;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
-import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationTableSource;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.junit.AfterClass;
@@ -312,12 +304,11 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
@Test
- public void testMetricsSourceBaseSourcePassThrough() {
+ public void testMetricsSourceBaseSourcePassthrough() {
/*
- * The replication MetricsSource wraps a MetricsReplicationTableSourceImpl,
- * MetricsReplicationSourceSourceImpl and a MetricsReplicationGlobalSourceSource,
- * so that metrics get written to both namespaces. Both of those classes wrap a
- * MetricsReplicationSourceImpl that implements BaseSource, which allows
+ * The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a
+ * MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of
+ * those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows
* for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on
* MetricsSource actually calls down through the two layers of wrapping to the actual
* BaseSource.
@@ -331,15 +322,14 @@ public class TestReplicationEndpoint extends TestReplicationBase {
MetricsReplicationSourceSource singleSourceSource =
new MetricsReplicationSourceSourceImpl(singleRms, id);
- MetricsReplicationGlobalSourceSource globalSourceSource =
- new MetricsReplicationGlobalSourceSourceImpl(globalRms);
- MetricsReplicationGlobalSourceSource spyglobalSourceSource = spy(globalSourceSource);
+ MetricsReplicationSourceSource globalSourceSource =
+ new MetricsReplicationGlobalSourceSource(globalRms);
+ MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
- Map<String, MetricsReplicationTableSource> singleSourceSourceByTable =
- new HashMap<>();
- MetricsSource source = new MetricsSource(id, singleSourceSource,
- spyglobalSourceSource, singleSourceSourceByTable);
+ Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
+ MetricsSource source =
+ new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
String gaugeName = "gauge";
@@ -388,37 +378,16 @@ public class TestReplicationEndpoint extends TestReplicationBase {
boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
.containsKey("RandomNewTable");
Assert.assertEquals(false, containsRandomNewTable);
- source.updateTableLevelMetrics(createWALEntriesWithSize("RandomNewTable"));
+ source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
containsRandomNewTable = source.getSingleSourceSourceByTable()
.containsKey("RandomNewTable");
Assert.assertEquals(true, containsRandomNewTable);
- MetricsReplicationTableSource msr = source.getSingleSourceSourceByTable()
+ MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable()
.get("RandomNewTable");
-
- // age should be greater than zero we created the entry with time in the past
+ // cannot put more concreate value here to verify because the age is arbitrary.
+ // as long as it's greater than 0, we see it as correct answer.
Assert.assertTrue(msr.getLastShippedAge() > 0);
- Assert.assertTrue(msr.getShippedBytes() > 0);
-
- }
-
- private List<Pair<Entry, Long>> createWALEntriesWithSize(String tableName) {
- List<Pair<Entry, Long>> walEntriesWithSize = new ArrayList<>();
- byte[] a = new byte[] { 'a' };
- Entry entry = createEntry(tableName, null, a);
- walEntriesWithSize.add(new Pair<>(entry, 10L));
- return walEntriesWithSize;
- }
- private Entry createEntry(String tableName, TreeMap<byte[], Integer> scopes, byte[]... kvs) {
- WALKeyImpl key1 = new WALKeyImpl(new byte[0], TableName.valueOf(tableName),
- System.currentTimeMillis() - 1L,
- scopes);
- WALEdit edit1 = new WALEdit();
-
- for (byte[] kv : kvs) {
- edit1.add(new KeyValue(kv, kv, kv));
- }
- return new Entry(key1, edit1);
}
private void doPut(byte[] row) throws IOException {
@@ -494,44 +463,6 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
}
- /**
- * Not used by unit tests, helpful for manual testing with replication.
- * <p>
- * Snippet for `hbase shell`:
- * <pre>
- * create 't', 'f'
- * add_peer '1', ENDPOINT_CLASSNAME => 'org.apache.hadoop.hbase.replication.' + \
- * 'TestReplicationEndpoint$SleepingReplicationEndpointForTest'
- * alter 't', {NAME=>'f', REPLICATION_SCOPE=>1}
- * </pre>
- */
- public static class SleepingReplicationEndpointForTest extends ReplicationEndpointForTest {
- private long duration;
- public SleepingReplicationEndpointForTest() {
- super();
- }
-
- @Override
- public void init(Context context) throws IOException {
- super.init(context);
- if (this.ctx != null) {
- duration = this.ctx.getConfiguration().getLong(
- "hbase.test.sleep.replication.endpoint.duration.millis", 5000L);
- }
- }
-
- @Override
- public boolean replicate(ReplicateContext context) {
- try {
- Thread.sleep(duration);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- }
- return super.replicate(context);
- }
- }
-
public static class InterClusterReplicationEndpointForTest
extends HBaseInterClusterReplicationEndpoint {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 067ab63..b4af38b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -336,8 +336,6 @@ public class TestWALEntryStream {
private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
- when(mockSourceManager.getTotalBufferLimit()).thenReturn(
- (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
Server mockServer = Mockito.mock(Server.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager);
@@ -345,9 +343,6 @@ public class TestWALEntryStream {
when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered);
- MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(
- MetricsReplicationGlobalSourceSource.class);
- when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
return source;
}