You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2010/07/27 06:58:32 UTC
svn commit: r979532 - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/replication/regionserver/
src/test/java/org/apache/hadoop/hbase/replication/
Author: jdcryans
Date: Tue Jul 27 04:58:31 2010
New Revision: 979532
URL: http://svn.apache.org/viewvc?rev=979532&view=rev
Log:
HBASE-2838 Replication metrics
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=979532&r1=979531&r2=979532&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Jul 27 04:58:31 2010
@@ -854,6 +854,7 @@ Release 0.21.0 - Unreleased
HBASE-2223 Handle 10min+ network partitions between clusters
HBASE-2862 Name DFSClient for Improved Debugging
(Nicolas Spiegelberg via Stack)
+ HBASE-2838 Replication metrics
OPTIMIZATIONS
HBASE-410 [testing] Speed up the test suite
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=979532&r1=979531&r2=979532&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Tue Jul 27 04:58:31 2010
@@ -22,8 +22,6 @@ package org.apache.hadoop.hbase.replicat
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableNotFoundException;
@@ -35,16 +33,10 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
-import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
/**
* This class is responsible for replicating the edits coming
@@ -70,6 +62,7 @@ public class ReplicationSink {
private final HTablePool pool;
// boolean coming from HRS to know when the process stops
private final AtomicBoolean stop;
+ private final ReplicationSinkMetrics metrics;
/**
* Create a sink for replication
@@ -84,6 +77,7 @@ public class ReplicationSink {
this.pool = new HTablePool(this.conf,
conf.getInt("replication.sink.htablepool.capacity", 10));
this.stop = stopper;
+ this.metrics = new ReplicationSinkMetrics();
}
/**
@@ -95,6 +89,9 @@ public class ReplicationSink {
*/
public synchronized void replicateEntries(HLog.Entry[] entries)
throws IOException {
+ if (entries.length == 0) {
+ return;
+ }
// Very simple optimization where we batch sequences of rows going
// to the same table.
try {
@@ -139,6 +136,9 @@ public class ReplicationSink {
totalReplicated++;
}
put(lastTable, puts);
+ this.metrics.setAgeOfLastAppliedOp(
+ entries[entries.length-1].getKey().getWriteTime());
+ this.metrics.appliedBatchesRate.inc(1);
LOG.info("Total replicated: " + totalReplicated);
} catch (IOException ex) {
if (ex.getCause() instanceof TableNotFoundException) {
@@ -173,6 +173,7 @@ public class ReplicationSink {
try {
table = this.pool.getTable(tableName);
table.put(puts);
+ this.metrics.appliedOpsRate.inc(puts.size());
this.pool.putTable(table);
puts.clear();
} finally {
@@ -193,6 +194,7 @@ public class ReplicationSink {
try {
table = this.pool.getTable(tableName);
table.delete(delete);
+ this.metrics.appliedOpsRate.inc(1);
this.pool.putTable(table);
} finally {
if (table != null) {
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java?rev=979532&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkMetrics.java Tue Jul 27 04:58:31 2010
@@ -0,0 +1,82 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+import org.apache.hadoop.hbase.metrics.MetricsRate;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.metrics.util.MetricsIntValue;
+import org.apache.hadoop.metrics.util.MetricsLongValue;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+
+/**
+ * This class is for maintaining the various replication statistics
+ * for a sink and publishing them through the metrics interfaces.
+ */
+public class ReplicationSinkMetrics implements Updater {
+ private final MetricsRecord metricsRecord;
+ private MetricsRegistry registry = new MetricsRegistry();
+ private static ReplicationSinkMetrics instance;
+
+ /** Rate of operations applied by the sink */
+ public final MetricsRate appliedOpsRate =
+ new MetricsRate("appliedOpsRate", registry);
+
+ /** Rate of batches (of operations) applied by the sink */
+ public final MetricsRate appliedBatchesRate =
+ new MetricsRate("appliedBatchesRate", registry);
+
+ /** Age of the last operation that was applied by the sink */
+ private final MetricsLongValue ageOfLastAppliedOp =
+ new MetricsLongValue("ageOfLastAppliedOp", registry);
+
+ /**
+ * Constructor used to register the metrics
+ */
+ public ReplicationSinkMetrics() {
+ MetricsContext context = MetricsUtil.getContext("hbase");
+ String name = Thread.currentThread().getName();
+ metricsRecord = MetricsUtil.createRecord(context, "replication");
+ metricsRecord.setTag("RegionServer", name);
+ context.registerUpdater(this);
+ // Add jvmmetrics.
+ JvmMetrics.init("RegionServer", name);
+ // export for JMX
+ new ReplicationStatistics(this.registry, "ReplicationSink");
+ }
+
+ /**
+ * Set the age of the last edit that was applied
+ * @param timestamp write time of the edit
+ */
+ public void setAgeOfLastAppliedOp(long timestamp) {
+ ageOfLastAppliedOp.set(System.currentTimeMillis() - timestamp);
+ }
+ @Override
+ public void doUpdates(MetricsContext metricsContext) {
+ synchronized (this) {
+ this.appliedOpsRate.pushMetric(this.metricsRecord);
+ this.appliedBatchesRate.pushMetric(this.metricsRecord);
+ this.ageOfLastAppliedOp.pushMetric(this.metricsRecord);
+ }
+ }
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=979532&r1=979531&r2=979532&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Tue Jul 27 04:58:31 2010
@@ -53,7 +53,6 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
/**
* Class that handles the source of a replication stream.
@@ -119,8 +118,12 @@ public class ReplicationSource extends T
private long maxRetriesMultiplier;
// Current number of entries that we need to replicate
private int currentNbEntries = 0;
+ // Current number of operations (Put/Delete) that we need to replicate
+ private int currentNbOperations = 0;
// Indicates if this particular source is running
private volatile boolean running = true;
+ // Metrics for this source
+ private ReplicationSourceMetrics metrics;
/**
* Instantiation method used by region servers
@@ -167,6 +170,7 @@ public class ReplicationSource extends T
this.conf.getLong("replication.source.sleepforretries", 1000);
this.fs = fs;
this.clusterId = Byte.valueOf(zkHelper.getClusterId());
+ this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
// Finally look if this is a recovered queue
this.checkIfQueueRecovered(peerClusterZnode);
@@ -213,6 +217,7 @@ public class ReplicationSource extends T
@Override
public void enqueueLog(Path log) {
this.queue.put(log);
+ this.metrics.sizeOfLogQueue.set(queue.size());
}
@Override
@@ -334,6 +339,7 @@ public class ReplicationSource extends T
HLog.Entry entry = this.reader.next(this.entriesArray[currentNbEntries]);
while (entry != null) {
WALEdit edit = entry.getEdit();
+ this.metrics.logEditsReadRate.inc(1);
seenEntries++;
// Remove all KVs that should not be replicated
removeNonReplicableEdits(edit);
@@ -344,7 +350,10 @@ public class ReplicationSource extends T
Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) &&
edit.size() != 0 && replicating.get()) {
logKey.setClusterId(this.clusterId);
+ currentNbOperations += countDistinctRowKeys(edit);
currentNbEntries++;
+ } else {
+ this.metrics.logEditsFilteredRate.inc(1);
}
// Stop if too many entries or too big
if ((this.reader.getPosition() - this.position)
@@ -354,7 +363,7 @@ public class ReplicationSource extends T
}
entry = this.reader.next(entriesArray[currentNbEntries]);
}
- LOG.debug("currentNbEntries:" + currentNbEntries +
+ LOG.debug("currentNbOperations:" + currentNbOperations +
" and seenEntries:" + seenEntries +
" and size: " + (this.reader.getPosition() - this.position));
// If we didn't get anything and the queue has an object, it means we
@@ -382,6 +391,7 @@ public class ReplicationSource extends T
try {
if (this.currentPath == null) {
this.currentPath = queue.poll(this.sleepForRetries, TimeUnit.MILLISECONDS);
+ this.metrics.sizeOfLogQueue.set(queue.size());
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while reading edits", e);
@@ -485,6 +495,24 @@ public class ReplicationSource extends T
}
/**
+ * Count the number of different row keys in the given edit because of
+ * mini-batching. We assume that there's at least one KV in the WALEdit.
+ * @param edit edit to count row keys from
+ * @return number of different row keys
+ */
+ private int countDistinctRowKeys(WALEdit edit) {
+ List<KeyValue> kvs = edit.getKeyValues();
+ int distinctRowKeys = 1;
+ KeyValue lastKV = kvs.get(0);
+ for (int i = 0; i < edit.size(); i++) {
+ if (!kvs.get(i).matchingRow(lastKV)) {
+ distinctRowKeys++;
+ }
+ }
+ return distinctRowKeys;
+ }
+
+ /**
* Do the shipping logic
*/
protected void shipEdits() {
@@ -497,6 +525,11 @@ public class ReplicationSource extends T
this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.position, queueRecovered);
this.totalReplicatedEdits += currentNbEntries;
+ this.metrics.shippedBatchesRate.inc(1);
+ this.metrics.shippedOpsRate.inc(
+ this.currentNbOperations);
+ this.metrics.setAgeOfLastShippedOp(
+ this.entriesArray[this.entriesArray.length-1].getKey().getWriteTime());
LOG.debug("Replicated in total: " + this.totalReplicatedEdits);
break;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=979532&r1=979531&r2=979532&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Tue Jul 27 04:58:31 2010
@@ -219,6 +219,7 @@ public class ReplicationSourceManager im
this.hlogs.add(newLog.getName());
}
this.latestPath = newLog;
+ // This only update the sources we own, not the recovered ones
for (ReplicationSourceInterface source : this.sources) {
source.enqueueLog(newLog);
}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java?rev=979532&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceMetrics.java Tue Jul 27 04:58:31 2010
@@ -0,0 +1,101 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+import org.apache.hadoop.hbase.metrics.MetricsRate;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.hadoop.metrics.util.MetricsIntValue;
+import org.apache.hadoop.metrics.util.MetricsLongValue;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+
+/**
+ * This class is for maintaining the various replication statistics
+ * for a source and publishing them through the metrics interfaces.
+ */
+public class ReplicationSourceMetrics implements Updater {
+ private final MetricsRecord metricsRecord;
+ private MetricsRegistry registry = new MetricsRegistry();
+
+ /** Rate of shipped operations by the source */
+ public final MetricsRate shippedOpsRate =
+ new MetricsRate("shippedOpsRate", registry);
+
+ /** Rate of shipped batches by the source */
+ public final MetricsRate shippedBatchesRate =
+ new MetricsRate("shippedBatchesRate", registry);
+
+ /** Rate of log entries (can be multiple Puts) read from the logs */
+ public final MetricsRate logEditsReadRate =
+ new MetricsRate("logEditsReadRate", registry);
+
+ /** Rate of log entries filtered by the source */
+ public final MetricsRate logEditsFilteredRate =
+ new MetricsRate("logEditsFilteredRate", registry);
+
+ /** Age of the last operation that was shipped by the source */
+ private final MetricsLongValue ageOfLastShippedOp =
+ new MetricsLongValue("ageOfLastShippedOp", registry);
+
+ /**
+ * Current size of the queue of logs to replicate,
+ * excluding the one being processed at the moment
+ */
+ public final MetricsIntValue sizeOfLogQueue =
+ new MetricsIntValue("sizeOfLogQueue", registry);
+
+ /**
+ * Constructor used to register the metrics
+ * @param id Name of the source this class is monitoring
+ */
+ public ReplicationSourceMetrics(String id) {
+ MetricsContext context = MetricsUtil.getContext("hbase");
+ String name = Thread.currentThread().getName();
+ metricsRecord = MetricsUtil.createRecord(context, "replication");
+ metricsRecord.setTag("RegionServer", name);
+ context.registerUpdater(this);
+ // Add jvmmetrics.
+ JvmMetrics.init("RegionServer", name);
+ // export for JMX
+ new ReplicationStatistics(this.registry, "ReplicationSource for " + id);
+ }
+
+ /**
+ * Set the age of the last edit that was shipped
+ * @param timestamp write time of the edit
+ */
+ public void setAgeOfLastShippedOp(long timestamp) {
+ ageOfLastShippedOp.set(System.currentTimeMillis() - timestamp);
+ }
+
+ @Override
+ public void doUpdates(MetricsContext metricsContext) {
+ synchronized (this) {
+ this.shippedOpsRate.pushMetric(this.metricsRecord);
+ this.shippedBatchesRate.pushMetric(this.metricsRecord);
+ this.logEditsReadRate.pushMetric(this.metricsRecord);
+ this.logEditsFilteredRate.pushMetric(this.metricsRecord);
+ this.ageOfLastShippedOp.pushMetric(this.metricsRecord);
+ this.sizeOfLogQueue.pushMetric(this.metricsRecord);
+ }
+ }
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java?rev=979532&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationStatistics.java Tue Jul 27 04:58:31 2010
@@ -0,0 +1,45 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.hbase.metrics.MetricsMBeanBase;
+import org.apache.hadoop.metrics.util.MBeanUtil;
+import org.apache.hadoop.metrics.util.MetricsRegistry;
+
+import javax.management.ObjectName;
+
+/**
+ * Exports metrics recorded by {@link ReplicationSourceMetrics} as an MBean
+ * for JMX monitoring.
+ */
+public class ReplicationStatistics extends MetricsMBeanBase {
+
+ private final ObjectName mbeanName;
+
+ /**
+ * Constructor to register the MBean
+ * @param registry which rehistry to use
+ * @param name name to get to this bean
+ */
+ public ReplicationStatistics(MetricsRegistry registry, String name) {
+ super(registry, name);
+ mbeanName = MBeanUtil.registerMBean("Replication", name, this);
+ }
+}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java?rev=979532&r1=979531&r2=979532&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java Tue Jul 27 04:58:31 2010
@@ -180,7 +180,7 @@ public class TestReplication {
* Add a row, check it's replicated, delete it, check's gone
* @throws Exception
*/
- //@Test
+ @Test
public void testSimplePutDelete() throws Exception {
LOG.info("testSimplePutDelete");
Put put = new Put(row);
@@ -228,7 +228,7 @@ public class TestReplication {
* Try a small batch upload using the write buffer, check it's replicated
* @throws Exception
*/
- //@Test
+ @Test
public void testSmallBatch() throws Exception {
LOG.info("testSmallBatch");
Put put;
@@ -272,7 +272,7 @@ public class TestReplication {
* replicated, enable it, try replicating and it should work
* @throws Exception
*/
- //@Test
+ @Test
public void testStartStop() throws Exception {
// Test stopping replication
@@ -341,7 +341,7 @@ public class TestReplication {
* hlog rolling and other non-trivial code paths
* @throws Exception
*/
- //@Test
+ @Test
public void loadTesting() throws Exception {
htable1.setWriteBufferSize(1024);
htable1.setAutoFlush(false);