You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/14 06:55:05 UTC

[05/51] [abbrv] git commit: ACCUMULO-2582 Add a replication-in-progress section to the monitor

ACCUMULO-2582 Add a replication-in-progress section to the monitor


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0f952069
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0f952069
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0f952069

Branch: refs/heads/master
Commit: 0f9520690808762fd1b63bff5159ef7e071cd64d
Parents: 177eabf
Author: Josh Elser <el...@apache.org>
Authored: Tue May 27 16:29:44 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 27 16:29:44 2014 -0400

----------------------------------------------------------------------
 .../monitor/servlets/ReplicationServlet.java    | 73 ++++++++++++++++++++
 1 file changed, 73 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f952069/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
----------------------------------------------------------------------
diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
index b8569d1..45f6baf 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/ReplicationServlet.java
@@ -19,16 +19,20 @@ package org.apache.accumulo.monitor.servlets;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.Set;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.replication.ReplicaSystem;
 import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
@@ -41,17 +45,24 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.accumulo.monitor.util.Table;
 import org.apache.accumulo.monitor.util.celltypes.NumberType;
 import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.replication.AbstractWorkAssigner;
 import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
+import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * 
@@ -61,6 +72,8 @@ public class ReplicationServlet extends BasicServlet {
 
   private static final long serialVersionUID = 1L;
 
+  private ZooCache zooCache = new ZooCache();
+
   @Override
   protected String getTitle(HttpServletRequest req) {
     return "Replication Overview";
@@ -180,6 +193,66 @@ public class ReplicationServlet extends BasicServlet {
     }
 
     replicationStats.generate(req, sb);
+
+    // Make a table for the replication data in progress
+    Table replicationInProgress = new Table("replicationInProgress", "In-Progress Replication");
+    replicationInProgress.addSortableColumn("File");
+    replicationInProgress.addSortableColumn("Peer");
+    replicationInProgress.addSortableColumn("Source Table ID");
+    replicationInProgress.addSortableColumn("Peer Identifier");
+    replicationInProgress.addUnsortableColumn("Status");
+
+    String zkRoot = ZooUtil.getRoot(inst);
+    ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
+
+    // Read the files from the workqueue in zk
+    final String workQueuePath = zkRoot + Constants.ZREPLICATION_WORK_QUEUE;
+    List<String> queuedReplication = zreader.getChildren(workQueuePath);
+    for (String queueKey : queuedReplication) {
+      Entry<String,ReplicationTarget> entry = AbstractWorkAssigner.fromQueueKey(queueKey);
+      String filename = entry.getKey();
+      ReplicationTarget target = entry.getValue();
+
+      byte[] data = zooCache.get(workQueuePath + "/" + queueKey);
+
+      if (null != data) {
+        Scanner s = ReplicationTable.getScanner(conn);
+        s.setRange(Range.exact(filename));
+        s.fetchColumn(WorkSection.NAME, target.toText());
+
+        // Fetch the work entry for this item
+        String status = "Unknown";
+        Entry<Key,Value> kv = null;
+        try {
+          kv = Iterables.getOnlyElement(s);
+        } catch (NoSuchElementException e) {
+         log.trace("Could not find status of {} replicating to {}", filename, target);
+         status = "Unknown";
+        } finally {
+          s.close();
+        }
+
+        // If we found the work entry for it, try to compute some progress
+        if (null != entry) {
+          try {
+            Status stat = Status.parseFrom(kv.getValue().get());
+            if (stat.getInfiniteEnd()) {
+              status = stat.getBegin() + "/&infin;";
+            } else {
+              status = stat.getBegin() + "/" + stat.getEnd();
+            }
+          } catch (InvalidProtocolBufferException e) {
+            log.warn("Could not deserialize protobuf for {}", kv.getKey(), e);
+            status = "Unknown";
+          }
+        }
+
+        // Add a row in the table
+        replicationInProgress.addRow(filename, target.getPeerName(), target.getSourceTableId(), target.getRemoteIdentifier(), status);
+      }
+    }
+
+    replicationInProgress.generate(req, sb);
   }
 
   protected Map<String,String> invert(Map<String,String> map) {