You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/14 06:55:05 UTC
[05/51] [abbrv] git commit: ACCUMULO-2582 Add a
replication-in-progress section to the monitor
ACCUMULO-2582 Add a replication-in-progress section to the monitor
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0f952069
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0f952069
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0f952069
Branch: refs/heads/master
Commit: 0f9520690808762fd1b63bff5159ef7e071cd64d
Parents: 177eabf
Author: Josh Elser <el...@apache.org>
Authored: Tue May 27 16:29:44 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 27 16:29:44 2014 -0400
----------------------------------------------------------------------
.../monitor/servlets/ReplicationServlet.java | 73 ++++++++++++++++++++
1 file changed, 73 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f952069/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index b8569d1..45f6baf 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -19,16 +19,20 @@ package org.apache.accumulo.monitor.servlets;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.NoSuchElementException;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.client.replication.ReplicaSystem;
import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
@@ -41,17 +45,24 @@ import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
import org.apache.accumulo.monitor.util.Table;
import org.apache.accumulo.monitor.util.celltypes.NumberType;
import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.replication.AbstractWorkAssigner;
import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.zookeeper.ZooCache;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
/**
*
@@ -61,6 +72,8 @@ public class ReplicationServlet extends BasicServlet {
private static final long serialVersionUID = 1L;
+ private ZooCache zooCache = new ZooCache();
+
@Override
protected String getTitle(HttpServletRequest req) {
return "Replication Overview";
@@ -180,6 +193,66 @@ public class ReplicationServlet extends BasicServlet {
}
replicationStats.generate(req, sb);
+
+ // Make a table for the replication data in progress
+ Table replicationInProgress = new Table("replicationInProgress", "In-Progress Replication");
+ replicationInProgress.addSortableColumn("File");
+ replicationInProgress.addSortableColumn("Peer");
+ replicationInProgress.addSortableColumn("Source Table ID");
+ replicationInProgress.addSortableColumn("Peer Identifier");
+ replicationInProgress.addUnsortableColumn("Status");
+
+ String zkRoot = ZooUtil.getRoot(inst);
+ ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
+
+ // Read the files from the workqueue in zk
+ final String workQueuePath = zkRoot + Constants.ZREPLICATION_WORK_QUEUE;
+ List<String> queuedReplication = zreader.getChildren(workQueuePath);
+ for (String queueKey : queuedReplication) {
+ Entry<String,ReplicationTarget> entry = AbstractWorkAssigner.fromQueueKey(queueKey);
+ String filename = entry.getKey();
+ ReplicationTarget target = entry.getValue();
+
+ byte[] data = zooCache.get(workQueuePath + "/" + queueKey);
+
+ if (null != data) {
+ Scanner s = ReplicationTable.getScanner(conn);
+ s.setRange(Range.exact(filename));
+ s.fetchColumn(WorkSection.NAME, target.toText());
+
+ // Fetch the work entry for this item
+ String status = "Unknown";
+ Entry<Key,Value> kv = null;
+ try {
+ kv = Iterables.getOnlyElement(s);
+ } catch (NoSuchElementException e) {
+ log.trace("Could not find status of {} replicating to {}", filename, target);
+ status = "Unknown";
+ } finally {
+ s.close();
+ }
+
+ // If we found the work entry for it, try to compute some progress
+ if (null != entry) {
+ try {
+ Status stat = Status.parseFrom(kv.getValue().get());
+ if (stat.getInfiniteEnd()) {
+ status = stat.getBegin() + "/∞";
+ } else {
+ status = stat.getBegin() + "/" + stat.getEnd();
+ }
+ } catch (InvalidProtocolBufferException e) {
+ log.warn("Could not deserialize protobuf for {}", kv.getKey(), e);
+ status = "Unknown";
+ }
+ }
+
+ // Add a row in the table
+ replicationInProgress.addRow(filename, target.getPeerName(), target.getSourceTableId(), target.getRemoteIdentifier(), status);
+ }
+ }
+
+ replicationInProgress.generate(req, sb);
}
protected Map<String,String> invert(Map<String,String> map) {